In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InR

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("vinereview").getOrCreate()

In [3]:
# read in book review data from csv file
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Books_v1_01.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Books_v1_01.tsv.gz"), sep="\t", header=True, inferSchema=True)

In [4]:
# Create the vine_table. DataFrame
# vine_df = df.select([])
vine_df= df.select(['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase'])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28HBXXO1UEVJT|          5|            0|          0|   N|                N|
| RZKRFS2UUMFFU|          5|           15|         15|   N|                Y|
|R2WAU9MD9K6JQA|          3|            6|          8|   N|                N|
|R36SCTKYTVPZPC|          5|           10|         11|   N|                Y|
|R10BM6JUOJX27Q|          3|            0|          0|   Y|                N|
| RCLZ5OKZNUSY4|          5|            0|          0|   N|                Y|
|R1S65DJYEI89G4|          4|            8|         17|   N|                N|
|R3KQYBQOLYDETV|          4|            2|          2|   N|                N|
|R3QV8K7CSU8K2W|          5|            0|          0|   N|                N|
|R3W5A1WUGO5VQ0|          4|            0|          1|   N|     

In [5]:
#create a new dataframe where total_votes >= 20
greater_votes = vine_df.filter(vine_df.total_votes>=20)
greater_votes.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R18VIM840CEFRP|          1|           16|        105|   N|                N|
|R14PMAJTY0EAAT|          4|          135|        142|   N|                Y|
|R1363VA3TPNLVB|          5|          370|        388|   N|                Y|
| RBQZC5A3TSWT5|          5|           11|         22|   N|                Y|
| RW00TDPV9U93E|          1|           35|         76|   N|                N|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [6]:
#print schema to see data types before beginning next step of filtering
greater_votes.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)



In [7]:
# filter the df by dividing helpful_votes and total_votes and keeping any that are equal to or greater than 50% 

percent_df = greater_votes.filter((greater_votes['helpful_votes']/greater_votes['total_votes'])>=0.5)

percent_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R14PMAJTY0EAAT|          4|          135|        142|   N|                Y|
|R1363VA3TPNLVB|          5|          370|        388|   N|                Y|
| RBQZC5A3TSWT5|          5|           11|         22|   N|                Y|
|R3OW0AIVLEDIQ7|          3|           99|        121|   N|                N|
|R3DTESO4FUAPKQ|          5|           90|        102|   N|                N|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [8]:
#filter percent_df and create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'
paid_program = percent_df.filter(percent_df['vine'] == 'Y')
paid_program.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2Z8083V8ZFQVZ|          3|           82|         87|   Y|                N|
| RK25TAO4GGS5G|          3|           22|         24|   Y|                N|
|R2SJQG3C6KY0M8|          3|           18|         20|   Y|                N|
|R12CHC0CB2WASU|          5|           43|         48|   Y|                N|
|R2RY328TIDXMTE|          2|           50|         61|   Y|                N|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [9]:
#filter percent_df and create a new dataframe that retrieves all the rows where the review was not part of the Vine program (unpaid), vine == 'N'
unpaid_program = percent_df.filter(percent_df['vine'] == 'N')
unpaid_program.show(5)


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R14PMAJTY0EAAT|          4|          135|        142|   N|                Y|
|R1363VA3TPNLVB|          5|          370|        388|   N|                Y|
| RBQZC5A3TSWT5|          5|           11|         22|   N|                Y|
|R3OW0AIVLEDIQ7|          3|           99|        121|   N|                N|
|R3DTESO4FUAPKQ|          5|           90|        102|   N|                N|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [10]:
#determine total of votes for paid_program
total_reviews = paid_program.agg({'review_id' : 'count'}).show()


+----------------+
|count(review_id)|
+----------------+
|            4781|
+----------------+



In [11]:
# determine number of 5-star reviews for paid_program
fivestar = paid_program.groupBy(paid_program['star_rating'] == '5').count().show()

+-----------------+-----+
|(star_rating = 5)|count|
+-----------------+-----+
|             true| 1604|
|            false| 3177|
+-----------------+-----+



In [35]:
#determine the percentage of 5 star ratings in paid_program compared to total reviews
import pyspark.sql.functions as F

total = paid_program.count()

result = (paid_program.groupBy('star_rating')).count()\
      .withColumn('total',F.lit(total))\
      .withColumn('percentage',F.expr('count/total'))\
      .filter('percentage>0.1')
result.show()

+-----------+-----+-----+-------------------+
|star_rating|count|total|         percentage|
+-----------+-----+-----+-------------------+
|          3| 1174| 4781|0.24555532315415185|
|          5| 1604| 4781|0.33549466638778497|
|          4|  918| 4781|0.19201003974064004|
|          2|  707| 4781| 0.1478770131771596|
+-----------+-----+-----+-------------------+



In [36]:
#determine total of votes for unpaid_program
total_reviews = unpaid_program.agg({'review_id' : 'count'}).show()


+----------------+
|count(review_id)|
+----------------+
|          332395|
+----------------+



In [37]:
# determine number of 5-star reviews for unpaid_program
fivestar = unpaid_program.groupBy(paid_program['star_rating'] == '5').count().show()

+-----------------+------+
|(star_rating = 5)| count|
+-----------------+------+
|             true|168800|
|            false|163595|
+-----------------+------+



In [38]:
#determine the percentage of 5 star ratings in unpaid_program compared to total reviews
total = unpaid_program.count()

result = (unpaid_program.groupBy('star_rating')).count()\
      .withColumn('total',F.lit(total))\
      .withColumn('percentage',F.expr('count/total'))\
      .filter('percentage>0.1')
result.show()

+-----------+------+------+-------------------+
|star_rating| count| total|         percentage|
+-----------+------+------+-------------------+
|          1| 58017|332395|0.17454233667774785|
|          3| 34283|332395| 0.1031393372343146|
|          5|168800|332395| 0.5078295401555378|
|          4| 39668|332395| 0.1193399419365514|
+-----------+------+------+-------------------+

