In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'
os.environ['SPARK_HOME'] = f'/content/{spark_version}-bin-hadoop3'

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [2 InRelease 14.2 kB/88.7 kB 16%] [Connecting to security.ubuntu.com (185.120% [1 InRelease gpgv 242 kB] [2 InRelease 15.6 kB/88.7 kB 18%] [Connecting to s                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [1 InRelease gpgv 242 kB] [2 InRelease 21.4 kB/88.7 kB 24%] [Connecting to s0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Waiting for headers] [Waiti                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
0% [1 InRelease gpgv 242 kB] [4 InRelease 8,396

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('M17-Amazon-Challenge-Analysis').getOrCreate()

In [3]:
from pyspark import SparkFiles
url = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Kitchen_v1_00.tsv.gz'
spark.sparkContext.addFile(url)
reviews_df = spark.read.option('encoding', 'UTF-8').csv(SparkFiles.get(''), sep='\t',
                                                        header=True, inferSchema=True)

reviews_df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   37000337|R3DT59XH7HXR9K|B00303FI0G|     529320574|Arthur Court Pape...|         Kitchen|          5|            0|          0|   N|                Y|Beautiful. Looks ...|Beautiful.  Looks...|2015-08-31 00:00:00|
|         US|   15272914|R1LFS11BNASSU8|B00JCZKZN6|     274237558|Olde Thompson Bav...| 

In [4]:
# Check datatypes
reviews_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [5]:
# Make DF of vine_table (review_id, star_rating, helpful_votes, total_votes, vine, verified_purchase)
vine_table = reviews_df.select(['review_id', 'star_rating', 'helpful_votes',
                                  'total_votes', 'vine', 'verified_purchase'])

vine_table.show(10)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3DT59XH7HXR9K|          5|            0|          0|   N|                Y|
|R1LFS11BNASSU8|          5|            0|          1|   N|                Y|
|R296RT05AG0AF6|          5|            0|          0|   N|                Y|
|R3V37XDZ7ZCI3L|          5|            0|          1|   N|                Y|
|R14GU232NQFYX2|          5|            0|          0|   N|                Y|
| RZQH4V7L2O1PL|          1|            1|          1|   N|                Y|
|R1F8JMOSPJ3KO7|          5|            0|          0|   N|                Y|
|R1ZISGY2BWW4Z5|          5|            0|          0|   N|                Y|
|R17PW4I3AE5WZW|          5|            0|          0|   N|                Y|
|R3D93G1KTP6A8P|          3|            0|          0|   N|     

In [6]:
# Create DF where total_votes is equal or greater than 20
voted_reviews = vine_table.filter('total_votes >= 20')

voted_reviews.show(10)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R3FB6BERWPEIJP|          4|           40|         43|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R1XMWJZICINIFX|          3|           20|         21|   N|                Y|
|R20QKY1GABXFLM|          1|          272|        297|   N|                Y|
|R3GZ7CK2O1PPG0|          2|           10|         30|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
|R1E0E5EFZSLJCS|          1|           66|         80|   N|                Y|
|R1TXZQWEHYWEWN|          2|           48|         51|   N|     

In [7]:
# Get the rows where helpful_votes / total_votes is equal to or greater than 50% from voted_reviews
helpful_reviews = voted_reviews.filter((voted_reviews['helpful_votes'] / voted_reviews['total_votes']) \
                                        >= 0.5)

helpful_reviews.show(10)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R3FB6BERWPEIJP|          4|           40|         43|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R1XMWJZICINIFX|          3|           20|         21|   N|                Y|
|R20QKY1GABXFLM|          1|          272|        297|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
|R1E0E5EFZSLJCS|          1|           66|         80|   N|                Y|
|R1TXZQWEHYWEWN|          2|           48|         51|   N|                Y|
| RQQAI8YL3UCY2|          5|           23|         25|   N|     

In [8]:
# Get rows that are a part of the Vine program from helpful_reviews
vine_reviews = helpful_reviews.filter('vine=="Y"')

vine_reviews.show(10)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1Z71RW4J9IK93|          5|           20|         22|   Y|                N|
|R3FVB5QI11KI9Q|          4|          192|        200|   Y|                N|
|R2G027YBMVXV6Y|          5|           39|         48|   Y|                N|
|R1QGBAN7BMGWRR|          5|          121|        129|   Y|                N|
|R2NH2AU7XL9ZDZ|          3|           18|         20|   Y|                N|
|R2YVVJ9NOPNX50|          4|           36|         41|   Y|                N|
|R1XH1LK1FWX3OS|          4|          214|        238|   Y|                N|
|R38LSQ71G2IZGS|          5|           26|         29|   Y|                N|
|R2FLITQVKWXDF4|          3|           26|         34|   Y|                N|
|R25LMMZF3DJTWY|          2|           18|         21|   Y|     

In [9]:
# Get rows not a part of the Vine program from helpful_reviews
not_vine_reviews = helpful_reviews.filter('vine=="N"')

not_vine_reviews.show(10)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R3FB6BERWPEIJP|          4|           40|         43|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R1XMWJZICINIFX|          3|           20|         21|   N|                Y|
|R20QKY1GABXFLM|          1|          272|        297|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
|R1E0E5EFZSLJCS|          1|           66|         80|   N|                Y|
|R1TXZQWEHYWEWN|          2|           48|         51|   N|                Y|
| RQQAI8YL3UCY2|          5|           23|         25|   N|     

In [10]:
# Make function to get total reviews, total 5 star reviews and percentage of 5 star reviews
from pyspark.sql.functions import count, round

def get_review_stats(df):

  total_reviews = df.count()
  five_star = df.filter('star_rating==5').count()
  review_percentage = five_star / total_reviews * 100

  columns = ['total_reviews', 'five_star_reviews', 'percent_five_star']
  data = [(total_reviews, five_star, review_percentage)]
  stats_df = spark.createDataFrame(data=data, schema=columns)
  stats_df.withColumn('percent_five_star', round('percent_five_star', 1)).show()

In [11]:
# Get review statistics for vine_reviews
get_review_stats(vine_reviews)

+-------------+-----------------+-----------------+
|total_reviews|five_star_reviews|percent_five_star|
+-------------+-----------------+-----------------+
|         1207|              509|             42.2|
+-------------+-----------------+-----------------+



In [12]:
# Get review statistics for not_vine_reviews
get_review_stats(not_vine_reviews)

+-------------+-----------------+-----------------+
|total_reviews|five_star_reviews|percent_five_star|
+-------------+-----------------+-----------------+
|        97839|            45858|             46.9|
+-------------+-----------------+-----------------+

