In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB 16%] [Waiting for headers                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB 16%] [2 InRelease 3,626 B0% [Waiting for headers] [1 InRelease 17.1 kB/88.7 kB 19%] [Waiting for headers                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [1 InRelease 17.1 kB/88.7 kB 19%] [Waiting for headers0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [1 InRelease 20.0 kB/88.7 k                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [2 InRelease gpgv 3,626 B] [4 InRe

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [3]:
# Load amazon data into pyspark
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Wireless_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|        Wireless|          2|            1|          3|   N|                Y|Looks good, funct...|2 issues  -  Once...|2015-08-31 00:00:00|
|         US|   50800750|R15V54KBMTQWAY|B00XK95RPQ|     516894650|Selfie Stick Fibl...| 

In [4]:
# Filter data to show total votes equal to or greater than 20
filtered_df = df.filter("total_votes >= 20")

In [6]:
# Filter data to show only results where helpful votes make up 50% or more of the total votes
filtered_df2 = filtered_df.filter("helpful_votes/total_votes >= 0.5")
filtered_df2.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   44689470|R2WOW0TURNXB26|B00YY3UBV2|     310491927|            Garmin 1|        Wireless|          3|           54|         59|   N|                Y|Pretty Disappoint...|Bought this unit ...|2015-08-31 00:00:00|
|         US|     112342|R13VL62Y2HBQ0B|B010VFZJD6|     129632031|iTaste MVP3 PRO -...| 

In [12]:
# Select all paid reviews 
paid_df = filtered_df2.filter("vine == 'Y'").show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   48852155|R1MAOLI5FJHAFM|B013X0V11K|     610966690|BLU Studio 7.0 ll...|        Wireless|          4|          249|        261|   Y|                N|More than just a ...|Ever since the ve...|2015-08-31 00:00:00|
|         US|   11556116| R9PYAUDIBJVEC|B013X0V4VM|     672788343|BLU Studio C Supe...| 

In [13]:
# Select all unpaid reviews
unpaid_df = filtered_df2.filter("vine == 'N'").show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   44689470|R2WOW0TURNXB26|B00YY3UBV2|     310491927|            Garmin 1|        Wireless|          3|           54|         59|   N|                Y|Pretty Disappoint...|Bought this unit ...|2015-08-31 00:00:00|
|         US|     112342|R13VL62Y2HBQ0B|B010VFZJD6|     129632031|iTaste MVP3 PRO -...| 

In [10]:
total_paid = paid_df.select('review_id').count()
paid_five_star = paid_df.select('review_id').where(paid_df.star_rating==5).count()
percent_five_star_paid = paid_five_star/total_paid*100

print("Total number of paid reviews: ",total_paid)
print("Number of paid 5 star reviews: ", paid_five_star)
print("Percent of paid five star reviews: ", percent_five_star_paid)

Total number of paid reviews:  613
Number of paid 5 star reviews:  222
Percent of paid five star reviews:  36.215334420880914


In [11]:
total_unpaid = unpaid_df.select('review_id').count()
unpaid_five_star = unpaid_df.select('review_id').where(unpaid_df.star_rating==5).count()
percent_five_star_unpaid = unpaid_five_star/total_unpaid*100

print("Total number of unpaid reviews: ",total_unpaid)
print("Number of unpaid 5 star reviews: ", unpaid_five_star)
print("Percent of unpaid five star reviews: ", percent_five_star_unpaid)

Total number of unpaid reviews:  64968
Number of unpaid 5 star reviews:  30543
Percent of unpaid five star reviews:  47.01237532323606
