<a href="https://colab.research.google.com/github/kk-deng/Big-Data-Challenge/blob/main/Big_Data_Level_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Config for Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()
from pyspark.sql import SparkSession 

spark = SparkSession.builder.appName("Basics").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# For connection to Postgres 
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-03-21 02:27:27--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2021-03-21 02:27:28 (4.35 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [16]:
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Luggage_v1_00.tsv.gz"

from pyspark import SparkFiles
spark.sparkContext.addFile(url)
spark_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Luggage_v1_00.tsv.gz"), sep="\t", header=True)
spark_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   40884699| R9CO86UUJCAW5|B00VGTN02Y|     786681372|Teenage Mutant Ni...|         Luggage|          3|            0|          0|   N|                Y|my review of this...|my review of this...| 2015-08-31|
|         US|   23208852|R3PR8X6QGVJ8B1|B005KIWL0E|     618251799|Kenneth Cole Reac...|         Luggage|          5|    

In [17]:
# Outputting the number of rows
spark_df.count()

348657

# Transform to match the table schema

In [18]:
# Removed duplicate rows
spark_df = spark_df.dropDuplicates()
spark_df.count()

348657

In [19]:
# Keep and rename necessary columns
review_id_df = spark_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RBD5TFCXJCAFY|   24170812|B00ICD5Q9M|     185207508| 2015-08-31|
|R1MCDDFVHB3X8P|   42610491|B004T2ZVPO|     961936464| 2015-08-31|
|R10HZY3WOJJZO3|   33703156|B002AS2EQ8|     687301761| 2015-08-30|
| R4KYVY5EMU2KA|   43687072|B00ICD5ZHU|     293482550| 2015-08-30|
|R1DTTWJF8FYBL1|    2199737|B000SKXRGQ|     940720910| 2015-08-30|
|R33T9I792ZC6GC|   12815344|B00NE7GDWA|      58174354| 2015-08-30|
|R3IWKYPCMKCE7J|   50497994|B00K1MGD0S|     458369720| 2015-08-30|
|R3P3XEHE1N4WWM|   14439583|B00C967F1U|     396288456| 2015-08-29|
|R1CMNW1EVU36K9|   14469453|B0109ZA5EQ|      97139436| 2015-08-29|
|R1KUPC2HFJ4M9B|   26127510|B00O60WW9W|     541224447| 2015-08-29|
|R1624GG6NZRKD1|   22232730|B00RG2DSI6|     336578846| 2015-08-28|
|R2B2RPBZOKGWA4|     110900|B013GCN4FQ|     622815633| 2015-08

In [20]:
# For Products table
products_df = spark_df.select(["product_id", "product_title"])
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00ICD5Q9M|Travelon Anti-The...|
|B004T2ZVPO|Organized Travel ...|
|B002AS2EQ8|Olympia Luggage  ...|
|B00ICD5ZHU|Travelon Anti-The...|
|B000SKXRGQ|Lewis N. Clark De...|
|B00NE7GDWA|Berchirly Small V...|
|B00K1MGD0S|Filson Dry Duffle...|
|B00C967F1U|Skyway Luggage Si...|
|B0109ZA5EQ|Passport Holder- ...|
|B00O60WW9W|Kids Backpack, ic...|
|B00RG2DSI6|Hanging Cosmetic ...|
|B013GCN4FQ|Demarkt Casual Fu...|
|B008VU8634|Herschel Supply C...|
|B00XL8KK8A|Witzman Men's Ret...|
|B00BS85OZ4|Horizon Dance Rel...|
|B006HMPV4A|Everest Cooler Lu...|
|B00IT05XB4|Tommy Hilfiger Mi...|
|B00T8OGJ7W|Floto Venezia Oli...|
|B00V7M5I8E|Alice Hot Selling...|
|B00LZXPJ9S|Canvas 13" Laptop...|
+----------+--------------------+
only showing top 20 rows



In [21]:
# For Customers table
customers_df = spark_df.groupby("customer_id").agg({
    "customer_id": "count"
    }).withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   22670687|             1|
|   37872518|             1|
|    2313517|             2|
|   51699315|             3|
|    2858831|             1|
|   32705839|             2|
|   18914339|             1|
|   12716063|             1|
|   29867038|             1|
|   50345937|             1|
|   12014780|             1|
|   49012244|             1|
|   25180989|             2|
|    7551037|             1|
|   31216481|             1|
|   52582489|             1|
|   17767270|             1|
|   21012418|             6|
|   49882810|             3|
|   32104141|             1|
+-----------+--------------+
only showing top 20 rows



In [22]:
# For Review table
reviews_df = spark_df.select(["review_id", "review_headline", "review_body"])
reviews_df.show()

+--------------+--------------------+--------------------+
|     review_id|     review_headline|         review_body|
+--------------+--------------------+--------------------+
| RBD5TFCXJCAFY|        Works great!|Works great, soli...|
|R1MCDDFVHB3X8P|It seems pretty d...|Very cute and col...|
|R10HZY3WOJJZO3|          Five Stars|Very happy with t...|
| R4KYVY5EMU2KA|          Five Stars|Well made; just w...|
|R1DTTWJF8FYBL1|... another one w...|I had another one...|
|R33T9I792ZC6GC| Great  weekend bag.|My new weekend ba...|
|R3IWKYPCMKCE7J|          Five Stars|Amazing bag, I us...|
|R3P3XEHE1N4WWM|Does What it is S...|Decent low-cost t...|
|R1CMNW1EVU36K9|Lots of places to...|This is a really ...|
|R1KUPC2HFJ4M9B|           Very cute|I got this for my...|
|R1624GG6NZRKD1|          Five Stars|Convenient and or...|
|R2B2RPBZOKGWA4|          Four Stars|Super fast shippi...|
|R15U77KMVQDHGK|          Five Stars|Love it! Herschel...|
|R1XMJSPEMZSN5G|Versatile and wel...|Sturdy and luggab..

In [24]:
# For Vine table
vine_df = spark_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| RBD5TFCXJCAFY|          5|            0|          0|   N|
|R1MCDDFVHB3X8P|          4|            0|          0|   N|
|R10HZY3WOJJZO3|          5|            3|          4|   N|
| R4KYVY5EMU2KA|          5|            1|          1|   N|
|R1DTTWJF8FYBL1|          5|            0|          0|   N|
|R33T9I792ZC6GC|          5|            0|          0|   N|
|R3IWKYPCMKCE7J|          5|            0|          0|   N|
|R3P3XEHE1N4WWM|          4|            0|          0|   N|
|R1CMNW1EVU36K9|          5|            2|          2|   N|
|R1KUPC2HFJ4M9B|          5|            3|          3|   N|
|R1624GG6NZRKD1|          5|            0|          0|   N|
|R2B2RPBZOKGWA4|          4|            0|          1|   N|
|R15U77KMVQDHGK|          5|            0|          0|   N|
|R1XMJSPEMZSN5G|          5|            

# Load data to AWS RDS

In [25]:
server = "big-data-challenge-db.cifpjfaljnoi.ca-central-1.rds.amazonaws.com"
database = "postgres"
port = "5432"
username = "postgres"
password = "postgres"

conn = f"postgres://{username}:{password}@{server}:{port}/{database}"

from sqlalchemy import create_engine 
engine = create_engine(conn, echo=False)

  """)


In [26]:
# Review ID table
pd_review_id_df = review_id_df.toPandas()
pd_review_id_df.head()

Unnamed: 0,review_id,customer_id,product_id,product_parent,review_date
0,RBD5TFCXJCAFY,24170812,B00ICD5Q9M,185207508,2015-08-31
1,R1MCDDFVHB3X8P,42610491,B004T2ZVPO,961936464,2015-08-31
2,R10HZY3WOJJZO3,33703156,B002AS2EQ8,687301761,2015-08-30
3,R4KYVY5EMU2KA,43687072,B00ICD5ZHU,293482550,2015-08-30
4,R1DTTWJF8FYBL1,2199737,B000SKXRGQ,940720910,2015-08-30


In [27]:
pd_review_id_df.set_index("review_id", inplace=True)
pd_review_id_df.head()

Unnamed: 0_level_0,customer_id,product_id,product_parent,review_date
review_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
RBD5TFCXJCAFY,24170812,B00ICD5Q9M,185207508,2015-08-31
R1MCDDFVHB3X8P,42610491,B004T2ZVPO,961936464,2015-08-31
R10HZY3WOJJZO3,33703156,B002AS2EQ8,687301761,2015-08-30
R4KYVY5EMU2KA,43687072,B00ICD5ZHU,293482550,2015-08-30
R1DTTWJF8FYBL1,2199737,B000SKXRGQ,940720910,2015-08-30


In [None]:
pd_review_id_df.to_sql("review_id_table", con=engine)

In [None]:
# Product table
pd_products_df = products_df.toPandas()
pd_products_df.set_index("product_id ", inplace=True)
pd_products_df.head()

In [None]:
pd_products_df.to_sql("products", con=engine)

In [None]:
# Customers table
pd_customers_df = customers_df.toPandas()
pd_customers_df.set_index("customer_id", inplace=True)
pd_customers_df.head()

In [None]:
pd_customers_df.to_sql("customers", con=engine)

In [None]:
# Reviews table
pd_reviews_df = reviews_df.toPandas()
pd_reviews_df.set_index("review_id", inplace=True)
pd_reviews_df.head()

In [None]:
pd_reviews_df.to_sql("reviews", con=engine)

In [None]:
# Vine table
pd_vine_df = vine_df.toPandas()
pd_vine_df.set_index("review_id", inplace=True)
pd_vine_df.head()

In [None]:
pd_vine_df.to_sql("vine_table", con=engine)