In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()


0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Connecting to security.ubuntu.com (91.189.91.39)] [Connecting to cloud.r-pr                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [2 InRelease 15.6 kB/88.7 kB 18%] [Connecting to security.ubuntu.com (91.1890% [1 InRelease gpgv 242 kB] [2 InRelease 15.6 kB/88.7 kB 18%] [Connecting to s                                                                               Hit:4 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [1 InRelease gpgv 242 kB] [2 InRelease 30.1 kB/88.7 kB 34%] [Connecting to s0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Connecting to security.ubun                                                       

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Vine_Review_Analysis").getOrCreate()


In [5]:
from pyspark import SparkFiles

url = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Mobile_Electronics_v1_00.tsv.gz'

spark.sparkContext.addFile(url)

df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)

df.show()


+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|  product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   20422322| R8MEA6IGAHO0B|B00MC4CED8|     217304173|BlackVue DR600GW-PMP|Mobile_Electronics|          5|            0|          0|   N|                Y|         Very Happy!|As advertised. Ev...|2015-08-31 00:00:00|
|         US|   40835037|R31LOQ8JGLPRLK|B00OQMFG1Q|     137313254|GENSSI GSM / G

In [6]:
# Create the vine review DataFrame to be filtered and analyzed below.
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R8MEA6IGAHO0B|          5|            0|          0|   N|                Y|
|R31LOQ8JGLPRLK|          5|            0|          1|   N|                Y|
|R2Y0MM9YE6OP3P|          5|            0|          0|   N|                Y|
| RRB9C05HDOD4O|          4|            0|          0|   N|                Y|
|R26I2RI1GFV8QG|          2|            0|          0|   N|                Y|
| RY8DDL22YG4R5|          3|            0|          1|   N|                Y|
|R2AT2426ZHFUHH|          3|            0|          1|   N|                Y|
|R3RRXU2R23NMQ9|          5|            0|          0|   N|                Y|
|R250PR8VJUZ62F|          4|            0|          2|   N|                Y|
| RBEMQ29WJBHYG|          5|          164|        168|   N|     

In [7]:
# Filter Step 1:
# Filter the data and create a new DataFrame to retrieve all the rows 
# where the total_votes count is equal to or greater than 20 to pick 
# reviews that are more likely to be helpful and to avoid having division 
# by zero errors later on.
vine_clean_df = vine_df.filter('total_votes > 0')
vine_clean_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R31LOQ8JGLPRLK|          5|            0|          1|   N|                Y|
| RY8DDL22YG4R5|          3|            0|          1|   N|                Y|
|R2AT2426ZHFUHH|          3|            0|          1|   N|                Y|
|R250PR8VJUZ62F|          4|            0|          2|   N|                Y|
| RBEMQ29WJBHYG|          5|          164|        168|   N|                Y|
|R19VVIUT4BZCMT|          5|            2|          2|   N|                Y|
|R1DT8JJUQHUKUL|          3|            0|          1|   N|                Y|
|R34EZZ68VYPHO0|          1|            1|          1|   N|                Y|
|R3TZNSOXS13SIG|          2|            1|          3|   N|                Y|
|R12UBZT87UX3AP|          5|            2|          2|   N|     

In [30]:
vine_clean_df.describe()


DataFrame[summary: string, review_id: string, star_rating: string, helpful_votes: string, total_votes: string, vine: string, verified_purchase: string]

In [9]:
# Filter Step 2:
# Filter the new DataFrame created in Step 1 and create a new DataFrame 
# to retrieve all the rows where the number of helpful_votes divided by total_votes 
# is equal to or greater than 50%.
vine_helpful_votes_df = vine_clean_df.filter('(helpful_votes / total_votes) > 0.5')
vine_helpful_votes_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RBEMQ29WJBHYG|          5|          164|        168|   N|                Y|
|R19VVIUT4BZCMT|          5|            2|          2|   N|                Y|
|R34EZZ68VYPHO0|          1|            1|          1|   N|                Y|
|R12UBZT87UX3AP|          5|            2|          2|   N|                Y|
| RT12RQFU0V2AX|          5|            1|          1|   N|                Y|
|R2K0YQPV5W2SJ1|          4|            2|          2|   N|                Y|
|R3IF1H9L7WSGQD|          5|            1|          1|   N|                N|
| R5I2YBT9J85FF|          5|            2|          2|   N|                Y|
|R2074W1A28UUU6|          5|            9|         11|   N|                N|
|R2RWA9DSTPVRCI|          5|            2|          2|   N|     

In [15]:
# Filter the DataFrame created in Step 2 and create a new DataFrame that 
# retrieves all the rows where a review was written as part of the 
# Vine program (paid), i.e. vine == 'Y'.
vine_reviews_df = vine_helpful_votes_df.filter('vine == "Y"')
vine_reviews_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3BOQTLUU3Y4L8|          4|           42|         55|   Y|                N|
| RWF03LXVXC22A|          3|          396|        445|   Y|                N|
|R2MUPC17C2FAA3|          4|           11|         14|   Y|                N|
| R19DZ4ZILR75G|          4|            5|          9|   Y|                N|
|R37RHDWFZYGHQS|          5|            6|         10|   Y|                N|
| REPU28WG1VZUE|          5|          242|        281|   Y|                N|
|R1D6REC9HPJVQY|          4|           31|         41|   Y|                N|
| R9US6D46U1A00|          5|            8|         11|   Y|                N|
| RH7LF0WVHLWIK|          2|            4|          7|   Y|                N|
+--------------+-----------+-------------+-----------+----+-----

In [31]:
# Filter the DataFrame created in Step 2 and create a new DataFrame that 
# retrieves all the rows where a review was NOT written as part of the
# Vine paid program, i.e. vine == 'N'.
nonvine_reviews_df = vine_helpful_votes_df.filter('vine == "N"')
nonvine_reviews_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RBEMQ29WJBHYG|          5|          164|        168|   N|                Y|
|R19VVIUT4BZCMT|          5|            2|          2|   N|                Y|
|R34EZZ68VYPHO0|          1|            1|          1|   N|                Y|
|R12UBZT87UX3AP|          5|            2|          2|   N|                Y|
| RT12RQFU0V2AX|          5|            1|          1|   N|                Y|
|R2K0YQPV5W2SJ1|          4|            2|          2|   N|                Y|
|R3IF1H9L7WSGQD|          5|            1|          1|   N|                N|
| R5I2YBT9J85FF|          5|            2|          2|   N|                Y|
|R2074W1A28UUU6|          5|            9|         11|   N|                N|
|R2RWA9DSTPVRCI|          5|            2|          2|   N|     

In [51]:
# Calculate the following values for the vine reviews:
# Total number of reviews 
# Total number of 5-star reviews
# Percentage of 5-star reviews
vine_review_total = vine_reviews_df.agg({'total_votes': 'sum'}). \
  withColumnRenamed('sum(total_votes)', 'sum_total_votes')

vine_review_5_stars = vine_reviews_df.filter('star_rating == 5'). \
  agg({'total_votes': 'sum'}). \
  withColumnRenamed('sum(total_votes)', 'five_star_votes')

vine_review_total.show()
vine_review_5_stars.show()

vine_percentage = round((vine_review_5_stars.collect()[0][0] / vine_review_total.collect()[0][0]) * 100)

print('The percentage of vine 5 star reviews is: ', vine_percentage)


+---------------+
|sum_total_votes|
+---------------+
|            873|
+---------------+

+---------------+
|five_star_votes|
+---------------+
|            302|
+---------------+

The percentage of vine 5 star reviews is:  35


In [52]:
# Calculate the following values for the non-vine reviews:
# Total number of reviews 
# Total number of 5-star reviews
# Percentage of 5-star reviews
nonvine_review_total = nonvine_reviews_df.agg({'total_votes': 'sum'}). \
  withColumnRenamed('sum(total_votes)', 'sum_total_votes')

nonvine_review_5_stars = nonvine_reviews_df.filter('star_rating == 5'). \
  agg({'total_votes': 'sum'}). \
  withColumnRenamed('sum(total_votes)', 'five_star_votes')

nonvine_review_total.show()
nonvine_review_5_stars.show()

nonvine_percentage = round((nonvine_review_5_stars.collect()[0][0] / nonvine_review_total.collect()[0][0]) * 100)

print('The percentage of non-vine 5 star reviews is: ', nonvine_percentage)


+---------------+
|sum_total_votes|
+---------------+
|         134751|
+---------------+

+---------------+
|five_star_votes|
+---------------+
|          64375|
+---------------+

The percentage of non-vine 5 star reviews is:  48
