In [1]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.0'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Fetched 252 kB in 2s (124 kB/s)
Reading package lists... Done


In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2020-11-10 03:38:49--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.4’


2020-11-10 03:38:51 (1.01 MB/s) - ‘postgresql-42.2.16.jar.4’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
# 2.2 Load the data from Deliverable 1
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Baby_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    9970739| R8EWA1OFT84NX|B00GSP5D94|     329991347|Summer Infant Swa...|            Baby|          5|            0|          0|   N|                Y|Great swaddled bl...|Loved these swadd...| 2015-08-31|
|         US|   23538442|R2JWY4YRQD4FOP|B00YYDDZGU|     646108902|Pacifier Clip Gir...|            Baby|          5|    

In [5]:
# 2.3 Recreate the vine_table DataFrame
# vine_df = df.select([])
vine_table = df.select(["review_id","star_rating","helpful_votes", "total_votes","vine", "verified_purchase"]).dropDuplicates(["review_id"])
# add drop duplicates to avoid errors when uploading to pgAdmin

vine_table.show(10)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R100GNJU76M4Q9|          4|            0|          0|   N|                Y|
|R100RNZZU8P949|          5|            2|          2|   N|                Y|
|R102LHBUMFLRSZ|          3|            5|          6|   N|                N|
|R102NVF0W1Y2Q1|          5|            5|          8|   N|                N|
|R1030RGR05XVMJ|          5|            0|          0|   N|                N|
|R1032V7632ZYV8|          5|            0|          0|   N|                Y|
|R1035LB8SN4J8C|          5|            0|          0|   N|                Y|
|R10459MPUEC8XH|          5|            0|          0|   N|                Y|
|R104RESTVELARK|          5|            5|          5|   N|                N|
|R105Z2BW17EOMD|          5|            0|          0|   N|     

In [6]:
# Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 
# to pick reviews that are more likely to be helpful and to avoid having division by zero errors later on
votes20plus = vine_table.filter((vine_table["total_votes"]>=20))
votes20plus.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R11IME54UJ0TV1|          1|           34|         42|   N|                Y|
|R11LANLE6U169Z|          5|           27|         27|   N|                N|
|R120ZYE4Z0LHI3|          5|           15|         20|   N|                Y|
|R12OMTI3YQVBSS|          5|           82|         87|   N|                N|
|R13BE7COD06LXV|          5|           51|         61|   N|                Y|
|R13PUDGVD1HF8E|          5|           30|         33|   N|                Y|
|R14D1LAC7GWP1N|          1|           34|         38|   N|                Y|
|R14I0JIEVLCI36|          2|           19|         24|   N|                N|
|R15P9EP044L69J|          1|            5|         33|   N|                Y|
|R16AX8GBS7H14M|          5|           26|         26|   N|     

In [7]:
# retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%# 
votes20plus_weight = votes20plus.filter(votes20plus["helpful_votes"]/votes20plus["total_votes"] >= 0.5)
votes20plus_weight.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R11IME54UJ0TV1|          1|           34|         42|   N|                Y|
|R11LANLE6U169Z|          5|           27|         27|   N|                N|
|R120ZYE4Z0LHI3|          5|           15|         20|   N|                Y|
|R12OMTI3YQVBSS|          5|           82|         87|   N|                N|
|R13BE7COD06LXV|          5|           51|         61|   N|                Y|
|R13PUDGVD1HF8E|          5|           30|         33|   N|                Y|
|R14D1LAC7GWP1N|          1|           34|         38|   N|                Y|
|R14I0JIEVLCI36|          2|           19|         24|   N|                N|
|R16AX8GBS7H14M|          5|           26|         26|   N|                Y|
|R1782ADZF7YXT4|          5|           79|         84|   N|     

In [8]:
from pyspark.sql.functions import col, avg

# Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'.
paid_votes = votes20plus_weight.filter(votes20plus_weight['vine']== 'Y')
paid_votes.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R26Y8YAR5AEQNV|          4|           21|         25|   Y|                N|
|R2PH29E7EQECTF|          4|           23|         25|   Y|                N|
| R2XU8C5Q33TM6|          5|           24|         34|   Y|                N|
|R3EIBNKTZXD4GJ|          3|           24|         25|   Y|                N|
|R1T36KTG6O5GT7|          5|           26|         30|   Y|                N|
|R1KVGQ4WN1PBX5|          3|           17|         20|   Y|                N|
| RT5GP7MX076JI|          4|          284|        292|   Y|                N|
| RT8AV0IB2PGHH|          4|          108|        132|   Y|                N|
|R1I943LCR9S4N1|          5|           21|         31|   Y|                N|
|R2L6QXN07FYVD2|          4|          384|        390|   Y|     

