In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:6 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:8 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:9 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Ign:10 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:11 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu b

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-07-16 06:07:05--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-07-16 06:07:05 (11.3 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()


Load Amazon Data into Spark DataFrame

In [5]:
from pyspark import SparkFiles
url = "https://denverdataviz.s3.amazonaws.com/amazon_reviews_us_Automotive_v1_00.tsv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   36075342| RAB23OVFNCXZQ|B00LPRXQ4Y|     339193102|17" 2003-2006 For...|      Automotive|          1|            0|          0|   N|                Y|     As it was used,|As it was used, t...| 2015-08-31|
|         US|   42462164|R3NORADVJO6IE6|B000C7S0TO|     907684644|Spectra Premium C...|      Automotive|          5|    

In [10]:
new_df = df.select(["product_id", "total_votes", "helpful_votes", "vine",  "review_id", "star_rating", "verified_purchase"])
new_df.show()

+----------+-----------+-------------+----+--------------+-----------+-----------------+
|product_id|total_votes|helpful_votes|vine|     review_id|star_rating|verified_purchase|
+----------+-----------+-------------+----+--------------+-----------+-----------------+
|B00LPRXQ4Y|          0|            0|   N| RAB23OVFNCXZQ|          1|                Y|
|B000C7S0TO|          0|            0|   N|R3NORADVJO6IE6|          5|                Y|
|B000CO9WE4|          1|            1|   N|R299F4SO98S5OO|          5|                Y|
|B000GKD5NI|          3|            2|   N|R2DA9DOT03UW6I|          5|                Y|
|B009SDA7TE|          0|            0|   N|R2OGCH681EQHU6|          5|                Y|
|B00KV15KRG|          2|            2|   N| R2JMKIC16MHD7|          5|                Y|
|B0002JMAKW|          0|            0|   N|R1DB5DA7CWWTI8|          5|                Y|
|B00XJKMM6S|          0|            0|   N|R1N8XWFDK4QACP|          5|                Y|
|B000C5CEKC|         

In [11]:
# 1.Creata new DF: total votes are greater than 20
total_votes_df = new_df.filter('total_votes>=20')
total_votes_df.show()

+----------+-----------+-------------+----+--------------+-----------+-----------------+
|product_id|total_votes|helpful_votes|vine|     review_id|star_rating|verified_purchase|
+----------+-----------+-------------+----+--------------+-----------+-----------------+
|B00V03D9KY|         31|            8|   N|R1T4FVSABO0IDP|          5|                Y|
|B00IS941D2|         21|           21|   N|R397VXR1GAK6C9|          5|                Y|
|B00VNBDWPK|         69|           59|   N|R2H6KB9RHS17GA|          3|                Y|
|B00UW4DLJ6|         28|            6|   N|R3HAK0MUN9F5IT|          1|                Y|
|B0101SLT5U|         21|           21|   N|R29UTZTOPUVRSV|          5|                Y|
|B009W85R4A|         32|           27|   N| RY9FLUD8VG6XD|          4|                Y|
|B000GQA8DC|         40|           34|   N|R3Q5NAY8BGAKHI|          5|                Y|
|B002G3OG6S|         30|           30|   N| RRVJ7TYDEK7EO|          5|                Y|
|B00LAJTEFW|         

In [13]:
# 2. Create new DF: helpful votes/total_votes > %50 
helpful_votes_df= new_df.withColumn("helpful_votes_percantage", new_df['helpful_votes']/new_df['total_votes']).filter('helpful_votes_percantage>=0.5')
helpful_votes_df.show()

+----------+-----------+-------------+----+--------------+-----------+-----------------+------------------------+
|product_id|total_votes|helpful_votes|vine|     review_id|star_rating|verified_purchase|helpful_votes_percantage|
+----------+-----------+-------------+----+--------------+-----------+-----------------+------------------------+
|B000CO9WE4|          1|            1|   N|R299F4SO98S5OO|          5|                Y|                     1.0|
|B000GKD5NI|          3|            2|   N|R2DA9DOT03UW6I|          5|                Y|      0.6666666666666666|
|B00KV15KRG|          2|            2|   N| R2JMKIC16MHD7|          5|                Y|                     1.0|
|B00GRV48TK|          2|            2|   N| R4WS0E0MSP9DH|          5|                Y|                     1.0|
|B00JH2DL4O|          3|            3|   N|R20S7DPGDWCM91|          4|                Y|                     1.0|
|B000BTO0CY|          2|            1|   N|R2PNF1QGR4SMTD|          5|                Y|

In [15]:
# 3. Create a new DF: vine =='Y'
y_df = helpful_votes_df.filter(helpful_votes_df['vine']=='Y')
y_df.show()

+----------+-----------+-------------+----+--------------+-----------+-----------------+------------------------+
|product_id|total_votes|helpful_votes|vine|     review_id|star_rating|verified_purchase|helpful_votes_percantage|
+----------+-----------+-------------+----+--------------+-----------+-----------------+------------------------+
|B00NBN1C78|          1|            1|   Y|R10NCRGJVMP7L7|          4|                N|                     1.0|
|B00KVVBEWQ|          3|            3|   Y|R23WHXDAFZ2Z8X|          2|                N|                     1.0|
|B00H36XX6C|          8|            8|   Y|R10TBBMVLW5KL3|          5|                N|                     1.0|
|B00KVVBEWQ|          7|            6|   Y|R3QZPRZM7AJSJO|          4|                N|      0.8571428571428571|
|B00DILTE1Y|          3|            2|   Y|R2M8DC3E0EI7D6|          5|                N|      0.6666666666666666|
|B00LML7GJY|          1|            1|   Y|R19B1CGHBEBUCJ|          5|                N|

In [16]:
# 4. Create a new DF: vine =='N'
n_df = helpful_votes_df.filter(helpful_votes_df['vine']=='N')
n_df.show()

+----------+-----------+-------------+----+--------------+-----------+-----------------+------------------------+
|product_id|total_votes|helpful_votes|vine|     review_id|star_rating|verified_purchase|helpful_votes_percantage|
+----------+-----------+-------------+----+--------------+-----------+-----------------+------------------------+
|B000CO9WE4|          1|            1|   N|R299F4SO98S5OO|          5|                Y|                     1.0|
|B000GKD5NI|          3|            2|   N|R2DA9DOT03UW6I|          5|                Y|      0.6666666666666666|
|B00KV15KRG|          2|            2|   N| R2JMKIC16MHD7|          5|                Y|                     1.0|
|B00GRV48TK|          2|            2|   N| R4WS0E0MSP9DH|          5|                Y|                     1.0|
|B00JH2DL4O|          3|            3|   N|R20S7DPGDWCM91|          4|                Y|                     1.0|
|B000BTO0CY|          2|            1|   N|R2PNF1QGR4SMTD|          5|                Y|

In [41]:
# 5. Counting for paid an unpaid
# 5.1 total reviews & number of 5 star reviews
total_reviews = y_df.count()
total_reviews

2088

In [46]:
five_star = y_df.filter(y_df.star_rating==5).count()
five_star

958

In [48]:
review_percantage = (five_star / total_reviews) * 100
review_percantage

45.88122605363984

In [49]:
# 5.2
total_n_reviews= n_df.count()
total_n_reviews

1011346

In [52]:
unpaid_five_star = n_df.filter(n_df.star_rating==5).count()
unpaid_five_star

605422

In [53]:
unpaid_percantage = (unpaid_five_star/total_n_reviews)*100
unpaid_percantage

59.862994464802355