<a href="https://colab.research.google.com/github/tlenzmeier58/big_data_challenge/blob/main/working2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Gather dependencies
import os
import pandas as pd
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.3'

os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
                                                                               Hit:5 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
                                                                               Hit:6 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
0% [Waiting for headers] [Connected to cloud.r-project.org (13.227.

In [2]:
# Install the PostgreSQL driver in our Colab environment
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-11-29 15:42:45--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-11-29 15:42:46 (1.67 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
# Import modules
from pyspark import SparkFiles

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
from posixpath import sep
#Load data
base_url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/"
file = "amazon_reviews_us_Outdoors_v1_00.tsv.gz"
url = base_url + file
spark.sparkContext.addFile(url)

# Initial DF
df = spark.read.option('header', 'true').csv(SparkFiles.get(file),
                                             inferSchema=True,
                                             sep='\t',
                                             timestampFormat = "yyyy-mm-dd")
df.show(truncate=True)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   18446823|R35T75OLUGHL5C|B000NV6H94|     110804376|Stearns Youth Boa...|        Outdoors|          4|            0|          0|   N|                Y|          Four Stars|          GOOD VALUE|2015-01-31 00:08:00|
|         US|   13724367|R2BV735O46BN33|B000IN0W3Y|     624096774|Primal Wear Men's...| 

In [5]:
# Cleaning time . . .
df = df.dropna()
df = df.dropDuplicates()

In [6]:
# Row count
df.count()

2302174

In [7]:
#Schema
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [None]:
 # IMPORTANT: Replace each of these parameters with your own values for your AWS RDS instance
my_aws_endpoint = 'big-data-challenge.ch9chhy09aqn.us-west-2.rds.amazonaws.com' # This is my value; please replace with your own
my_aws_port_number = '5432' # Your value is likely the same, but please double check
my_aws_database_name = 'big_data_etl' # This is my value; please replace with your own
my_aws_username = 'postgres' # Your value is likely the same, but please double check
my_aws_password = 'XXXXXXXX' # This is my value; please replace with your own

 # Define the connection string
jdbc_url=f'jdbc:postgresql://{my_aws_endpoint}:{my_aws_port_number}/{my_aws_database_name}'

In [None]:
 # Define the connection string
jdbc_url=f'jdbc:postgresql://{my_aws_endpoint}:{my_aws_port_number}/{my_aws_database_name}'

# Set up the configuration parameters
config = {"user": f'{my_aws_username}', 
          "password": f'{my_aws_password}', 
          "driver":"org.postgresql.Driver"}


mode = 'overwrite' 

In [8]:
# Outdoors df
outdoors = df.select(['product_id', 'product_parent', 'product_title'])
outdoors.show(truncate=True)



# Push to PostgreSQL
outdoors.write.jdbc(url = jdbc_url,
                   table = 'outdoors',
                  mode = mode,
                 properties = config)

+----------+--------------+--------------------+
|product_id|product_parent|       product_title|
+----------+--------------+--------------------+
|B000IZX0SG|     373520218|Fox River Four La...|
|B00NCJPXEO|      80630966|Saris Cycle Racks...|
|B001O5M1CA|     139791691|Coleman Road Trip...|
|B00A2C8BXO|     949028516|Hydro Flask Doubl...|
|B000NPCSU2|     435965836|gearup OakRak Ext...|
|B000IZGF42|     100719823|Pedro's Toothbrus...|
|B009R9FUI2|     232637943|Kelty Noah's Tarp...|
|B0000AUSFB|     145114599|Coghlan's Water F...|
|B00LA97I8M|     165160843|Survivor Filter -...|
|B000K00O1A|     267773760|Razor Electric Sc...|
|B004X55NEG|     150248498|Hydro Flask Insul...|
|B001GSONUI|     586388708|Lazer Helmet Pod,...|
|B002HMD8YY|     854049810|Oakley O-Frame MX...|
|B00MX90AFQ|     464839910|Giro Savant Road ...|
|B00EW0CG2Y|     785213515|Magnesium Fire St...|
|B004CG9RGQ|     366026861|Hobie - Leash Kit...|
|B00MI5U47E|     893560228|Cressi Scuba Divi...|
|B00180MO2S|     598

In [10]:
outdoors.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)



In [9]:
# Reviews df
outdoor_reviews = df = df.select(['review_id', 'customer_id', 'product_id', 'product_parent', 'review_date'])
outdoor_reviews.show(truncate=True)

outdoor_reviews.write.jdbc(url=jdbc_url,
                   table = 'outdoor_reviews',
                   mode = mode,
                   properties = config)

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R100AJS24RR6PQ|   44185913|B000IZX0SG|     373520218|2015-01-22 00:02:00|
|R100F6MBZKRAFD|   49266466|B00NCJPXEO|      80630966|2014-01-01 00:12:00|
|R100Y47MF2D3EF|   29788575|B001O5M1CA|     139791691|2013-01-27 00:05:00|
|R101EMNRQWQ4NA|   49971220|B00A2C8BXO|     949028516|2014-01-17 00:10:00|
|R101T5ACN6GE6Y|    1512429|B000NPCSU2|     435965836|2015-01-04 00:02:00|
|R102458UPCSCJF|   10518483|B000IZGF42|     100719823|2015-01-01 00:07:00|
|R102MVX3XAWLW8|   17049802|B009R9FUI2|     232637943|2013-01-28 00:05:00|
|R1030MR86HT0G5|   22406799|B0000AUSFB|     145114599|2013-01-02 00:02:00|
|R10358ROTFT3IB|     928913|B00LA97I8M|     165160843|2015-01-06 00:05:00|
|R103VYR9VYTJLB|   31778769|B000K00O1A|     267773760|2015-01-18 00:04:00|
|R10409ALR168X3|   320238

In [11]:
outdoor_reviews.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [12]:
# Customers
outdoor_customers = df.select(['customer_id'])
outdoor_customers = outdoor_customers.groupBy('customer_id').count()
outdoor_customers = outdoor_customers.withColumnRenamed('count','customer_count')
outdoor_customers.show(truncate=True)

# Push to PostgreSQL
outdoor_customers.write.jdbc(url=jdbc_url,
                     table = 'outdoor_customers',
                     mode=mode,
                     properties = config)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   33543441|             1|
|   41549558|            12|
|   38247118|             1|
|   22365621|             3|
|   48081045|            11|
|   14645754|             5|
|   18026207|             1|
|   14241908|             3|
|    8100655|             1|
|   48441020|             1|
|   49459137|             2|
|   17952945|             1|
|   36179637|             2|
|   11834281|             1|
|   27980635|             1|
|   42815232|             1|
|   39445047|             1|
|   28629340|             3|
|   37785878|             1|
|    8460667|             1|
+-----------+--------------+
only showing top 20 rows



In [13]:
outdoor_customers.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)

