# Using Amazon.com user reviews to generate recommendation/analytics databases
### Data Engineering Capstone Project

#### Project Summary

The objective of this project is to develop a database for recommendation and analytics based on data from product reviews on the Amazon.com website, in addition to the Amazon.com product metadata.

The views intended for recommendation can be placed on an index and serve as a recommendation system for when a user accesses a particular product.

The views destined for analytics, on the other hand, may show products that are being poorly reviewed and/or under-sold, and come to help in decision making.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Running queries on the built database
* Step 6: Complete Project Write Up

In [1]:
import pandas as pd

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

from helpers.metadata import *
from helpers.ratings import *
from helpers.utils import *

from sql_queries.analytics_queries import *
from sql_queries.etl_queries import *

### Step 1: Scope the Project and Gather Data

#### Scope

The data used in this project were made available by [Julian McAuley](https://cseweb.ucsd.edu/~jmcauley/), from the [University of California San Diego (UCSD)](https://ucsd.edu/), and originally were part of [Stanford's Stanford Network Analysis Project (SNAP) database](https://snap.stanford.edu/data/), through the [following link](https://snap.stanford.edu/data/web-Amazon.html), which contained a dataset from March 2013.

**However, the dataset i used was a little more updated, from July 2014. There are more updated datasets, but they are much bigger than that.**

#### Describe and Gather Data 

All necessary files are available at this [link](http://jmcauley.ucsd.edu/data/amazon/links.html).

The metadata file can be downloaded through this [direct link](http://snap.stanford.edu/data/amazon/productGraph/metadata.json.gz).

The review files can be downloaded according to your choice, from the **"Small" subsets for experimentation > ratings only** section.

!["Small" subsets for experimentation > ratings only](images/ratings_subset.png)

In my case, I downloaded all the rating files and made a manual cut of the datasets to reduce the size, since the original datasets had more than 80 million reviews. My dataset after the cut was 2.5 million reviews from all categories.

I also cut the metadata dataset, from ~9.4 million products, to 2 million products

I used the following commands to cut the datasets:

- 'Cutting' the metadata dataset:

```shell
~ head -n 2000000 sample_data/metadata.json > sample_data/metadata_cut.json && mv sample_data/metadata_cut.json sample_data/metadata.json
```

- 'Cutting' the ratings dataset:

```shell
~ ls sample_data/ratings_*.csv | xargs -i head -n 105000 sample_data/{} > sample_data/ratings.csv
```

Sample metadata:

```json
{
  "asin": "0000031852",
  "title": "Girls Ballet Tutu Zebra Hot Pink",
  "price": 3.17,
  "imUrl": "http://ecx.images-amazon.com/images/I/51fAmVkTbyL._SY300_.jpg",
  "related":
  {
    "also_bought": ["B00JHONN1S", "B002BZX8Z6", "B00D2K1M3O", "0000031909", "B00613WDTQ", "B00D0WDS9A", "B00D0GCI8S", "0000031895", "B003AVKOP2", "B003AVEU6G", "B003IEDM9Q", "B002R0FA24", "B00D23MC6W", "B00D2K0PA0", "B00538F5OK", "B00CEV86I6", "B002R0FABA", "B00D10CLVW", "B003AVNY6I", "B002GZGI4E", "B001T9NUFS", "B002R0F7FE", "B00E1YRI4C", "B008UBQZKU", "B00D103F8U", "B007R2RM8W"],
    "also_viewed": ["B002BZX8Z6", "B00JHONN1S", "B008F0SU0Y", "B00D23MC6W", "B00AFDOPDA", "B00E1YRI4C", "B002GZGI4E", "B003AVKOP2", "B00D9C1WBM", "B00CEV8366", "B00CEUX0D8", "B0079ME3KU", "B00CEUWY8K", "B004FOEEHC", "0000031895", "B00BC4GY9Y", "B003XRKA7A", "B00K18LKX2", "B00EM7KAG6", "B00AMQ17JA", "B00D9C32NI", "B002C3Y6WG", "B00JLL4L5Y", "B003AVNY6I", "B008UBQZKU", "B00D0WDS9A", "B00613WDTQ", "B00538F5OK", "B005C4Y4F6", "B004LHZ1NY", "B00CPHX76U", "B00CEUWUZC", "B00IJVASUE", "B00GOR07RE", "B00J2GTM0W", "B00JHNSNSM", "B003IEDM9Q", "B00CYBU84G", "B008VV8NSQ", "B00CYBULSO", "B00I2UHSZA", "B005F50FXC", "B007LCQI3S", "B00DP68AVW", "B009RXWNSI", "B003AVEU6G", "B00HSOJB9M", "B00EHAGZNA", "B0046W9T8C", "B00E79VW6Q", "B00D10CLVW", "B00B0AVO54", "B00E95LC8Q", "B00GOR92SO", "B007ZN5Y56", "B00AL2569W", "B00B608000", "B008F0SMUC", "B00BFXLZ8M"],
    "bought_together": ["B002BZX8Z6"]
  },
  "salesRank": {"Toys & Games": 211836},
  "brand": "Coxlures",
  "categories": [["Sports & Outdoors", "Other Sports", "Dance"]]
}
```

where

- asin: [ASIN](https://en.wikipedia.org/wiki/Amazon_Standard_Identification_Number)https://en.wikipedia.org/wiki/Amazon_Standard_Identification_Number of the product, e.g. 0000031852
- title: name of the product
- price: price in US dollars (at time of crawl)
- imUrl: url of the product image
- related: related products (also bought, also viewed, bought together, buy after viewing)
- salesRank: sales rank information
- brand: brand name
- categories: list of categories the product belongs to

Sample ratings:

```csv
A1EE2E3N7PW666,B000GFDAUG,5.0,1202256000
AGZ8SM1BGK3CK,B000GFDAUG,5.0,1198195200
A2VHZ21245KBT7,B000GIOPK2,4.0,1215388800
ACX8YW2D5EGP6,B000GIOPK2,4.0,1185840000
A9RNMO9MUSMTJ,B000GIOPK2,2.0,1281052800
A3STFVPM8NHJ7B,B000GIOPK2,5.0,1203897600
A2582KMXLK2P06,B000GIOPK2,5.0,1205884800
A1TZCLCW9QGGBH,B000GIOPK2,4.0,1209427200
A2E2I6B878CRMA,B000GIOPK2,5.0,1378684800
AD5MZA8SOVMPJ,B000GIOPK2,5.0,1218240000

```

which represents a tuple (user,item,rating,timestamp), where:

- user: ID of a user
- item: [ASIN](https://en.wikipedia.org/wiki/Amazon_Standard_Identification_Number)https://en.wikipedia.org/wiki/Amazon_Standard_Identification_Number a the product
- rating: The rating the user gave
- timestamp: Timestamp of the rating

In [2]:
# Defines directory to save '.parquet' files
OUTPUT_DIR = 'output'

# Initializes Spark Session
conf = SparkConf() \
        .setMaster('local')\
        .setAppName('dend-capstone')

sc = SparkContext.getOrCreate(conf=conf)

# Initializes Spark Session
session = SparkSession(sc)

In [3]:
# Initializes amazon ratings text files
# Also, apply some transformations/cleaning
product_review_record_staging = session.sparkContext \
    .textFile('sample_data/ratings.csv') \
    .map(parse_ratings) \
    .map(convert_ts_to_date) \
    .map(cast_rating_to_int) \
    .toDF()

# Initializes metadata file, with product information
# Also, apply some transformations/cleaning
metadata_staging = session.sparkContext \
    .textFile('sample_data/metadata.json') \
    .map(parse_metadata) \
    .filter(only_with_all_must_exist_keys) \
    .filter(only_products_with_brand) \
    .map(remove_unnecessary_keys) \
    .toDF()



In [4]:
product_review_record_staging.show(n=5)

+----------+------+-------------------+--------------+
|      asin|rating|        review_date|          user|
+----------+------+-------------------+--------------+
|B000GFDAUG|     5|2008-02-06 00:00:00|A1EE2E3N7PW666|
|B000GFDAUG|     5|2007-12-21 00:00:00| AGZ8SM1BGK3CK|
|B000GIOPK2|     4|2008-07-07 00:00:00|A2VHZ21245KBT7|
|B000GIOPK2|     4|2007-07-31 00:00:00| ACX8YW2D5EGP6|
|B000GIOPK2|     2|2010-08-06 00:00:00| A9RNMO9MUSMTJ|
+----------+------+-------------------+--------------+
only showing top 5 rows



In [5]:
metadata_staging.show(n=5)

+----------+--------------+--------------------+-----+--------------------+--------------------+--------------------+
|      asin|         brand|          categories|price|             related|           salesRank|               title|
+----------+--------------+--------------------+-----+--------------------+--------------------+--------------------+
|0000037214|    Big Dreams|[[Clothing, Shoes...| 6.99|[also_viewed -> [...|[Clothing -> 1233...|Purple Sequin Tin...|
|0000031909|       Unknown|[[Sports & Outdoo...|  7.0|[bought_together ...|[Toys & Games -> ...|Girls Ballet Tutu...|
|0000031852|      Coxlures|[[Sports & Outdoo...| 3.17|[bought_together ...|[Toys & Games -> ...|Girls Ballet Tutu...|
|0000031887|Boutique Cutie|[[Clothing, Shoes...| 6.79|[bought_together ...|[Sports &amp; Out...|Ballet Dress-Up F...|
|0000031895|      BubuBibi|[[Sports & Outdoo...| 2.99|[bought_together ...|[Toys & Games -> ...|Girls Ballet Tutu...|
+----------+--------------+--------------------+-----+--

In [6]:
# Write 'metadata_staging' table to parquet
metadata_staging.write \
    .parquet(
        '{}/metadata_staging.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

In [7]:
# Write 'product_review_record_staging' table to parquet

product_review_record_staging.write \
    .parquet(
        '{}/product_review_record_staging.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

### Step 2: Explore and Assess the Data
#### Explore the Data
Some products do not contain all possible information, so they have been removed from the base, in addition, products without brand information have also been removed.

If a product is found without a corresponding review, it will also be removed from the database later.

#### Cleaning Steps
Some cleaning steps have already been carried out previously, such as, for example, the selection of only branded products, in addition to all the necessary keys.

The other cleaning processes will be done during the pipeline to create the fact/dimension tables.

In [8]:
# Read parquet files
metadata_staging = session.read.parquet(
    '{}/metadata_staging.parquet'.format(OUTPUT_DIR)
)

product_review_record_staging = session.read.parquet(
    '{}/product_review_record_staging.parquet'.format(OUTPUT_DIR)
)

In [9]:
# RDD with related product information
product_related_product_staging = metadata_staging.rdd \
    .map(map_product_related_product) \
    .flatMapValues(dict.items) \
    .map(lambda row: ((row[0], row[1][0]), row[1][1])) \
    .flatMapValues(lambda asin: asin) \
    .map(apply_product_related_product_staging_schema) \
    .toDF()

# RDD with sales rank information. Determines which
# products are best-selling (the lower the better)
product_sales_rank_staging = metadata_staging.rdd \
    .map(map_sales_rank) \
    .flatMapValues(dict.items) \
    .map(apply_product_sales_rank_staging_schema) \
    .toDF()

# RDD with information about product categories
product_category_staging = metadata_staging.rdd \
    .map(map_categories) \
    .flatMapValues(lambda categories: categories) \
    .flatMapValues(lambda category: category) \
    .map(apply_product_category_staging_schema) \
    .toDF()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![Conceptual Model](images/conceptual_model.png)

The conceptual model consists of eight tables, which are:

- `product_review_record_fact`: Stores all details of reviews made by users on Amazon.com;

- `product_review_dimension`: Stores the consolidation of reviews by product, for example, how many reviews a given product received, what the average rating of the reviews was, what was the worst rating a product received, the best rating a product received, among other details;

- `product_dimension`: Stores details about Amazon.com products, such as their description, brand, price, among other details;

- `product_category_dimension`: Stores the categories to which a given product is categorized;

- `product_sales_rank_by_category_dimension`: Stores the sales rank of a given product for each category to which it is associated. The sales rank determines, as the name says, how much a product is sold. This rank is calculated in ascending order, that is, the lower the rank of a product, the better it is sold;

- `product_sales_rank_dimension`: Stores the general sales rank of a product, that is, for all categories in which it is inserted, it takes the best sales rank and inserts it in the table;

- `product_related_product_dimension`: Stores which products are related to a given product, and what type of that relationship (also bought, also viewed, bought together, buy after viewing);

- `user_review_dimension`: Stores consolidated information about the reviews made by a user, such as, how many reviews a given user made, what the average rating he gave during his reviews, among other details.

The data in the `product_review_record_fact` table comes entirely from the` product_review_record_staging` table, which in turn comes entirely from the `.csv` file with rating data from Amazon.com users.

Data in the `product_dimension` table comes entirely from the` metadata_staging` table, which in turn comes entirely from the Amazon.com product metadata file (`metadata.json`)

The other tables are generated through transformations and aggregations between the staging tables and the `product_review_record_fact` and` product_dimension` tables

#### 3.2 Mapping Out Data Pipelines

The pipeline steps are as follows:

- Load the datasets
- Parse each line of the `.csv` file of ratings to the` product_review_record_staging` table;
- Parse each line of the metadata `.json` file to the` metadata_staging` table;
- In the `metadata_staging` table, filter products that do not have all the necessary information (only products that have information about asin, title, price, related products, sales rank, brand and categories are kept);
- Write the two tables in their respective `.parquet` files;
- Based on the `metadata_staging` table, it performs transformations to generate the` product_related_product_staging`, `product_sales_rank_staging` and` product_category_staging` tables, with raw information about the relationship between products, the sales rank of products and the category of products;
- Creates the table `product_review_record_fact`, clearing the data from the table` product_review_record_staging`, keeping only the data of products that have the corresponding metadata;
- Creates the `product_review_dimension` table, consolidating review data from the` product_review_record_fact` table;
- Creates the table `product_dimension`, clearing the data from the table` metadata_staging`, maintaining only data of products that have corresponding reviews;
- Creates the `product_related_product_dimension` table, clearing the data from the` product_related_product_staging` table, keeping only product data that are also in the `product_dimension` table;
- Creates the `product_sales_rank_by_category_dimension` table, clearing the data from the` product_sales_rank_dimension` table, maintaining only product data that are also in the `product_dimension` table;
- Creates the table `product_sales_rank_dimension`, consolidating the data from the table` product_sales_rank_by_category_dimension`
- Creates the `user_review_dimension` table, consolidating data about user reviews found in the` product_review_record_fact` table
- Write a `.parquet` file for the fact table and for each of the dimension tables.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [10]:
# Convert RDD's to DataFrames / Create views for every 
# DataFrame converted, to enable the use of SQL queries
metadata_staging.createOrReplaceTempView('metadata_staging')
product_review_record_staging.createOrReplaceTempView('product_review_record_staging')
product_related_product_staging.createOrReplaceTempView('product_related_product_staging')
product_sales_rank_staging.createOrReplaceTempView('product_sales_rank_staging')
product_category_staging.createOrReplaceTempView('product_category_staging')

In [11]:
# Creates the 'product_review_record_fact' table
product_review_record_fact = session.sql(product_review_record_fact_query)

# Persists 'product_review_record_fact' table, as it 
# will be used in more than one of the next 
# transformations
product_review_record_fact.persist()

# Create view for 'product_review_record_fact' table,
# to enable the use of SQL queries
product_review_record_fact.createOrReplaceTempView('product_review_record_fact')

In [12]:
# Creates the 'product_review_dimension' table
product_review_dimension = session.sql(product_review_dimension_query)

In [13]:
# Creates the 'product_dimension' table
product_dimension = session.sql(product_dimension_query)

# Persists 'product_dimension' table, as it will be
# used in more than one of the next transformations
product_dimension.persist()

# Create view for 'product_dimension' table, to
# enable the use of SQL queries
product_dimension.createOrReplaceTempView('product_dimension')

In [14]:
# Creates the 'product_related_product_dimension' table
product_related_product_dimension = session.sql(product_related_product_dimension_query)

In [15]:
# Creates the 'product_sales_rank_by_category_dimension' table
product_sales_rank_by_category_dimension = session.sql(product_sales_rank_by_category_dimension_query)

# Create view for 'product_sales_rank_by_category_dimension'
# table, to enable the use of SQL queries
product_sales_rank_by_category_dimension.createOrReplaceTempView('product_sales_rank_by_category_dimension')

In [16]:
# Creates the 'product_sales_rank_dimension' table

product_sales_rank_dimension = session.sql(product_sales_rank_dimension_query)

In [17]:
# Creates the 'product_category_dimension' table

product_category_dimension = session.sql(product_category_dimension_query)

In [18]:
# Creates the 'user_review_dimension' table 

user_review_dimension = session.sql(user_review_dimension_query)

Write each table to '.parquet'

In [26]:
# Some files could have been better partitioned, but they were not, 
# as they would generate many folders, which would hit the EXT4 FS 
# subfolder limit

In [19]:
product_review_record_fact.write \
    .partitionBy(
        'review_year',
        'review_month',
        'review_day'
    ).parquet(
        '{}/product_review_record_fact.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

In [20]:
product_review_dimension.write \
    .parquet(
        '{}/product_review_dimension.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

In [21]:
product_dimension.write \
    .parquet(
        '{}/product_dimension.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

In [22]:
product_related_product_dimension.write \
    .parquet(
        '{}/product_related_product_dimension.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

In [23]:
product_sales_rank_dimension.write \
    .parquet(
        '{}/product_sales_rank_dimension.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

In [24]:
product_sales_rank_by_category_dimension.write \
    .partitionBy('product_category') \
    .parquet(
        '{}/product_sales_rank_by_category_dimension.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

In [25]:
product_category_dimension.write \
    .partitionBy('product_category') \
    .parquet(
        '{}/product_category_dimension.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

In [26]:
user_review_dimension.write \
    .parquet(
        '{}/user_review_dimension.parquet'.format(OUTPUT_DIR),
        'overwrite',
    )

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 


Read '.parquet' files

In [6]:
product_review_record_fact = session.read.parquet(
    '{}/product_review_record_fact.parquet'.format(OUTPUT_DIR)
)

product_review_dimension = session.read.parquet(
    '{}/product_review_dimension.parquet'.format(OUTPUT_DIR)
)

product_dimension = session.read.parquet(
    '{}/product_dimension.parquet'.format(OUTPUT_DIR)
)

product_related_product_dimension = session.read.parquet(
    '{}/product_related_product_dimension.parquet'.format(OUTPUT_DIR)
)

product_sales_rank_dimension = session.read.parquet(
    '{}/product_sales_rank_dimension.parquet'.format(OUTPUT_DIR)
)

product_sales_rank_by_category_dimension = session.read.parquet(
    '{}/product_sales_rank_by_category_dimension.parquet'.format(OUTPUT_DIR)
)

product_category_dimension = session.read.parquet(
    '{}/product_category_dimension.parquet'.format(OUTPUT_DIR)
)

user_review_dimension = session.read.parquet(
    '{}/user_review_dimension.parquet'.format(OUTPUT_DIR)
)

Run Quality Checks

In [28]:
dfs_to_check = {
    'product_review_record_fact': product_review_record_fact,
    'product_review_dimension': product_review_dimension,
    'product_dimension': product_dimension,
    'product_related_product_dimension': product_related_product_dimension,
    'product_sales_rank_dimension': product_sales_rank_dimension,
    'product_sales_rank_by_category_dimension': product_sales_rank_by_category_dimension,
    'product_category_dimension': product_category_dimension,
    'user_review_dimension': user_review_dimension,
}

for table, df in dfs_to_check.items():
    data_quality_check(table, df)
    
print('\nAll checks succeeded! :)')

Data quality check: 'product_review_record_fact' table: succeed (194817 rows)
Data quality check: 'product_review_dimension' table: succeed (5588 rows)
Data quality check: 'product_dimension' table: succeed (5588 rows)
Data quality check: 'product_related_product_dimension' table: succeed (438827 rows)
Data quality check: 'product_sales_rank_dimension' table: succeed (5463 rows)
Data quality check: 'product_sales_rank_by_category_dimension' table: succeed (5463 rows)
Data quality check: 'product_category_dimension' table: succeed (26733 rows)
Data quality check: 'user_review_dimension' table: succeed (169762 rows)

All checks succeeded! :)


Show schema for every table:

In [29]:
product_review_record_fact.printSchema()

root
 |-- user: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- review_year: integer (nullable = true)
 |-- review_month: integer (nullable = true)
 |-- review_day: integer (nullable = true)



In [30]:
product_review_dimension.printSchema()

root
 |-- asin: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- min_rating: long (nullable = true)
 |-- max_rating: long (nullable = true)
 |-- first_review_date: timestamp (nullable = true)
 |-- last_review_date: timestamp (nullable = true)



In [31]:
product_dimension.printSchema()

root
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- title: string (nullable = true)
 |-- price: double (nullable = true)



In [32]:
product_related_product_dimension.printSchema()

root
 |-- asin: string (nullable = true)
 |-- related_asin: string (nullable = true)
 |-- relation_type: string (nullable = true)



In [33]:
product_sales_rank_dimension.printSchema()

root
 |-- asin: string (nullable = true)
 |-- sales_rank: long (nullable = true)



In [34]:
product_sales_rank_by_category_dimension.printSchema()

root
 |-- asin: string (nullable = true)
 |-- sales_rank: long (nullable = true)
 |-- product_category: string (nullable = true)



In [35]:
product_category_dimension.printSchema()

root
 |-- asin: string (nullable = true)
 |-- product_category: string (nullable = true)



In [36]:
user_review_dimension.printSchema()

root
 |-- user: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- min_rating: long (nullable = true)
 |-- max_rating: long (nullable = true)
 |-- first_review_date: timestamp (nullable = true)
 |-- last_review_date: timestamp (nullable = true)



#### 4.3 Data dictionary

Table `product_review_record_fact`:

| column | type | description |
| --- | --- | --- |
| user | string | ID of the user who wrote the review |
| asin | string | **Amazon Standard Identification Number** (ASIN), a 10-character alphanumeric unique identifier assigned by [Amazon.com](amazon.com) and its partners for product identification within the Amazon organization |
| rating | long | The rating, from 1 to 5, that the user gave for the product (bigger is better)  |
| review_date | timestamp | The date the review was given |
| review_year | integer | The year the review was given  |
| review_month | integer | The month the review was given |
| review_day | integer | The day the review was given |

Table `product_review_dimension`:

| column | type | description |
| --- | --- | --- |
| asin | string | **Amazon Standard Identification Number** (ASIN), a 10-character alphanumeric unique identifier assigned by [Amazon.com](amazon.com) and its partners for product identification within the Amazon organization |
| review_count | long | The total of reviews a product received |
| avg_rating | double | The average of all ratings a product received |
| min_rating | long | The minimum of all ratings a product received |
| max_rating | long | The maximum of all ratings a product received |
| first_review_date | timestamp | The first date a product received a review |
| last_review_date | timestamp | The last date a product received a review |

Table `product_dimension`:

| column | type | description |
| --- | --- | --- |
| asin | string | **Amazon Standard Identification Number** (ASIN), a 10-character alphanumeric unique identifier assigned by [Amazon.com](amazon.com) and its partners for product identification within the Amazon organization |
| brand | string | The brand of the product |
| title | string | The title of the product |
| price | double | The price of the product |

Table `product_related_product_dimension`:

| column | type | description |
| --- | --- | --- |
| asin | string | **Amazon Standard Identification Number** (ASIN), a 10-character alphanumeric unique identifier assigned by [Amazon.com](amazon.com) and its partners for product identification within the Amazon organization |
| related_asin | string | The ASIN of an related product |
| relation_type | string | The type of the relation (also bought, also viewed, bought together, buy after viewing) |

Table `product_sales_rank_dimension`:

| column | type | description |
| --- | --- | --- |
| asin | string | **Amazon Standard Identification Number** (ASIN), a 10-character alphanumeric unique identifier assigned by [Amazon.com](amazon.com) and its partners for product identification within the Amazon organization |
| sales_rank | long | The best sales rank that the product has, considering all the categories in which it is categorized (lower is better) |

Table `product_sales_rank_by_category_dimension`:

| column | type | description |
| --- | --- | --- | 
| asin | string | **Amazon Standard Identification Number** (ASIN), a 10-character alphanumeric unique identifier assigned by [Amazon.com](amazon.com) and its partners for product identification within the Amazon organization |
| sales_rank | long | Product's sales rank (lower is better) |
| product_category | string | Category to which the sales rank refers |

Table `product_category_dimension`:

| column | type | description |
| --- | --- | --- |
| asin | string | **Amazon Standard Identification Number** (ASIN), a 10-character alphanumeric unique identifier assigned by [Amazon.com](amazon.com) and its partners for product identification within the Amazon organization |
| product_category | string | The product category |

Table `user_review_dimension`:

| column | type | description |
| --- | --- | --- |
| user | string | ID of the user who wrote the review |
| review_count | long | The total of reviews a user wrote |
| avg_rating | double | The average of all ratings a user wrote |
| min_rating | long | The minimum of all ratings a user wrote |
| max_rating | long | The maximum of all ratings a user wrote |
| first_review_date | timestamp | The first date a user wrote a review |
| last_review_date | timestamp | The last date a user wrote a review |

#### Step 5: Running queries on the built database

In this step, we will use the database previously built to run queries in order to serve an (theoretical) recommendation system, in addition to a (also theoretical) analytics system.

In [10]:
# Creating views to execute SQL queries on the built database

product_review_record_fact.createOrReplaceTempView('product_review_record_fact')
product_review_dimension.createOrReplaceTempView('product_review_dimension')
product_dimension.createOrReplaceTempView('product_dimension')
product_related_product_dimension.createOrReplaceTempView('product_related_product_dimension')
product_sales_rank_dimension.createOrReplaceTempView('product_sales_rank_dimension')
product_sales_rank_by_category_dimension.createOrReplaceTempView('product_sales_rank_by_category_dimension')
product_category_dimension.createOrReplaceTempView('product_category_dimension')
user_review_dimension.createOrReplaceTempView('user_review_dimension')

##### 5.1 Recommendation system queries:

5.1.1 Best selling related products:

Shows the best-selling related products for a given product.

In [11]:
# Define here the ASIN of the product that will be used as the basis for the recommendation in the query.
product_asin='0739045067'

session.sql(
    best_selling_product_related_product_query.format(product_asin)
).show()

+-------------+----------+----------+------+--------------------+-----+
|relation_type|sales_rank|      asin| brand|               title|price|
+-------------+----------+----------+------+--------------------+-----+
|  also_bought|     18688|0739057561|Alfred|Don Mock's Master...|15.95|
+-------------+----------+----------+------+--------------------+-----+



5.1.2 Best selling products in the same category:

Shows the best-selling products in the same category of a given product

In [12]:
# Define here the ASIN of the product that will be used as the basis for the recommendation in the query.
product_asin='0739045067'

session.sql(
    best_selling_product_same_category_query.format(product_asin)
).show()

+----------------+----------+----------+--------------------+--------------------+-----+
|product_category|sales_rank|      asin|               brand|               title|price|
+----------------+----------+----------+--------------------+--------------------+-----+
|     Movies & TV|      1368|0767803434|Columbia/Tristar ...|       Air Force One| 6.76|
|     Movies & TV|      1858|0783219806|UNIVERSAL STUDIOS...|  The Little Rascals| 9.55|
|     Movies & TV|      2740|0767836324|                Sony|                Hook|  9.0|
|     Movies & TV|      3517|0767836359|                Sony|A River Runs Thro...| 7.44|
|     Movies & TV|      4249|0767851013|                Sony|Little Women (Col...|14.97|
|     Movies & TV|      4545|0767827813|         FIELD,SALLY|    Murphy's Romance|58.89|
|     Movies & TV|      4902|0767802519|Columbia/Tristar ...|    The Professional| 7.39|
|     Movies & TV|      5080|0767839277|                Sony|The Adventures of...| 6.03|
|     Movies & TV|   

5.1.3 Best rated products in the same category:

Shows the best rated products in the same category of a given product

In [13]:
# Define here the ASIN of the product that will be used as the basis for the recommendation in the query.
product_asin='0739045067'

session.sql(
    best_rated_product_same_category_query.format(product_asin)
).show()

+----------+------------+----------+--------------------+--------------------+-----+
|avg_rating|review_count|      asin|               brand|               title|price|
+----------+------------+----------+--------------------+--------------------+-----+
|       5.0|          20|1573301310|                Sony|Herbert von Karaj...|19.08|
|       5.0|          19|B000000DE1|   Wea/Atlantic/Curb|          All I Want| 4.99|
|       5.0|          14|B0000008E9|    PARAGONS/JESTERS|Paragons Meet the...| 9.96|
|       5.0|          14|B00000I3YS|Educational Activ...|   Rhythms on Parade|12.21|
|       5.0|          13|B0000006VP|       AUGER/BALDWIN|Arleen Auger: Lov...|16.49|
|       5.0|          10|B000006SCQ|                 EMI|          This Child| 16.4|
|       5.0|           8|B0000002VF|        BOOKER,JAMES|New Orleans Piano...|15.57|
|       5.0|           8|B0000002ZU|        BOOKER,JAMES| Spiders on the Keys|15.78|
|       5.0|           6|B000000FV6|   Cd Baby.Com/Indys|Don't Le

5.1.4 Cheaper related products:

Shows the cheaper related products of a given product

In [14]:
# Define here the ASIN of the product that will be used as the basis for the recommendation in the query.
product_asin='0739045067'

session.sql(
    cheaper_related_product_query.format(product_asin)
).show()

+-------------+----------+--------+--------------------+-----+
|relation_type|      asin|   brand|               title|price|
+-------------+----------+--------+--------------------+-----+
|  also_bought|0739057561|  Alfred|Don Mock's Master...|15.95|
|  also_bought|0739044265|Creative|Pat Martino- Crea...|21.14|
+-------------+----------+--------+--------------------+-----+



5.1.5 Cheaper products in the same category:

Shows the cheaper products in the same category of a given product

In [15]:
# Define here the ASIN of the product that will be used as the basis for the recommendation in the query.
product_asin='0739045067'

session.sql(
    cheaper_product_same_category_query.format(product_asin)
).show()

+----------+--------------------+--------------------+-----+
|      asin|               brand|               title|price|
+----------+--------------------+--------------------+-----+
|6302353998|            VHS Tape|Passover at Bubbe...|  2.3|
|6304400225|              Disney|Fun and Fancy Fre...| 2.48|
|6304030576|DK - Dorling Kind...|See How They Grow...|  2.8|
|B000000GT3|         Tvt Records|At The Speed Of L...| 2.98|
|B00000DFRS|                Sony|My Love Is Your Love| 2.99|
|B00000063H|           CUBANISMO|             Malembe| 3.05|
|B00000IFNS|  Interscope Records|Ryde Or Die Compi...| 3.28|
|6303224199|CMV COLUMBIA MUSI...|Barbra - The Conc...| 3.39|
|B0000009P4|            RYKODISC|File Under: Easy ...| 3.69|
|B000000GS1|         Tvt Records|  Bliss by Birdbrain| 3.69|
|0849707692|                KJOS|KJOS Standard Of ...| 3.77|
|0307142493|Santa Claus Is Co...|Santa Claus Is Co...| 3.86|
|0849734339|                KJOS|KJOS Bach And Bef...| 3.95|
|B0000005GG|           H

##### 5.2 Analytics system queries:

5.2.1 Best rated products:

Shows the best rated products

In [16]:
session.sql(best_rated_product_query).show()

+----------+--------------------+--------------------+-----+----------+------------+----------+----------+----------+-------------------+-------------------+
|      asin|               brand|               title|price|      asin|review_count|avg_rating|min_rating|max_rating|  first_review_date|   last_review_date|
+----------+--------------------+--------------------+-----+----------+------------+----------+----------+----------+-------------------+-------------------+
|B00004OCMZ|                 OXO|OXO Good Grips 1....| 7.19|B00004OCMZ|          20|       5.0|         5|         5|2000-06-19 00:00:00|2004-08-25 00:00:00|
|1573301310|                Sony|Herbert von Karaj...|19.08|1573301310|          20|       5.0|         5|         5|1998-12-28 00:00:00|2014-03-28 00:00:00|
|B000000DE1|   Wea/Atlantic/Curb|          All I Want| 4.99|B000000DE1|          19|       5.0|         5|         5|1999-01-31 00:00:00|2014-03-17 00:00:00|
|B00004OCOR|                 OXO|OXO Good Grips St..

5.2.2 Worst rated products:

Shows the worst rated products

In [17]:
session.sql(worst_rated_product_query).show()

+----------+--------------------+--------------------+-----+----------+------------+----------+----------+----------+-------------------+-------------------+
|      asin|               brand|               title|price|      asin|review_count|avg_rating|min_rating|max_rating|  first_review_date|   last_review_date|
+----------+--------------------+--------------------+-----+----------+------------+----------+----------+----------+-------------------+-------------------+
|B00002N8M7|            Raindrip|Rain Drip R682CP ...|51.99|B00002N8M7|          13|       1.0|         1|         1|2007-08-04 00:00:00|2013-11-18 00:00:00|
|6305898871|MUSIC VIDEO DISTR...|I Want My DVD - V...| 8.82|6305898871|           4|       1.0|         1|         1|2000-08-03 00:00:00|2001-05-07 00:00:00|
|1412719410|  EDUCATORS RESOURCE|My First Story Re...|32.41|1412719410|           4|       1.0|         1|         1|2012-07-30 00:00:00|2013-09-13 00:00:00|
|1453020004|              Disney|Pocket Size Disne..

5.2.3 Most rated products:

Shows the most rated products (products that received the highest number of reviews)

In [18]:
session.sql(most_rated_product_query).show()

+----------+--------------------+--------------------+------+----------+------------+----------+----------+----------+-------------------+-------------------+
|      asin|               brand|               title| price|      asin|review_count|avg_rating|min_rating|max_rating|  first_review_date|   last_review_date|
+----------+--------------------+--------------------+------+----------+------------+----------+----------+----------+-------------------+-------------------+
|8499000606|                Syma|Syma S107/S107G R...|  17.1|8499000606|        2938|      4.18|         1|         5|2010-03-02 00:00:00|2014-07-23 00:00:00|
|B00004R9VV|            Flowtron|Flowtron BK-80D 8...|  76.0|B00004R9VV|        2091|      4.13|         1|         5|2000-06-13 00:00:00|2014-07-16 00:00:00|
|7245456313|      Black Mountain|Black Mountain Pr...| 32.99|7245456313|        2033|      4.63|         1|         5|2010-09-02 00:00:00|2014-07-20 00:00:00|
|B00002X29G|             Stanley|Stanley 66-03

5.2.4 Best rated products by category:

Shows the best rated products for each category

In [19]:
session.sql(best_rated_product_by_category_query).show()

+--------------------+----------+--------------------+--------------------+-----+------------+----------+----------+----------+-------------------+-------------------+
|    product_category|      asin|               brand|               title|price|review_count|avg_rating|min_rating|max_rating|  first_review_date|   last_review_date|
+--------------------+----------+--------------------+--------------------+-----+------------+----------+----------+----------+-------------------+-------------------+
|         3-D Puzzles|B00000GC6B|             Wrebbit|3D Grandfather Cl...|99.95|           6|       5.0|         5|         5|2009-11-04 00:00:00|2013-10-07 00:00:00|
|                  9V|B00003IE4E|           Energizer|Energizer 9V Alka...| 6.98|          21|      4.19|         1|         5|2000-07-02 00:00:00|2014-07-10 00:00:00|
|                  AA|B00003IEME|           Energizer|Energizer AA Lith...| 6.09|         836|      4.45|         1|         5|2003-11-22 00:00:00|2014-07-22 00

5.2.5 Best selling products:

Shows the best selling products

In [20]:
session.sql(best_selling_product_query).show()

+----------+----------+------------------+--------------------+-----+
|sales_rank|      asin|             brand|               title|price|
+----------+----------+------------------+--------------------+-----+
|         4|B00004R9TL|Black &amp; Decker|Black &amp; Decke...|18.72|
|         8|B00004R9VZ|          Flowtron|Flowtron BK-15D E...|36.38|
|        10|B00000JIVS|              LEGO|LEGO Green Buildi...| 5.99|
|        15|7245456313|    Black Mountain|Black Mountain Pr...|32.99|
|        23|B00000IWIT|          Play-Doh|Play-Doh: Case of...| 5.64|
|        24|B00004R9VW|          Flowtron|Flowtron BK-40D E...|47.99|
|        25|B00000K3BR|      Stomp Rocket|Stomp Rocket Jr. ...| 12.0|
|        32|B00004RAGL|              Apex|Apex REM 15 15-Fo...| 6.49|
|        49|B00002N66H|           Fiskars|Fiskars Tradition...| 9.22|
|        63|0975277324|    Days of Wonder|      Ticket To Ride|29.95|
|        71|B00004RBDU|            Victor|Victor M231 Ultim...| 6.99|
|        83|B00004RB

5.2.6 Worst selling products:

Shows the worst selling products

In [21]:
session.sql(worst_selling_product_query).show()

+----------+----------+-------------------+--------------------+------+
|sales_rank|      asin|              brand|               title| price|
+----------+----------+-------------------+--------------------+------+
|   2745586|1574892193|Cavallini &amp; Co.|Cavallini Gigante...| 49.95|
|   2660606|7168258958|      Elite Mailers|Elite Mailers S25...| 31.49|
|   2492608|B00000JN49|           Hoberman|Hoberman Large Ex...| 34.99|
|   2177560|9178900484|               Ikea|IKEA - INGOLF Jun...|111.98|
|   2151535|9539752574|            Flag It|Air Force Heavy D...|  2.95|
|   2083565|9178907764|               IKEA|IKEA - GRUNDTAL T...| 19.95|
|   2027880|9178896703|               Ikea|IKEA - RATIONELL ...| 19.79|
|   1861711|9178882400|               Ikea|Sm&aring;land B&o...| 59.99|
|   1767529|8963293599|        Hello Kitty|Hello kitty Lovel...| 22.99|
|   1707155|0587168978|         Buyenlarge|&quot;Daybreak&qu...| 26.72|
|   1685419|9875972428|            SanDisk|Professional Ultr...|

5.2.7 Best selling products by category:

Shows the best selling products for each category

In [22]:
session.sql(best_selling_product_by_category_query).show()

+--------------------+----------+----------+--------------------+--------------------+-----+
|    product_category|sales_rank|      asin|               brand|               title|price|
+--------------------+----------+----------+--------------------+--------------------+-----+
|         3-D Puzzles|     22939|0911121056|      Creative Whack|Creative Whacks 6...|23.81|
|                  AA|      3329|B00003IEME|           Energizer|Energizer AA Lith...| 6.09|
|                 AAA|     13433|B00000JHQ2|            Duracell|Duracell Batterie...| 4.99|
|         Accessories|        15|7245456313|      Black Mountain|Black Mountain Pr...|32.99|
|Accessories & Sup...|      1152|B00000JBHP|              Yamaha|Yamaha RH1C Porta...| 9.86|
|      Accessory Kits|    146203|986229874X|                Zizo|Samsung Galaxy Ri...| 1.55|
|           Acid Jazz|    381966|B000000GR4|         Tvt Records|               Babel|11.01|
|Acoustic & Classi...|      4093|B00002MZ8R|           First Act|First

5.2.8 User review count:

Shows users according to the number of reviews they have made.

In [23]:
session.sql(user_review_count_query).show()

+--------------+------------+----------+----------+----------+-------------------+-------------------+
|          user|review_count|avg_rating|min_rating|max_rating|  first_review_date|   last_review_date|
+--------------+------------+----------+----------+----------+-------------------+-------------------+
| A9Q28YTLYREO7|          77|      3.34|         1|         5|1999-11-25 00:00:00|2013-10-02 00:00:00|
| ANCOMAI0I7LVG|          46|      3.65|         1|         5|2008-01-31 00:00:00|2014-06-12 00:00:00|
|A1J5KCZC8CMW9I|          46|      3.91|         2|         5|2002-07-14 00:00:00|2011-02-14 00:00:00|
|A3LZGLA88K0LA0|          44|      3.91|         2|         5|2000-02-19 00:00:00|2013-01-10 00:00:00|
| AJGU56YG8G1DQ|          43|      4.98|         4|         5|2011-03-31 00:00:00|2012-01-05 00:00:00|
|A1GN8UJIZLCA59|          41|      4.56|         2|         5|2000-07-19 00:00:00|2012-04-26 00:00:00|
|A3HU0B9XUEVHIM|          38|       4.5|         3|         5|2006-01-16 

5.2.9 Best rated product in interval:

Shows the products that received the most ratings from the specified interval to the "present" day.

*As the database only has data until 2014, this will be considered the "present day"

In [24]:
year=2014
month=4

session.sql(
    best_rated_product_in_interval_query.format(year, month)
).show()

+----------+------------+----------+----------+--------------------+--------------------+-----+
|      asin|review_count|avg_rating|      asin|               brand|               title|price|
+----------+------------+----------+----------+--------------------+--------------------+-----+
|B00000IV34|          20|       5.0|B00000IV34|     SET Enterprises|Set The Family Ga...| 9.45|
|B00000ADG2|          18|       5.0|B00000ADG2|         HILL,LAURYN|The Miseducation ...| 6.72|
|B0000003IO|          17|       5.0|B0000003IO|               RAFFI|Raffi The Singabl...|20.61|
|0767812182|          12|       5.0|0767812182|                Sony|        Thunderheart|16.99|
|B000002KD5|          11|       5.0|B000002KD5|     Warner Brothers|       Fleetwood Mac| 7.99|
|6305781427|          11|       5.0|6305781427|               YANNI|Yanni - Live at t...|12.11|
|158994030X|          11|       5.0|158994030X|Fantasy Flight Games|            Citadels|19.38|
|6302598834|          10|       5.0|6302

#### Step 6: Complete Project Write Up

- Clearly state the rationale for the choice of tools and technologies for the project:

  - Apache Spark was used because:
  
      - Is a widely spread tool;
      - Has a lot of documentation, it can be easily integrated with Python through PySpark;
      - Has a lot of support from the community;
      - Has the ability to handle very large files, in different formats;
      - Offers an easy-to-work API, yet compatible with SQL;
      - Enables fast, distributed processing for Big Data.


- Propose how often the data should be updated and why:

    - In theory, if this pipeline was running within Amazon, for example, new data would be generated at all times, such as when a user did a review. So, the closer to real-time these data were updated, the better (near-real-time).


- Write a description of how you would approach the problem differently under the following scenarios:

 - The data was increased by 100x:
     - Spark can handle this scenario easily;
     - Obviously, if this project were taken into production, it should run in a cluster, and not a single machine, as it is running here;
     - At first, we can just add more nodes to our cluster to improve the processing capacity;
     - After, we can adjust our pipeline to be more performatic, like, for example, improving the partition of our data according to the queries that we need to execute;
     - We can also increase the pre-processing of some of our tables/views, pre-aggregating data, even if it will spend more storage, to speed up the processing of queries.


 - The data populates a dashboard that must be updated on a daily basis by 7am every day:
 
     - We can use Apache Airflow to schedule and run this pipeline.


 - The database needed to be accessed by 100+ people:
 
     - In this case, we can move our tables/views to Amazon Redshift or Google BigQuery, for example, which provide faster access.