In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install pyspark
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [1 0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Co0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg 

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-01-27 00:50:13--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-01-27 00:50:14 (4.49 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AmazonReviewAnalysis").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
watch_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True, timestampFormat="yyyy-mm-dd")
# Show DataFrame
watch_data_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...|2015-01-31 00:08:00|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...| 

In [5]:
#Count the number of records (rows) in the dataset.
print(watch_data_df.count())
watch_data_df=watch_data_df.dropDuplicates()
watch_data_df=watch_data_df.dropna()
print(watch_data_df.count())

960872
960679


In [12]:
# Load in a sql function to use columns
from pyspark.sql.functions import col,desc

**Vine Table**

In [7]:
vine_table_df=watch_data_df.select(['review_id','star_rating','helpful_votes','total_votes','vine'])
print(vine_table_df.count())
print(vine_table_df.distinct().count())
vine_table_df.show()

960679
960679
+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1004K426LTDKN|          2|            0|          1|   N|
|R1006QGKXQ3Q96|          5|            0|          0|   N|
|R1017P0ZGRXSR0|          5|            0|          0|   N|
|R103HXXDQD9W4Y|          5|            0|          0|   N|
|R10472PWW2S2AW|          4|            0|          0|   N|
|R104EYLG2ZXXJ1|          1|            0|          0|   N|
|R105KF16VGOILR|          4|            1|          1|   N|
|R106A2ETV5NZNE|          5|            0|          0|   N|
|R106UPPPFIIX3P|          5|            0|          0|   N|
|R107IKXXNUAZZC|          5|            0|          0|   N|
|R1081CHIZX3PZW|          4|            1|          1|   N|
|R1092B4NE0WNDD|          4|            0|          0|   N|
|R109WVVOK26661|          1|            0|          0|   N|
|R10AQ92VE5HTJ6|          

**Analysis**

In [13]:
vine_table_df.orderBy(desc('total_votes')).show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| R98ZYLB84KR5I|          5|         4004|       4249|   N|
|R2HXVIKJY27SHC|          5|         2591|       2887|   N|
|R3PA20VZXRWHTJ|          1|         1372|       1498|   N|
| RLS6K44P0V87V|          4|         1288|       1425|   N|
|R22O940WOHPL5H|          5|         1019|       1135|   N|
|R1CCAPD7CHQ6V7|          1|          753|        901|   N|
|R3I22M1QEOXAAZ|          5|          748|        895|   N|
|R1EW24Z16XE5BZ|          5|          759|        878|   N|
|R30LES7PE4O9LM|          5|          803|        820|   N|
|R3V539242QE3L3|          5|          738|        782|   N|
|R3OWSMRC33P0FM|          5|          635|        754|   N|
|R3K634QWBXXOAZ|          5|          706|        725|   N|
|R264317PFNOCXI|          5|          671|        700|   N|
|R1ZE2ANR28UKLR|          3|          56

In [59]:
vine_table_df.groupby('vine').count().show()

+----+------+
|vine| count|
+----+------+
|   Y|  1747|
|   N|958932|
+----+------+



**Set Total number of votes >=10**

In [51]:
vine_table_df.filter('total_votes >=10').count()

27722

In [53]:
VoteCount=vine_table_df.filter('total_votes >=10')

**Splitting Vine(paid) and Non Vine(unpaid reviews)**

**Total Number of paid reviews**

In [70]:
Vine_df=VoteCount.filter("vine =='Y'")
Total_vine_reviews=Vine_df.count()

**Total number of unpaid reviews**

In [71]:
NonVine_df=VoteCount.filter("vine =='N'")
Total_NonVine_reviews=NonVine_df.count()

**Number of 5 star Vine reviews and non-Vine reviews**

In [62]:
Vine_rating=Vine_df.filter("star_rating == 5").count()

In [63]:
NonVine_rating=NonVine_df.filter("star_rating == 5").count()

**Total number of 5 star reviews**

In [64]:
Total_rating=VoteCount.filter("star_rating == 5").count()

**Percentage calculation**

In [72]:
#Vine Reviews
Vine_perc=round((Vine_rating/Total_vine_reviews)*100,2)
Vine_perc

33.85

In [73]:
# Non-Vine Reviews
NonVine_perc=round((NonVine_rating/Total_NonVine_reviews)*100,2)
NonVine_perc

45.98

**Summary of Analysis**

Total Number of paid reviews is 1747 and unpaid reviews is 958932.Out of that only the data with total votes >=10 is being analyzed.So now the total Number of paid reviews is 130 and unpaid reviews is 27592.
Percentage of paid reviews with 5 star rating is 33.85 ,whereas for nonvine reviews is 45.98.As per this analysis it can be assumed that the Vine reviews are not biased.



# New Section