# Amazon Review ETL

This is an extract-transform-load (ETL) pipeline for Amazon pet product reviews data. The process for each step in the pipeline is outlined in the section description.

### Dependencies

In [1]:
# Download a Postgres driver to allow Spark to interact with Postgres
!curl -O https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  979k  100  979k    0     0  1059k      0 --:--:-- --:--:-- --:--:-- 1058k


In [2]:
# Locate Spark
import findspark
findspark.init()

# Dependencies
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import col
from config import rds, db_props

# Spark session adding the Postgres driver to Spark
spark = SparkSession.builder \
                    .appName('amz') \
                    .config('spark.driver.extraClassPath', 'postgresql-42.2.16.jar') \
                    .getOrCreate()
spark

### Extract

1. Read in the data using the S3 url for pet products (from the data source below)
2. Sample 4% of the data
    - We will be working with a subsample of the data to shorten the load time

Source: https://s3.amazonaws.com/amazon-reviews-pds/tsv/index.txt

In [3]:
# Add file
data_bucket = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/'
data_file = 'amazon_reviews_us_Pet_Products_v1_00.tsv.gz'
spark.sparkContext.addFile(data_bucket + data_file)

# Read in data
df = spark.read.csv(SparkFiles.get(data_file), sep='\t', header=True, inferSchema=True)
df.show(2, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 marketplace       | US                                                                                                                                                                                                                                                           
 customer_id       | 28794885                                                                                                                                                                                                                                                     
 review_id         | REAKC26P07MDN                                                                                                                                             

In [4]:
# Schema and row count
df.printSchema()
df.count()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



2643619

In [5]:
# Sample 4% of rows
df_sample = df.sample(fraction=0.04, seed=0)
df_sample.count()

105644

### Transform

Create a dataframe for each of the 4 tables in the database, with the corresponding columns:
1. **`reviews`** - `review_id`, `customer_id`, `product_id`, `product_parent`, `review_date`
2. **`products`** - `product_id`, `product_title`
3. **`customers`** - `customer_id`, `review_count`
4. **`vine`** - `review_id`, `star_rating`, `helpful_votes`, `total_votes`, `vine`, `verified_purchase`

In [6]:
# Review table
review_cols = ['review_id', 'customer_id', 'product_id', 'product_parent', df_sample['review_date'].cast(DateType())]
review_df = df_sample.select(review_cols).orderBy('review_id')
print(review_df.count())
review_df.show(5)

105644
+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1000CIZTRNP23|   25423435|B00K1B6RCI|     308737701| 2015-03-06|
|R1004OFK0BIRO3|   27253416|B002MY58G8|     165494310| 2013-02-24|
|R10060FLI507JX|   12505859|B000L3XYZ4|     501118658| 2015-01-10|
|R1006BO4DMPE0K|   22756793|B00CD6LYD6|      19412506| 2013-09-22|
|R1007R17B2A71O|   27891829|B003QS6JL2|     936104171| 2013-06-23|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



In [7]:
# Product table
product_df = df_sample.select(['product_id', 'product_title']).dropDuplicates().orderBy('product_id')
print(product_df.count())
product_df.show(5)

42329
+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|039480001X|  The Cat in the Hat|
|0876051468|The Stray Cat Han...|
|0975412817|The Notes Pocket ...|
|0983794812|Pathway Cat Journ...|
|1223000893|Cat Sitter DVD Tr...|
+----------+--------------------+
only showing top 5 rows



In [8]:
# Customer table
customer_df = df_sample.groupBy('customer_id').count().orderBy('customer_id')
customer_df = customer_df.withColumnRenamed('count', 'review_count')
print(customer_df.count())
customer_df.show(5)

98486
+-----------+------------+
|customer_id|review_count|
+-----------+------------+
|      10318|           1|
|      10368|           1|
|      10449|           1|
|      11125|           2|
|      11618|           1|
+-----------+------------+
only showing top 5 rows



In [9]:
# Vine table
vine_cols = ['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase']
vine_df = df_sample.select(vine_cols).orderBy('review_id')
print(vine_df.count())
vine_df.show(5)

105644
+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1000CIZTRNP23|          4|            3|          3|   N|                Y|
|R1004OFK0BIRO3|          5|          138|        141|   N|                N|
|R10060FLI507JX|          5|            0|          0|   N|                Y|
|R1006BO4DMPE0K|          5|            0|          0|   N|                Y|
|R1007R17B2A71O|          5|            1|          1|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



### Load

1. Create a file named `config.py` in the same directory as this notebook
2. Create a PostgreSQL database on AWS Relational Database Service (RDS) and save its endpoint and port in `config.py` with the following format:<br />
    rds = {
        "endpoint": "db-identifier.qwerty1234.us-region.rds.amazonaws.com",
        "port": "5432"
    }
3. Use the endpoint and port to create a new server on pgAdmin
4. Create a new database in the server and save its information in `config.py` with the following format:<br />
    db_props = {
        "name": "db-name",
        "user": "postgres",
        "password": "password",
        "driver": "org.postgresql.Driver",
        "mode": "overwrite"
    }
5. Run the code in `schema.sql` on pgAdmin to create the tables inside the database
6. Load the 4 dataframes created in the transform step into the database

In [10]:
# Connection to database on RDS
jdbc_url = f"jdbc:postgresql://{rds['endpoint']}:{rds['port']}/{db_props['name']}"

# Configure RDS settings
mode = db_props['mode']
properties = {
    "user": db_props['user'],
    "password": db_props['password'],
    "driver": db_props['driver']
}

In [11]:
# Write data to database
review_df.write.jdbc(url=jdbc_url, table='reviews', mode=mode, properties=properties)
product_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=properties)
customer_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=properties)
vine_df.write.jdbc(url=jdbc_url, table='vine', mode=mode, properties=properties)

Py4JJavaError: An error occurred while calling o101.jdbc.
: org.postgresql.util.PSQLException: The connection attempt failed.
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:315)
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:51)
	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:225)
	at org.postgresql.Driver.makeConnection(Driver.java:465)
	at org.postgresql.Driver.connect(Driver.java:264)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:791)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: amz-reviews.cqzw4eyr0cqo.us-west-1.rds.amazonaws.com
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:196)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
	at java.net.Socket.connect(Socket.java:606)
	at org.postgresql.core.PGStream.createSocket(PGStream.java:231)
	at org.postgresql.core.PGStream.<init>(PGStream.java:95)
	at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:98)
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:213)
	... 38 more


In [None]:
# Check if the data was loaded into the db
spark.read.format('jdbc') \
          .option("url", jdbc_url) \
          .option("dbtable", 'reviews') \
          .option("user", properties['user']) \
          .option("password", properties['password']) \
          .option("driver", properties['driver']) \
          .load().show(5)