In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:14 http://

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-06-26 23:29:16--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-06-26 23:29:18 (1.22 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Apparel_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32158956|R1KKOXHNI8MSXU|B01KL6O72Y|      24485154|Easy Tool Stainle...|         Apparel|          4|            0|          0|   N|                Y|★ THESE REALLY DO...|These Really Do W...| 2013-01-14|
|         US|    2714559|R26SP2OPDK4HT7|B01ID3ZS5W|     363128556|V28 Women Cowl Ne...|         Apparel|          5|    

In [5]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show() 


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1KKOXHNI8MSXU|          4|            0|          0|   N|                Y|
|R26SP2OPDK4HT7|          5|            1|          2|   N|                Y|
| RWQEDYAX373I1|          5|            0|          0|   N|                Y|
|R231YI7R4GPF6J|          5|            0|          0|   N|                Y|
|R3KO3W45DD0L1K|          5|            0|          0|   N|                Y|
|R1C4QH63NFL5NJ|          5|            0|          0|   N|                Y|
|R2GP65O1U9N7BP|          5|            0|          0|   N|                Y|
|R3O29CT5MQQ3XQ|          4|            0|          0|   N|                Y|
|R1ZECD2AA8QFF6|          5|            0|          0|   N|                Y|
|R2S79GCF6J89OA|          3|            0|          0|   N|     

In [6]:
# Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 to pick reviews that are more likely to be helpful and to avoid having division by zero errors later on.
from pyspark.sql.functions import col
total_votes_df = vine_df.filter(col("total_votes") >= 20)
total_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R35PT06NWP7LDP|          5|           30|         32|   N|                N|
|R2P76PJFUGXBHO|          1|            6|         45|   N|                N|
| RQW4AFOG9MR4Z|          5|           51|         52|   N|                N|
|R2SMUEBMGLAJQK|          5|           29|         36|   N|                Y|
|R1XVIZZALU5P6J|          5|          148|        150|   N|                Y|
|R11UFMG8M2488I|          4|           23|         26|   N|                N|
|R2OSHKSPXU68W5|          5|          136|        147|   N|                N|
|R3M02FSD3BLUPU|          5|           19|         21|   N|                N|
|R1P0LEEJHH09L3|          5|           32|         32|   N|                Y|
|R1V6UU1EHW3Q12|          5|           57|         62|   N|     

In [7]:
# Filter the new DataFrame or table created in Step 1 and create a new DataFrame or table to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
vote_percent_df = total_votes_df.withColumn('vote_percent', col('helpful_votes')/col('total_votes')).alias('vote_percent').filter(col("vote_percent") >= 0.5)
vote_percent_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      vote_percent|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R35PT06NWP7LDP|          5|           30|         32|   N|                N|            0.9375|
| RQW4AFOG9MR4Z|          5|           51|         52|   N|                N|0.9807692307692307|
|R2SMUEBMGLAJQK|          5|           29|         36|   N|                Y|0.8055555555555556|
|R1XVIZZALU5P6J|          5|          148|        150|   N|                Y|0.9866666666666667|
|R11UFMG8M2488I|          4|           23|         26|   N|                N|0.8846153846153846|
|R2OSHKSPXU68W5|          5|          136|        147|   N|                N|0.9251700680272109|
|R3M02FSD3BLUPU|          5|           19|         21|   N|                N|0.9047619047619048|
|R1P0LEEJHH09L3|          5|  

In [38]:
#vine == 'Y'.
vine_Y_df = vote_percent_df.filter(col("vine") == "Y")
vine_Y_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      vote_percent|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
| R6U9701C3BGO6|          3|          139|        147|   Y|                N|0.9455782312925171|
|R1XK3ALB45D7N4|          5|           33|         34|   Y|                N|0.9705882352941176|
|R1IZCSTLX81D6C|          5|           31|         33|   Y|                N|0.9393939393939394|
|R2C8NC8EQLH4JF|          3|           45|         48|   Y|                N|            0.9375|
|R1JJ1YOJMOML1P|          5|           18|         21|   Y|                N|0.8571428571428571|
|R3TKG664L9MTXJ|          4|          164|        175|   Y|                N|0.9371428571428572|
|R2E942L5EX73FP|          5|           29|         33|   Y|                N|0.8787878787878788|
|R3CGRT9HYB3LI7|          5|  

In [34]:
# vine == 'N'.
vine_N_df = vote_percent_df.filter(col("vine") == "N")
vine_N_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      vote_percent|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R35PT06NWP7LDP|          5|           30|         32|   N|                N|            0.9375|
| RQW4AFOG9MR4Z|          5|           51|         52|   N|                N|0.9807692307692307|
|R2SMUEBMGLAJQK|          5|           29|         36|   N|                Y|0.8055555555555556|
|R1XVIZZALU5P6J|          5|          148|        150|   N|                Y|0.9866666666666667|
|R11UFMG8M2488I|          4|           23|         26|   N|                N|0.8846153846153846|
|R2OSHKSPXU68W5|          5|          136|        147|   N|                N|0.9251700680272109|
|R3M02FSD3BLUPU|          5|           19|         21|   N|                N|0.9047619047619048|
|R1P0LEEJHH09L3|          5|  

In [11]:
# Get the total number of reviews
total_review_count = total_votes_df.select("review_id").count()
total_review_count

46909

In [20]:
# number of 5 star reviews
five_star_reviews = total_votes_df.filter(col("star_rating") == 5).count()
five_star_reviews

23991

In [39]:
# number of paid 5 star reviews
paid_reviews = vine_Y_df.count()
paid_reviews

33

In [41]:
five_star_paid_reviews = vine_Y_df.filter(vine_Y_df.star_rating == 5).count()
five_star_paid_reviews

15

In [43]:
# percentage of paid 5 start reviews
paid_percent = (five_star_paid_reviews/paid_reviews) * 100
paid_percent

45.45454545454545

In [44]:
# number of unpaid 5 star reviews
five_star_unpaid_reviews = vine_N_df.filter(vine_N_df.star_rating == 5).count()
five_star_unpaid_reviews

23733

In [45]:
unpaid_reviews = vine_N_df.count()
unpaid_reviews

45388

In [46]:
# percentage of unpaid 5 star reviews
unpaid_percent = (five_star_unpaid_reviews/unpaid_reviews) * 100
unpaid_percent

52.28915131752886

In [47]:
total_five_star_percent = (five_star_reviews/total_review_count) * 100
total_five_star_percent

51.143703766867766