In [1]:
# Import dependencies
import os

# Install Spark
spark_version = "spark-3.3.2"
os.environ["SPARK_VERSION"] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 14.2 kB/114                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
0% [Waiting for headers] [1 InRelease 92.4 kB/114 kB 81%] [Waiting for headers]                                                                               0% [Waiting for headers] [Waiting for headers] [Waiting for headers]                                                                    Hit:3 http://archive.ubuntu.com/ubuntu focal InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers]                                                                    Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpad.net              

In [2]:
# Start a Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge-D2").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [3]:
# Load in the Music Dataset
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   10140119|R3LI5TRP3YIDQL|B00TXH4OLC|     384427924|Whatever's for Us...|           Music|          5|            0|          0|   N|                Y|          Five Stars|Love this CD alon...|2015-08-31 00:00:00|
|         US|   27664622|R3LGC3EKEG84PX|B00B6QXN6U|     831769051|Same Trailer Diff...| 

In [4]:
# Create the vine_table DataFrame
# vine_df = df.select([])
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])

vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3LI5TRP3YIDQL|          5|            0|          0|   N|                Y|
|R3LGC3EKEG84PX|          5|            0|          0|   N|                Y|
| R9PYL3OYH55QY|          5|            0|          1|   N|                Y|
|R3PWBAWUS4NT0Q|          3|            0|          0|   N|                Y|
|R15LYP3O51UU9E|          5|            0|          0|   N|                Y|
|R1AD7L0CC3DSRI|          5|            0|          0|   N|                Y|
|R32FE8Y45QV434|          5|            0|          0|   N|                Y|
|R3NM4MZ4XWL43Q|          5|            1|          2|   N|                Y|
|R3H4FXX6Q7I37D|          4|            0|          0|   N|                Y|
|R30L5PET7LFFDC|          5|            1|          1|   N|     

1. Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 to pick reviews that are more likely to be helpful and to avoid having division by zero errors later on.

In [5]:
# Create total_votes dataframe
import pyspark.sql.functions as func
from pyspark.sql.functions import col

total_votes_df = vine_df.filter("total_votes>=20")

total_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2SHXRL6SL1GC9|          3|           25|         26|   N|                Y|
|R2ZC033X86YOY8|          5|           25|         26|   N|                N|
|R2736RJGCOSL80|          5|           19|         20|   N|                Y|
| RRY5DJ6J9BKAX|          5|           19|         21|   N|                Y|
|R2P4PJJ2ROTPBM|          5|           46|         48|   N|                N|
| RO8RAEGB66BKR|          4|           46|         46|   N|                N|
| RRFZ7QZTRJC59|          5|          292|        300|   N|                N|
| RFN4PNRUD1UW2|          4|           21|         22|   N|                N|
| RO7EBJEP7IHIX|          5|           26|         26|   N|                N|
|R1CVS4MK9RTNNP|          2|           11|         22|   N|     

2. Filter the new DataFrame or table created in Step 1 and create a new DataFrame or table to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.

In [6]:
# Filter total_votes dataframe and create new dataframe
divide_df = total_votes_df.withColumn("divide", total_votes_df["helpful_votes"]/total_votes_df["total_votes"])

fifty_df = divide_df.filter("divide>=0.5").select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])

fifty_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2SHXRL6SL1GC9|          3|           25|         26|   N|                Y|
|R2ZC033X86YOY8|          5|           25|         26|   N|                N|
|R2736RJGCOSL80|          5|           19|         20|   N|                Y|
| RRY5DJ6J9BKAX|          5|           19|         21|   N|                Y|
|R2P4PJJ2ROTPBM|          5|           46|         48|   N|                N|
| RO8RAEGB66BKR|          4|           46|         46|   N|                N|
| RRFZ7QZTRJC59|          5|          292|        300|   N|                N|
| RFN4PNRUD1UW2|          4|           21|         22|   N|                N|
| RO7EBJEP7IHIX|          5|           26|         26|   N|                N|
|R1CVS4MK9RTNNP|          2|           11|         22|   N|     

3. Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'.

In [7]:
# Filter and create new dataframe for paid vine program
paid_df = fifty_df.filter(col("vine") == "Y")
paid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R84VUCDBCI29U|          3|           18|         28|   Y|                N|
| R4V3ICFDTIDIF|          4|           20|         21|   Y|                N|
|R1JZ0JAPW83WFS|          4|           54|         58|   Y|                N|
|R1XH7EA97FAVP7|          3|           35|         44|   Y|                N|
|R1482JAU1ZR7QH|          4|           15|         22|   Y|                N|
|R1GGYGVTHP84DG|          4|           25|         27|   Y|                N|
| RXGU9DSKZJSP0|          3|           21|         22|   Y|                N|
+--------------+-----------+-------------+-----------+----+-----------------+



4. Repeat Step 3, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'.

In [8]:
# Repeat step 3 for unpaid vine program
unpaid_df = fifty_df.filter(col("vine") == "N")
unpaid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2SHXRL6SL1GC9|          3|           25|         26|   N|                Y|
|R2ZC033X86YOY8|          5|           25|         26|   N|                N|
|R2736RJGCOSL80|          5|           19|         20|   N|                Y|
| RRY5DJ6J9BKAX|          5|           19|         21|   N|                Y|
|R2P4PJJ2ROTPBM|          5|           46|         48|   N|                N|
| RO8RAEGB66BKR|          4|           46|         46|   N|                N|
| RRFZ7QZTRJC59|          5|          292|        300|   N|                N|
| RFN4PNRUD1UW2|          4|           21|         22|   N|                N|
| RO7EBJEP7IHIX|          5|           26|         26|   N|                N|
|R1CVS4MK9RTNNP|          2|           11|         22|   N|     

5. Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).




> a. Paid review type





In [9]:
# Determine the total number of reviews
paid_df.count()

7

In [10]:
# Determine the total number of 5 star reviews
paid_df.select("star_rating").where(paid_df.star_rating>4).count()

0

In [11]:
# Determine the percentage of 5-star reviews for the paid type
rating_df = paid_df.groupBy("star_rating").agg(func.count("star_rating"))

rating_df.withColumn("percentage", func.round((rating_df["count(star_rating)"]/7) * 100)).show()

+-----------+------------------+----------+
|star_rating|count(star_rating)|percentage|
+-----------+------------------+----------+
|          3|                 3|      43.0|
|          4|                 4|      57.0|
+-----------+------------------+----------+





> b. Unpaid review type



In [12]:
# Determine the total number of reviews
unpaid_df.count()

105979

In [13]:
# Determine the total number of 5 star reviews
unpaid_df.select("star_rating").where(paid_df.star_rating>4).count()

67580

In [14]:
# Determine the percentage of 5-star reviews for the unpaid type
unpaid_rating_df = unpaid_df.groupBy("star_rating").agg(func.count("star_rating"))

unpaid_rating_df.withColumn("percentage", func.round((unpaid_rating_df["count(star_rating)"]/105979) * 100)).show()

+-----------+------------------+----------+
|star_rating|count(star_rating)|percentage|
+-----------+------------------+----------+
|          1|              9157|       9.0|
|          3|              7897|       7.0|
|          5|             67580|      64.0|
|          4|             15997|      15.0|
|          2|              5348|       5.0|
+-----------+------------------+----------+

