In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.1.2'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Wait                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/comp

In [2]:
 from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("level_2").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

## Connect to s3 data

In [3]:
url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
spark_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), sep="\t", header=True)
spark_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...| 2015-08-31|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...|         Watches|          5|    

In [4]:
spark_df.count()

960872

## Process and select data related to vine


product_parent    - Random identifier that can be used to aggregate reviews for the same product.<br>
star_rating       - The 1-5 star rating of the review.<br>
helpful_votes     - Number of helpful votes.<br>
total_votes       - Number of total votes the review received.<br>
vine              - Review was written as part of the Vine program.<br>
verified_purchase - The review is on a verified purchase.<br>

In [5]:
# Removed duplicate rows and rows with na values
raw_vine_df=spark_df.select(["customer_id","product_parent","star_rating", "helpful_votes", "total_votes","vine","verified_purchase"])
raw_vine_df=raw_vine_df.dropna(how="any")
raw_vine_df = raw_vine_df.dropDuplicates()
raw_vine_df.count()

960375

In [6]:
raw_vine_df.show()

+-----------+--------------+-----------+-------------+-----------+----+-----------------+
|customer_id|product_parent|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+--------------+-----------+-------------+-----------+----+-----------------+
|   18440567|     851729310|          5|            1|          1|   N|                N|
|   12750068|     969148256|          5|            0|          0|   N|                Y|
|    4766847|     329989343|          5|            2|          2|   N|                Y|
|   41739247|     740607550|          5|            0|          0|   N|                Y|
|   14358967|     176955326|          5|            0|          0|   N|                Y|
|     174882|     735941801|          4|            0|          2|   N|                N|
|   31427172|     289453173|          5|            0|          0|   N|                Y|
|   14980284|     572601317|          3|            0|          0|   N|                Y|
|   145306

In [7]:
filtered_vine_df=raw_vine_df.filter("total_votes>=10").filter(raw_vine_df["helpful_votes"]/raw_vine_df["total_votes"] >= (1-0.618))
filtered_vine_df.show()

+-----------+--------------+-----------+-------------+-----------+----+-----------------+
|customer_id|product_parent|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+--------------+-----------+-------------+-----------+----+-----------------+
|   20624774|     609917507|          4|           25|         26|   N|                Y|
|   31795932|     682366421|          4|            8|         10|   N|                Y|
|   31908965|      62445149|          1|           23|         23|   N|                Y|
|   37033278|      26401635|          5|           43|         46|   N|                Y|
|   46440092|     356783176|          5|            8|         11|   N|                Y|
|   22878858|     748259212|          3|           36|         39|   N|                Y|
|    7578878|     725214487|          5|            9|         14|   N|                N|
|   23754871|     981900156|          5|           14|         14|   N|                N|
|    28335

In [8]:
filtered_vine_df.count()

24479

In [9]:
from pyspark.sql.functions import col, avg
paid_df = filtered_vine_df.filter("vine='Y'")
unpaid_df = filtered_vine_df.filter("vine='N'")

In [10]:
paid_df.describe().show()

+-------+--------------------+--------------------+------------------+------------------+----------------+----+-----------------+
|summary|         customer_id|      product_parent|       star_rating|     helpful_votes|     total_votes|vine|verified_purchase|
+-------+--------------------+--------------------+------------------+------------------+----------------+----+-----------------+
|  count|                 125|                 125|               125|               125|             125| 125|              125|
|   mean|      4.1348735896E7|     4.50166770264E8|             4.016|            22.912|          26.216|null|             null|
| stddev|1.2126937225969555E7|2.6326763366797188E8|0.9753742044516438|35.194165572187956|37.6211128095948|null|             null|
|    min|            12288995|           102224312|                 1|                10|              10|   Y|                N|
|    max|            53096363|           984068460|                 5|                 9| 

In [11]:
unpaid_df.describe().show()

+-------+--------------------+--------------------+------------------+------------------+------------------+-----+-----------------+
|summary|         customer_id|      product_parent|       star_rating|     helpful_votes|       total_votes| vine|verified_purchase|
+-------+--------------------+--------------------+------------------+------------------+------------------+-----+-----------------+
|  count|               24354|               24354|             24354|             24354|             24354|24354|            24354|
|   mean|3.0853774295844626E7|5.1217918582191837E8| 3.747515808491418| 22.02512934220251|25.013057403301307| null|             null|
| stddev| 1.531187316725172E7|2.8922199903519046E8|1.5417441759081463|45.865329040939116| 49.73405471014589| null|             null|
|    min|            10001397|            10017951|                 1|                10|                10|    N|                N|
|    max|             9998293|            99999591|                 5

In [12]:
paid_arr_str=list(paid_df.select(['star_rating']).toPandas()['star_rating'])
paid_arr=[int(a) for a in paid_arr_str]
unpaid_arr_str=list(unpaid_df.select(['star_rating']).toPandas()['star_rating'])
unpaid_arr=[int(a) for a in unpaid_arr_str]

In [13]:
from scipy import stats
# Run paired t-test
stats.ttest_ind(paid_arr, unpaid_arr, equal_var = False)

Ttest_indResult(statistic=3.057985902095464, pvalue=0.0027171025020883794)

## Results

The t-test value is 3.06, which is higher than the t-test value (2.576) with 99% confidence that it rejects the null hypothesis. The vine reviews and unpaid reviews do **have statisically significant differences**.