### *Capstone Project for Udacity Data Engineering Nanodegree*

# Book Reviews Data Lake

___

# Project Summary

This project goals to build a **Book Reviews Data Lake** containing data on books from across the world, along with Amazon reviews about them. For this purpose, we are going to use data sources provided by [Book Depository](https://www.bookdepository.com/) and [Amazon](https://www.amazon.com/).

The project follows these steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Installs and imports

In [None]:
!pip install pandas

In [1]:
import os
import configparser
import pandas as pd
import logging
from pyspark.sql import SparkSession
import pyspark.sql.types as t
import pyspark.sql.functions as f

pd.set_option('display.max_columns', 500)

## Configuration file

Open the configuration file `btl.cfg` and set your AWS credentials (`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`). The AWS User needs to have at least the **AmazonS3FullAccess** permission policy attached.

Then, set the INPUT parameters `AMAZON_REVIEWS` and `BOOK_DEPOSITORY` with the path from where those datasets will be loaded. 

*To ease the review of this Capstone Project, the files of the Book Depository Dataset were already downloaded from Kaggle and uploaded to a S3 Bucket, and the parameters `AMAZON_REVIEWS` and `BOOK_DEPOSITORY` are already set to the paths of the S3 buckets containing the datasets.*

At last, set the `OUTPUT_DATA` parameter with the path where the Data Lake will be hosted.

```bash
[AWS]
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

[INPUT]
AMAZON_REVIEWS=
BOOK_DEPOSITORY=

[OUTPUT]
DATA_LAKE_PATH=
```

Reading the configuration file `bdl.cfg`:

In [2]:
config = configparser.ConfigParser()
config.read('bdl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

amazon_path = config['INPUT']['AMAZON_REVIEWS'] 
book_depository_path = config['INPUT']['BOOK_DEPOSITORY']
    
data_lake_path = config['OUTPUT']['DATA_LAKE_PATH']

## SparkSession

Creating the SparkSession:

In [3]:
spark = SparkSession \
    .builder \
    .appName("Books Data Lake") \
    .config("fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID'])\
    .config("fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY'])\
    .config("spark.sql.csv.parser.columnPruning.enabled", False) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.1") \
    .getOrCreate() 

# Step 1: Scope the Project and Gather Data

## 1.1 Scope

[Book Depository](https://www.bookdepository.com/) is a leading international book retailer that has a wide book catalog and delivers worldwide. We are going to use a dataset of books metadata scraped from their site and made available on [Kaggle](https://www.kaggle.com/datasets). 

[Amazon](https://www.amazon.com/) has 20 years of online bookselling experience. Having always maintained high emphasis on customer satisfaction, the reliability of its Customer Reviews is one of the main strengths of the company. The [Amazon Customer Reviews Dataset](https://registry.opendata.aws/amazon-reviews/) is maintained by Amazon itself, and contains product reviews in multiple languages and product categories, including books. However, it focus exclusively on the reviews, and does not contain specific books metadata, like authors, edition and others. 

This project joins both datasets through the books International Standard Book Number (ISBN) producing a **Book Reviews Data Lake** hosted on S3, aimed to provide data for analytical purposes. We are going to use [Spark](https://spark.apache.org/) to build a data pipeline that consumes raw books metadata and Amazon product reviews from S3 buckets, processes the data into analytics tables using Spark in an EMR cluster, and writes those tables as parquet files into the S3 bucket that will host our data lake.

![Architecture Diagram](images/architecture.png)

## 1.2 Use Cases

Some intended use cases for this Data Lake:
* Sentiment analysis on book reviews;
* Understanding how people rate usefulness of a review of a book, and what factors influence helpfulness of a review;
* Feeding data for a book recommender system.

## 1.3 Data Sources

### *1.3.1. Book Depository Dataset*

This dataset is a large collection of books metadata, like title, description, authors, category and others. It was scraped from [Book Depository](https://www.bookdepository.com/), a leading international book retailer. 
 
 The following list describes each file in the dataset:
* `authors.csv` (**703,666 rows**): author names, matching the `authors` column from `dataset.csv`; a single book in the dataset may have more than one author.
* `categories.csv` (**2,767 rows**): category names, matching the `categories` column from `dataset.csv`; a single book in the dataset may have more than one category.
* `formats.csv` (**59 rows**): format names, matching the `format` column from `dataset.csv`.
* `places.csv` (**7,410 rows**): publication places (city and country), matching the `publication-place` column from `dataset.csv`.
* `dataset.csv` (**1,153,986 rows**): the actual dataset containing the catalog of books.

Source: https://www.kaggle.com/sp1thas/book-depository-dataset


### *1.3.2. Amazon Customer Reviews Dataset*

This dataset is managed by [Amazon](https://www.amazon.com/) and contains over 130+ million customer reviews written from 1995 until 2015. For this project, we are interested only in the reviews for products of the category "Books". 

This dataset is available in two file formats: TSV (tab separated value) and Parquet. Since Parquet is more efficient and is already partitioned by `product_category`, which makes it more convenient to use since we will be reading data specific to the "Books" category, we will be using only the Parquet files where `product_category = Books` fot this project. 

Number of rows in the category "Books" of this dataset: **20,726,160**

Source: https://registry.opendata.aws/amazon-reviews/

## 1.4. Reading the Data

In [4]:
authors_staging_df = spark.read.csv("{}/authors.csv".format(book_depository_path),
                                    quote='\"',
                                    escape='\"',
                                    sep=",",
                                    header=True, 
                                    inferSchema= True,
                                    multiLine=True)

In [10]:
categories_staging_df = spark.read.csv("{}/categories.csv".format(book_depository_path),
                                       quote='\"',
                                       escape='\"',
                                       sep=",",
                                       header=True, 
                                       inferSchema= True,
                                       multiLine=True)

In [16]:
formats_staging_df = spark.read.csv("{}/formats.csv".format(book_depository_path),
                                    quote='\"',
                                    escape='\"',
                                    sep=",",
                                    header=True, 
                                    inferSchema= True,
                                    multiLine=True)

In [22]:
places_staging_df = spark.read.csv("{}/places.csv".format(book_depository_path),
                                   quote='\"',
                                   escape='\"',
                                   sep=",",
                                   header=True, 
                                   inferSchema= True,
                                   multiLine=True)

In [28]:
books_staging_df = spark.read.csv("{}/dataset.csv".format(book_depository_path),
                                  quote='\"',
                                  escape='\"',
                                  sep=",",
                                  header=True, 
                                  inferSchema= True,
                                  multiLine=True)

In [34]:
amazon_staging_df = spark.read.parquet(amazon_path)

# Step 2: Explore and Assess the Data

In this step, we are going to explore the Spark DataFrames created in the previous section, in order to identify data quality issues and document steps necessary to clean the data.

### *2.1. Authors*

In [5]:
authors_staging_df.printSchema()

root
 |-- author_id: integer (nullable = true)
 |-- author_name: string (nullable = true)



In [6]:
authors_staging_df.limit(50).toPandas().head(50)

Unnamed: 0,author_id,author_name
0,4872,
1,374796,!Mediengruppe Bitnik
2,295624,!ralupop
3,172310,# House Press
4,59149,#1 Blank Comic Books
5,100888,#Value!
6,287590,#Worldlcass Media
7,552132,#emergingproud Press
8,679802,#shakeback Publishing
9,28924,#vettech Life


In [7]:
authors_staging_df.describe("author_id", "author_name").show()

+-------+-----------------+--------------------+
|summary|        author_id|         author_name|
+-------+-----------------+--------------------+
|  count|           703666|              703665|
|   mean|         351833.5|   8.151614933835E11|
| stddev|203131.0215973056|2.823802163249373E12|
|    min|                1|!Mediengruppe Bitnik|
|    max|           703666|              張洪聲|
+-------+-----------------+--------------------+



Checking the number of authors in the dataset:

In [8]:
authors_staging_df.count()

703666

Checking the number of authors in the dataset *removing duplicates*:

In [9]:
authors_staging_df.dropDuplicates().count()

703666

**Assessment:**

From the above assessment, we can see that the `author_id` column is clean, all values are numeric. 

Some `author_name` begin with non readable characters, but since it is not possible to identify which ones are valid authors an which are not, we are going to load them as they are, removing only the null values.

We can see also that there are no duplicated rows in the file.

### *2.2. Categories*

In [11]:
categories_staging_df.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)



In [12]:
categories_staging_df.limit(20).toPandas().head(20)

Unnamed: 0,category_id,category_name
0,1998,.Net Programming
1,176,20th Century & Contemporary Classical Music
2,3291,20th Century & Contemporary Classical Music
3,2659,20th Century History: C 1900 To C 2000
4,2661,21st Century History: From C 2000 -
5,1992,2D Graphics: Games Programming
6,1943,3D Graphics & Modelling
7,1993,3D Graphics: Games Programming
8,2472,ABC
9,768,Abnormal Psychology


In [13]:
categories_staging_df.describe("category_id", "category_name").show()

+-------+------------------+----------------+
|summary|       category_id|   category_name|
+-------+------------------+----------------+
|  count|              2767|            2767|
|   mean|1709.2696060715577|            null|
| stddev|1004.8571823557746|            null|
|    min|                 2|.Net Programming|
|    max|              3390|  Zoroastrianism|
+-------+------------------+----------------+



Checking the number of book categories in the dataset:

In [14]:
categories_staging_df.count()

2767

Checking the number of book categories in the dataset *removing duplicates*:

In [15]:
categories_staging_df.dropDuplicates().count()

2767

**Assessment:**

The categories data looks clean. We are going to load them as they are. We can see also that there are no duplicated rows in the file.

### *2.3. Formats*

In [17]:
formats_staging_df.printSchema()

root
 |-- format_id: integer (nullable = true)
 |-- format_name: string (nullable = true)



In [18]:
formats_staging_df.limit(20).toPandas().head(20)

Unnamed: 0,format_id,format_name
0,17,Address
1,12,Audio
2,50,Bath
3,53,Betamax
4,51,Big
5,26,Board
6,13,Book
7,41,Boxed
8,7,CD
9,4,Calendar


In [19]:
formats_staging_df.describe("format_id", "format_name").show()

+-------+------------------+-----------+
|summary|         format_id|format_name|
+-------+------------------+-----------+
|  count|                59|         59|
|   mean|              30.0|       null|
| stddev|17.175564037317667|       null|
|    min|                 1|    Address|
|    max|                59|  Wallchart|
+-------+------------------+-----------+



Checking the number of book formats in the dataset:

In [20]:
formats_staging_df.count()

59

Checking the number of book formats in the dataset *removing duplicates*:

In [21]:
formats_staging_df.dropDuplicates().count()

59

**Assessment:**

The formats data also looks clean, and will be loaded without changes. We can see also that there are no duplicated rows in the file.

### *2.4. Publication Places*

In [23]:
places_staging_df.printSchema()

root
 |-- place_id: integer (nullable = true)
 |-- place_name: string (nullable = true)



In [24]:
places_staging_df.limit(50).toPandas().head(50)

Unnamed: 0,place_id,place_name
0,5996,"(56 Bourne Way, Hayes, Kent), United Kingdom"
1,619,"), United Kingdom"
2,4307,"0, United States"
3,3488,"189, United States"
4,7326,192
5,4601,"204, United States"
6,4602,"205, United States"
7,5234,206
8,4600,"206, United States"
9,5233,"216, United States"


In [25]:
places_staging_df.describe("place_id", "place_name").show()

+-------+------------------+--------------------+
|summary|          place_id|          place_name|
+-------+------------------+--------------------+
|  count|              7410|                7410|
|   mean|            3705.5|  218.66666666666666|
| stddev|2139.2270800455003|    34.7754702819866|
|    min|                 1|(56 Bourne Way, H...|
|    max|              7410|Москва, United St...|
+-------+------------------+--------------------+



Checking the number of publication places in the dataset:

In [26]:
places_staging_df.count()

7410

Checking the number of publication places in the dataset *removing duplicates*:

In [27]:
places_staging_df.dropDuplicates().count()

7410

**Assessment:**

The `place_name` column has some issues, like numeric characters at the begining of the string. To clean it, we will make some replacements during the ETL.

We can see that there are no duplicated rows in the file.

### *2.5. Books*

In [29]:
books_staging_df.printSchema()

root
 |-- authors: string (nullable = true)
 |-- bestsellers-rank: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- description: string (nullable = true)
 |-- dimension-x: double (nullable = true)
 |-- dimension-y: double (nullable = true)
 |-- dimension-z: double (nullable = true)
 |-- edition: string (nullable = true)
 |-- edition-statement: string (nullable = true)
 |-- for-ages: string (nullable = true)
 |-- format: string (nullable = true)
 |-- id: long (nullable = true)
 |-- illustrations-note: string (nullable = true)
 |-- imprint: string (nullable = true)
 |-- index-date: timestamp (nullable = true)
 |-- isbn10: string (nullable = true)
 |-- isbn13: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- publication-date: timestamp (nullable = true)
 |-- publication-place: integer (nullable = true)
 |-- rating-avg: double (nullable = true)
 |-- rating-count: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable 

In [30]:
books_staging_df.limit(50).toPandas().head(50)

Unnamed: 0,authors,bestsellers-rank,categories,description,dimension-x,dimension-y,dimension-z,edition,edition-statement,for-ages,format,id,illustrations-note,imprint,index-date,isbn10,isbn13,lang,publication-date,publication-place,rating-avg,rating-count,title,url,weight
0,"[1, 2]",2896.0,"[235, 3386, 736, 763]",________________________________,129.0,198.0,23.0,,,,1,9781787465145,,ARROW BOOKS LTD,2020-01-15 10:00:55,1787465144,9781787000000.0,en,2019-10-17,1.0,4.04,3784.0,Journey Into Darkness,/Journey-Into-Darkness-John-Douglas/9781787465145,266.0
1,[3],51517.0,"[360, 2632]",A fake marriage is the last thing he wants . ....,108.0,178.0,13.0,,,,1,9781946510402,,,2020-01-15 10:01:06,1946510408,9781947000000.0,en,2019-09-17,,4.49,211.0,The Mercenary : Order of the Broken Blade,/Mercenary-Cecelia-Mecca/9781946510402,182.0
2,[4],162985.0,"[1703, 2771, 2818, 3097]","Once the dust has settled, you'll need to know...",127.0,178.0,17.78,,,,1,9781612124568,,,2020-01-15 10:01:15,1612124569,9781612000000.0,en,2015-05-05,2.0,2.96,139.0,100 Skills You'll Need for the End of the Worl...,/100-Skills-Youll-Need-for-End-World-We-Know-I...,226.8
3,[5],41520.0,"[819, 3364, 1853, 2977]",The Daily Mail and the Spectator Book of the Y...,140.0,201.0,14.0,,,,2,9781786487155,,,2020-01-15 10:01:26,1786487152,9781786000000.0,en,2017-09-21,1.0,4.21,158.0,How to Land a Plane,/How-Land-Plane-Mark-Vanhoenacker/9781786487155,206.0
4,[6],3885.0,"[1694, 1703, 2818]","Easy, do-able, down to earth ideas and suggest...",153.0,234.0,35.0,,,,1,9781472969125,,Green Tree,2020-01-15 10:01:36,147296912X,9781473000000.0,en,2020-03-10,1.0,,,The Sustainable(ish) Living Guide : Everything...,/Sustainable-ish-Living-Guide-Jen-Gale/9781472...,458.0
5,[7],75224.0,"[1843, 2967, 2969]",Mini celebrates 60 amazing years of this iconi...,216.0,254.0,25.4,,,,2,9780760363997,200 color & b-w photos,,2020-01-15 10:01:48,760363994,9780760000000.0,en,2019-05-09,3.0,4.75,4.0,Mini : 60 Years,/Mini-Giles-Chapman/9780760363997,975.22
6,"[8, 9]",368836.0,"[1520, 2771, 3019, 3074, 3078]","Dynamics, motion, and sensation are karate's c...",153.0,229.0,15.0,,,,1,9781594394591,black & white illustrations,,2020-01-15 10:01:57,1594394598,9781594000000.0,en,2017-05-01,4.0,4.48,27.0,Karate Science : Dynamic Movement,/Karate-Science-J-D-Swanson/9781594394591,408.0
7,[10],108369.0,"[1853, 2977, 3023]",Brian Cosgrove's classic introduction to the w...,189.0,246.0,10.16,Revised,8th edition,,1,9781847975096,Colour illustrations,,2020-01-15 10:02:08,1847975097,9781848000000.0,en,2013-11-01,5.0,4.38,26.0,Microlight Pilot's Handbook,/Microlight-Pilots-Handbook-Brian-Cosgrove/978...,453.59
8,"[11, 12]",49553.0,"[649, 2771, 3019, 3074]",BECAUSE NOT ALL KRAV MAGA IS THE SAME(R) This ...,184.0,238.0,22.86,,,,1,9781594396816,"727 Halftones, black and white",,2020-01-15 10:02:19,1594396817,9781594000000.0,en,2019-08-07,4.0,4.0,1.0,Krav Maga Combatives : Maximum Effect,/Krav-Maga-Combatives-David-Kahn/9781594396816,703.07
9,"[13, 14]",128070.0,"[1725, 1844, 2969, 2973]","Haynes offers the best coverage for cars, truc...",212.0,272.0,18.0,Revised,3rd Revised edition,,1,9781563924699,black & white illustrations,HAYNES MANUALS INC,2020-01-15 10:02:29,9781563924699,,en,2002-12-02,6.0,3.92,13.0,"Dodge Caravan, Plymouth Voyager & Chrysler Tow...",/Dodge-Caravan-Plymouth-Voyager-Chrysler-Town-...,497.0


In [31]:
books_staging_df.describe().toPandas().head()

Unnamed: 0,summary,authors,bestsellers-rank,categories,description,dimension-x,dimension-y,dimension-z,edition,edition-statement,for-ages,format,id,illustrations-note,imprint,isbn10,isbn13,lang,publication-place,rating-avg,rating-count,title,url,weight
0,count,1153986,461288.0,1153986,1042082,1101652.0,1051356.0,1101652.0,123180,306720,55344,1147715,1153986.0,394403,228849,1153986,1138650.0,1078206,737199.0,486376.0,486376.0,1153986,1153986,1059685.0
1,mean,,1073662.3132837743,,1014.1333333333333,163.5892014538665,232.09219199780216,25.5515790013423,,1648.2755432269996,,2.0372931410631616,9781778054683.115,208.5388974983396,9543261.0,1.4448580669936057E11,9781784246509.844,,337.1121868043772,3.951495016201626,5177.365287349705,Infinity,,475.52386219496674
2,stddev,,842385.808938867,,975.7049368582306,36.65227604035362,39.53291989305113,50.42905419090038,,2366.4170096067755,,3.315326294695639,1770055928.9798207,594.1026705181978,,1.1726851144641418E12,1773877696.0178776,,796.9126703460335,0.622674740999877,76965.97854149972,,,716.884990113177
3,min,"[1, 2]",1000130.0,"[10, 103, 2902, 2954]",! THIS BOOK IS INCLUDED IN ITALIAN CUISINE COO...,0.25,1.0,0.1,Abridged,#5 of Series ed.,0-10,1,9780000005199.0,"""",12...,000017758X,9780000005199.0,ab,1.0,1.0,1.0,! : New Soups dieting,/-1044-1072-1088-1100-1103-1053-1077-1089-1090...,15.0
4,max,[],999997.0,[],����★☆ What Is Stopping You From Making Money ...,3962.0,2000.0,2000.0,Variorum,überarbeitete Neuauflage,9-12,Wallchart\n |,9798389450189.0,über 80 Abb.,united p. c.,9999991319,9798389450189.0,zxx,7410.0,5.0,6473716.0,ï¿½tudes Et Lectures Sur l'Astronomie. Tome 4,/zzSongs-Verse-Dahl-Roald/9780141356136,82522.0


Checking the number of books in the dataset:

In [32]:
books_staging_df.count()

1153986

Checking the number of books in the dataset *removing duplicates*:

In [33]:
books_staging_df.dropDuplicates().count()

1153986

**Assessment:**

The Books table has a many-to-many relationship with the entities `authors` and `categories`. In the dataset, these relationships is represented through lists of IDs (e.g. "[235, 3386, 736, 763]"). Because of that, we will create two aditional tables to serve as many-to-many bridges between Books-Author and Books-Category.

We can see also that there are no duplicated rows in the Books file.

### *2.6. Amazon Reviews*

In [35]:
amazon_staging_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)



In [36]:
amazon_staging_df.limit(20).toPandas().head(20)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,year
0,US,15444933,R1WWG70WK9VUCH,1848192576,835940987,Standing Qigong for Health and Martial Arts - ...,5,9,10,N,Y,Informative AND interesting!,"After attending a few Qigong classes, I wanted...",2015-05-02,2015
1,US,20595117,R1EQ3POS0RIOD5,145162445X,574044348,A Universe from Nothing: Why There Is Somethin...,4,4,7,N,N,Between 'Nothing' and 'Nothing' somehow we are...,Krauss traces the remarkable transformation in...,2012-06-29,2012
2,US,52925878,R10SRJA4VVGUBD,055341805X,846590203,Hyacinth Girls: A Novel,4,0,0,Y,N,Mysteries upon mysteries,"Rebecca, a dental hygienist, receives a call a...",2015-05-02,2015
3,US,40062567,RD3268X41GM7U,0425263908,119148606,Bared to You,5,1,1,N,N,"""RAW, STEAMY, HYPNOTIC!""","\\""BARED TO YOU\\"" is a sizzling, red-hot pass...",2012-06-29,2012
4,US,47221489,R3KGQL5X5BSJE1,1416556141,987400385,Healer: A Novel,5,0,0,N,Y,Well written story,Good characters and plot line. I spent a pleas...,2015-05-02,2015
5,US,20709973,R3IY9A2Z7N2R49,1455523003,181218559,The Missionary Position: Mother Teresa in Theo...,4,5,7,N,Y,"Eye opening, yet somehow not the whole story...",This book was fascinating. It started with the...,2012-06-29,2012
6,US,18052694,R12PGYPH9B9UVJ,0990388697,840404589,"I'm Tired of Being Ordinary, Are You?",4,1,1,N,Y,JOY,A personal world view by an original and creat...,2015-05-02,2015
7,US,48279565,R2CA9LLJBEORNZ,0345803485,600633062,Fifty Shades of Grey: Book One of the Fifty Sh...,1,7,7,N,N,This book is Twilight with worse characters,I bought this book because everyone was talkin...,2012-06-29,2012
8,US,14297627,R2C8BZMS1JRZ04,1469245167,466321651,The Thrill of Victory,4,0,0,N,Y,Four Stars,Sandra Brown is always a great read.,2015-05-02,2015
9,US,37340439,R2F8UI9NNP6S33,0345803485,600633062,Fifty Shades of Grey: Book One of the Fifty Sh...,5,0,9,N,N,Fifty shades of grey,Loved this book! People kept telling me to rea...,2012-06-29,2012


Checking the number of reviews in the dataset:

In [37]:
amazon_staging_df.count()

20726160

Checking the number of reviews in the dataset *removing duplicates*:

In [38]:
amazon_staging_df.dropDuplicates().count()

19887359

Checking for reviews in invalid (future) dates:

In [39]:
invalid_reviews = amazon_staging_df.filter(f.col("review_date") > f.unix_timestamp(timestamp=None,format='yyyy-MM-dd HH:mm:ss').cast('timestamp'))

invalid_reviews.count()

0

**Assessment:**

* This dataset **has duplicated rows** that will have to be eliminated in the ETL. 

* There are no rows with `review_date` greater than the current timestamp.

# Step 3: Define the Data Model

The Data Lake built by this project is meant to be used mainly for analytic queries (see section *1.2 Use Cases*). Therefore, the schema chosen for the data model was the **snowflake schema**. 

The fact table was defined to be the table containing the Amazon reviews. The dimension tables defined for the schema are the **dates** when the reviews were written and the **books** metadata, which in turn has its own related tables: **authors**, **categories**, **formats** and **publication places**.

Also, since the books table has a many-to-many relationship with the entities authors and categories, two aditional tables were defined to serve as bridges between them: **books-authors** and **books-categories**.

After exploring the datasets, the resulting Conceptual Data Model created was this:

![Data Model](images/data_model.png)

# Step 4: Run Pipelines to Model the Data 

## 4.1 Create the data model

In this step, the data will be cleaned and loaded into the tables, creating the data model defined in the previous section.

### *4.1.1. Authors Table*

In [40]:
# Registering the DataFrame as a SQL temporary view
authors_staging_df.createOrReplaceTempView("authors_staging_table")

# Loading the table
authors_table = spark.sql(
        """
        SELECT CAST(author_id AS int) AS author_id,
               author_name
        FROM authors_staging_table
        WHERE author_name IS NOT NULL
        """)

# Checking results
authors_table.sort("author_id").limit(10).toPandas().head(10)

Unnamed: 0,author_id,author_name
0,1,John Douglas
1,2,Mark Olshaker
2,3,Cecelia Mecca
3,4,Ana Maria Spagna
4,5,Mark Vanhoenacker
5,6,Jen Gale
6,7,Giles Chapman
7,8,Sam Nigro
8,9,J. D. Swanson
9,10,Brian Cosgrove


### *4.1.2. Categories Table*

In [41]:
# Registering the DataFrame as a SQL temporary view
categories_staging_df.createOrReplaceTempView("categories_staging_table")

# Loading the table
categories_table = categories_staging_df

# Checking results
categories_table.sort("category_id").limit(10).toPandas().head(10)

Unnamed: 0,category_id,category_name
0,2,Art & Photography
1,3,Art Books
2,4,Art Theory
3,5,Art Resoration
4,6,Forgery
5,7,Art Finance
6,8,Art History
7,9,Styles
8,10,Indigenous Art
9,11,Naive Art


### *4.1.3. Formats Table*

In [42]:
# Register the DataFrame as a SQL temporary view
formats_staging_df.createOrReplaceTempView("formats_staging_table")

# Loading the table
formats_table = formats_staging_df

# Checking results
formats_table.sort("format_id").limit(10).toPandas().head(10)

Unnamed: 0,format_id,format_name
0,1,Paperback
1,2,Hardback
2,3,Spiral
3,4,Calendar
4,5,Pamphlet
5,6,Game
6,7,CD
7,8,Notebook
8,9,Downloadable
9,10,Cards


### *4.1.4. Publication Places Table*

In [43]:
# Registering the DataFrame as a SQL temporary view
places_staging_df.createOrReplaceTempView("places_staging_table")

# Loading the table
places_table = spark.sql(
        """
        SELECT place_id,
               CASE WHEN place_name rlike '^[0-9]*$' THEN null
                    WHEN place_name rlike '^[0-9]+ Paris, France$' THEN 'Paris, France'
                    WHEN place_name rlike '^[0-9]+[,] United States$' THEN 'United States'
                    WHEN place_name rlike '[\)], United Kingdom' THEN 'United Kingdom'                                       
                    ELSE place_name 
                    END AS place_name
        FROM places_staging_table
        WHERE place_name is not null AND not place_name rlike '^[0-9]*$'
        """)

# Checking results
places_table.sort("place_id").limit(10).toPandas().head(10)

Unnamed: 0,place_id,place_name
0,1,"London, United Kingdom"
1,2,"North Adams, United States"
2,3,"Wisconsin, United States"
3,4,"Rolindale, MA, United States"
4,5,"Ramsbury, United Kingdom"
5,6,"Somerset, United Kingdom"
6,7,"New York, NY, United States"
7,8,"Rockport, ME, United States"
8,9,"New York, United States"
9,10,United States


### *4.1.5. Books Table*

In [44]:
# Registering the DataFrame as a SQL temporary view
books_staging_df.createOrReplaceTempView("books_staging_table")

# Loading the table
books_table = spark.sql(
        """
        SELECT id as book_id,
               isbn10,
               isbn13,
               title,
               authors as authors_list,
               categories as categories_list,               
               edition,
               lang,
               format as format_id,
               `publication-place` as publication_place_id,
               `publication-date` as publication_date,
               description
        FROM books_staging_table
        """)

# Checking results
books_table.limit(50).toPandas().head(50)

Unnamed: 0,book_id,isbn10,isbn13,title,authors_list,categories_list,edition,lang,format_id,publication_place_id,publication_date,description
0,9781787465145,1787465144,9781787000000.0,Journey Into Darkness,"[1, 2]","[235, 3386, 736, 763]",,en,1,1.0,2019-10-17,________________________________
1,9781946510402,1946510408,9781947000000.0,The Mercenary : Order of the Broken Blade,[3],"[360, 2632]",,en,1,,2019-09-17,A fake marriage is the last thing he wants . ....
2,9781612124568,1612124569,9781612000000.0,100 Skills You'll Need for the End of the Worl...,[4],"[1703, 2771, 2818, 3097]",,en,1,2.0,2015-05-05,"Once the dust has settled, you'll need to know..."
3,9781786487155,1786487152,9781786000000.0,How to Land a Plane,[5],"[819, 3364, 1853, 2977]",,en,2,1.0,2017-09-21,The Daily Mail and the Spectator Book of the Y...
4,9781472969125,147296912X,9781473000000.0,The Sustainable(ish) Living Guide : Everything...,[6],"[1694, 1703, 2818]",,en,1,1.0,2020-03-10,"Easy, do-able, down to earth ideas and suggest..."
5,9780760363997,760363994,9780760000000.0,Mini : 60 Years,[7],"[1843, 2967, 2969]",,en,2,3.0,2019-05-09,Mini celebrates 60 amazing years of this iconi...
6,9781594394591,1594394598,9781594000000.0,Karate Science : Dynamic Movement,"[8, 9]","[1520, 2771, 3019, 3074, 3078]",,en,1,4.0,2017-05-01,"Dynamics, motion, and sensation are karate's c..."
7,9781847975096,1847975097,9781848000000.0,Microlight Pilot's Handbook,[10],"[1853, 2977, 3023]",Revised,en,1,5.0,2013-11-01,Brian Cosgrove's classic introduction to the w...
8,9781594396816,1594396817,9781594000000.0,Krav Maga Combatives : Maximum Effect,"[11, 12]","[649, 2771, 3019, 3074]",,en,1,4.0,2019-08-07,BECAUSE NOT ALL KRAV MAGA IS THE SAME(R) This ...
9,9781563924699,9781563924699,,"Dodge Caravan, Plymouth Voyager & Chrysler Tow...","[13, 14]","[1725, 1844, 2969, 2973]",Revised,en,1,6.0,2002-12-02,"Haynes offers the best coverage for cars, truc..."


### *4.1.6. Book-Categories Relationship Table*

The Books table has a many-to-many relationship with the Categories Table. In the original dataset, these relationships is represented through lists of IDs. Because of that, we will we will parse these lists in order to create an aditional table to serve as a many-to-many bridge between Books and Categories.

In [45]:
# Defining the parsing function
def convert_string_to_list(s):
    return s.strip('][').split(', ')

convert_string_to_list_udf = f.udf(lambda s: convert_string_to_list(s), t.ArrayType(t.StringType()))

# Loading the table
books_categories_table = books_staging_df.select(
    f.col("id").alias("book_id"), 
    f.explode(convert_string_to_list_udf("categories")).alias("category_id")
)

# Checking results
books_categories_table.limit(20).toPandas().head(20)

Unnamed: 0,book_id,category_id
0,9781787465145,235
1,9781787465145,3386
2,9781787465145,736
3,9781787465145,763
4,9781946510402,360
5,9781946510402,2632
6,9781612124568,1703
7,9781612124568,2771
8,9781612124568,2818
9,9781612124568,3097


### *4.1.7. Book-Authors Relationship Table*

The Books table has a many-to-many relationship with the Authors Table. In the original dataset, these relationships is represented through lists of IDs. Because of that, we will parse these lists in order to create an aditional table to serve as a many-to-many bridge between Books and Authors.

In [46]:
# Loading the table
books_authors_table = books_staging_df.select(
    f.col("id").alias("book_id"), 
    f.explode(convert_string_to_list_udf("authors")).alias("author_id")
)

# Checking results
books_authors_table.limit(20).toPandas().head(20)

Unnamed: 0,book_id,author_id
0,9781787465145,1
1,9781787465145,2
2,9781946510402,3
3,9781612124568,4
4,9781786487155,5
5,9781472969125,6
6,9780760363997,7
7,9781594394591,8
8,9781594394591,9
9,9781847975096,10


### *4.1.8. Amazon Reviews Table*

For books in Amazon, the `product_id` corresponds to the books's ISBN-10. Therefore, we are joining the Amazon Reviews data and the Books Depository metadata through the ISBN-10.

We are going to use `INNER JOIN` to match books and reviews. Therefore, **reviews that are NOT associated with any row in the Books Table will not be loaded into the Data Lake.**

In [47]:
# Registering the Amazon Reviews Staging DataFrame as a SQL temporary view
amazon_staging_df.createOrReplaceTempView("amazon_reviews_staging_table")

# Registering the Books Table as a SQL temporary view
books_table.createOrReplaceTempView("books_table")

# Removing the duplicated rows
amazon_staging_df = amazon_staging_df.dropDuplicates()

# Loading Amazon Reviews Table, by joining the Amazon Reviews Staging DataFrame and the Books Table.
# For books, the product_id is the books's ISBN
amazon_reviews_table = spark.sql(
    """
    SELECT a.review_id, 
           b.book_id, 
           a.star_rating, 
           a.helpful_votes, 
           a.total_votes, 
           a.verified_purchase, 
           a.customer_id,
           a.review_date, 
           year(a.review_date) as review_year, 
           a.review_headline, 
           a.review_body
    FROM amazon_reviews_staging_table a
         INNER JOIN books_table b ON a.product_id = b.isbn10
    """)

# Checking results
amazon_reviews_table.limit(20).toPandas().head(20)

Unnamed: 0,review_id,book_id,star_rating,helpful_votes,total_votes,verified_purchase,customer_id,review_date,review_year,review_headline,review_body
0,R1BFKJAX7PLZY2,9780060878078,4,6,9,N,14810040,2008-05-29,2008,Informative but sadly negative,This book has many useful facts and anecdotes ...
1,R24LW7WLO6UF6W,9780060878078,5,22,24,N,40072440,2007-11-23,2007,Interesting & fun to read,Very well-written and easy to read. The autho...
2,RHDONITYOSLD7,9780060878078,1,11,18,N,17492715,2008-05-20,2008,Popes through the eyes of dissent,Fr.Mcbrien is a celebrated academic. His Theo...
3,R2POBJH7MJ3KHL,9780060878078,3,7,10,N,27331930,2009-04-08,2009,Solid Sketches of the Popes But Know What You ...,Richard McBrien offers thumbnail sketches of t...
4,RKAKST5M35YVG,9780060878078,5,5,5,N,34247947,2013-02-28,2013,A great overview of a fascinating institution,"I want to begin this review, which I am writin..."
5,R2W8J5L0K2BYXJ,9780060878078,1,13,27,Y,18058313,2007-11-15,2007,Expected History and Got A Movie Review,I would give this book no stars if possible. ...
6,R202WY3U0RCYAN,9780060878078,4,0,0,Y,2762668,2015-03-01,2015,Four Stars,nulla problema
7,R1FG7OWVG4PJ9A,9780060878078,1,0,0,N,52456601,2013-12-28,2013,"""The number of fools is infinite""","&#34;The perverse are hard to be corrected, an..."
8,R2R2A3TPJCBSEM,9780060878078,5,2,2,Y,15813941,2012-12-02,2012,An incredible history of an ancient institution,The papacy has been around since before the Ro...
9,R8PICYUMJUY10,9780060878078,5,3,13,N,11229368,2006-02-20,2006,Pretty Good,This is a fantastic book. McBrien does an exc...


### *4.1.9. Date Table*

In [48]:
# Selecting the review dates from the Amazon Reviews Table, dropping the duplicated values
date_df = amazon_reviews_table.select("review_date").dropDuplicates()

# Loading the table
date_table = date_df.select(
    "review_date",
    f.year("review_date").alias('year'), 
    f.month("review_date").alias('month'), 
    f.dayofmonth("review_date").alias('day'),
    f.weekofyear("review_date").alias('week'),
    f.dayofweek("review_date").alias('weekday')
)

# Checking results
date_table.limit(10).toPandas().head(10)

Unnamed: 0,review_date,year,month,day,week,weekday
0,2007-11-23,2007,11,23,47,6
1,2007-11-15,2007,11,15,46,5
2,2013-03-26,2013,3,26,13,3
3,2015-03-09,2015,3,9,11,2
4,2015-05-19,2015,5,19,21,3
5,2012-04-17,2012,4,17,16,3
6,2002-12-25,2002,12,25,52,4
7,2010-08-11,2010,8,11,32,4
8,2014-11-12,2014,11,12,46,4
9,2014-09-26,2014,9,26,39,6


## 4.2. Writing the tables into the Data Lake as parquet files

In [67]:
authors_table \
    .write \
    .mode("overwrite") \
    .parquet(os.path.join(data_lake_path, "authors.parquet"))  

In [68]:
categories_table \
    .write \
    .mode("overwrite") \
    .parquet(os.path.join(data_lake_path, "categories.parquet"))  

In [69]:
formats_table \
    .write \
    .mode("overwrite") \
    .parquet(os.path.join(data_lake_path, "formats.parquet"))  

In [70]:
places_table \
    .write \
    .mode("overwrite") \
    .parquet(os.path.join(data_lake_path, "places.parquet"))  

In [71]:
books_table \
    .write \
    .mode("overwrite") \
    .parquet(os.path.join(data_lake_path, "books.parquet"))  

In [72]:
books_categories_table \
    .write \
    .mode("overwrite") \
    .parquet(os.path.join(data_lake_path, "books_categories.parquet"))  

In [75]:
books_authors_table \
    .write \
    .mode("overwrite") \
    .parquet(os.path.join(data_lake_path, "books_authors.parquet"))  

In [77]:
amazon_reviews_table \
    .repartition("review_year") \
    .write \
    .partitionBy("review_year") \
    .mode("overwrite") \
    .parquet(os.path.join(data_lake_path, "amazon_reviews.parquet"))  

In [80]:
date_table \
    .repartition("year") \
    .write \
    .partitionBy("year") \
    .mode("overwrite") \
    .parquet(os.path.join(data_lake_path, "dates.parquet"))  

## 4.3 Data Quality Checks

### *4.3.1. Checking that none of the tables is empty*

In [49]:
authors_table.count()

703665

In [50]:
categories_table.count()

2767

In [51]:
formats_table.count()

59

In [52]:
places_table.count()

7407

In [53]:
books_table.count()

1153986

In [54]:
books_categories_table.count()

4074354

In [55]:
books_authors_table.count()

1600548

In [56]:
date_table.count()

6710

In [57]:
amazon_reviews_table.count()

1077713

### *4.3.2. Checking for unwanted NULL columns*

In [58]:
authors_table.filter("author_id is null OR author_name is null").count()

0

In [59]:
categories_table.filter("category_id is null or category_name is null").count()

0

In [60]:
formats_table.filter("format_id is null or format_name is null").count()

0

In [61]:
places_table.filter("place_id is null or place_name is null").count()

0

In [62]:
books_table.filter("book_id is null or isbn10 is null or title is null").count()

0

In [63]:
books_categories_table.filter("book_id is null or category_id is null").count()

0

In [64]:
books_authors_table.filter("book_id is null or author_id is null").count()

0

In [65]:
amazon_reviews_table.filter("review_id is null or book_id is null or review_date is null").count()

0

In [66]:
date_table.filter("review_date is null").count()

0

Closing Spark Session:

In [81]:
spark.stop()

## 4.4 Data dictionary 

In [4]:
%%html
<style>
table {float:left}
</style>

Table: **AUTHORS** \
Description: book authors
 
| Column | Type | Description |
| :----- | :--: | :----------- |
| author_id | int | Author's unique ID |
| author_name | string | Author's name |

Table: **CATEGORIES** \
Description: book categories

| Column | Type | Description |
| :----- | :--: | :---------- |
| category_id | int | Category unique ID |
| category_name | string | Category name |

Table: **FORMATS** \
Description: book formats

| Column | Type | Description |
| :----- | :--- | :---------- |
| format_id | int | Format unique ID |
| format_name | string | Format name |

Table: **PLACES** \
Description: book publication places

| Column | Type | Description |
| :----- | :--- | :---------- |
| place_id | int | Publication place unique ID |
| place_name | string | Publication place (city, country) |

Table: **BOOKS** \
Description: books

| Column | Type | Description |
| :----- | :--- | :---------- |
| book_id | long | Book's unique ID |
| isbn10 | string | Book's ISBN-10 |
| isbn13 | long | Book's ISBN-13 |
| title | string | Book's title |
| authors_list | string | List of the book's authors IDs, in the format [##, ##, ##, ...] |
| categories_list | string | List of the book's categories IDs, in the format [##, ##, ##, ...] |
| edition | string | Book edition |
| lang | string | Abbreviation of the book's language (e.g. "en" for English) |
| format_id | int | ID of the book's format |
| publication_place_id | int | ID of the book's publication place |
| publication_date | timestamp | Publication date |
| description | string | Book description |

Table: **BOOKS_AUTHORS** \
Description: many-to-many relationship between books and authors

| Column | Type | Description |
| :----- | :--- | :---------- |
| book_id | long | Book ID |
| author_id | int | Author ID |

Table: **BOOKS_CATEGORIES** \
Description: many-to-many relationship between books and categories

| Column | Type | Description |
| :----- | :--- | :---------- |
| book_id | long | Book ID |
| category_id | int | Category ID |

Table: **AMAZON_REVIEWS** \
Description: customer reviews for books on Amazon

| Column | Type | Description |
| :----- | :--- | :---------- |
| review_id | string | Review unique ID |
| book_id | long | Book ID |
| star_rating | int | The 1-5 star rating of the review |
| helpful_votes | int | Number of helpful votes the review received |
| total_votes | int | Number of total votes the review received |
| verified_purchase | string | Identifies if the review is on a verified purchase (Y or N) |
| customer_id | string | Customer ID in Amazon; can be used to aggregate reviews written by a single author |
| review_date | date | Date when the review was written |
| review_year | int | Year when the review was written |
| review_headline | string | Title of the review |
| review_body | string | Review text |

Table: **DATES** \
Description: metadata on review dates

| Column | Type | Description |
| :----- | :--- | :---------- |
| review_date | date | Review date (yyyy-MM-dd) |
| year | int | Year |
| month | int | Month (numeric) |
| day | int | Day of the month (numeric) |
| week | int | Week number of the year (numeric) |
| weekday | int | Day of the week (numeric) |

# Step 5: Complete Project Write Up

The choice to build a data lake in S3 instead of a data warehouse in Redshift is due mainly to reducing storage costs. Since this new dataset is being put together mainly for analytical purposes, there is no need for the cluster to be online 24/7. The analysis can be done by reading the data lake files. Also, the data being in S3, it becomes availabe to a wider range of users, who can rely on schema-on-read to easily read and wrangle the data.

As the datasets are too big to be wrangled locally by pandas, or even by Spark in standalone mode, it has been chosen to run the ETL pipeline in an EMR cluster containing both Spark and Hadoop applications. Also, since both datasets used by this project are static, the data pipeline was implemented in a Python Jupyter Notebook, in order to be run manually in a one-time execution.

Under other scenarios, the problem should be approached differently. For example:
* In a scenario where the data was increased by 100x, it would be recommended to configure an EMR cluster with more nodes, or choose instance types with higher processing capacity for each node.

* In a real case in which new data would become available in a daily basis, I would orchestrate the workflow setting up a DAG in [Apache AirFlow](https://airflow.apache.org/) splitting the logic included in the notebook `BookReviewsDataLake.ipynb` in Python operators and scheduling the DAG to run once a day.

* If the database had to be accessed by 100+ people, I would load the data into Amazon Redshift, since it is an analytical and distributed database optimized for aggregation and read-heavy workloads. By using a distributed database, it is possible to  improve the replications and partitioning of the tables to get faster query results for each user.