In [31]:
# Activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.2'
# spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Waiting for header                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Waiting for header                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Connecting to ppa.                                                                               Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Hit:5 http://security.ubuntu.com/ubuntu focal-security InRelease
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/

In [None]:
# Get postgresql package
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2023-02-22 01:19:27--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2023-02-22 01:19:27 (4.56 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [None]:
# Import Spark and create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-HW-1").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Extract the Amazon Data into Spark DataFrame

In [32]:
# Read in the data from an S3 Bucket
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_DVD_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Video_DVD_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="yyyy-mm-dd")

df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   27288431| R33UPQQUZQEM8|B005T4ND06|     400024643|Yoga for Movement...|       Video DVD|          5|            3|          3|   N|                Y|This was a gift f...|This was a gift f...|2015-01-31 00:08:00|
|         US|   13722556|R3IKTNQQPD9662|B004EPZ070|     685335564|  Something Borrowed| 

In [33]:
# Get the number of rows in the DataFrame.
df.count()


5069140

# Transform the Data

## Create the "review_id_table".

In [34]:
from pyspark.sql.functions import to_date
# Create the "review_id_df" DataFrame with the appropriate columns and data types.
review_id_df = df.select(["review_id","customer_id","product_id", "product_parent","review_date"])
review_id_df.show()

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
| R33UPQQUZQEM8|   27288431|B005T4ND06|     400024643|2015-01-31 00:08:00|
|R3IKTNQQPD9662|   13722556|B004EPZ070|     685335564|2015-01-31 00:08:00|
|R3U27V5QMCP27T|   20381037|B005S9EKCW|     922008804|2015-01-31 00:08:00|
|R2TOH2QKNK4IOC|   24852644|B00FC1ZCB4|     326560548|2015-01-31 00:08:00|
|R2XQG5NJ59UFMY|   15556113|B002ZG98Z0|     637495038|2015-01-31 00:08:00|
|R1N1KHBRR4ZTX3|    6132474|B00X8RONBO|     896602391|2015-01-31 00:08:00|
|R3OM9S0TCBP38K|   48049524|B000CEXFZG|     115883890|2015-01-31 00:08:00|
|R1W4S949ZRCTBW|    3282516|B00ID8H8EW|     977932459|2015-01-31 00:08:00|
|R18JL1NNQAZFV2|   51771179|B000TGJ8IU|     840084782|2015-01-31 00:08:00|
|R1LP6PR06OPYUX|   31816501|B00DPMPTDS|     262144920|2015-01-31 00:08:00|
| RZKBT035JA0UQ|   161649

## Create the "products" Table

In [35]:
# Create the "products_df" DataFrame that drops the duplicates in the "product_id" and "product_title columns. 
products_df = df.select(["product_id", "product_title"])
products_df = products_df.dropDuplicates()
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B001DE29SS|Chaplin (15th Ann...|
|B00ZJ2GL6G|Jurassic World 3D...|
|B00T5DM876|Pretty Little Lia...|
|B004VW4V76|         Lebanon, PA|
|B009D4SFEC|Murdoch Mysteries...|
|B000GG4Y5K|Numb3rs - The Com...|
|B00383XZP8|TCM Spotlight: Ch...|
|B00I9MS86O|Breaking Bad: The...|
|B00005K3O4|Killer Klowns Fro...|
|B00BD6KETC|     Psych: Season 7|
|B00WHZZBTQ|Heaven Adores You...|
|B000092Q5C|Remo Williams - T...|
|B004AXPWH4|10 MINUTESOLUTION...|
|B00FEP9PQG|Boardwalk Empire:...|
|B0002F6BJW|Fractured Flicker...|
|B0007LXP7W|The A-Team - Seas...|
|B005ET4NHI|  Where The Boys Are|
|B005EBOSO4|Muscle Energy Tec...|
|B003V5CTKU|Black Label Socie...|
|B000087F6L|      Fahrenheit 451|
+----------+--------------------+
only showing top 20 rows



## Create the "customers" Table

In [36]:
# Create the "customers_df" DataFrame that groups the data on the "customer_id" by the number of times a customer reviewed a product. 
from pyspark.sql.functions import desc

customers_df = df.groupby("customer_id").agg({"customer_id":"count"})
customers_df = customers_df.orderBy(desc("count(customer_id)"))
customers_df = customers_df.withColumnRenamed("count(customer_id)", "customer_count") 
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   43430756|          3582|
|   18116317|          2987|
|   52287429|          2747|
|   52496677|          2650|
|   51110953|          2624|
|   19792742|          2495|
|   20018062|          2492|
|   50068216|          2379|
|   14539589|          2269|
|   50881246|          2104|
|   12797924|          1755|
|   50913245|          1749|
|   23133606|          1731|
|   49382242|          1730|
|   51105456|          1695|
|   38002140|          1689|
|   45838740|          1618|
|   21779173|          1501|
|   49837360|          1499|
|   17590934|          1462|
+-----------+--------------+
only showing top 20 rows



## Create the "vine_table".

In [None]:
# Create the "vine_df" DataFrame that has the "review_id", "star_rating", "helpful_votes", "total_votes", and "vine" columns. 

# Load in a sql function to use columns
from pyspark.sql.functions import col

vine_df = df.select(["review_id","star_rating","helpful_votes", "total_votes","vine"])

# # Filter for only columns from Amazon's Vine program
# vine_table_df = vine_table_df.filter(col("vine")  == "Y")
vine_df.show() 

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| RTIS3L2M1F5SM|          5|            0|          0|   N|
| R1ZV7R40OLHKD|          5|            0|          0|   N|
|R3BH071QLH8QMC|          1|            0|          1|   N|
|R127K9NTSXA2YH|          3|            0|          0|   N|
|R32ZWUXDJPW27Q|          4|            0|          0|   N|
|R3AQQ4YUKJWBA6|          1|            0|          0|   N|
|R2F0POU5K6F73F|          5|            0|          0|   N|
|R3VNR804HYSMR6|          5|            0|          0|   N|
| R3GZTM72WA2QH|          5|            0|          0|   N|
| RNQOY62705W1K|          4|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 10 rows



# Load

In [45]:
mode = "append"
jdbc_url="jdbc:postgresql://<endpoint>:5432/my_video_DVD_db"
config = {"user":"postgres", "password": "password", "driver":"org.postgresql.Driver"}

In [48]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

Py4JJavaError: ignored

In [None]:
# Write products_df to table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)


In [None]:
# Write customers_df to table in RDS
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)


In [None]:
# Write vine_df to table in RDS
vine_df.write.jdbc(url=jdbc_url, table='vine', mode=mode, properties=config)