In [9]:
# see summary details of the paid data above
paid_votes.describe().show()

+-------+--------------+-----------------+-----------------+------------------+----+-----------------+
|summary|     review_id|      star_rating|    helpful_votes|       total_votes|vine|verified_purchase|
+-------+--------------+-----------------+-----------------+------------------+----+-----------------+
|  count|           463|              463|              463|               463| 463|              463|
|   mean|          null|4.058315334773218|65.91792656587474|  73.1792656587473|null|             null|
| stddev|          null|1.056238410588458|94.07596729061886|100.55045599693311|null|             null|
|    min|R1038HIHP60BGQ|                1|               11|                20|   Y|                N|
|    max| RZC721M0G7OAB|                5|              790|               937|   Y|                Y|
+-------+--------------+-----------------+-----------------+------------------+----+-----------------+



In [10]:
unpaid_votes = votes20plus_weight.filter(votes20plus_weight['vine']== 'N')
unpaid_votes.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R11IME54UJ0TV1|          1|           34|         42|   N|                Y|
|R11LANLE6U169Z|          5|           27|         27|   N|                N|
|R120ZYE4Z0LHI3|          5|           15|         20|   N|                Y|
|R12OMTI3YQVBSS|          5|           82|         87|   N|                N|
|R13BE7COD06LXV|          5|           51|         61|   N|                Y|
|R13PUDGVD1HF8E|          5|           30|         33|   N|                Y|
|R14D1LAC7GWP1N|          1|           34|         38|   N|                Y|
|R14I0JIEVLCI36|          2|           19|         24|   N|                N|
|R16AX8GBS7H14M|          5|           26|         26|   N|                Y|
|R1782ADZF7YXT4|          5|           79|         84|   N|     

In [11]:
# Details of the unpaid data above
unpaid_votes.describe().show()

+-------+--------------+------------------+------------------+------------------+-----+-----------------+
|summary|     review_id|       star_rating|     helpful_votes|       total_votes| vine|verified_purchase|
+-------+--------------+------------------+------------------+------------------+-----+-----------------+
|  count|         25094|             25094|             25094|             25094|25094|            25094|
|   mean|          null|  3.68203554634574| 57.67215270582609| 63.70235912967243| null|             null|
| stddev|          null|1.5492545817482586|106.55902159228367|111.31034786057249| null|             null|
|    min|R1005Z9N2TD883|                 1|                10|                20|    N|                N|
|    max| RZZX866D23PU5|                 5|              5245|              5471|    N|                Y|
+-------+--------------+------------------+------------------+------------------+-----+-----------------+



In [12]:
# Number of all paid reviews Vine reviews
paid_count = paid_votes.count()
# Number of 5-star paid reviews
paid_five_star = paid_votes[paid_votes['star_rating']== 5].count()
#  Percentage of five-star paid reviews among paid Vine reviews
percentage_five_star_paid = (paid_five_star) / (paid_count)

# Results
print('The total number of paid Vine reviews is:', paid_count)
print('The total number of paid five star Vine reviews is:', paid_five_star)
print('Percentage of paid 5-star Vine reviews is:', percentage_five_star_paid)

The total number of paid Vine reviews is: 463
The total number of paid five star Vine reviews is: 202
Percentage of paid 5-star Vine reviews is: 0.43628509719222464


In [13]:
# Number of all unpaid Vine reviews
unpaid_count = unpaid_votes.count()
# Number of all unpaid 5-star reviews
unpaid_five_star = unpaid_votes[unpaid_votes['star_rating']== 5].count()
#  Percentage of five-star reviews among unpaid Vine reviews
percentage_five_star_unpaid = (unpaid_five_star) / (unpaid_count)

# Results
print('The total number of unpaid Vine reviews are:', unpaid_count)
print('The total number of unpaid five star Vine reviews are:', unpaid_five_star)
print('Percentage of unpaid Vine 5-star reviews is:', percentage_five_star_unpaid)

The total number of unpaid Vine reviews are: 25094
The total number of unpaid five star Vine reviews are: 12033
Percentage of unpaid Vine 5-star reviews is: 0.4795170160197657
