In [1]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.2'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

'apt-get' is not recognized as an internal or external command,
operable program or batch file.
The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open '$SPARK_VERSION-bin-hadoop2.7.tgz'


Exception: Unable to find py4j in /content/spark-3.2.3-bin-hadoop2.7\python, your SPARK_HOME may not be configured correctly

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ToyReview").getOrCreate()

In [3]:
from pyspark import SparkFiles
# Load file

url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz" #enter correct address here

spark.sparkContext.addFile(url)


df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Toys_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!| 2015-08-31|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...|            Toys|          5|    

In [4]:
print(df.count())

4864249


In [5]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [6]:
review_df=df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_df.show(5)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RDIJS7QYB6XNR|   18778586|B00EDBY7X8|     122952789| 2015-08-31|
|R36ED1U38IELG8|   24769659|B00D7JFOPC|     952062646| 2015-08-31|
| R1UE3RPRGCOLD|   44331596|B002LHA74O|     818126353| 2015-08-31|
|R298788GS6I901|   23310293|B00ARPLCGY|     261944918| 2015-08-31|
|  RNX4EXOBBPN5|   38745832|B00UZOPOFW|     717410439| 2015-08-31|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



In [7]:
product_df=df.dropDuplicates(["product_id", "product_title"])
print(product_df.count())

664063


In [8]:
product_df1=df.select(["product_id", "product_title"])
product_df1.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00EDBY7X8|Monopoly Junior B...|
|B00D7JFOPC|56 Pieces of Wood...|
|B002LHA74O|Super Jumbo Playi...|
|B00ARPLCGY|Barbie Doll and F...|
|B00UZOPOFW|Emazing Lights eL...|
+----------+--------------------+
only showing top 5 rows



In [9]:
customer_df=df.groupBy("customer_id").agg({"customer_id": "count"})
customer_df.show()

+-----------+------------------+
|customer_id|count(customer_id)|
+-----------+------------------+
|   16989307|                 1|
|   45632184|                 2|
|   14703850|                13|
|   49645387|                 2|
|   16343477|                 1|
|   15554899|                 1|
|   17067926|                 1|
|   50843047|                 2|
|    4051424|                 1|
|   11487525|                 1|
|   19371753|                 1|
|   18634862|                 1|
|   14552054|                 1|
|   52695798|                 1|
|   49438424|                 3|
|   10854449|                 9|
|   48521319|                 1|
|   11839424|                 2|
|   27887950|                 1|
|   45392827|                 3|
+-----------+------------------+
only showing top 20 rows



In [10]:
vine_df=df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| RDIJS7QYB6XNR|          5|            0|          0|   N|
|R36ED1U38IELG8|          5|            0|          0|   N|
| R1UE3RPRGCOLD|          2|            1|          1|   N|
|R298788GS6I901|          5|            0|          0|   N|
|  RNX4EXOBBPN5|          1|            1|          1|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [None]:
mode="append"
jdbc_url = "jdbc:postgresql://mod22-challenge.cx8qkn0ocxoj.us-east-1.rds.amazonaws.com:5432/toys"
config = {"user":"postgres",
          "password": "postgres",
          "driver":"org.postgresql.Driver"}

In [None]:
review_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)

In [None]:
product_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)

In [None]:
customer_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)

In [None]:
vine_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)