In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.1.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!wget -q https://jdbc.postgresql.org/download/postgresql-42.2.19.jar
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"
os.environ["SPARK_CLASSPATH"] = f"/content/postgresql-42.2.19.jar"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Waiting for headers] [1 InRelease 0 B/3,626 B 0%] [Wa0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [3 InRelease 2,586 B/88.7 k                                                                               Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease 

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g").appName("Extract2")\
.config("spark.jars", "/content/postgresql-42.2.19.jar").getOrCreate()
# Enable Arrow-based columnar data transfers
spark.conf.set("park.sqls.execution.arrow.enabled", "true")
# Increase timeout values
spark.conf.set("spark.sql.broadcastTimeout", "1000")
# disable the broadcasting
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [None]:
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Musical_Instruments_v1_00.tsv.gz"
from pyspark.sql.functions import col, lit
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
spark_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Musical_Instruments_v1_00.tsv.gz"), sep="\t", header=True)
spark_df = spark_df.withColumn("review_date",col("review_date").cast("date"))
spark_df = spark_df.withColumn("star_rating",col("star_rating").cast("int"))
spark_df = spark_df.withColumn("helpful_votes",col("helpful_votes").cast("int"))
spark_df = spark_df.withColumn("total_votes",col("total_votes").cast("int"))
spark_df = spark_df.na.drop("any")
spark_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|   product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   45610553| RMDCHWD0Y5OZ9|B00HH62VB6|     618218723|AGPtek® 10 Isolat...|Musical Instruments|          3|            0|          1|   N|                N|         Three Stars|Works very good, ...| 2015-08-31|
|         US|   14640079| RZSL0BALIYUNU|B003LRN53I|     986692292|Sennheiser HD203 ...|Musical Instruments| 

In [None]:
spark_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



In [None]:
# Outputting the number of rows
spark_df.count()

904663

In [None]:
# Removed duplicate rows
spark_df = spark_df.dropDuplicates()
spark_df.count()

904663

# Transform dataframes to fit the schema 

In [None]:
# Keep and rename necessary columns for Review ID table
review_id_df = spark_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_df = review_id_df.withColumn('batch_id',lit(2))
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+--------+
|     review_id|customer_id|product_id|product_parent|review_date|batch_id|
+--------------+-----------+----------+--------------+-----------+--------+
| RJAJML3BPHJW1|   47628330|B00B981XQC|     207269976| 2015-08-31|       2|
|R1B82PWGID1EA8|   12641855|B000EEL6LY|     678026488| 2015-08-31|       2|
|R25ZFQ35EW7IXO|    1988775|B0002CZV7I|     687252828| 2015-08-31|       2|
|R3KS3IDXEK68T6|   16149682|B004XNK7AI|     486980785| 2015-08-31|       2|
|R1VKRR7XSK5SKW|   19681020|B004PAWJDM|     395396084| 2015-08-31|       2|
| RS02E08LSFRFW|   44928285|B00178SNZS|     806365617| 2015-08-31|       2|
|R3EDFV16NVBQ5P|   50161886|B00EO4A7L0|     789342770| 2015-08-30|       2|
| R3CP22ACPQIHV|   12744882|B0002GX7ZM|     477030445| 2015-08-30|       2|
| R7I7M18YAKMV7|   18067281|B003FRMRC4|     313130939| 2015-08-30|       2|
| ROI1P4F99XHVC|   13518155|B001677QO4|     995106205| 2015-08-30|       2|
| RF9SWVPSR8

In [None]:
# For Products table
products_df = spark_df.select(["product_id", "product_title", "product_category"])
products_df = products_df.dropDuplicates(["product_id"])
products_df = products_df.dropDuplicates()
products_df = products_df.withColumn('batch_id',lit(2))
products_df.show()

+----------+--------------------+-------------------+--------+
|product_id|       product_title|   product_category|batch_id|
+----------+--------------------+-------------------+--------+
|B000078JQ0|M-Audio Portable ...|Musical Instruments|       2|
|B0000C8CFP|Gemini CDJ 15X TO...|Musical Instruments|       2|
|B0002CZTIE|Gator 88 Note Key...|Musical Instruments|       2|
|B0002D02NA|Protec CTG234 Ele...|Musical Instruments|       2|
|B0002D03EI|Traveler Guitar O...|Musical Instruments|       2|
|B0002D049M|Wireless Solution...|Musical Instruments|       2|
|B0002D04B0|Zildjian A Custom...|Musical Instruments|       2|
|B0002D064U|Seymour Duncan - ...|Musical Instruments|       2|
|B0002D0DVG|Evans RGS Pad Bas...|Musical Instruments|       2|
|B0002D0GBS|Zildjian ZXT 10-I...|Musical Instruments|       2|
|B0002D0LRC|Gibson Gear PPAT-...|Musical Instruments|       2|
|B0002D0PJ6|Clayton Picks Bla...|Musical Instruments|       2|
|B0002DUS2A|Fender Stainless ...|Musical Instruments|  

In [None]:
# For Customers table
customers_df = spark_df.groupby("customer_id").agg({
    "customer_id": "count"
    }).withColumnRenamed("count(customer_id)", "customer_count")
customers_df = customers_df.withColumn('batch_id',lit(2))
customers_df.show()

+-----------+--------------+--------+
|customer_id|customer_count|batch_id|
+-----------+--------------+--------+
|   13282123|             3|       2|
|   16604143|             1|       2|
|   37485313|            13|       2|
|   20516380|             7|       2|
|    7382856|             1|       2|
|   48803349|             1|       2|
|    2082754|             1|       2|
|   37546075|            32|       2|
|   21073511|             1|       2|
|   26541525|             6|       2|
|   11270814|             1|       2|
|   13022232|             2|       2|
|   35465802|             1|       2|
|   20943978|             6|       2|
|   32347749|             5|       2|
|   26794699|             1|       2|
|   49956082|            14|       2|
|   17561156|             2|       2|
|   46998862|             1|       2|
|   51560727|             1|       2|
+-----------+--------------+--------+
only showing top 20 rows



In [None]:
# For Review table
reviews_df = spark_df.select(["review_id", "verified_purchase", "review_headline", "review_body"])
reviews_df = reviews_df.dropDuplicates()
reviews_df = reviews_df.withColumn('batch_id',lit(2))
reviews_df.show()

+--------------+-----------------+--------------------+--------------------+--------+
|     review_id|verified_purchase|     review_headline|         review_body|batch_id|
+--------------+-----------------+--------------------+--------------------+--------+
| RVDQ2JXRGP2OK|                Y|          Five Stars|I'm very happy wi...|       2|
|R2I3XUICPJ41US|                Y|Definitely worth it!|Great value! I ha...|       2|
|R11F02GBH7WDV7|                Y|Awesome bass - wo...|Love my Fender ba...|       2|
|R17U3Y9X16SURO|                Y| Wonderfully useful!|Wonderfully usefu...|       2|
|R3N7TSN3U7DAJD|                Y|          Five Stars|Well made and com...|       2|
| RZN00RE92E9W4|                Y|Excellent for hea...|Excellent picks f...|       2|
|R1RKEFDQO16XYT|                Y|          Five Stars|Grandson loves this.|       2|
|R3S07F2O4LVV4R|                Y|Is this some sort...|Is this some sort...|       2|
|R117WJPKTFRUYL|                Y|holds your guitar...

In [None]:
# For Vine table
vine_df = spark_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df = vine_df.withColumn('batch_id',lit(2))
vine_df.show()

+--------------+-----------+-------------+-----------+----+--------+
|     review_id|star_rating|helpful_votes|total_votes|vine|batch_id|
+--------------+-----------+-------------+-----------+----+--------+
| RJAJML3BPHJW1|          5|            0|          1|   N|       2|
|R1B82PWGID1EA8|          5|            0|          0|   N|       2|
|R25ZFQ35EW7IXO|          3|            0|          2|   N|       2|
|R3KS3IDXEK68T6|          5|            0|          0|   N|       2|
|R1VKRR7XSK5SKW|          1|            0|          0|   N|       2|
| RS02E08LSFRFW|          5|            0|          0|   N|       2|
|R3EDFV16NVBQ5P|          5|            0|          0|   N|       2|
| R3CP22ACPQIHV|          5|            0|          0|   N|       2|
| R7I7M18YAKMV7|          5|            0|          0|   N|       2|
| ROI1P4F99XHVC|          4|            0|          0|   N|       2|
| RF9SWVPSR80M1|          4|            0|          0|   N|       2|
|R1GCQNEKUHYMS7|          5|      

# Load data to AWS RDS

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
# Insert the directory
import sys
sys.path.insert(0,'/content/drive/My Drive/Colab_Notebooks/Homework/')

In [None]:
from config import server, database, port, username, password

## Run schema.sql on pgAdmin before running below codes

In [None]:
# Review ID table
review_id_df.write.mode("append").format("jdbc").option("url", f"jdbc:postgresql://{server}:{port}/{database}").option("dbtable", "review_id_table").option("user", f"{username}").option("password", f"{password}")\
           .option("driver", "org.postgresql.Driver").save()

In [None]:
# Product table
products_df.write.mode("append").format("jdbc").option("url", f"jdbc:postgresql://{server}:{port}/{database}").option("dbtable", "products").option("user", f"{username}").option("password", f"{password}")\
           .option("driver", "org.postgresql.Driver").save()

In [None]:
# Customers table
customers_df.write.mode("append").format("jdbc").option("url", f"jdbc:postgresql://{server}:{port}/{database}").option("dbtable", "customers").option("user", f"{username}").option("password", f"{password}")\
           .option("driver", "org.postgresql.Driver").save()

In [None]:
# Reviews table
reviews_df.write.mode("append").format("jdbc").option("url", f"jdbc:postgresql://{server}:{port}/{database}").option("dbtable", "reviews").option("user", f"{username}").option("password", f"{password}")\
           .option("driver", "org.postgresql.Driver").save()

In [None]:
# Vine table
vine_df.write.mode("append").format("jdbc").option("url", f"jdbc:postgresql://{server}:{port}/{database}").option("dbtable", "vine_table").option("user", f"{username}").option("password", f"{password}")\
           .option("driver", "org.postgresql.Driver").save()

# Query Test

In [None]:
df = spark.read.jdbc(url = f"jdbc:postgresql://{server}:{port}/{database}", 
                     table = "(SELECT t1.*, t2.vine \
                     FROM reviews AS t1 INNER JOIN vine_table AS t2 ON t1.review_id = t2.review_id) AS my_table",
                     properties={"user": f"{username}", "password": f"{password}", "driver": 'org.postgresql.Driver'})
df.show()

In [None]:
df.count()

In [None]:
df = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{server}:{port}/{database}") \
    .option("dbtable", "review_id_table") \
    .option("user", f"{username}") \
    .option("password", f"{password}") \
    .option("driver", "org.postgresql.Driver") \
    .load()
df.printSchema()
df.show()