<a href="https://colab.research.google.com/github/mattdhill011/big-data-challenge/blob/main/watches_reviews.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.2'

os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:7 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NLPTokens").getOrCreate()

In [3]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType


In [4]:
from pyspark import SparkFiles

watches = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(watches)

watches_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), inferSchema=True, sep='\t', header=True, timestampFormat="yyyy-mm-dd")

In [5]:
watches_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...|2015-01-31 00:08:00|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...| 

In [6]:
count = watches_df.count()

print(f"There are {count} rows in the dataset")

There are 960872 rows in the dataset


In [32]:
review_id_table = watches_df.select(["review_id","customer_id","product_id","product_parent","review_date"])

review_id_table.printSchema()
review_id_table.show()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: timestamp (nullable = true)

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R3O9SGZBVQBV76|    3653882|B00FALQ1ZC|     937001370|2015-01-31 00:08:00|
| RKH8BNC3L5DLF|   14661224|B00D3RGO20|     484010722|2015-01-31 00:08:00|
|R2HLE8WKZSU3NL|   27324930|B00DKYC7TK|     361166390|2015-01-31 00:08:00|
|R31U3UH5AZ42LL|    7211452|B000EQS1JW|     958035625|2015-01-31 00:08:00|
|R2SV659OUJ945Y|   12733322|B00A6GFD7S|     765328221|2015-01-31 00:08:00|
| RA51CP8TR5A2L|    6576411|B00EYSOSE8|     230493695|2015-01-31 00:08:00|
| RB2Q7DLDN6TH6|   11811565|B00WM0QA3M|     549298279|2015-01-31 00:08:00|
|R2RHFJV0UYBK3Y|   49401

In [34]:
products = watches_df.select(["product_id", "product_title"]).distinct()
products.printSchema()
products.show()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00EVX7V1I|Game Time Women's...|
|B009S4DODY|XOXO Women's XO55...|
|B00LBKXQRW|Anne Klein Women'...|
|B0009P679Y|Invicta Men's 993...|
|B00DHF30RU|M&c Women's | Cla...|
|B00NIDA43Y|GuTe Classic Skel...|
|B008EQDDPQ|Nautica Men's N13...|
|B004VRBZ66|Timex Men's T2N63...|
|B009BEO81I|        Fossil Riley|
|B008B39MTI|XOXO Women's XO55...|
|B00TGPM8PU|Handmade Wooden W...|
|B00VNXQQQ0|Eterna 2520-41-64...|
|B00B1PV1C4|Nautica Men's N19...|
|B00N1Y8TQ4|Tissot Men's T095...|
|B00G6DBTY6|red line Men's RL...|
|B00HM04AYI|Columbia Men's Fi...|
|B00VI8HB96|GUESS I90176L1 Wo...|
|B00IT25WJU|LanTac DGN556B Dr...|
|B0106S12XE|Skmei S Shock Ana...|
|B00FPSJ63Y|Michael Kors Ladi...|
+----------+--------------------+
only showing top 20 rows



In [23]:
customers = watches_df.select(["customer_id", "customer_count"]).distinct()
customers.count()

719914

In [36]:
from pyspark.sql.functions import countDistinct
customers = watches_df.groupBy("customer_id").agg(countDistinct('review_id')).withColumnRenamed("count(review_id)", "customer_count")
customers.printSchema()
customers.show()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   12318815|             3|
|   13923687|             6|
|   45449174|             2|
|   26625961|             1|
|   44801986|             1|
|     608011|             1|
|   46704992|             1|
|   38713654|             1|
|   19282763|             1|
|   13717715|             2|
|   43304216|             1|
|   43235042|             2|
|    2727460|             1|
|   52908654|             1|
|   51202156|             1|
|   35293336|             1|
|   44267837|            14|
|    2630732|             1|
|   20495779|             6|
|   46839414|            11|
+-----------+--------------+
only showing top 20 rows



In [38]:
vine_table = watches_df.select(["review_id", "star_rating", "helpful_votes", "total_votes","vine"])
vine_table.printSchema()
vine_table.show()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R3O9SGZBVQBV76|          5|            0|          0|   N|
| RKH8BNC3L5DLF|          5|            0|          0|   N|
|R2HLE8WKZSU3NL|          2|            1|          1|   N|
|R31U3UH5AZ42LL|          5|            0|          0|   N|
|R2SV659OUJ945Y|          4|            0|          0|   N|
| RA51CP8TR5A2L|          5|            0|          0|   N|
| RB2Q7DLDN6TH6|          5|            1|          1|   N|
|R2RHFJV0UYBK3Y|          1|            1|          5|   N|
|R2Z6JOQ94LFHEP|          5|            1|          2|   N|
| RX27XIIWY5JPB|          4|            0|          0|   N|
|R1