In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or0% [1 InRelease gpgv 1,581 B] [Waiting for headers] [Waiting for headers] [Conn                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [1 InRelease gpgv 1,581 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 1,581 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [1 InRelease gpgv 1,581 B] [Waiting for headers] [Waiting for header

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Extract the dataset used in Deliverable 1

In [3]:
from pyspark import SparkFiles
url = "https://bootcamp-lps-pyspark-elt.s3.amazonaws.com/amazon_reviews_us_Musical_Instruments_v1_00.tsv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Musical_Instruments_v1_00.tsv"), sep="\t", header=True, inferSchema=True)
df.show()
df.count()

+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|   product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   45610553| RMDCHWD0Y5OZ9|B00HH62VB6|     618218723|AGPtek® 10 Isolat...|Musical Instruments|          3|            0|          1|   N|                N|         Three Stars|Works very good, ...| 2015-08-31|
|         US|   14640079| RZSL0BALIYUNU|B003LRN53I|     986692292|Sennheiser HD203 ...|Musical Instruments| 

904765

### 1) Retrieve all the rows where the total_votes count is equal to or greater than 20

In [4]:
df_20 = df.filter(df.total_votes >= 20)
df_20.show(10)
df_20.count()

+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|   product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   15365576|R2243Y3OD8U6KQ|B00W1RTVGO|     160618734|Supertech stage l...|Musical Instruments|          5|           47|         61|   N|                N|This fills a room...|I am always looki...| 2015-08-31|
|         US|   28770559|R2TGT0CDTCAAHW|B00INJ7HBK|     157027184|Singing Machine I...|Musical Instruments| 

16520

### 2) Retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%

In [5]:
df_50 = df_20.filter((df_20.helpful_votes / df_20.total_votes) >= 0.5)
df_50.show(10)
df_50.count()

+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|   product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   15365576|R2243Y3OD8U6KQ|B00W1RTVGO|     160618734|Supertech stage l...|Musical Instruments|          5|           47|         61|   N|                N|This fills a room...|I am always looki...| 2015-08-31|
|         US|   28770559|R2TGT0CDTCAAHW|B00INJ7HBK|     157027184|Singing Machine I...|Musical Instruments| 

14537

### 3) Retrieve all the rows where a review was written as part of the Vine program (paid)

In [6]:
paid_df = df_50.filter(df_50.vine == 'Y')
paid_df.show(10)
paid_df.count()

+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|   product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   42689422|R1R9RU7JW0MFR2|B0124Y99PQ|     618027384|Casio CGP-700BK C...|Musical Instruments|          4|           20|         23|   Y|                N|Be prepared to be...|First off PLEASE ...| 2015-08-27|
|         US|   29182364|R19EFYNN3W8Q07|B00ZU4G0ZK|     499223759|TC Electronic Pol...|Musical Instruments| 

60

### 4) Retrieve all the rows where the review was not part of the Vine program (unpaid)

In [7]:
unpaid_df = df_50.filter(df_50.vine == 'N')
unpaid_df.show(10)
unpaid_df.count()

+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|   product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   15365576|R2243Y3OD8U6KQ|B00W1RTVGO|     160618734|Supertech stage l...|Musical Instruments|          5|           47|         61|   N|                N|This fills a room...|I am always looki...| 2015-08-31|
|         US|   28770559|R2TGT0CDTCAAHW|B00INJ7HBK|     157027184|Singing Machine I...|Musical Instruments| 

14477

### 5) Subtotals and percentages in each category

In [8]:
paid = paid_df.count()
print("Total paid reviews:", paid)

Total paid reviews: 60


In [9]:
# Paid 5 star reviews
paid_5star = paid_df.filter(paid_df.star_rating == 5).count()
print("Paid 5-star reviews:", paid_5star)

Paid 5-star reviews: 34


In [10]:
# Paid 5-star reviews %
paid_5star_percent = round((paid_5star / paid * 100),2)
print("Paid 5-star reviews percentage:", paid_5star_percent, "%")

Paid 5-star reviews percentage: 56.67 %


In [11]:
unpaid = unpaid_df.count()
print("Total unpaid reviews:", unpaid)

Total unpaid reviews: 14477


In [12]:
# Unpaid 5 star reviews
unpaid_5star = unpaid_df.filter(unpaid_df.star_rating == 5).count()
unpaid_5star
print("Unpaid 5-star reviews", unpaid_5star)

Unpaid 5-star reviews 8212


In [13]:
# Unpaid 5-star reviews %
unpaid_5star_percent = round((unpaid_5star / unpaid * 100),2)
print("Unpaid 5-star reviews percentage:", unpaid_5star_percent, "%")

Unpaid 5-star reviews percentage: 56.72 %


### Summary

In [14]:
print("SUMMARY")
print("===========================================")
print("Total paid reviews:", paid)
print("Paid 5-star reviews:", paid_5star)
print("Paid 5-star reviews percentage:", paid_5star_percent, "%")
print("===========================================")
print("Total unpaid reviews:", unpaid)
print("Unpaid 5-star reviews", unpaid_5star)
print("Unpaid 5-star reviews percentage:", unpaid_5star_percent, "%")

SUMMARY
Total paid reviews: 60
Paid 5-star reviews: 34
Paid 5-star reviews percentage: 56.67 %
Total unpaid reviews: 14477
Unpaid 5-star reviews 8212
Unpaid 5-star reviews percentage: 56.72 %
