<a href="https://colab.research.google.com/github/luisantoniococa/Spark_Big_Data_Amazon_Reviews_Analysis/blob/master/Amazon_reviews_home_entertainment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [39]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-04-15 16:15:24--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2020-04-15 16:15:25 (8.52 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [77]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
# url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Jewelry_v1_00.tsv.gz" # for jewerly
url = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Home_Entertainment_v1_00.tsv.gz'
spark.sparkContext.addFile(url)
homeenter_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Home_Entertainment_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

# Show DataFrame
homeenter_data_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|  product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|     179886| RY01SAV7HZ8QO|B00NTI0CQ2|     667358431|Aketek 1080P LED ...|Home Entertainment|          4|            0|          0|   N|                Y|good enough for m...|not the best pict...|2015-08-31 00:00:00|
|         US|   37293769|R1XX8SDGJ4MZ4L|B00BUCLVZU|     621695622|TiVo Mini with

In [78]:
# print the data schemata to identify the differents columns
homeenter_data_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [79]:
# Get the amount fo rows in the dataframe
homeenter_data_df.count()

705889

In [80]:
# Drop the na values and drop duplicates values 
dropna_df = homeenter_data_df.dropna()
dropduplicates_df = dropna_df.dropDuplicates()

dropduplicates_df.count()

705850

In [81]:
from pyspark.sql.functions import to_date
# review the DataFrame and transform the date column to date
review_id_df = dropduplicates_df.select(["review_id", "customer_id", "product_id", 
                                       "product_parent", 
                                       to_date("review_date", 'yyyy-MM-dd').alias("review_date")]) # using .alias works as in sql
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1022278GSR5QQ|   10950187|B003EX0BVS|     936344876| 2015-02-18|
|R102C1L8WG2MV1|   51793406|B00008N6X4|     207746434| 2004-01-14|
|R105S0WJL3H3PJ|   52343941|B004GTN0T4|     286560188| 2012-01-08|
|R106UUNJS7P59B|   49406745|B005MR6CNK|     682823016| 2013-08-08|
|R107KOKFAOFQF1|   22590545|B001VKY7WU|     929595423| 2010-02-17|
|R1090POSWG4QEM|   50537528|B001TK3D4U|     345027888| 2009-10-24|
|R10A30JROMZ7HB|   34585522|B00F9YYHSI|     909572429| 2013-12-06|
|R10BBL6EZ574ZR|   41784316|B00J90X0RC|     720890071| 2014-08-05|
|R10CXFW068Q5CH|   45099585|B00AWKBZ0M|     361085603| 2013-09-16|
|R10DDPNYQ32Z8R|   16224312|B001TK3D4K|     965006755| 2010-02-20|
|R10H9S9FU2TNUI|   17635926|B006ZH0MP0|      55958725| 2013-05-23|
|R10IHWCT4WAE4R|   45190365|B004UETB20|     312156652| 2012-03

In [0]:
# finding duplicates values by ID and dropping them

products_df = homeenter_data_df.select(['product_id', 'product_title'])
products_df = products_df.drop_duplicates(subset = ['product_id'])
# changed drop_duplicates() for dropDuplicates() important to add the subset 


In [83]:
new_df = products_df.groupBy('product_id').count().orderBy('count', ascending=False)
new_df.show()

+----------+-----+
|product_id|count|
+----------+-----+
|B00TKOSUDQ|    1|
|B00UNL08XA|    1|
|B00QNR0LDA|    1|
|B00VIRGK6C|    1|
|B00SMBFPDG|    1|
|B00QGKNP32|    1|
|B00LBENEFM|    1|
|B000083CUF|    1|
|B00NMRSYJ2|    1|
|B0014F9U6U|    1|
|B00RV9TKKY|    1|
|B004QXJJIG|    1|
|B00ZOV5M1M|    1|
|B004Y45RXI|    1|
|B00RVFIQZI|    1|
|B00C2CZU18|    1|
|B01309NF40|    1|
|B005O88CY4|    1|
|B00SWHC39E|    1|
|B00BP5N574|    1|
+----------+-----+
only showing top 20 rows



