<a href="https://colab.research.google.com/github/jnjorstad/Air-Quality-and-Lung-Cancer/blob/main/Furniture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os

# find the latest version of spark 3.0
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION'] = spark_version

# install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# start a sparksession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:2 http://security.ubuntu.com/ubuntu bionic-security InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:10 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Reading package lists... Done


In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-08-12 03:51:56--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-08-12 03:51:58 (1.25 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.drive.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Read in data from S3 buckets
from pyspark import SparkFiles

url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Furniture_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

furniture_df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Furniture_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat = "mm/dd/yy")

# show DataFrame
furniture_df.show()


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   24509695|R3VR960AHLFKDV|B004HB5E0E|     488241329|Shoal Creek Compu...|       Furniture|          4|            0|          0|   N|                Y|... desk is very ...|This desk is very...| 2015-08-31|
|         US|   34731776|R16LGVMFKIUT0G|B0042TNMMS|     205864445|Dorel Home Produc...|       Furniture|          5|    

In [5]:
# drop duplicates and incomplete rows

print(furniture_df.count())
furniture_df = furniture_df.dropna()
print(furniture_df.count())
furniture_df = furniture_df.dropDuplicates()
print(furniture_df.count())

792113
791971
791971


In [6]:
# examine the schema
furniture_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [7]:
# load in a sql function to use columns
from pyspark.sql.functions import col

In [8]:
# create DataFrame to match review_id_table
furniture_review_df = furniture_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
furniture_review_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R102J678JN8OQL|   46476285|B008OS1WGQ|     991978353| 2013-07-05|
|R102WTX7GH5XZJ|   28589116|B00DGEPIPO|     241314063| 2013-12-12|
|R105DQG52MW5S9|   51878818|B004Y6G7QC|     216229139| 2012-10-08|
|R107GJAMGANOJA|   52387168|B003QVDHRS|     957908300| 2014-11-23|
|R10A1UPXAUZ8W3|   36143353|B000WWAR22|     938792177| 2011-08-29|
|R10BV8UMGA6599|    9740769|B000KK7Y3Q|     478396916| 2015-08-12|
|R10E2K84D6CKQ1|   43778765|B00EZ5TB4W|     848089547| 2015-01-11|
|R10EJFWP9F3C42|   11616909|B005CR11A4|     643469497| 2013-04-26|
|R10F4GS24G0HBE|    3269876|B008OTSIY4|     981686694| 2015-04-01|
|R10GED5OITNO3Z|   15345890|B005O9Z1RE|     399691007| 2013-08-17|
|R10HVEMZ8UJ5WJ|   16456606|B001E95R3Q|     560496074| 2013-07-19|
|R10JQGEMK8Q2SM|   45301622|B00EBW4NOM|     111879870| 2015-06

In [9]:
#create DataFrame to match products table
furniture_products_df = furniture_df.select(["product_id", "product_title"])
furniture_products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B008OS1WGQ|Prepac Series 9 D...|
|B00DGEPIPO|8.7' x 12' Flowin...|
|B004Y6G7QC|Coaster Furniture...|
|B003QVDHRS|Track Ceiling Pot...|
|B000WWAR22|Spider Web Key Ho...|
|B000KK7Y3Q|1 X Unique 72" Hi...|
|B00EZ5TB4W|Sleep Innovations...|
|B005CR11A4|Skid-resistant Ca...|
|B008OTSIY4|Flash Furniture H...|
|B005O9Z1RE|Designer Modern L...|
|B001E95R3Q|Winsome Wood Sadd...|
|B00EBW4NOM|Wellington Tray T...|
|B007J1V3OK|Safavieh American...|
|B00NTY83GM|RiverRidge Home E...|
|B004GZWPCQ|Crosley Furniture...|
|B002WGJHB8|Acme Furniture Na...|
|B000VAZ4AU|Rubbermaid 4200-8...|
|B001EBM2QY|Home Accent White...|
|B007XPD8OK|Achim Capri 3 Pie...|
|B00RLCEZKG|ECVISION Classic ...|
+----------+--------------------+
only showing top 20 rows



In [10]:
#create DataFrame to match customer table
furniture_customers_df = furniture_df.groupBy('customer_id').agg({"customer_id": "count"})
furniture_customers_df = furniture_customers_df.withColumnRenamed("count(customer_id)", "customer_count")
#furniture_customers_df = furniture_df.select(["customer_id", "customer_count"])
furniture_customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   50099399|             3|
|   14866125|             1|
|   19225480|             1|
|    5901466|             1|
|   39976700|             2|
|   37762729|             1|
|   50579579|             1|
|    5937990|             2|
|   45831934|             1|
|   10134614|             1|
|   31068424|             1|
|   27205779|             1|
|   19671756|             4|
|   14913459|             1|
|   46395822|             1|
|   12866376|             4|
|   23540829|             2|
|     693435|             1|
|   49430592|             2|
|   12949009|             4|
+-----------+--------------+
only showing top 20 rows



In [11]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://endpoint:5432/big_data_hw_2"
config = {"user":"postgres", 
          "password": "password", 
          "driver":"org.postgresql.Driver"}

In [13]:
 # Write DataFrame to review_id table in RDS

furniture_review_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties= config)

Py4JJavaError: ignored

In [14]:
 # Write dataframe to products table in RDS

furniture_products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

Py4JJavaError: ignored

In [15]:
 # Write dataframe to customers table in RDS

furniture_customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

Py4JJavaError: ignored