In [45]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/88.7 kB 16%] [Connec                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/88.7 kB 16%] [Waitin0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [1 InRelease 1                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:7 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [

In [46]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-01-03 23:15:35--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.2’


2023-01-03 23:15:35 (11.5 MB/s) - ‘postgresql-42.2.16.jar.2’ saved [1002883/1002883]

time: 416 ms (started: 2023-01-03 23:15:35 +00:00)


In [47]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

time: 3.2 ms (started: 2023-01-03 23:15:39 +00:00)


### Load Amazon Data into Spark DataFrame

In [48]:
from pyspark import SparkFiles


time: 812 µs (started: 2023-01-03 23:15:42 +00:00)


### Create DataFrames to match tables

In [49]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Electronics_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   41409413|R2MTG1GCZLR2DK|B00428R89M|     112201306|yoomall 5M Antenn...|     Electronics|          5|            0|          0|   N|                Y|          Five Stars|       As described.| 2015-08-31|
|         US|   49668221|R2HBOEM8LE9928|B000068O48|     734576678|Hosa GPM-103 3.5m...|     Electronics|          5|    

In [50]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2MTG1GCZLR2DK|          5|            0|          0|   N|                Y|
|R2HBOEM8LE9928|          5|            0|          0|   N|                Y|
|R1P4RW1R9FDPEE|          5|            1|          1|   N|                Y|
|R1EBPM82ENI67M|          1|            0|          0|   N|                Y|
|R372S58V6D11AT|          5|            1|          1|   N|                Y|
|R1A4514XOYI1PD|          5|            1|          1|   N|                Y|
|R20D9EHB7N20V6|          5|            0|          0|   N|                Y|
|R1WUTD8MVSROJU|          5|            0|          0|   N|                Y|
|R1QCYLT25812DM|          4|            0|          0|   N|                Y|
| R904DQPBCEM7A|          4|            0|          0|   N|     

### PySpark Vine Review Analysis. 

In [51]:
# From pyspark import the functions 
from pyspark.sql.functions import col,when,count,lit
# Step 1 
## Filter the data and create a new DataFrame   to retrieve all the rows where the total_votes count is equal to or greater than 20 to pick reviews 
## that are more likely to be helpful and to avoid having division by zero errors later on
total_votes_df = vine_df.filter(col("total_votes") >= 20)
total_votes_df.summary().show()

+-------+--------------+------------------+------------------+------------------+-----+-----------------+
|summary|     review_id|       star_rating|     helpful_votes|       total_votes| vine|verified_purchase|
+-------+--------------+------------------+------------------+------------------+-----+-----------------+
|  count|         58554|             58554|             58554|             58554|58554|            58554|
|   mean|          null|3.4432318885131674|53.776496908836286|63.085920688595145| null|             null|
| stddev|          null|1.6395295123294913| 145.1312294528288|150.37446247023817| null|             null|
|    min|R1005XCXUSVE54|                 1|                 0|                20|    N|                N|
|    25%|          null|                 2|                19|                25| null|             null|
|    50%|          null|                 4|                27|                34| null|             null|
|    75%|          null|                 5|   

In [44]:
# Step 2
## Create a new DataFrame to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.

percent_votes_df = total_votes_df.withColumn('percent_votes',col('helpful_votes')/col('total_votes')).alias('percent_votes').filter(col('percent_votes')>=.5)
percent_votes_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     percent_votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R1FBO737KD9F2N|          5|           19|         23|   N|                Y|0.8260869565217391|
|R227GSNWI6BSZV|          1|           20|         20|   N|                Y|               1.0|
|R3SJTYZBYBG4EE|          4|           99|         99|   N|                Y|               1.0|
|R248FG65D76D5Y|          1|           42|         53|   N|                Y|0.7924528301886793|
|R3B6BXFKGW52SG|          1|           32|         32|   N|                Y|               1.0|
| ROTIV4VYL31R4|          5|           26|         26|   N|                Y|               1.0|
|R3VQ59LD2LSKY7|          5|           20|         25|   N|                Y|               0.8|
| RIIGLD8JB7PX8|          1|  

In [52]:
# Step 3 
## Filter the DataFrame  that retrieves all the rows where a review was written as part of the Vine program (paid)


vine_df = percent_votes_df.filter(col("vine") == "Y")
vine_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     percent_votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R184FOUNZZ7KO8|          5|           15|         20|   Y|                N|              0.75|
| R82QWN2X2OCHB|          5|          176|        208|   Y|                N|0.8461538461538461|
|R1UYHBYE6790BU|          5|           44|         53|   Y|                N|0.8301886792452831|
|R2J3YLX1L4EH2B|          5|          299|        321|   Y|                N|0.9314641744548287|
|R3QDI539WTXKE2|          5|           26|         32|   Y|                N|            0.8125|
| RQTPWY6ND2NRV|          4|           37|         37|   Y|                N|               1.0|
| RQZSTE0KOBY2G|          4|           98|        109|   Y|                N|0.8990825688073395|
| RF2RNZEJO0K1N|          4|  

In [53]:
# Step 4
## Filter the DataFrame  that retrieves all the rows where a review was written as part of the NOT Vine program (unpaid)

not_vine_df = percent_votes_df.filter(col("vine") == "N")
not_vine_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     percent_votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R1FBO737KD9F2N|          5|           19|         23|   N|                Y|0.8260869565217391|
|R227GSNWI6BSZV|          1|           20|         20|   N|                Y|               1.0|
|R3SJTYZBYBG4EE|          4|           99|         99|   N|                Y|               1.0|
|R248FG65D76D5Y|          1|           42|         53|   N|                Y|0.7924528301886793|
|R3B6BXFKGW52SG|          1|           32|         32|   N|                Y|               1.0|
| ROTIV4VYL31R4|          5|           26|         26|   N|                Y|               1.0|
|R3VQ59LD2LSKY7|          5|           20|         25|   N|                Y|               0.8|
| RIIGLD8JB7PX8|          1|  

In [54]:
# Step 5 
## Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid [vine="Y"] ) vs unpaid [vine="N"] ).
reviews_df = percent_votes_df.groupBy("vine").agg(
        count(col("vine")).alias("total_reviews"),
        count(when(col("star_rating") == 5, True)).alias("total_5*****_reviews"),
        (
             count(when(col("star_rating") == 5, True)) / count(col("vine"))*100
             ).alias("Percentage_5*****_total")
        )
reviews_df.show()            


+----+-------------+--------------------+-----------------------+
|vine|total_reviews|total_5*****_reviews|Percentage_5*****_total|
+----+-------------+--------------------+-----------------------+
|   Y|         1080|                 454|      42.03703703703704|
|   N|        49673|               23043|       46.3893865882874|
+----+-------------+--------------------+-----------------------+

time: 20.8 s (started: 2023-01-03 23:19:05 +00:00)
