In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.1'
jdk_version = '11'
hadoop_version = 'hadoop2.7'

# Set Environment Variables
os.environ['SPARK_VERSION'] = spark_version
os.environ['JDK_VERSION'] = jdk_version
os.environ['HADOOP_VERSION'] = hadoop_version
os.environ['JAVA_HOME'] = f'/usr/lib/jvm/java-11-openjdk-amd64'
os.environ['SPARK_HOME'] = f'/content/{spark_version}-bin-{hadoop_version}'

# Update modules and install Spark and Java
!apt-get update
!apt-get install openjdk-$JDK_VERSION-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-$HADOOP_VERSION.tgz
!tar xf $SPARK_VERSION-bin-$HADOOP_VERSION.tgz
!pip install -q findspark

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:13 htt

In [None]:
postgresql_java_version = '42.2.9'
os.environ['POSTGRESQL_JAVA_VERSION'] = postgresql_java_version
!wget https://jdbc.postgresql.org/download/postgresql-$POSTGRESQL_JAVA_VERSION.jar

--2021-11-23 01:13:39--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2021-11-23 01:13:40 (5.90 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Level-One-Starter-Code").config(
    "spark.driver.extraClassPath",
    f"/content/postgresql-42.2.9.jar"
  ).getOrCreate()

# Load Amazon Data into Spark DataFrame

In [None]:
from pyspark import SparkFiles

url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Sports_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
# Read and show the data
sports_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Sports_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True, timestampFormat="mm/dd/yy")

# Show DataFrame
sports_df.show(truncate=False)

+-----------+-----------+--------------+----------+--------------+--------------------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|marketplace|customer_id|review_id     |product_

In [None]:
# What is the size of Data?
sports_df.select("customer_id").count()

4850360

In [None]:
sports_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



# Cleaned up DataFrames to match tables

In [109]:
from pyspark.sql.functions import *
# Review DataFrame
sports_df = sports_df.withColumn("review_date",to_date(col("review_date"),"yyyy-MM-dd"))
sports_df.cache()
sports_df = sports_df.withColumn("star_rating",col("star_rating").cast("int"))
sports_df.cache()
sports_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   48945260|R1WBPB8MDCCN8F|B012P7UPSM|     409940130|Chicago Blackhawk...|          Sports|          5|            0|          0|   N|                N|   LOVE IT. 6 stars!|Bought this last ...| 2015-08-31|
|         US|    5782091|R32M0YEWV77XG8|B001GQ3VHG|     657746679|Copag Poker Size ...|          Sports|          5|    

In [None]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

schema = [StructField("marketplace", StringType(), True),
          StructField("customer_id", IntegerType(), True),
          StructField("review_id", StringType(), True),
          StructField("product_id", StringType(), True),
          StructField("product_parent", IntegerType(), True),
          StructField("product_title", StringType(), True),
          StructField("product_category", StringType(), True),
          StructField("star_rating", IntegerType(), True),
          StructField("helpful_votes", IntegerType(), True),
          StructField("total_votes", IntegerType(), True),
          StructField("vine", StringType(), True),
          StructField("verified_purchase", StringType(), True),
          StructField("review_headline", StringType(), True),
          StructField("review_body", StringType(), True),
          StructField("review_date", DateType(), True),]

final=StructType(fields=schema)

In [None]:
review_id_table_df = sports_df.select("review_id", "customer_id", "product_id", "product_parent", "review_date")
review_id_table_df.show(truncate=False)

