In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.3'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Co                                                                               Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:10 htt

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-12-21 09:43:06--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2021-12-21 09:43:06 (6.02 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData_Amazon_Reviews").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles

amazon_url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/"

In [5]:
# Dataset 1 --> Video Games
dataset1 = "amazon_reviews_us_Video_Games_v1_00.tsv.gz"

amazon_url1 = amazon_url + dataset1
spark.sparkContext.addFile(amazon_url1)
video_games_df = spark.read.csv(SparkFiles.get(dataset1), inferSchema=True, sep="\t", header=True)

# Show DataFrame
video_games_df.show(truncate=False)

+-----------+-----------+--------------+----------+--------------+---------------------------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|marketplace|customer_id|review_id     |product_id|product_parent|product_title                                                                                                              |product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline                     

In [6]:
# Dataset 2 --> Digital Music
dataset2 = "amazon_reviews_us_Digital_Music_Purchase_v1_00.tsv.gz"

amazon_url2 = amazon_url + dataset2
spark.sparkContext.addFile(amazon_url2)
digital_music_df = spark.read.csv(SparkFiles.get(dataset2), inferSchema=True, sep="\t", header=True)

# Show DataFrame
digital_music_df.show(truncate=False)

+-----------+-----------+--------------+----------+--------------+----------------------------------------------------------------------------+----------------------+-----------+-------------+-----------+----+-----------------+--------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|marketplace|customer_id|review_id     |product_id|product_parent|product_title                                                               |product_category      |star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline                                         |review_body                                                                                                                                    

In [7]:
# Count the number of records(rows) in the datasets.

print('Number of records in Video games dataset:', video_games_df.count())

print('Number of records in Digital music dataset:', digital_music_df.count())

Number of records in Video games dataset: 1785997
Number of records in Digital music dataset: 1688884


In [8]:
amazon_df = video_games_df.union(digital_music_df)
amazon_df.show(truncate=False)

+-----------+-----------+--------------+----------+--------------+---------------------------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|marketplace|customer_id|review_id     |product_id|product_parent|product_title                                                                                                              |product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline                     

In [9]:
# Transform the dataset to fit the table review_id_table
from pyspark.sql.types import DateType

review_id_df = amazon_df.select('review_id', 'customer_id', 'product_id', 'product_parent', 'review_date')

review_id_table = review_id_df.withColumn("review_date", review_id_df["review_date"].cast(DateType()))

print('Number of rows in review_id_table:' + str(review_id_table.count()))
review_id_table.orderBy('review_id').show(truncate=False)

Number of rows in review_id_table:3474881
+--------------+-----------+----------+--------------+-----------+
|review_id     |customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R100007TERQ36I|20227669   |B002NIWBBI|430027531     |2015-07-22 |
|R10001PSW1P9MN|43072842   |B000TEVKF6|584630580     |2012-12-10 |
|R10002GHQRXG9Q|33776241   |B00IOMX5L2|501710886     |2014-06-05 |
|R10003I619LWL0|4317219    |B001REZLY8|816407837     |2015-01-10 |
|R100078OO83YQB|31850030   |B0016HM45K|670563392     |2008-12-29 |
|R10008N8QVGXBM|38876196   |B003Y3XPOI|738771843     |2011-08-11 |
|R10009MVS1J3KE|16982254   |B00137II62|220306261     |2014-04-19 |
|R10009VHWXV2UL|45142846   |B00LQ8IGVK|899670131     |2014-10-04 |
|R10009XH2FY9Q0|51098385   |B00000IOQV|845394808     |1999-11-28 |
|R1000AG0XB09WV|43615244   |B003DSCGDK|989074968     |2012-11-26 |
|R1000EJULTHQ16|12211018   |B002I0H7K6|22716581      |2012-02-05 |
|R1000GECIM9DZG|1043

In [10]:
# 'review_id_df' Column list and its data types
review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [11]:
# Transform the dataset to fit the table 'products' and table count
products_df = amazon_df.select('product_id', 'product_title').distinct()
print("Distinct count of products:", products_df.count())
products_df.orderBy('product_id').show(truncate=False)

Distinct count of products: 767340
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_id|product_title                                                                                                                                                               |
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0000118532|Bethesda The Evil Within Xbox One                                                                                                                                           |
|006056038X|Bitter Truth                                                                                                                                                                |
|006073132X|Freakonomics: A Rogue E

In [12]:
# 'products_df' Column list and its data types
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [13]:
# Transform the dataset to fit the 'customer' table and print the count of records
customersCounts = amazon_df.select('customer_id').groupBy('customer_id').count()
customers_df = customersCounts.withColumnRenamed('count', 'customer_count')

print('Number of records in customers: '+ str(customers_df.count()))
customers_df.orderBy('customer_id').show(truncate=False)

Number of records in customers: 1782219
+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|10018      |4             |
|10114      |2             |
|10128      |1             |
|10145      |1             |
|10146      |1             |
|10156      |1             |
|10164      |1             |
|10167      |1             |
|10192      |1             |
|10206      |8             |
|10267      |1             |
|10323      |1             |
|10348      |1             |
|10362      |1             |
|10382      |1             |
|10408      |1             |
|10453      |1             |
|10467      |1             |
|10468      |1             |
|10481      |1             |
+-----------+--------------+
only showing top 20 rows



In [14]:
# 'customers' Table column list and its data types
customers_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)



In [15]:
# Transform the dataset to fit 'vine_table'
vine_table= amazon_df.select('review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine')
print('Count of records in Vine dataset:' + str(vine_table.count()))
print('Count of records in Vine dataset:' + str(vine_table.distinct().count()))
vine_table.orderBy('review_id').show(truncate=False)

Count of records in Vine dataset:3474881
Count of records in Vine dataset:3474881
+--------------+-----------+-------------+-----------+----+
|review_id     |star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R100007TERQ36I|5          |0            |0          |N   |
|R10001PSW1P9MN|5          |0            |0          |N   |
|R10002GHQRXG9Q|5          |0            |0          |N   |
|R10003I619LWL0|5          |1            |1          |N   |
|R100078OO83YQB|5          |0            |0          |N   |
|R10008N8QVGXBM|5          |0            |0          |N   |
|R10009MVS1J3KE|5          |0            |0          |N   |
|R10009VHWXV2UL|5          |0            |0          |N   |
|R10009XH2FY9Q0|5          |4            |4          |N   |
|R1000AG0XB09WV|5          |0            |0          |N   |
|R1000EJULTHQ16|5          |0            |0          |N   |
|R1000GECIM9DZG|5          |0            |0          |N   |
|R1000ID99V4TKO|5 

In [16]:
# 'vine_table' column list and its data types
vine_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



In [17]:
#Create the Database properties

mode = "append"
db_url = "jdbc:postgresql://<AWS RDS endpoint>:5432/postgres"

config = {
    "user" : "<username>", 
    "password" : "<password>",
    "driver" : "org.postgresql.Driver"
}

In [18]:
# Load 'review_id_df' dataframe into 'review_id_table

review_id_df.write.jdbc(url=db_url, table='review_id_table', mode=mode, properties=config)

In [19]:
# Load dataframe 'products_df' into RDS Table 'products'

products_df.write.jdbc(url=db_url, table='products', mode=mode, properties=config)

In [20]:
# Load dataframe 'customers_df' into RDS Table 'customers'

customers_df.write.jdbc(url=db_url, table='customers', mode=mode, properties=config)

In [21]:
# Load dataframe 'vine_table' into RDS Table 'vine_table'

vine_table.write.jdbc(url=db_url, table='vine_table', mode=mode, properties=config)