In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:8 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Get:9 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic I

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("VineReview").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Kitchen_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   37000337|R3DT59XH7HXR9K|B00303FI0G|     529320574|Arthur Court Pape...|         Kitchen|          5|            0|          0|   N|                Y|Beautiful. Looks ...|Beautiful.  Looks...|2015-08-31 00:00:00|
|         US|   15272914|R1LFS11BNASSU8|B00JCZKZN6|     274237558|Olde Thompson Bav...| 

In [4]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3DT59XH7HXR9K|          5|            0|          0|   N|                Y|
|R1LFS11BNASSU8|          5|            0|          1|   N|                Y|
|R296RT05AG0AF6|          5|            0|          0|   N|                Y|
|R3V37XDZ7ZCI3L|          5|            0|          1|   N|                Y|
|R14GU232NQFYX2|          5|            0|          0|   N|                Y|
| RZQH4V7L2O1PL|          1|            1|          1|   N|                Y|
|R1F8JMOSPJ3KO7|          5|            0|          0|   N|                Y|
|R1ZISGY2BWW4Z5|          5|            0|          0|   N|                Y|
|R17PW4I3AE5WZW|          5|            0|          0|   N|                Y|
|R3D93G1KTP6A8P|          3|            0|          0|   N|     

In [5]:
# Deliverable 2
from pyspark.sql.functions import col

clean_total_votes = vine_df.filter(col("total_votes") >= 20)
clean_total_votes.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R3FB6BERWPEIJP|          4|           40|         43|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R1XMWJZICINIFX|          3|           20|         21|   N|                Y|
|R20QKY1GABXFLM|          1|          272|        297|   N|                Y|
|R3GZ7CK2O1PPG0|          2|           10|         30|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
|R1E0E5EFZSLJCS|          1|           66|         80|   N|                Y|
|R1TXZQWEHYWEWN|          2|           48|         51|   N|     

In [6]:
clean_helpful_votes = clean_total_votes.filter(col("helpful_votes") / (clean_total_votes["total_votes"]) >= 0.5)
clean_helpful_votes.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|5          |20           |20         |N   |Y                |
|R3FB6BERWPEIJP|4          |40           |43         |N   |Y                |
|R1D4Z38STRDQXK|5          |53           |56         |N   |Y                |
|R1XMWJZICINIFX|3          |20           |21         |N   |Y                |
|R20QKY1GABXFLM|1          |272          |297        |N   |Y                |
|R328FA1E6FY3F5|5          |17           |20         |N   |N                |
|R3DH22AA5WGLLS|5          |30           |30         |N   |N                |
|R1E0E5EFZSLJCS|1          |66           |80         |N   |Y                |
|R1TXZQWEHYWEWN|2          |48           |51         |N   |Y                |
|RQQAI8YL3UCY2 |5          |23           |25         |N   |Y    

In [7]:
vine_program_df = clean_helpful_votes.filter(col("vine") == 'Y')
vine_program_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1Z71RW4J9IK93|          5|           20|         22|   Y|                N|
|R3FVB5QI11KI9Q|          4|          192|        200|   Y|                N|
|R2G027YBMVXV6Y|          5|           39|         48|   Y|                N|
|R1QGBAN7BMGWRR|          5|          121|        129|   Y|                N|
|R2NH2AU7XL9ZDZ|          3|           18|         20|   Y|                N|
|R2YVVJ9NOPNX50|          4|           36|         41|   Y|                N|
|R1XH1LK1FWX3OS|          4|          214|        238|   Y|                N|
|R38LSQ71G2IZGS|          5|           26|         29|   Y|                N|
|R2FLITQVKWXDF4|          3|           26|         34|   Y|                N|
|R25LMMZF3DJTWY|          2|           18|         21|   Y|     

In [9]:
not_vine_program_df = clean_helpful_votes.filter(col("vine") == 'N')
not_vine_program_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R3FB6BERWPEIJP|          4|           40|         43|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R1XMWJZICINIFX|          3|           20|         21|   N|                Y|
|R20QKY1GABXFLM|          1|          272|        297|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
|R1E0E5EFZSLJCS|          1|           66|         80|   N|                Y|
|R1TXZQWEHYWEWN|          2|           48|         51|   N|                Y|
| RQQAI8YL3UCY2|          5|           23|         25|   N|     

