In [1]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql.functions import to_date
import pyspark.sql.functions as f
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine, inspect
import pandas as pd
from config import my_db

In [2]:
# Create spark session
spark = SparkSession.builder.appName("sc").getOrCreate()

In [3]:
# Read in data 
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/sample_us.tsv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("sample_us.tsv"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!|2015-08-31 00:00:00|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...| 

In [4]:
# Change format of review_date
df = df.withColumn("review_date",to_date(df["review_date"], 'yyyy/mm/dd'))
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!| 2015-08-31|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...|            Toys|          5|    

In [5]:
# Print df schema
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



In [6]:
# Print number of rows
df.count()

49

In [7]:
# Create review_id table
review_id = df.select(["review_id","customer_id","product_id","product_parent","review_date"]).distinct()
review_id.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R36ED1U38IELG8|   24769659|B00D7JFOPC|     952062646| 2015-08-31|
|R1JS8G26X4RM2G|   12612039|B00D4NJSJE|     408940178| 2015-08-31|
| R47XBGQFP039N|   21448082|B00FZX71BI|     480992295| 2015-08-31|
|R300I65NW30Y19|   32866903|B000TFLAZA|     149264874| 2015-08-31|
|R1YS3DS218NNMD|    1662075|B00XPWXYDK|     210135375| 2015-08-31|
| R2ATXF4QQ30YW|   43880421|B00000JS5S|     341842639| 2015-08-31|
| R44NP0QG6E98W|   26599182|B00JLKI69W|     375626298| 2015-08-31|
| RKLAK7EPEG5S6|   12191231|B00BMKL5WY|     906199996| 2015-08-31|
|R1WQ3ME3JAG2O1|   34874898|B00WAKEQLW|     824555589| 2015-08-31|
| R2D90RQQ3V8LH|   52006292|B00519PJTW|     493486387| 2015-08-31|
|R3AHZWWOL0IAV0|   47781982|B00GNDY40U|     438056479| 2015-08-31|
|R1H1HOVB44808I|    6762003|B00PXWS1CY|     996611871| 2015-08

In [8]:
# Create products table with unique product_id
products = df.select(["product_id","product_title"]).distinct()
# products = products.withColumn("product_id",products["product_id"]).distinct()
products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00CAEEDC0|Seat Pets Car Sea...|
|B00V5DM3RE|Yoga Joes - Green...|
|B008W1BPWQ|Peppa Pig 7 Wood ...|
|1591749352|Klutz Sticker Des...|
|B002LHA74O|Super Jumbo Playi...|
|B00UZOPOFW|Emazing Lights eL...|
|B004C04I4I|Disney Baby: Eeyo...|
|B00JLKI69W|WOW Toys Town Adv...|
|B00GNDY40U|Pokemon - Gyarado...|
|B00WJ1OPMW|LeapFrog LeapTV L...|
|B00JVY9J1M|Winston Zeddmore ...|
|B0013OY0S0|Claw Climber Goli...|
|B00NWGEKBY|Team Losi 8IGHT-E...|
|B004S8F7QM|Cards Against Hum...|
|B00FGPU7U2|Fisher-Price Octo...|
|B009T8BSQY|Alien Frontiers: ...|
|B00519PJTW|100 Foot Multicol...|
|B000PEOMC8|Intex River Run I...|
|B00WAKEQLW|Whiffer Sniffers ...|
|B0101EHRSM|Big Bang Cosmic P...|
+----------+--------------------+
only showing top 20 rows



In [9]:
# Create customers table
# customers = df.select("customer_id")
# customers = customers.withColumn("customer_count", count(customers[customer_id]))
customers = df.groupBy('customer_id').count().select('customer_id', f.col('count').alias('customer_count')).distinct()
customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   18461411|             1|
|   44331596|             1|
|   20962528|             1|
|   34874898|             1|
|    6762003|             1|
|    1297934|             1|
|   13328687|             1|
|    2749569|             1|
|   12612039|             1|
|   11613707|             1|
|    5543658|             1|
|    7360347|             1|
|   45601416|             1|
|   27225859|             1|
|   32910511|             1|
|   47546726|             1|
|   13545982|             1|
|   12191231|             1|
|     433677|             1|
|   13394189|             1|
+-----------+--------------+
only showing top 20 rows



In [10]:
# Create vine table
vine = df.select(["review_id","star_rating","helpful_votes","total_votes","vine"]).distinct()
vine.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1ORWPFQ9EDYA0|          5|            0|          0|   N|
|R2RDOJQ0WBZCF6|          5|            0|          0|   N|
|R1H1HOVB44808I|          5|            1|          1|   N|
|R2SDXLTLF92O0H|          5|            0|          0|   N|
| R44NP0QG6E98W|          3|            1|          1|   N|
|R2MW3TEBPWKENS|          5|            0|          0|   N|
|R23URALWA7IHWL|          5|            0|          0|   N|
|R2B8VBEPB4YEZ7|          5|            0|          0|   N|
| R1UE3RPRGCOLD|          2|            1|          1|   N|
|R1Y4ZOUGFMJ327|          5|            0|          0|   N|
|R2BUV9QJI2A00X|          5|            0|          1|   N|
|R1HOJ5GOA2JWM0|          2|            1|          1|   N|
|  RNX4EXOBBPN5|          1|            1|          1|   N|
|R36ED1U38IELG8|          5|            

In [11]:
# Connect to local database
engine = create_engine(my_db)

In [12]:
# Convert tables to pandas df and load tables to db
review_id.toPandas().to_sql('review_id', con=engine, if_exists='replace', index=False)
products.toPandas().to_sql('products', con=engine, if_exists='replace', index=False)
customers.toPandas().to_sql('customers', con=engine, if_exists='replace', index=False)
vine.toPandas().to_sql('vine', con=engine, if_exists='replace', index=False)

In [13]:
# Read products table from sql
products_from_sql = pd.read_sql_query('SELECT * FROM products',con=engine)
products_from_sql.head()

Unnamed: 0,product_id,product_title
0,B00CAEEDC0,Seat Pets Car Seat Toy
1,B00V5DM3RE,Yoga Joes - Green Army Men Toys
2,B008W1BPWQ,Peppa Pig 7 Wood Puzzles In Wooden Storage Box...
3,1591749352,Klutz Sticker Design Studio: Create Your Own C...
4,B002LHA74O,Super Jumbo Playing Cards by S&S Worldwide


In [14]:
# Read review_id table from sql
review_id_from_sql = pd.read_sql_query('SELECT * FROM review_id',con=engine)
review_id_from_sql.head()

Unnamed: 0,review_id,customer_id,product_id,product_parent,review_date
0,R36ED1U38IELG8,24769659,B00D7JFOPC,952062646,2015-08-31
1,R1JS8G26X4RM2G,12612039,B00D4NJSJE,408940178,2015-08-31
2,R47XBGQFP039N,21448082,B00FZX71BI,480992295,2015-08-31
3,R300I65NW30Y19,32866903,B000TFLAZA,149264874,2015-08-31
4,R1YS3DS218NNMD,1662075,B00XPWXYDK,210135375,2015-08-31


In [15]:
# Read customers table from sql
customers_from_sql = pd.read_sql_query('SELECT * FROM customers',con=engine)
customers_from_sql.head()

Unnamed: 0,customer_id,customer_count
0,18461411,1
1,44331596,1
2,20962528,1
3,34874898,1
4,6762003,1


In [16]:
# Read vine table from sql
vine_from_sql = pd.read_sql_query('SELECT * FROM vine',con=engine)
vine_from_sql.head()

Unnamed: 0,review_id,star_rating,helpful_votes,total_votes,vine
0,R1ORWPFQ9EDYA0,5,0,0,N
1,R2RDOJQ0WBZCF6,5,0,0,N
2,R1H1HOVB44808I,5,1,1,N
3,R2SDXLTLF92O0H,5,0,0,N
4,R44NP0QG6E98W,3,1,1,N