In [84]:
# check for duplicates since sql table has a unique condition for product_id Column
# products_df.filter(products_df['product_id'] == 'B004NMC1M6').show()

+----------+-------------+
|product_id|product_title|
+----------+-------------+
+----------+-------------+



In [85]:
# reviews df for sql table 
reviews_df = homeenter_data_df.select(['review_id','review_headline','review_body'])
reviews_df.show(10)

+--------------+--------------------+--------------------+
|     review_id|     review_headline|         review_body|
+--------------+--------------------+--------------------+
| RY01SAV7HZ8QO|good enough for m...|not the best pict...|
|R1XX8SDGJ4MZ4L|Tell the Cable Co...|Not only do my Ti...|
|R149Q3B5L33NN5|    Works perfectly!|Works perfectly! ...|
|R2ZVD69Z6KPJ4O|It doesn't work. ...|It doesn't work. ...|
|R1DIKG2G33ZLNP|         As pictured|I received the it...|
|R3L6FGKAW0EYFI|Only lasts 3-4 ye...|I bought this TV ...|
| RAO0QZH5VC6VI|            One Star|    Waste of $$$$$$$|
|R25IK0UAHWNB22|         Three Stars|Nice but all thre...|
|R2A9IHKZMTMAL1|Yes...exactly wha...|Oh, yeah...doesn'...|
| R5XVKTHL6SITI|Fantastic sound. ...|Fantastic sound. ...|
+--------------+--------------------+--------------------+
only showing top 10 rows



In [86]:
customer_df = homeenter_data_df.groupby('customer_id').agg({'customer_id': 'count'}).withColumnRenamed('count(customer_id)','customer_count')
customer_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   10142992|             1|
|   16457323|             6|
|   11935383|             1|
|   46277736|             1|
|   13671072|             1|
|   21453814|             1|
|   17684885|             1|
|   20415768|             1|
|   15212710|             1|
|    5220924|             1|
|   46253451|             6|
|     971908|             1|
|   32829933|             1|
|   51221518|             1|
|   12002637|             2|
|   16105308|             1|
|     135867|             1|
|   47425808|             1|
|   43138273|             1|
|   16411995|             1|
+-----------+--------------+
only showing top 20 rows



In [87]:
customer_df.count()

610080

In [88]:
vine_df = homeenter_data_df.select(['review_id','star_rating','helpful_votes','total_votes','vine'])
vine_df.show(10)


+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| RY01SAV7HZ8QO|          4|            0|          0|   N|
|R1XX8SDGJ4MZ4L|          5|            0|          0|   N|
|R149Q3B5L33NN5|          5|            0|          0|   N|
|R2ZVD69Z6KPJ4O|          1|            0|          2|   N|
|R1DIKG2G33ZLNP|          4|            0|          0|   N|
|R3L6FGKAW0EYFI|          1|            1|          1|   N|
| RAO0QZH5VC6VI|          1|            0|          0|   N|
|R25IK0UAHWNB22|          3|            0|          0|   N|
|R2A9IHKZMTMAL1|          5|            1|          2|   N|
| R5XVKTHL6SITI|          5|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 10 rows



## Pushing created dfs into AWS RDS postgres connected DB

In [0]:
mode = 'append'
jdbc_url="jdbc:postgresql://<endpoint>:5432/Amazon_reviews_db"
config = {"user":"<user>", 
          "password": "<password>", 
          "driver":"org.postgresql.Driver"}

In [0]:
# write the review_id_df to the table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)


In [0]:
# write the products_df to table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [0]:
# Write customers_df to table in RDS

customer_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config) 

In [0]:
# Write vine_df to table in RDS
vine_df.write.jdbc(url=jdbc_url, table='vines', mode=mode, properties=config)