In [1]:
# Activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.2'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Hit:4 http://archive.ubuntu.com/ubuntu focal InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.

In [2]:
# Get postgresql package
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2023-01-28 14:22:28--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2023-01-28 14:22:29 (4.53 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
# Import Spark and create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-HW-1").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Extract the Amazon Data into Spark DataFrame

In [4]:
# Read in the data from an S3 Bucket
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Wireless_v1_00.tsv.gz"), sep="\t", header=True)

df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|        Wireless|          2|            1|          3|   N|                Y|Looks good, funct...|2 issues  -  Once...| 2015-08-31|
|         US|   50800750|R15V54KBMTQWAY|B00XK95RPQ|     516894650|Selfie Stick Fibl...|        Wireless|          4|    

In [5]:
# Get the number of rows in the DataFrame.
df.count()

9002021

# Transform the Data

## Create the "review_id_table".

In [6]:
from pyspark.sql.functions import to_date
# Create the "review_id_df" DataFrame with the appropriate columns and data types.
review_id_df = df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R3W4P9UBGNGH1U|   16414143|B00YL0EKWE|     852431543| 2015-08-31|
|R15V54KBMTQWAY|   50800750|B00XK95RPQ|     516894650| 2015-08-31|
| RY8I449HNXSVF|   15184378|B00SXRXUKO|     984297154| 2015-08-31|
|R18TLJYCKJFLSR|   10203548|B009V5X1CE|     279912704| 2015-08-31|
|R1NK26SWS53B8Q|     488280|B00D93OVF0|     662791300| 2015-08-31|
|R11LOHEDYJALTN|   13334021|B00XVGJMDQ|     421688488| 2015-08-31|
|R3ALQVQB2P9LA7|   27520697|B00KQW1X1C|     554285554| 2015-08-31|
|R3MWLXLNO21PDQ|   48086021|B00IP1MQNK|     488006702| 2015-08-31|
|R2L15IS24CX0LI|   12738196|B00HVORET8|     389677711| 2015-08-31|
|R1DJ8976WPWVZU|   15867807|B00HX3G6J6|     299654876| 2015-08-31|
|R3MRWNNR8CBTB7|    1972249|B00U4NATNQ|     577878727| 2015-08-31|
|R1DS6DKTUXAQK3|   10956619|B00SZEFDH8|     654620704| 2015-08

## Create the "products" Table

In [7]:
# Create the "products_df" DataFrame that drops the duplicates in the "product_id" and "product_title columns. 
products_df = df.select(["product_id", "product_title"]).distinct()
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B010LVPJH6|LG G Watch Urbane...|
|B00K5ZNXZ4|Minisuit Sporty A...|
|B00S9RBQOK|iPhone 6 Plus Cas...|
|B0116N7GYC|Selfie Stick, Por...|
|B009UNH0CY|Plantronics Voyag...|
|B00L8GFYAG|Eallc New Quality...|
|B00R3LMTI0|Kaleidio [Wallop ...|
|B011R0VG36|Galaxy Note 4 Cas...|
|B00BXX0QVQ|iKross Black Dual...|
|B00F4AYI2M|Incipio DualPro C...|
|B00GPI3OHC|Retevis H-777 2 W...|
|B00Y9ZUVU6|Tiwkich 2 in 1 Du...|
|B00W65SYHS|LG G4 case, Caseo...|
|B00V5OU6CW|S5 Leather case,P...|
|B00LP3FSH6|Escort Coiled Sma...|
|B00MIO2KRC|Black Box G1W-C C...|
|B00V5FZM0M|   KoKo Cases 5/5S !|
|B00T1KO2TA|iPhone 6 & 6S Cas...|
|B00YU9XOTQ|Galaxy S5 Screen ...|
|B00PI7IGHE|Soyan Latest DZ09...|
+----------+--------------------+
only showing top 20 rows



## Create the "customers" Table

In [8]:
# Create the "customers_df" DataFrame that groups the data on the "customer_id" by the number of times a customer reviewed a product. 
customers_df = df.select(["customer_id"])
customers_df = customers_df.groupBy("customer_id").count()
customers_df.orderBy("customer_id").select(["customer_id", "count"])
customers_df.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|    1355626|    1|
|   15043009|    1|
|   22770141|    3|
|   45481568|    3|
|   25404882|    2|
|    4697081|    2|
|   13888158|    3|
|   18288553|    2|
|   22279960|    1|
|   25278739|    2|
|   15407188|    1|
|    5030775|   21|
|   13267537|    1|
|   29937804|    2|
|   20919693|    1|
|    8854514|    3|
|   49258595|    8|
|     234036|    1|
|   33346823|    1|
|    7403004|    1|
+-----------+-----+
only showing top 20 rows



## Create the "vine_table".

In [9]:
# Create the "vine_df" DataFrame that has the "review_id", "star_rating", "helpful_votes", "total_votes", and "vine" columns. 
vine_df = df.select(["review_id","star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R3W4P9UBGNGH1U|          2|            1|          3|   N|
|R15V54KBMTQWAY|          4|            0|          0|   N|
| RY8I449HNXSVF|          5|            0|          0|   N|
|R18TLJYCKJFLSR|          5|            0|          0|   N|
|R1NK26SWS53B8Q|          5|            0|          0|   N|
|R11LOHEDYJALTN|          5|            0|          0|   N|
|R3ALQVQB2P9LA7|          4|            0|          0|   N|
|R3MWLXLNO21PDQ|          5|            0|          0|   N|
|R2L15IS24CX0LI|          5|            0|          0|   N|
|R1DJ8976WPWVZU|          3|            0|          0|   N|
|R3MRWNNR8CBTB7|          5|            0|          0|   N|
|R1DS6DKTUXAQK3|          5|            0|          0|   N|
| RWJM5E0TWUJD2|          5|            0|          0|   N|
|R1XTJKDYNCRGAC|          1|            

# Load

In [15]:
mode = "append"
jdbc_url="jdbc:postgresql://mypostgresdb.ciinmofh3cpc.ap-northeast-1.rds.amazonaws.com:5432/mypostgresdb"
config = {"user":"root", "password": "", "driver":"org.postgresql.Driver"}

In [16]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table="review_id_table", mode=mode, properties=config)

Py4JJavaError: ignored

In [12]:
# Write products_df to table in RDS
products_df.write.jdbc(url=jdbc_url, table="products", mode=mode, properties=config)

Py4JJavaError: ignored

In [None]:
# Write customers_df to table in RDS
customers_df.write.jdbc(url=jdbc_url, table="customers", mode=mode, properties=config)

In [None]:
# Write vine_df to table in RDS
vine_df.write.jdbc(url=jdbc_url, table="vine_table", mode=mode, properties=config)