In [1]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AmazonReviewsAnalysis2").getOrCreate()

In [3]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Pet_Products_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

pet_products_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Pet_Products_v1_00.tsv.gz"), sep="\t", header=True)
pet_products_df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   28794885| REAKC26P07MDN|B00Q0K9604|     510387886|(8-Pack) EZwhelp ...|    Pet Products|          5|            0|          0|   N|                Y|A great purchase ...|Best belly bands ...| 2015-08-31|
|         US|   11488901|R3NU7OMZ4HQIEG|B00MBW5O9W|     912374672|Warren Eckstein's...|    Pet Products|          2|    

In [4]:
# number of records in data set
pet_products_df.count()

2643619

In [5]:
# clean df
pet_products_df = pet_products_df.dropna()
pet_products_df = pet_products_df.dropDuplicates()

In [6]:
# transform df to match the review_id_table schema
review_id_df = pet_products_df.select("review_id", "customer_id", "product_id", "product_parent", "review_date")
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1001KC9GI6ARJ|   11232277|B0080YGXPE|     551772997| 2015-04-16|
|R100B4FHILXAWG|   18802434|B007TRREJC|      56313707| 2014-10-19|
|R1017Z5WWPSSLA|    5584111|B0006342AU|     735846008| 2015-02-02|
|R1019WUJBM9K3X|   32939702|B005JW5VRG|     534045437| 2015-01-20|
|R101IZCRPIN8SX|   50738127|B0029PY7SK|     383930100| 2014-12-03|
|R101KD4S1R47FR|   49683154|B002CM941K|     938528747| 2014-01-28|
|R102EATCBUUDL6|   42553668|B0018KVHWQ|     206118958| 2014-08-08|
|R102OI838NN3Z7|   11056168|B000YDUAMQ|     180607962| 2014-01-18|
|R102VOSABZP429|   45566045|B00TJ3N916|     521682976| 2015-07-31|
|R103GQY0RP7GOH|   45241970|B00CX6LJ22|     595786155| 2014-09-21|
|R103N9CYVCM7YY|   11637795|B00IBWZYVE|     882938526| 2014-06-17|
|R103RPAXH53V7L|   20885662|B0070S62U2|     690582944| 2015-06

In [7]:
review_id_df.dtypes

[('review_id', 'string'),
 ('customer_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('review_date', 'string')]

In [8]:
from pyspark.sql.types import IntegerType
review_id_df = review_id_df.withColumn("customer_id", review_id_df["customer_id"].cast(IntegerType()))
review_id_df = review_id_df.withColumn("product_parent", review_id_df["product_parent"].cast(IntegerType()))
from pyspark.sql.types import DateType
review_id_df = review_id_df.withColumn("review_date", review_id_df['review_date'].cast(DateType()))
review_id_df.dtypes

[('review_id', 'string'),
 ('customer_id', 'int'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('review_date', 'date')]

In [9]:
# transform df to match the products schema
products_df = pet_products_df.select("product_id", "product_title")
products_df.show(10)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B0080YGXPE|AETERTEK AT-918C ...|
|B007TRREJC|   Happy Pet Harness|
|B0006342AU|SmartyKat Organic...|
|B005JW5VRG|Marina Naturals F...|
|B0029PY7SK|ThunderShirt Clas...|
|B002CM941K|Hyperflite Jawz D...|
|B0018KVHWQ|EasyClip Quiet Ai...|
|B000YDUAMQ|Aqueon 06088 Filt...|
|B00TJ3N916|MLB PET JERSEY. -...|
|B00CX6LJ22|Dogwidgets DW-3 R...|
+----------+--------------------+
only showing top 10 rows



In [10]:
products_df.dtypes

[('product_id', 'string'), ('product_title', 'string')]

In [11]:
products_df = products_df.dropDuplicates()

In [12]:
# transform df to match the customers schema
customers_df = pet_products_df.groupby("customer_id").count()
customers_df.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   20214846|   15|
|   31120744|    4|
|   20223672|    2|
|   39043742|    1|
|   15231334|    9|
|   44700871|    2|
|   26617857|    7|
|   51750322|    1|
|   42655488|    1|
|   48278478|    1|
|   11417844|    1|
|   34795930|    4|
|   12369110|    8|
|   20437114|    1|
|   25490618|    3|
|   31754174|    6|
|   20460479|    7|
|   22219042|   30|
|    4909697|    2|
|   15741843|    9|
+-----------+-----+
only showing top 20 rows



In [13]:
customers_df = customers_df.withColumnRenamed("count", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   20214846|            15|
|   31120744|             4|
|   20223672|             2|
|   39043742|             1|
|   15231334|             9|
|   44700871|             2|
|   26617857|             7|
|   51750322|             1|
|   42655488|             1|
|   48278478|             1|
|   11417844|             1|
|   34795930|             4|
|   12369110|             8|
|   20437114|             1|
|   25490618|             3|
|   31754174|             6|
|   20460479|             7|
|   22219042|            30|
|    4909697|             2|
|   15741843|             9|
+-----------+--------------+
only showing top 20 rows



In [14]:
customers_df.dtypes

[('customer_id', 'string'), ('customer_count', 'bigint')]

In [15]:
customers_df = customers_df.withColumn("customer_id", customers_df["customer_id"].cast(IntegerType()))
customers_df = customers_df.withColumn("customer_count", customers_df["customer_count"].cast(IntegerType()))
customers_df.dtypes

[('customer_id', 'int'), ('customer_count', 'int')]

In [17]:
# transform df to match the vine_tables schema
vine_df = pet_products_df.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine")
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1001KC9GI6ARJ|          4|            1|          1|   N|
|R100B4FHILXAWG|          2|            0|          0|   N|
|R1017Z5WWPSSLA|          5|            0|          0|   N|
|R1019WUJBM9K3X|          5|            0|          0|   N|
|R101IZCRPIN8SX|          4|            0|          0|   N|
|R101KD4S1R47FR|          5|            0|          0|   N|
|R102EATCBUUDL6|          5|            0|          1|   N|
|R102OI838NN3Z7|          5|            0|          0|   N|
|R102VOSABZP429|          5|            2|          2|   N|
|R103GQY0RP7GOH|          5|            0|          0|   N|
|R103N9CYVCM7YY|          3|            4|          6|   N|
|R103RPAXH53V7L|          5|            0|          0|   N|
|R103WGJI3K7XR0|          5|            6|          8|   N|
|R103WY0OHHL7SE|          5|            

In [18]:
vine_df = vine_df.withColumn("star_rating", vine_df["star_rating"].cast(IntegerType()))
vine_df = vine_df.withColumn("helpful_votes", vine_df["helpful_votes"].cast(IntegerType()))
vine_df = vine_df.withColumn("total_votes", vine_df["total_votes"].cast(IntegerType()))
vine_df.dtypes

[('review_id', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string')]

In [16]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://database-1.cskyuknpbcn2.us-east-2.rds.amazonaws.com:5432/amazon_reviews_db2"
config = {"user":"root", 
          "password": "applepie28", 
          "driver":"org.postgresql.Driver"}

In [None]:
# Write DataFrame to review_id_table table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

Py4JJavaError: ignored

In [None]:
# Write DataFrame to products table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

Py4JJavaError: ignored

In [None]:
# Write DataFrame to customers table in RDS
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

Py4JJavaError: ignored

In [19]:
# Write DataFrame to customers table in RDS
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)

Py4JJavaError: ignored