In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 14.2 kB/88.                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 46.0 kB/88.0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 88.7 kB/88.0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [C0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.39                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:4 http://ppa.launchpad.net/c

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Health_Personal_Care_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Health_Personal_Care_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|     650634| R3EQSTM9PWRAL|B0091LBZSU|     578484426|Demograss Capsule...|Health & Personal...|          3|            0|          0|   N|                Y|         Three Stars|Only came with 30...|2015-08-31 00:00:00|
|         US|   19827510| RBWPRK17XKIXD|B00PWW3LQ6|     456433146|Viva L

In [4]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

# Filter for only columns with total_votes equal or greater than 20
total_votes_20plus_df = df.filter(col("total_votes") >= 20)
total_votes_20plus_df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    1717317|R2984F9VD9WDJB|B0013OVVK0|     895291357|Jarrow Formulas N...|Health & Personal...|          1|           37|         46|   N|                Y|... month 2000 mg...|took it upon my d...|2015-08-31 00:00:00|
|         US|   37405868|R218TGWCIAZYNY|B0065ZTKWS|     961894951|New Ma

In [5]:
# Filter the data for helpful_votes/total_votes is >= 50%
helpful_votes_50plus_df = total_votes_20plus_df.filter(col("helpful_votes")/col("total_votes") >= .50)
helpful_votes_50plus_df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    1717317|R2984F9VD9WDJB|B0013OVVK0|     895291357|Jarrow Formulas N...|Health & Personal...|          1|           37|         46|   N|                Y|... month 2000 mg...|took it upon my d...|2015-08-31 00:00:00|
|         US|   37405868|R218TGWCIAZYNY|B0065ZTKWS|     961894951|New Ma

In [6]:
# Filter for data where a review was written as part of the Vine program
paid_vine_df = helpful_votes_50plus_df.filter(col("vine") == 'Y')
paid_vine_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   34746219| RG70K1HMY4LXX|B010H76PTU|     581545062|Omron 7 Series Wi...|Health & Personal...|          4|           37|         44|   Y|                N|Only tracks 1 per...|This product come...|2015-08-27 00:00:00|
|         US|   51401810|R1DHGTNXDXJ0GB|B00VAPRSP4|     373707856|Celluc

In [7]:
# Finding total # of review paid
paid_vine_df.count()

497

In [8]:
# Filter for data where a review was not part of the Vine program
unpaid_vine_df = helpful_votes_50plus_df.filter(col("vine") == 'N')
unpaid_vine_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    1717317|R2984F9VD9WDJB|B0013OVVK0|     895291357|Jarrow Formulas N...|Health & Personal...|          1|           37|         46|   N|                Y|... month 2000 mg...|took it upon my d...|2015-08-31 00:00:00|
|         US|   37405868|R218TGWCIAZYNY|B0065ZTKWS|     961894951|New Ma

In [9]:
# Finding total # of review unpaid
unpaid_vine_df.count()

120863

In [10]:
# Finding total # of 5-star reviews paid
paid_star_rating_df = paid_vine_df.filter(col("star_rating") == 5)
paid_star_rating_df.count()

220

In [11]:
# Finding total # of 5-star reviews unpaid
unpaid_star_rating_df = unpaid_vine_df.filter(col("star_rating") == 5)
unpaid_star_rating_df.count()

74470

In [12]:
# Finding percentage of 5-star reviews unpaid
percent_unpaid = (unpaid_star_rating_df.count()/unpaid_vine_df.count())*100
print("Percentage of 5-star reviews for unpaid Vine program was: %f" % percent_unpaid + "%")

Percentage of 5-star reviews for unpaid Vine program was: 61.615217%
