In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Fetched 252 kB in 3s (89.7 kB/s)
Reading package l

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-06-08 23:20:43--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-06-08 23:20:44 (1.11 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://my-s3-bucket-sp.s3.amazonaws.com/vine_table.csv"
spark.sparkContext.addFile(url)
vine_table_df = spark.read.csv(SparkFiles.get("vine_table.csv"), header=True, inferSchema=True)

In [None]:
# Read in the vine_table.csv file as a DataFrame
vine_table_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| REAKC26P07MDN|          5|            0|          0|   N|                Y|
|R3NU7OMZ4HQIEG|          2|            0|          1|   N|                Y|
|R14QJW3XF8QO1P|          5|            0|          0|   N|                Y|
|R2HB7AX0394ZGY|          5|            0|          0|   N|                Y|
| RGKMPDQGSAHR3|          5|            0|          0|   N|                Y|
|R1DJCVPQGCV66E|          5|            0|          0|   N|                Y|
|R3V52EAWLPBFQG|          3|            0|          0|   N|                Y|
|R3DKO8J1J28QBI|          2|            0|          0|   N|                Y|
| R764DBXGRNECG|          5|            1|          1|   N|                N|
| RW1853GAT0Z9F|          5|            0|          0|   N|     

In [None]:
# 1. Filter the data and create a new DataFrame to retrieve all the rows where
# the total_votes count is equal to or greater than 20
clean_vine_df = vine_table_df.filter("total_votes >= 20")
clean_vine_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R21KC552Y6HL8X|1          |27           |31         |N   |Y                |
|RX9WC9FTIR1XR |5          |25           |25         |N   |Y                |
|RGDCOU1KBHMNG |3          |29           |31         |N   |Y                |
|RVTYWID2TPMMY |2          |35           |42         |N   |Y                |
|R2CMPZ5VESGRLY|4          |27           |28         |N   |Y                |
|R3VQPJZ54B55BA|5          |62           |64         |N   |N                |
|R24QM6D7FEDZ5M|2          |36           |43         |N   |Y                |
|R3A1VQ3SQDXEJ3|5          |20           |20         |N   |Y                |
|R39GSNW76GYF  |1          |20           |23         |N   |Y                |
|RPJLR6MFDPXXE |5          |35           |36         |N   |Y    

In [None]:
# 2. Filter the new DataFrame created in Step 1 and create a new DataFrame
# to retrieve all the rows where the number of helpful_votes divided by 
# total_votes is equal to or greater than 50%.
clean_vine_df = vine_table_df.filter("(helpful_votes/total_votes) >= 0.5")
clean_vine_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R764DBXGRNECG |5          |1            |1          |N   |N                |
|R1H7AVM81TAYRV|1          |2            |2          |N   |Y                |
|RMB8N0DBRH34O |5          |1            |1          |N   |Y                |
|RLR047V0C09SG |5          |1            |1          |N   |Y                |
|R2D06CYY6KZSJ0|2          |16           |19         |N   |Y                |
|R2P2G1XUTS2UI1|5          |8            |8          |N   |Y                |
|R2E9I9L1DXK7U8|5          |10           |10         |N   |Y                |
|ROPI31U4NHTB  |2          |1            |1          |N   |Y                |
|R2EDA9TQWM5LOW|5          |1            |1          |N   |Y                |
|RBJ69AC0G5A5R |5          |1            |1          |N   |Y    

In [None]:
# 3. Filter the DataFrame created in Step 2, 
# and create a new DataFrame that retrieves all the rows 
# where a review was written as part of the Vine program (paid), vine == 'Y'.
vine_reviews_df = vine_table_df.filter('vine == "Y"')
vine_reviews_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1CBOJMJAYL75C|5          |0            |0          |Y   |N                |
|R37IHP001XZVR |5          |0            |1          |Y   |N                |
|R175KT8QHRRK2G|4          |0            |1          |Y   |N                |
|RNWWD2B3X0CU2 |5          |0            |0          |Y   |N                |
|R2FDITZFABSUN8|3          |0            |1          |Y   |N                |
|R9WD02GLN4A35 |5          |7            |8          |Y   |N                |
|RMGB3SAPBORMY |5          |0            |0          |Y   |N                |
|R3V4XNC741N9XW|4          |0            |0          |Y   |N                |
|RS4W1GEFFRXTX |5          |0            |0          |Y   |N                |
|RS3X4WLXNYT6B |5          |0            |0          |Y   |N    

In [None]:
# 4. Retrieve all the rows where the review was not part of the Vine 
# program (unpaid), vine == 'N'.
non_vine_df = vine_table_df.filter('vine == "N"')
non_vine_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|REAKC26P07MDN |5          |0            |0          |N   |Y                |
|R3NU7OMZ4HQIEG|2          |0            |1          |N   |Y                |
|R14QJW3XF8QO1P|5          |0            |0          |N   |Y                |
|R2HB7AX0394ZGY|5          |0            |0          |N   |Y                |
|RGKMPDQGSAHR3 |5          |0            |0          |N   |Y                |
|R1DJCVPQGCV66E|5          |0            |0          |N   |Y                |
|R3V52EAWLPBFQG|3          |0            |0          |N   |Y                |
|R3DKO8J1J28QBI|2          |0            |0          |N   |Y                |
|R764DBXGRNECG |5          |1            |1          |N   |N                |
|RW1853GAT0Z9F |5          |0            |0          |N   |Y    

In [None]:
# 5a. Determine the total number of reviews for paid vs. unpaid
v_count = vine_reviews_df.count()
v_count

10215

In [None]:
nv_count = non_vine_df.count()
nv_count

2633399

In [None]:
# 5b. Determine the number of 5-star reviews for paid vs. unpaid
best_vine_df = vine_reviews_df.filter(vine_reviews_df.star_rating == 5)
best_vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1CBOJMJAYL75C|          5|            0|          0|   Y|                N|
| R37IHP001XZVR|          5|            0|          1|   Y|                N|
| RNWWD2B3X0CU2|          5|            0|          0|   Y|                N|
| R9WD02GLN4A35|          5|            7|          8|   Y|                N|
| RMGB3SAPBORMY|          5|            0|          0|   Y|                N|
| RS4W1GEFFRXTX|          5|            0|          0|   Y|                N|
| RS3X4WLXNYT6B|          5|            0|          0|   Y|                N|
|R2VT6HPTNONELB|          5|            0|          0|   Y|                N|
|R3RO9RPS35AE1Z|          5|            0|          1|   Y|                N|
| RMGKA8XODWN6D|          5|            0|          0|   Y|     

In [None]:
best_v = best_vine_df.count()
best_v

4343

In [None]:
best_non_vine_df = non_vine_df.filter(non_vine_df.star_rating == 5)
best_non_vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| REAKC26P07MDN|          5|            0|          0|   N|                Y|
|R14QJW3XF8QO1P|          5|            0|          0|   N|                Y|
|R2HB7AX0394ZGY|          5|            0|          0|   N|                Y|
| RGKMPDQGSAHR3|          5|            0|          0|   N|                Y|
|R1DJCVPQGCV66E|          5|            0|          0|   N|                Y|
| R764DBXGRNECG|          5|            1|          1|   N|                N|
| RW1853GAT0Z9F|          5|            0|          0|   N|                Y|
|R2ZOYAQZNNZZWV|          5|            0|          0|   N|                Y|
| RJB41Q575XNG4|          5|            0|          3|   N|                Y|
|R28W8BM1587CPF|          5|            0|          0|   N|     

In [None]:
best_n = best_non_vine_df.count()
best_n 

1641210

In [None]:
# Total 5-star reviews
total_5star = v + n 

In [None]:
# 5c. Determine the percentage of 5-star reviews for the two types of review (paid vs unpaid)
percent_vine = (v/total_5star) * 100
percent_vine 

0.2639234348574613

In [None]:
percent_non_vine = (n/total_5star) * 100
percent_non_vine 

99.73607656514254

In [None]:
v_5star_percentage = (best_v/v_count) * 100 
v_5star_percentage

42.51590797846305

In [None]:
nv_5star_percentage = (best_n/nv_count) * 100 
nv_5star_percentage

62.32287625232637