+--------------+-----------+----------+--------------+-----------+
|review_id     |customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1WBPB8MDCCN8F|48945260   |B012P7UPSM|409940130     |2015-08-31 |
|R32M0YEWV77XG8|5782091    |B001GQ3VHG|657746679     |2015-08-31 |
|RR8V7WR27NXJ5 |45813853   |B008VS8M58|962442336     |2015-08-31 |
|R1MHO5V9Z932AY|1593730    |B005F06F4U|74305227      |2015-08-31 |
|R16PD71086BD2V|29605511   |B010T4IE2C|787185588     |2015-08-31 |
|R1Z8IFGWTRWXT6|11112959   |B004RKJGLS|94127483      |2015-08-31 |
|R3AUMSHAW73HWN|108031     |B005V3DCBU|526977496     |2015-08-31 |
|R2KWDWFOHGX6FL|13981540   |B00MHT9WN8|26521265      |2015-08-31 |
|R3H9543FWBWFBU|37993909   |B001CSIRQ8|652431165     |2015-08-31 |
|RUANXOQ9W3OU5 |26040213   |B001KZ3NOO|635861713     |2015-08-31 |
|R31673RTGEZSW7|34657602   |B00005RCQS|72099763      |2015-08-31 |
|R22OQLFSH42RCM|14346192   |B00FA7RWVI|757354022     |2015-08-

In [None]:
review_id_table_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [None]:
from pyspark.sql import Row
products_table_df = sports_df.dropDuplicates((["product_id"])).select("product_id", "product_title")

products_table_df.show(truncate=False)

+----------+--------------------------------------------------------------------------------------------+
|product_id|product_title                                                                               |
+----------+--------------------------------------------------------------------------------------------+
|1570340439|Compass & Map Navigator                                                                     |
|9879000633|Swordmaster - Universal Fit Sword Frog Belt Strap for Link's Master Zelda Sword BK Brand New|
|B00004RAN1|24x7x7 Pro Cage Trap                                                                        |
|B00005OU7P|Putt and Return Practice Green                                                              |
|B00005V3EN|Airzone Trampoline Cover                                                                    |
|B00008Z7M2|Koolatron 401619 Mini Bike                                                                  |
|B0000AAEMO|Seahawks Franklin Big Kids NFL Uni

In [None]:
products_table_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [None]:
customers_table_df = sports_df.select("customer_id").groupBy("customer_id").count().withColumnRenamed("count","customer_count")
customers_table_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   35669025|             1|
|   48198949|            30|
|   43355824|             6|
|   33014261|             6|
|   23493243|             1|
|   30717305|             4|
|   15714077|             2|
|    7854719|             1|
|   12761428|             2|
|   14127895|             1|
|   51451778|             4|
|    3452965|             1|
|   40430762|             2|
|   27314089|            15|
|   11405991|             1|
|   20368048|             2|
|   33967841|             2|
|   10418855|             1|
|   43783459|             1|
|   50843047|             4|
+-----------+--------------+
only showing top 20 rows



In [None]:
customers_table_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)



In [110]:
vine_table_df = sports_df.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine")
vine_table_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+
|review_id     |star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1WBPB8MDCCN8F|5          |0            |0          |N   |
|R32M0YEWV77XG8|5          |1            |1          |N   |
|RR8V7WR27NXJ5 |1          |0            |0          |N   |
|R1MHO5V9Z932AY|5          |0            |0          |N   |
|R16PD71086BD2V|5          |0            |1          |N   |
|R1Z8IFGWTRWXT6|3          |0            |0          |N   |
|R3AUMSHAW73HWN|4          |2            |3          |N   |
|R2KWDWFOHGX6FL|5          |0            |0          |N   |
|R3H9543FWBWFBU|5          |1            |1          |N   |
|RUANXOQ9W3OU5 |5          |0            |0          |N   |
|R31673RTGEZSW7|5          |2            |2          |N   |
|R22OQLFSH42RCM|5          |1            |1          |N   |
|R12LEL4F3TSZUJ|5          |2            |2          |N   |
|R2L9XWD03072NI|5          |1           

In [111]:
vine_table_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



# Push to AWS RDS instance

In [None]:
mode = "append"
jdbc_url = "jdbc:postgresql://big-data-1.cluyaga9jyra.us-east-2.rds.amazonaws.com:5432/big_data_1"
config = {
    "user" : "{username}", 
    "password" : "{password}",
    "driver" : "org.postgresql.Driver"
}

In [None]:
# Write review_id_df to table in RDS
review_id_table_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [None]:
# Write products_df to table in RDS
products_table_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [None]:
# Write customers_df to table in RDS
customers_table_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [112]:
# Write vine_df to table in RDS
vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)