In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com] [Connecting to ppa0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Ign:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:6 http://security.ubuntu.com/ubuntu bionic-security InRelease
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("VineReview").getOrCreate()

In [3]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://sgalik-challenge.s3.us-west-1.amazonaws.com/vine_table.csv"
spark.sparkContext.addFile(url)
vine_table_df = spark.read.csv(SparkFiles.get("vine_table.csv"), sep=",", header=True)
vine_table_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R135Q3VZ4DQN5N|          5|            0|          0|   N|                Y|
|R2N0QQ6R4T7YRY|          5|            0|          0|   N|                N|
|R3N5JE5Y4T6W5M|          5|            0|          0|   N|                Y|
|R2I150CX5IVY9Q|          5|            0|          0|   N|                Y|
|R1RM9ICOOA9MQ3|          5|            0|          0|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [4]:
vine_table_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)



In [5]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
schema = [StructField("review_id", StringType(), True),StructField("star_rating", IntegerType(), True), 
          StructField("helpful_votes", IntegerType(), True),StructField("total_votes", IntegerType(), True),
          StructField("vine", StringType(), True),StructField("verified_purchase", StringType(), True),]
final = StructType(fields=schema)
updatedvine_table_df = spark.read.csv(SparkFiles.get("vine_table.csv"), schema=final, sep=",", header=True)

In [6]:
updatedvine_table_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)



In [7]:
filtered_df = updatedvine_table_df.filter("total_votes>20")
filtered_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R16YC6SMGKA8SR|          5|           23|         23|   N|                Y|
| R5O4WE9UM60B0|          1|           29|         29|   N|                Y|
|R2EKRVCRC7U0IY|          1|           22|         24|   N|                Y|
|R2OKV47GETH0L7|          5|           24|         24|   N|                Y|
| R36LII9IITE17|          1|           22|         24|   N|                Y|
|R35VKNE16PFY0H|          5|           22|         24|   N|                N|
|R3TG008LHHZZIN|          5|          123|        125|   N|                Y|
| R63PV336NI5X7|          1|           27|         28|   N|                Y|
|R1R42WPHB5ZSWI|          5|           51|         52|   N|                Y|
| RXSS0QZJE1TEO|          1|           35|         35|   N|     

In [8]:
helpful_df = filtered_df.filter((filtered_df['helpful_votes']/filtered_df['total_votes'])*100>=50)
helpful_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R16YC6SMGKA8SR|          5|           23|         23|   N|                Y|
| R5O4WE9UM60B0|          1|           29|         29|   N|                Y|
|R2EKRVCRC7U0IY|          1|           22|         24|   N|                Y|
|R2OKV47GETH0L7|          5|           24|         24|   N|                Y|
| R36LII9IITE17|          1|           22|         24|   N|                Y|
|R35VKNE16PFY0H|          5|           22|         24|   N|                N|
|R3TG008LHHZZIN|          5|          123|        125|   N|                Y|
| R63PV336NI5X7|          1|           27|         28|   N|                Y|
|R1R42WPHB5ZSWI|          5|           51|         52|   N|                Y|
| RXSS0QZJE1TEO|          1|           35|         35|   N|     

In [9]:
paid_df = helpful_df.filter(helpful_df["vine"] == "Y")
paid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R15PXA9XU9ZKSU|          4|           29|         29|   Y|                N|
|R2LJ4QYQ4PYDEH|          5|           24|         26|   Y|                N|
|R2PJWWQZ37WCIC|          5|           95|         96|   Y|                N|
|R33G2VDKRN2USY|          5|           21|         21|   Y|                N|
|R13R4X37T2U50A|          4|           52|         57|   Y|                N|
|R1X1J2NAQ39DF6|          4|           29|         34|   Y|                N|
|R2UV5XHFNMPXOX|          5|           60|         61|   Y|                N|
|R31V08BCC9X3MF|          5|           18|         22|   Y|                N|
|R2G00QWTYW9N2G|          5|           86|         91|   Y|                N|
| RL8H3NKKP989J|          5|           30|         34|   Y|     

In [10]:
unpaid_df = helpful_df.filter(helpful_df["vine"] == "N")
unpaid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R16YC6SMGKA8SR|          5|           23|         23|   N|                Y|
| R5O4WE9UM60B0|          1|           29|         29|   N|                Y|
|R2EKRVCRC7U0IY|          1|           22|         24|   N|                Y|
|R2OKV47GETH0L7|          5|           24|         24|   N|                Y|
| R36LII9IITE17|          1|           22|         24|   N|                Y|
|R35VKNE16PFY0H|          5|           22|         24|   N|                N|
|R3TG008LHHZZIN|          5|          123|        125|   N|                Y|
| R63PV336NI5X7|          1|           27|         28|   N|                Y|
|R1R42WPHB5ZSWI|          5|           51|         52|   N|                Y|
| RXSS0QZJE1TEO|          1|           35|         35|   N|     

In [24]:
from pyspark.sql.functions import count


In [31]:
# Total number of vine reviews
vine_reviews=paid_df.count()
vine_reviews

20

In [32]:
# Total number of non-vine reviews
non_vine_revs = unpaid_df.count()
non_vine_revs

7067

In [35]:
# Vine reviews with 5 stars
vine_five = paid_df.filter('star_rating==5').count()
vine_five

10

In [36]:
# Non-Vine reviews with 5 stars
non_vine_five = unpaid_df.filter('star_rating==5').count()
non_vine_five

4076

In [38]:
# Percentage of Vine reviews with 5 stars
vine_five/vine_reviews*100

50.0

In [39]:
# Percentage of non=Vine reviews with 5 stars
non_vine_five/non_vine_revs*100

57.6765246922315