In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB 16%] [Connected to cloud.                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB 16%] [Connected to cloud.0% [2 InRelease gpgv 242 kB] [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [2 InRelease gpgv 242 kB] [3 InRelease 14.2 kB/88.7 kB 16%] [1 InRelease 20.                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [2 InRelease gpgv 242 kB] [3 InRelease 18.5 kB/88.7 kB 21%] [1 InRelease 54.0% [2 InRelease gpgv 242 kB] [3 InRel

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-09-27 15:52:04--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-09-27 15:52:04 (6.17 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
# Load Amazon Data into Spark DF
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32787517| RED72VWWCOS7S|B008HDQYLQ|     348668413|Garden Weasel Gar...| Lawn and Garden|          1|            2|          8|   N|                Y|            One Star|I don't hate the ...| 2015-08-31|
|         US|   16374060| RZHWQ208LTEPV|B005OBZBD6|     264704759|10 Foot Mc4 Solar...| Lawn and Garden|          5|    

In [None]:
# Drop rows with null values
cleaned_vine_df = df.dropna()
cleaned_vine_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32787517| RED72VWWCOS7S|B008HDQYLQ|     348668413|Garden Weasel Gar...| Lawn and Garden|          1|            2|          8|   N|                Y|            One Star|I don't hate the ...| 2015-08-31|
|         US|   16374060| RZHWQ208LTEPV|B005OBZBD6|     264704759|10 Foot Mc4 Solar...| Lawn and Garden|          5|    

In [None]:
# Create the vine_table DataFrame
vine_df = cleaned_vine_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RED72VWWCOS7S|          1|            2|          8|   N|                Y|
| RZHWQ208LTEPV|          5|            0|          0|   N|                Y|
|R37LBC3XAVLYOO|          5|            4|          5|   N|                Y|
|R3L7XJMA0MVJWC|          5|            0|          0|   N|                Y|
|R2I2GHSI7T1UBN|          1|            5|          6|   N|                Y|
|R2GFFKHK4I6VMX|          5|            0|          0|   N|                Y|
|R1R0UDX2XAN1S4|          4|            0|          0|   N|                Y|
|R22C8FMBSTFRY8|          5|            2|          2|   N|                Y|
|R118NNIQ75XPGO|          3|            0|          0|   N|                Y|
|R30HYXHZQ49621|          2|            0|          0|   N|     

In [None]:
# Load SQL function to use columns
from pyspark.sql.functions import col

#Filter for columns with greater than 20 total votes
high_votes_df = vine_df.filter(col("total_votes") >= 20)
high_votes_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RQQ3KVTU5TJ4I|          5|           24|         29|   N|                Y|
|R3FELXWV9T5CWE|          2|           22|         24|   N|                Y|
| ROBYK6EZYK398|          5|           29|         30|   N|                Y|
|R2RKCSAG6GBA4A|          1|            8|         28|   N|                Y|
|R2YVBBR6NXIA4V|          5|           25|         28|   N|                N|
|R2AVTBDIVG2AW4|          5|           26|         26|   N|                N|
|R1Z2LNN3FANMTO|          1|           20|         24|   N|                N|
|  RLNULBKRWNNR|          5|           42|         43|   N|                Y|
| R9QNQUL94RX1F|          3|           27|         33|   N|                Y|
| RTULFZTUS1VBP|          5|           51|         52|   N|     

In [None]:
# Create a column that calculates what percent helpful the votes were for each review_id
helpful_votes_df = high_votes_df.withColumn("percent_helpful", ((high_votes_df["helpful_votes"]) / (high_votes_df["total_votes"])) * 100)

In [None]:
# Filter df to show only reviews that were 50% or more helpful
high_votes_df = helpful_votes_df.filter(col("percent_helpful") >= 50)
high_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  percent_helpful|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
| RQQ3KVTU5TJ4I|          5|           24|         29|   N|                Y|82.75862068965517|
|R3FELXWV9T5CWE|          2|           22|         24|   N|                Y|91.66666666666666|
| ROBYK6EZYK398|          5|           29|         30|   N|                Y|96.66666666666667|
|R2YVBBR6NXIA4V|          5|           25|         28|   N|                N|89.28571428571429|
|R2AVTBDIVG2AW4|          5|           26|         26|   N|                N|            100.0|
|R1Z2LNN3FANMTO|          1|           20|         24|   N|                N|83.33333333333334|
|  RLNULBKRWNNR|          5|           42|         43|   N|                Y|97.67441860465115|
| R9QNQUL94RX1F|          3|           2

In [None]:
# Show the Vine Program reviews
vine_reviews = high_votes_df.filter((high_votes_df['vine'] == "Y"))
vine_reviews.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  percent_helpful|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R28DXTC3JQ9IY1|          4|           24|         26|   Y|                N| 92.3076923076923|
|R3AFZKLQXATHBU|          5|           44|         49|   Y|                N|89.79591836734694|
|R2RUUF2JPJPC0E|          4|           20|         22|   Y|                N| 90.9090909090909|
| RFZ2WUH4248AB|          2|           26|         27|   Y|                N|96.29629629629629|
|R1Q4LVHIFOWYFR|          5|           23|         28|   Y|                N|82.14285714285714|
| R4YEPTQED3X1Q|          5|           19|         20|   Y|                N|             95.0|
|R2Z7C8YCRSC9DP|          5|           22|         22|   Y|                N|            100.0|
|R3J8OI5CB74P5K|          1|           2

In [None]:
from pyspark.sql.functions import count

In [None]:
# Total reviews for Vine Subscribers
total_vine_rev = vine_reviews.select("review_id").count()
total_vine_rev

386

In [None]:
# Number of 5 Star Reviews for Vine Subscribers
five_star_vine = vine_reviews.select("star_rating").where(vine_reviews.star_rating == 5).count()
five_star_vine

176

In [None]:
# Percentage of 5 Star reviews for Vine Subscribers
vine_percent = (five_star_vine / total_vine_rev) * 100
vine_percent

45.59585492227979

In [None]:
# Show the non-Vine Program reviews
non_vine_reviews = high_votes_df.filter((high_votes_df['vine'] == "N"))
non_vine_reviews.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  percent_helpful|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
| RQQ3KVTU5TJ4I|          5|           24|         29|   N|                Y|82.75862068965517|
|R3FELXWV9T5CWE|          2|           22|         24|   N|                Y|91.66666666666666|
| ROBYK6EZYK398|          5|           29|         30|   N|                Y|96.66666666666667|
|R2YVBBR6NXIA4V|          5|           25|         28|   N|                N|89.28571428571429|
|R2AVTBDIVG2AW4|          5|           26|         26|   N|                N|            100.0|
|R1Z2LNN3FANMTO|          1|           20|         24|   N|                N|83.33333333333334|
|  RLNULBKRWNNR|          5|           42|         43|   N|                Y|97.67441860465115|
| R9QNQUL94RX1F|          3|           2

In [None]:
# Total reviews for Non-Vine Subscribers
total_nonvine_rev = non_vine_reviews.select("review_id").count()
total_nonvine_rev

48702

In [None]:
# Number of 5 Star Reviews for Non_Vine Subscribers
five_star_non_vine = non_vine_reviews.select("star_rating").where(non_vine_reviews.star_rating == 5).count()
five_star_non_vine

24016

In [None]:
# Percentage of 5 Star reviews for Non-Vine Subscribers
non_vine_percent = (five_star_non_vine / total_nonvine_rev) * 100
non_vine_percent

49.3121432384707