In [10]:
paid_5_star_df = vine_program_df.filter(col("star_rating") == '5')
paid_5_star_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1Z71RW4J9IK93|          5|           20|         22|   Y|                N|
|R2G027YBMVXV6Y|          5|           39|         48|   Y|                N|
|R1QGBAN7BMGWRR|          5|          121|        129|   Y|                N|
|R38LSQ71G2IZGS|          5|           26|         29|   Y|                N|
|R18NC69T0KEC00|          5|           20|         23|   Y|                N|
| ROYYZ0D9LMYS3|          5|           25|         33|   Y|                N|
|R1THPHKHO1BPRZ|          5|           15|         20|   Y|                N|
|R11K4PQS62ROLS|          5|           72|         75|   Y|                N|
| RMBQSVTJWY2A2|          5|           26|         28|   Y|                N|
| RXLOQIB7S4OKN|          5|           31|         33|   Y|     

In [11]:
unpaid_5_star_df = not_vine_program_df.filter(col("star_rating") == '5')
unpaid_5_star_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
| RQQAI8YL3UCY2|          5|           23|         25|   N|                Y|
|R3SHJSHTY8AXKR|          5|           69|         72|   N|                Y|
|R16QL1GNQVT4G9|          5|          117|        119|   N|                Y|
|R1MV4YDD6O5UME|          5|           30|         32|   N|                N|
|R3A8EHXA0JX7WV|          5|           18|         24|   N|                N|
|R2M7TEVV9YSLTR|          5|           94|         96|   N|     

In [12]:
from pyspark.sql.functions import count
paid_five_star_count = paid_5_star_df.agg(count("star_rating")).withColumnRenamed("count(star_rating)", "five_star_rating_count (vine program)").show()

+-------------------------------------+
|five_star_rating_count (vine program)|
+-------------------------------------+
|                                  509|
+-------------------------------------+



In [13]:
unpaid_five_star_count = unpaid_5_star_df.agg(count("star_rating")).withColumnRenamed("count(star_rating)", "five_star_rating_count (unpaid)").show()

+-------------------------------+
|five_star_rating_count (unpaid)|
+-------------------------------+
|                          45858|
+-------------------------------+



In [14]:
total_paid_count = vine_program_df.agg(count("review_id")).withColumnRenamed("count(review_id)", "total_reviews (vine program)").show()


+----------------------------+
|total_reviews (vine program)|
+----------------------------+
|                        1207|
+----------------------------+



In [15]:
total_unpaid_count = not_vine_program_df.agg(count("review_id")).withColumnRenamed("count(review_id)", "total_reviews (unpaid)").show()

+----------------------+
|total_reviews (unpaid)|
+----------------------+
|                 97839|
+----------------------+



In [16]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
review_data = [("Paid", 1207, 509),
               ("Unpaid", 97839, 45858)]

schema = StructType([
    StructField("Vine type", StringType(), True),
    StructField("Total Reviews", IntegerType(), True),
    StructField("Five Star Reviews", IntegerType(), True)
])

review_analysis_df = spark.createDataFrame(data=review_data, schema=schema)
review_analysis_df.printSchema()
review_analysis_df.show(truncate=False)


root
 |-- Vine type: string (nullable = true)
 |-- Total Reviews: integer (nullable = true)
 |-- Five Star Reviews: integer (nullable = true)

+---------+-------------+-----------------+
|Vine type|Total Reviews|Five Star Reviews|
+---------+-------------+-----------------+
|Paid     |1207         |509              |
|Unpaid   |97839        |45858            |
+---------+-------------+-----------------+



In [17]:
review_analysis_df = review_analysis_df.withColumn("Five Star Percentage", review_analysis_df['Five Star Reviews'] / review_analysis_df['Total Reviews'] * 100).show()

+---------+-------------+-----------------+--------------------+
|Vine type|Total Reviews|Five Star Reviews|Five Star Percentage|
+---------+-------------+-----------------+--------------------+
|     Paid|         1207|              509|   42.17067108533554|
|   Unpaid|        97839|            45858|   46.87087971054488|
+---------+-------------+-----------------+--------------------+

