<a href="https://colab.research.google.com/github/jared-roussel/big-data-challenge/blob/main/level-1/amazon_music_reviews.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [15]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.3'
#spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

!pyspark --packages org.postgresql:postgresql:42.2.10

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:7 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 http://ppa.launchpad.net/graph

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkDates").getOrCreate()

In [3]:
# Load in data
from pyspark import SparkFiles
music_url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"
spark.sparkContext.addFile(music_url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Music_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True, timestampFormat="yyyy/MM/dd HH:mm:ss")
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   10140119|R3LI5TRP3YIDQL|B00TXH4OLC|     384427924|Whatever's for Us...|           Music|          5|            0|          0|   N|                Y|          Five Stars|Love this CD alon...| 2015-08-31|
|         US|   27664622|R3LGC3EKEG84PX|B00B6QXN6U|     831769051|Same Trailer Diff...|           Music|          5|    

In [4]:
# Show schema to confirm date type
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [5]:
#Remove duplicates and null values before importing into postgres
df = df.drop_duplicates()
df = df.dropna()
#df

In [9]:
#Make data fit schema
music_review_id = df.select(["review_id","customer_id","product_id","product_parent","review_date"])
music_review_id.show(10)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1005BINTGKCVE|   18789304|B000002J6H|     480387571| 2013-01-29|
|R10067SN6FSII1|   32818060|B00005BCY9|     926227231| 2003-01-19|
|R100NGDZ6CAL8E|   32318800|B0000AB13N|     711057433| 2003-08-04|
|R100OLMAO039IL|   50673975|B000002SZA|     632006013| 2009-09-10|
| R100OMDUXE5VZ|   52503259|B000056KSU|     155726790| 2003-02-08|
|R100RJOC7TCXN6|   47602845|B005G492CC|     819279192| 2014-06-03|
|R10182KW3O6486|   52776942|B000031WEY|     719850793| 2004-04-23|
|R101B8W76YQILD|   12264663|B000BM7YYW|     364079538| 2006-03-10|
|R101K97ELZLSDK|   51870288|B00006NSDJ|     319548298| 2003-07-28|
|R101MMM4IXONBO|   43769541|B00C4XYEFS|     319009128| 2014-03-27|
+--------------+-----------+----------+--------------+-----------+
only showing top 10 rows



In [7]:
music_products = df.select(["product_id","product_title"])
music_products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B000002J6H|      Stephen Stills|
|B00005BCY9|      Do You Love Me|
|B0000AB13N|        Project 1950|
|B000002SZA|My Christmas Favo...|
|B000056KSU|Os Sonhos Mas Lindos|
|B005G492CC|            The Help|
|B000031WEY|Bad Channels - So...|
|B000BM7YYW|       Walk the Line|
|B00006NSDJ|    Oh So Many Years|
|B00C4XYEFS|Bennett & Brubeck...|
|B000002WR7|If Every Day Was ...|
|B000002GNQ|          Wavelength|
|B000KEGH7Q|Bye-Bye Cigarette...|
|B0002VEU7Q|Diplomatic Immuni...|
|B00005M989|The Very Best of ...|
|B00F2HW20M|Genesis Revisited...|
|B00LH98PEG|      Ragged & Dirty|
|B000CQQHGI|The Definitive Co...|
|B00000AW34|         Full Circle|
|B001C0CJLU|Classic Album Col...|
+----------+--------------------+
only showing top 20 rows



In [10]:
music_customers = df.select(["product_id","product_title"])
music_customers.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B000002J6H|      Stephen Stills|
|B00005BCY9|      Do You Love Me|
|B0000AB13N|        Project 1950|
|B000002SZA|My Christmas Favo...|
|B000056KSU|Os Sonhos Mas Lindos|
|B005G492CC|            The Help|
|B000031WEY|Bad Channels - So...|
|B000BM7YYW|       Walk the Line|
|B00006NSDJ|    Oh So Many Years|
|B00C4XYEFS|Bennett & Brubeck...|
|B000002WR7|If Every Day Was ...|
|B000002GNQ|          Wavelength|
|B000KEGH7Q|Bye-Bye Cigarette...|
|B0002VEU7Q|Diplomatic Immuni...|
|B00005M989|The Very Best of ...|
|B00F2HW20M|Genesis Revisited...|
|B00LH98PEG|      Ragged & Dirty|
|B000CQQHGI|The Definitive Co...|
|B00000AW34|         Full Circle|
|B001C0CJLU|Classic Album Col...|
+----------+--------------------+
only showing top 20 rows



In [8]:
# Import date time functions
from pyspark.sql.functions import year

# Show the year for the date column
#df.select(year(df["date"])).show()

In [11]:
music_vine_table = df.select(["review_id","star_rating","helpful_votes", "total_votes", "vine" ])
music_vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1005BINTGKCVE|          5|            0|          0|   N|
|R10067SN6FSII1|          4|            0|          0|   N|
|R100NGDZ6CAL8E|          1|            2|          2|   N|
|R100OLMAO039IL|          3|            2|          2|   N|
| R100OMDUXE5VZ|          4|            0|          0|   N|
|R100RJOC7TCXN6|          5|            2|          2|   N|
|R10182KW3O6486|          4|            5|          6|   N|
|R101B8W76YQILD|          4|            5|          5|   N|
|R101K97ELZLSDK|          4|            7|          9|   N|
|R101MMM4IXONBO|          5|            0|          0|   N|
|R101SUFE1N98DO|          5|            3|          6|   N|
|R101Y7O5QCTMH4|          4|            5|          7|   N|
|R1020J9O3H1F52|          5|            0|          0|   N|
|R1024GZRHXWW2F|          1|            

In [12]:
#Create connection to postgres
mode="append"
jdbc_url = "jdbc:postgresql://<>:5432/<>"
config = {"user":"<>",
          "password": "<>",
          "driver":"org.postgresql.Driver"}

In [16]:
music_vine_table.write.jdbc(url = jdbc_url, table = 'music_vine_table', mode = mode, properties = config)