In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 http://ppa.launchpad

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-09-17 21:29:46--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2021-09-17 21:29:48 (1.31 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Wireless_v1_00.tsv.gz"), sep=",", header=True, inferSchema=True)

# Show DataFrame
user_data_df.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|marketplace	customer_id	review_id	product_id	product_parent	product_title	product_category	star_rating	helpful_votes	total_votes	vine	verified_purchase	review_headline	review_body	review_date                                                                                                                                                                                                                             |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [5]:
user_data_df.printSchema

<bound method DataFrame.printSchema of DataFrame[marketplace	customer_id	review_id	product_id	product_parent	product_title	product_category	star_rating	helpful_votes	total_votes	vine	verified_purchase	review_headline	review_body	review_date: string]>

In [6]:
dropna_df = user_data_df.na.drop(how="any")
dropna_df.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|marketplace	customer_id	review_id	product_id	product_parent	product_title	product_category	star_rating	helpful_votes	total_votes	vine	verified_purchase	review_headline	review_body	review_date                                                                                                                                                                                                                             |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
user_data_df.columns

['marketplace\tcustomer_id\treview_id\tproduct_id\tproduct_parent\tproduct_title\tproduct_category\tstar_rating\thelpful_votes\ttotal_votes\tvine\tverified_purchase\treview_headline\treview_body\treview_date']

In [8]:
user_data_df.describe

<bound method DataFrame.describe of DataFrame[marketplace	customer_id	review_id	product_id	product_parent	product_title	product_category	star_rating	helpful_votes	total_votes	vine	verified_purchase	review_headline	review_body	review_date: string]>

In [9]:
# Cast Data to new data types
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [16]:
# Next we need to create the list of struct fields
schema = [StructField("review_id", StringType(), True), StructField("customer_id", IntegerType(), True), StructField("product_id", StringType(), True), StructField("product_parent", IntegerType(), True)]

In [18]:
# Pass in our fields
final = StructType(fields=schema)
final

StructType(List(StructField(review_id,StringType,true),StructField(customer_id,IntegerType,true),StructField(product_id,StringType,true),StructField(product_parent,IntegerType,true)))

In [19]:
# Read our data with our new schema
dataframe = spark.read.csv(SparkFiles.get("amazon_reviews_us_Wireless_v1_00.tsv.gz"), schema=final, sep=",", header=True)
dataframe.show()


+--------------------+-----------+--------------------+--------------+
|           review_id|customer_id|          product_id|product_parent|
+--------------------+-----------+--------------------+--------------+
|US	16414143	R3W4P...|       null|  my battery drai...|          null|
|US	50800750	R15V5...|       null| they’re growing ...|          null|
|US	15184378	RY8I4...|       null|                null|          null|
|US	10203548	R18TL...|       null| Travel Charger f...|          null|
|US	488280	R1NK26S...|       null|                null|          null|
|US	13334021	R11LO...|       null| better customer ...|          null|
|US	27520697	R3ALQ...|       null|                null|          null|
|US	48086021	R3MWL...|       null|                null|          null|
|US	12738196	R2L15...|       null|                null|          null|
|US	15867807	R1DJ8...|       null| you hardly need ...|          null|
|US	1972249	R3MRWN...|       null|         lightweight|          null|
|US	10

In [20]:
dataframe.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)



In [21]:
dataframe.select('review_id')

DataFrame[review_id: string]

In [22]:
type(dataframe.select('review_id'))

pyspark.sql.dataframe.DataFrame

In [23]:
dataframe.select('review_id').show()

+--------------------+
|           review_id|
+--------------------+
|US	16414143	R3W4P...|
|US	50800750	R15V5...|
|US	15184378	RY8I4...|
|US	10203548	R18TL...|
|US	488280	R1NK26S...|
|US	13334021	R11LO...|
|US	27520697	R3ALQ...|
|US	48086021	R3MWL...|
|US	12738196	R2L15...|
|US	15867807	R1DJ8...|
|US	1972249	R3MRWN...|
|US	10956619	R1DS6...|
|US	14805911	RWJM5...|
|US	15611116	R1XTJ...|
|US	39298603	R2UZL...|
|US	17552454	R2EZX...|
|US	12218556	R26VY...|
|US	21872923	R2SSA...|
|US	16264332	R1G63...|
|US	6042304	R2DRG0...|
+--------------------+
only showing top 20 rows



In [24]:
# Remove leading and trailing space of the column in pyspark

from pyspark.sql.functions import *

dataframe = dataframe.withColumn('customer_id', trim(dataframe.customer_id))

dataframe.show(truncate =False)


+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+
|review_id                                                                                                                                                                                                                                           