In [1]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu focal InRelease
Get:4 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Get:6 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Hit:7 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Get:8 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:9 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:10 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease [24.3 kB]
Get:11 http://archive.ubuntu.com/ubuntu focal-updates/universe amd64 Packages [1,323 kB]
Get:12 http://security.ubuntu.com/ubuntu focal-security/universe amd64 Packages [1,027 kB]
Hit:13 http://ppa.launchpad.net/ubuntugis/ppa/ubuntu focal InRelease
Ge

In [2]:
# Start Spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("VineReviews").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Beauty_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

# Read in the Review dataset as a DataFrame
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)

In [39]:
# Create the vine_table. DataFrame

vine_df = df.selectExpr(["review_id", "INT(star_rating)", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3I2DHQBR577SS|          5|            0|          0|   N|                Y|
|R1QNE9NQFJC2Y4|          5|            0|          0|   N|                Y|
|R3LIDG2Q4LJBAO|          5|            0|          0|   N|                Y|
|R3KSZHPAEVPEAL|          5|            0|          0|   N|                Y|
| RAI2OIG50KZ43|          5|            0|          0|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [40]:
from pyspark.sql.functions import col

In [41]:
# Filter table by total votes greater than 20 for most relevant reviews 

vine_df_new = vine_df.where(vine_df.total_votes>='20')
vine_df_new.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2QRUE9REK8OUC|          5|           18|         23|   N|                Y|
|R2POXRW6PXHXZG|          5|           25|         30|   N|                Y|
| RZN43XRZ89IIJ|          3|           10|         27|   N|                N|
|R29Q748WSHZ3SN|          5|           23|         24|   N|                Y|
| R46UL5G5HEPRZ|          5|           18|         20|   N|                N|
|R2UQHSNWU6WTZX|          5|           74|         76|   N|                N|
|R2HSRI3D6E2M9Y|          5|           69|         71|   N|                Y|
|R17ARFSDV555EW|          1|           29|         29|   N|                Y|
|R3GL5156FFEDQA|          5|           20|         21|   N|                Y|
|R29KQJC9PIYWQS|          5|           25|         26|   N|     

In [43]:
# helpful votes percentage 50% or greater
vine_df_final = vine_df_new.withColumn("helpfulness", col("helpful_votes") / col("total_votes")*100)
vine_df_final.show()


+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      helpfulness|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R2QRUE9REK8OUC|          5|           18|         23|   N|                Y|78.26086956521739|
|R2POXRW6PXHXZG|          5|           25|         30|   N|                Y|83.33333333333334|
| RZN43XRZ89IIJ|          3|           10|         27|   N|                N|37.03703703703704|
|R29Q748WSHZ3SN|          5|           23|         24|   N|                Y|95.83333333333334|
| R46UL5G5HEPRZ|          5|           18|         20|   N|                N|             90.0|
|R2UQHSNWU6WTZX|          5|           74|         76|   N|                N|97.36842105263158|
|R2HSRI3D6E2M9Y|          5|           69|         71|   N|                Y| 97.1830985915493|
|R17ARFSDV555EW|          1|           2

In [44]:
vine_df_final = vine_df_final.where(vine_df_final.helpfulness >= '50')
vine_df_final.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      helpfulness|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R2QRUE9REK8OUC|          5|           18|         23|   N|                Y|78.26086956521739|
|R2POXRW6PXHXZG|          5|           25|         30|   N|                Y|83.33333333333334|
|R29Q748WSHZ3SN|          5|           23|         24|   N|                Y|95.83333333333334|
| R46UL5G5HEPRZ|          5|           18|         20|   N|                N|             90.0|
|R2UQHSNWU6WTZX|          5|           74|         76|   N|                N|97.36842105263158|
|R2HSRI3D6E2M9Y|          5|           69|         71|   N|                Y| 97.1830985915493|
|R17ARFSDV555EW|          1|           29|         29|   N|                Y|            100.0|
|R3GL5156FFEDQA|          5|           2

In [45]:
# Filter on paid reviews 
vine_df_paid = vine_df_final.where(vine_df_new.vine=='Y')
vine_df_paid.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      helpfulness|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R106V6GUNWRSSS|          5|          857|        889|   Y|                N|96.40044994375702|
|R37N8KZS48W36A|          5|           36|         37|   Y|                N| 97.2972972972973|
|R1LNEQ2MQE03PD|          5|           49|         66|   Y|                N|74.24242424242425|
|R1ECPWPUNK36ES|          5|           29|         31|   Y|                N|93.54838709677419|
|R16BUM7UQZLOM6|          3|          107|        112|   Y|                N|95.53571428571429|
| R7NWIOCA5RVPR|          5|           19|         26|   Y|                N|73.07692307692307|
|R3NTGOCUZB33JK|          5|           40|         47|   Y|                Y| 85.1063829787234|
| RQX3RHGMG7UBN|          5|          18

In [46]:
# Paid analysis - total nbr of reviews
count_paid = vine_df_paid.agg({"review_id": "count"}).show()

+----------------+
|count(review_id)|
+----------------+
|             647|
+----------------+



In [47]:
# Paid analysis - total nbr of 5 star reviews
paid_5stars_df = vine_df_paid.where(vine_df_paid.star_rating == '5')
paid_5stars_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      helpfulness|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R106V6GUNWRSSS|          5|          857|        889|   Y|                N|96.40044994375702|
|R37N8KZS48W36A|          5|           36|         37|   Y|                N| 97.2972972972973|
|R1LNEQ2MQE03PD|          5|           49|         66|   Y|                N|74.24242424242425|
|R1ECPWPUNK36ES|          5|           29|         31|   Y|                N|93.54838709677419|
| R7NWIOCA5RVPR|          5|           19|         26|   Y|                N|73.07692307692307|
|R3NTGOCUZB33JK|          5|           40|         47|   Y|                Y| 85.1063829787234|
| RQX3RHGMG7UBN|          5|          186|        200|   Y|                N|             93.0|
|R1AF6ANE5JFP4D|          5|           5

In [48]:
count_paid_5stars = paid_5stars_df.agg({"review_id": "count"})
count_paid_5stars.show()

+----------------+
|count(review_id)|
+----------------+
|             229|
+----------------+



In [49]:
percentage_paid = 229/647*100
percentage_paid

35.394126738794434

In [50]:
# Filter on unpaid reviews 
vine_df_unpaid = vine_df_final.where(vine_df_new.vine=='N')
vine_df_unpaid.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      helpfulness|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R2QRUE9REK8OUC|          5|           18|         23|   N|                Y|78.26086956521739|
|R2POXRW6PXHXZG|          5|           25|         30|   N|                Y|83.33333333333334|
|R29Q748WSHZ3SN|          5|           23|         24|   N|                Y|95.83333333333334|
| R46UL5G5HEPRZ|          5|           18|         20|   N|                N|             90.0|
|R2UQHSNWU6WTZX|          5|           74|         76|   N|                N|97.36842105263158|
|R2HSRI3D6E2M9Y|          5|           69|         71|   N|                Y| 97.1830985915493|
|R17ARFSDV555EW|          1|           29|         29|   N|                Y|            100.0|
|R3GL5156FFEDQA|          5|           2

In [51]:
# Unpaid analysis - total nbr of reviews
count_unpaid = vine_df_unpaid.agg({"review_id": "count"}).show()

+----------------+
|count(review_id)|
+----------------+
|           74113|
+----------------+



In [52]:
# Unpaid analysis - total nbr of 5 star reviews
unpaid_5stars_df = vine_df_unpaid.where(vine_df_unpaid.star_rating == '5')
unpaid_5stars_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      helpfulness|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R2QRUE9REK8OUC|          5|           18|         23|   N|                Y|78.26086956521739|
|R2POXRW6PXHXZG|          5|           25|         30|   N|                Y|83.33333333333334|
|R29Q748WSHZ3SN|          5|           23|         24|   N|                Y|95.83333333333334|
| R46UL5G5HEPRZ|          5|           18|         20|   N|                N|             90.0|
|R2UQHSNWU6WTZX|          5|           74|         76|   N|                N|97.36842105263158|
|R2HSRI3D6E2M9Y|          5|           69|         71|   N|                Y| 97.1830985915493|
|R3GL5156FFEDQA|          5|           20|         21|   N|                Y|95.23809523809523|
|R29KQJC9PIYWQS|          5|           2

In [53]:
count_unpaid_5stars = unpaid_5stars_df.agg({"review_id": "count"})
count_unpaid_5stars.show()

+----------------+
|count(review_id)|
+----------------+
|           43217|
+----------------+



In [54]:
percentage_unpaid = 43217/74113*100
percentage_unpaid

58.312306882733125