In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.0'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.tgz
!pip install -q findspark
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Get:12 http://security.ubuntu.com/ubuntu bionic-security/universe amd64

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

Load Amazon Data into Spark Dataframe

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   32787517| RED72VWWCOS7S|B008HDQYLQ|     348668413|Garden Weasel Gar...| Lawn and Garden|          1|            2|          8|   N|                Y|            One Star|I don't hate the ...|2015-08-31 00:00:00|
|         US|   16374060| RZHWQ208LTEPV|B005OBZBD6|     264704759|10 Foot Mc4 Solar...| 

In [8]:
vine_table_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_table_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RED72VWWCOS7S|          1|            2|          8|   N|                Y|
| RZHWQ208LTEPV|          5|            0|          0|   N|                Y|
|R37LBC3XAVLYOO|          5|            4|          5|   N|                Y|
|R3L7XJMA0MVJWC|          5|            0|          0|   N|                Y|
|R2I2GHSI7T1UBN|          1|            5|          6|   N|                Y|
|R2GFFKHK4I6VMX|          5|            0|          0|   N|                Y|
|R1R0UDX2XAN1S4|          4|            0|          0|   N|                Y|
|R22C8FMBSTFRY8|          5|            2|          2|   N|                Y|
|R118NNIQ75XPGO|          3|            0|          0|   N|                Y|
|R30HYXHZQ49621|          2|            0|          0|   N|     

Steps 1 & 2

In [9]:
votes_df = vine_table_df.filter(df["total_votes"]>=20).filter(df["helpful_votes"]/df["total_votes"] >=0.5)
votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RQQ3KVTU5TJ4I|          5|           24|         29|   N|                Y|
|R3FELXWV9T5CWE|          2|           22|         24|   N|                Y|
| ROBYK6EZYK398|          5|           29|         30|   N|                Y|
|R2YVBBR6NXIA4V|          5|           25|         28|   N|                N|
|R2AVTBDIVG2AW4|          5|           26|         26|   N|                N|
|R1Z2LNN3FANMTO|          1|           20|         24|   N|                N|
|  RLNULBKRWNNR|          5|           42|         43|   N|                Y|
| R9QNQUL94RX1F|          3|           27|         33|   N|                Y|
| RTULFZTUS1VBP|          5|           51|         52|   N|                Y|
|R1BM9RBQWI62O2|          5|           43|         60|   N|     

In [10]:
vine_df = votes_df.filter(votes_df["vine"] == 'Y')
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28DXTC3JQ9IY1|          4|           24|         26|   Y|                N|
|R3AFZKLQXATHBU|          5|           44|         49|   Y|                N|
|R2RUUF2JPJPC0E|          4|           20|         22|   Y|                N|
| RFZ2WUH4248AB|          2|           26|         27|   Y|                N|
|R1Q4LVHIFOWYFR|          5|           23|         28|   Y|                N|
| R4YEPTQED3X1Q|          5|           19|         20|   Y|                N|
|R2Z7C8YCRSC9DP|          5|           22|         22|   Y|                N|
|R3J8OI5CB74P5K|          1|           22|         25|   Y|                N|
| RH39LMKN6AZDC|          5|           33|         40|   Y|                N|
| R8RD8K0ESJSRD|          5|           21|         21|   Y|     

In [11]:
non_vine_df = votes_df.filter(votes_df["vine"] == 'N')
non_vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RQQ3KVTU5TJ4I|          5|           24|         29|   N|                Y|
|R3FELXWV9T5CWE|          2|           22|         24|   N|                Y|
| ROBYK6EZYK398|          5|           29|         30|   N|                Y|
|R2YVBBR6NXIA4V|          5|           25|         28|   N|                N|
|R2AVTBDIVG2AW4|          5|           26|         26|   N|                N|
|R1Z2LNN3FANMTO|          1|           20|         24|   N|                N|
|  RLNULBKRWNNR|          5|           42|         43|   N|                Y|
| R9QNQUL94RX1F|          3|           27|         33|   N|                Y|
| RTULFZTUS1VBP|          5|           51|         52|   N|                Y|
|R1BM9RBQWI62O2|          5|           43|         60|   N|     

Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).

In [15]:
# Paid reviews
paid_total_reviews = vine_df.count()
paid_5star_reviews = vine_df[vine_df['star_rating']==5].count()
paid_5star_percent = float(paid_5star_reviews)/float(paid_total_reviews)
print(f"Total Paid Reviews: {paid_total_reviews}")
print(f"Total Paid 5-Star Reviews: {paid_5star_reviews}")
print(f"Percent: {paid_5star_percent*100}%")

Total Paid Reviews: 386
Total Paid 5-Star Reviews: 176
Percent: 45.59585492227979%


In [19]:
# Unpaid reviews
unpaid_total_reviews = non_vine_df.count()
unpaid_5star_reviews = non_vine_df[non_vine_df['star_rating']==5].count()
unpaid_5star_percent = float(unpaid_5star_reviews)/float(unpaid_total_reviews)
print(f"Total Unpaid Reviews: {unpaid_total_reviews}")
print(f"Total Unpaid 5-Star Reviews: {unpaid_5star_reviews}")
print(f"Percent: {unpaid_5star_percent*100}%")

Total Unpaid Reviews: 48717
Total Unpaid 5-Star Reviews: 24026
Percent: 49.317486708951705%
