In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com] [Connected to cloud.r-project.org (108.150% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com] [Connected to                                                                                Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:7 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:10 http://ppa.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Vine_Analysis").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Kitchen_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Kitchen_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   37000337|R3DT59XH7HXR9K|B00303FI0G|     529320574|Arthur Court Pape...|         Kitchen|          5|            0|          0|   N|                Y|Beautiful. Looks ...|Beautiful.  Looks...| 2015-08-31|
|         US|   15272914|R1LFS11BNASSU8|B00JCZKZN6|     274237558|Olde Thompson Bav...|         Kitchen|          5|    

In [4]:
from pyspark.sql.functions import col, avg

# Filter voters.
filtered_kitchen_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
filtered_kitchen_df.show(10)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3DT59XH7HXR9K|          5|            0|          0|   N|                Y|
|R1LFS11BNASSU8|          5|            0|          1|   N|                Y|
|R296RT05AG0AF6|          5|            0|          0|   N|                Y|
|R3V37XDZ7ZCI3L|          5|            0|          1|   N|                Y|
|R14GU232NQFYX2|          5|            0|          0|   N|                Y|
| RZQH4V7L2O1PL|          1|            1|          1|   N|                Y|
|R1F8JMOSPJ3KO7|          5|            0|          0|   N|                Y|
|R1ZISGY2BWW4Z5|          5|            0|          0|   N|                Y|
|R17PW4I3AE5WZW|          5|            0|          0|   N|                Y|
|R3D93G1KTP6A8P|          3|            0|          0|   N|     

In [5]:
# 20 plus total votes
twenty_df = filtered_kitchen_df.filter(filtered_kitchen_df['total_votes'] >= 20)
twenty_df.show(10)


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R3FB6BERWPEIJP|          4|           40|         43|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R1XMWJZICINIFX|          3|           20|         21|   N|                Y|
|R20QKY1GABXFLM|          1|          272|        297|   N|                Y|
|R3GZ7CK2O1PPG0|          2|           10|         30|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
|R1E0E5EFZSLJCS|          1|           66|         80|   N|                Y|
|R1TXZQWEHYWEWN|          2|           48|         51|   N|     

In [6]:
# Helpful votes/Total votes >= 50%
fifty_df = twenty_df.filter(twenty_df["helpful_votes"]/twenty_df["total_votes"] >= 0.5)
fifty_df.show(10)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R3FB6BERWPEIJP|          4|           40|         43|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R1XMWJZICINIFX|          3|           20|         21|   N|                Y|
|R20QKY1GABXFLM|          1|          272|        297|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
|R1E0E5EFZSLJCS|          1|           66|         80|   N|                Y|
|R1TXZQWEHYWEWN|          2|           48|         51|   N|                Y|
| RQQAI8YL3UCY2|          5|           23|         25|   N|     

In [15]:
# Vine Reviews
vine_review_df = fifty_df.filter(fifty_df['vine']== 'Y')
vine_review_df.show(10)
vine_review_df.count()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1Z71RW4J9IK93|          5|           20|         22|   Y|                N|
|R3FVB5QI11KI9Q|          4|          192|        200|   Y|                N|
|R2G027YBMVXV6Y|          5|           39|         48|   Y|                N|
|R1QGBAN7BMGWRR|          5|          121|        129|   Y|                N|
|R2NH2AU7XL9ZDZ|          3|           18|         20|   Y|                N|
|R2YVVJ9NOPNX50|          4|           36|         41|   Y|                N|
|R1XH1LK1FWX3OS|          4|          214|        238|   Y|                N|
|R38LSQ71G2IZGS|          5|           26|         29|   Y|                N|
|R2FLITQVKWXDF4|          3|           26|         34|   Y|                N|
|R25LMMZF3DJTWY|          2|           18|         21|   Y|     

1207

In [8]:
vine_count=vine_review_df.count()
vine_count

1207

In [17]:
# Not Vine Reviews
not_vine_review_df = fifty_df.filter(fifty_df['vine']== 'N')
not_vine_review_df.show(10)
not_vine_review_df.count()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R3FB6BERWPEIJP|          4|           40|         43|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R1XMWJZICINIFX|          3|           20|         21|   N|                Y|
|R20QKY1GABXFLM|          1|          272|        297|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
|R1E0E5EFZSLJCS|          1|           66|         80|   N|                Y|
|R1TXZQWEHYWEWN|          2|           48|         51|   N|                Y|
| RQQAI8YL3UCY2|          5|           23|         25|   N|     

97839

In [10]:
not_vine_count=not_vine_review_df.count()  
not_vine_count

97839

In [11]:
# Total Reviews
total_reviews=fifty_df.count()
total_reviews

99046

In [12]:
# Total 5 star reviews
five_stars=fifty_df.filter(fifty_df.star_rating==5).count()
five_stars

46367

In [19]:
# 5 star reviews for vines and percent
vine_five=vine_review_df.filter(vine_review_df.star_rating==5).count()
vine_five_percent=(vine_five/vine_count)*100
print(vine_five)
print(vine_five_percent)

509
42.17067108533554


In [20]:
# 5 star reviews for non-vines and percent
not_vine_five=not_vine_review_df.filter(not_vine_review_df.star_rating==5).count()
not_vine_five_percent=(not_vine_five/not_vine_count)*100
print(not_vine_five)
print(not_vine_five_percent)

45858
46.87087971054488
