# Project Title
### Data Engineering Capstone Project

#### Project Summary
An online book store company, Valdivian,  wants to analyse the data they've been collecting on books and user ratings of these specific books from their app and online website. The analytics team is particularly interested in understanding which books users rate the most, and the data scientists are interested in using machine learning algorithms to develop a book recommendation system. Currently, they don't have an easy way to query their data, which resides in a directories containing a CSV file on user data, JSON logs of user ratings and another CSV file of metadata on books.

They want to implement a data warehouse which is designed to optimise queries on user rating analysis, additionally, the ETL pipelines are to be high-grade data pipelines that:
* Are automated and easily monitored. 
* Are dynamic and built from reusable tasks.
* Have automated data quality checks that catch any discrepancies in the datasets.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
from time import time
import psycopg2
import configparser
import matplotlib.pyplot as plt
import pandas as pd
import pandas.io.sql as sqlio
import numpy as np

### Step 1: Scope the Project and Gather Data

#### Scope 
The source data resides on AWS S3 and needs to be processed in Sparkify's data warehouse which resides on Amazon Redshift. The source datasets consist of JSON file of logs that tell them about users **ratings** of books on their store, CSV files that hold **users** information, and CSV metadata about the **books** the are available on the store.

#### Describe and Gather Data 
The original datasets (BX-Book-Ratings, BX-Books, BX-Users) are from https://www.kaggle.com/ruchi798/bookcrossing-dataset. BX-Book-Ratings will be converted to .JSON format and renamed to **ratings**, while the other two datasets respectively will be renamed to **books** and **users**. Subsequently they will be saved on my S3 bucket. For details of the bucket, folder structure and row 0 of the dataset please refer to the below:

> s3://udacity-capstone-project-828/ratings: data about book ratings, example of row '0' of the data as follows: {'Book-Rating': 0, 'ISBN': '034545104X', 'User-ID': 276725}

> s3://udacity-capstone-project-828/users: data about users, example of row '0' of the data as follows: {'User-ID': 1, 'Location': 'nyc, new york, usa', 'Age': nan}

> s3://udacity-capstone-project-828/books: data books available from this store, example of row '0' of the data as follows: {'ISBN': '0195153448', 'book_title': 'Classical Mythology', 'book_author': 'Mark P. O. Morford', 'year_of_publication': '2002', 'publisher': 'Oxford University Press', 'img_s': 'http://images.amazon.com/images/P/0195153448.01.THUMBZZZ.jpg', 'img_m': 'http://images.amazon.com/images/P/0195153448.01.MZZZZZZZ.jpg', 'img_l': 'http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg'}

In [2]:
# Read in the data here
df_users = pd.read_csv('BX-Users.csv')
df_users.iloc[0].to_dict()

{'User-ID': 1, 'Location': 'nyc, new york, usa', 'Age': nan}

In [3]:
df_ratings = pd.read_json("ratings.json")
df_ratings.iloc[0].to_dict()

{'Book-Rating': 0, 'ISBN': '034545104X', 'User-ID': 276725}

In [4]:
df_books = pd.read_csv("BX-Books.csv", encoding='latin-1',low_memory=False)
df_books.iloc[0].to_dict()

{'ISBN': '0195153448',
 'book_title': 'Classical Mythology',
 'book_author': 'Mark P. O. Morford',
 'year_of_publication': '2002',
 'publisher': 'Oxford University Press',
 'img_s': 'http://images.amazon.com/images/P/0195153448.01.THUMBZZZ.jpg',
 'img_m': 'http://images.amazon.com/images/P/0195153448.01.MZZZZZZZ.jpg',
 'img_l': 'http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg'}

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [5]:
# Fill in NaN values
df_users['Age'] = df_users['Age'].fillna(-1)
df_books = df_books.fillna("No Data")

# Replace &amp! with "And"
df_books=df_books.assign(book_author=df_books["book_author"].str.replace('&amp;', 'and'))
df_books=df_books.assign(book_title=df_books["book_title"].str.replace('&amp;', 'and'))
df_books=df_books.assign(publisher=df_books["publisher"].str.replace('&amp;', 'and'))

# Remove columns that are not numeric
df_users=df_users.loc[df_users['User-ID'].astype(str).str.isnumeric()]
df_books=df_books.loc[df_books['year_of_publication'].str.isnumeric()]

# Remove columns that are not alpha numeric
df_ratings=df_ratings.loc[df_ratings["ISBN"].str.isalnum()]

#### Writing Files For Project
Write files here to be uploaded onto S3 for this project

In [6]:
rowmultiple=len(df_ratings)/20
Start=0
Finish=10
i=0
for i in range(0,20):
    if i == 0:
        Start = i*rowmultiple
    elif i > 0:
        Start = (i*rowmultiple)+1
    Finish = (i+1)*rowmultiple
    batch = df_ratings[int(Start):int(Finish)].copy()
    batch = batch.to_json(orient='records')
    batch = batch.strip('[]').replace('},', '}').replace('\\', "") # Make file compatible with Redshift
    
    with open('data/ratingsbatch{}.json'.format(i+1), 'w') as file:
        file.write(batch)

In [7]:
# Write to .csv
df_users.to_csv("users.csv", encoding='utf-8-sig', sep=';', index=False)
df_books.to_csv("books.csv", encoding='utf-8-sig', sep=';', index=False)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Refer to Section 1 of the README on https://github.com/manchhui/Udacity-DENG-Capstone

#### 3.2 Mapping Out Data Pipelines
Refer to Section 1 of the README on https://github.com/manchhui/Udacity-DENG-Capstone

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
## STEP 1: Get the params and create staging tables

In [8]:
config = configparser.ConfigParser()
config.read_file(open('dwh_crshc.cfg'))
KEY=config.get('AWS','key')
SECRET= config.get('AWS','secret')

DWH_DB= config.get("DWH","DWH_DB")
DWH_DB_USER= config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")

DWH_ENDPOINT= config.get("DWH","DWH_ENDPOINT")
DWH_ROLE_ARN= config.get("IAM_ROLE","ARN")

In [9]:
%%time
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(DWH_ENDPOINT, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT))
cur = conn.cursor()
print(conn)

<connection object at 0x7f84c4881898; dsn: 'user=dwhuser password=xxx dbname=dwh host=dwhcluster.ckoszxyq4rgx.us-west-2.redshift.amazonaws.com port=5439', closed: 0>
CPU times: user 7.08 ms, sys: 223 µs, total: 7.31 ms
Wall time: 525 ms


In [10]:
%%time
cur.execute("""
DROP TABLE IF EXISTS "staging_users"; 
DROP TABLE IF EXISTS "staging_books"; 
DROP TABLE IF EXISTS "staging_ratings"; 
CREATE TABLE "staging_users" ("iduser" int,
                            "location" varchar(256),
                            "age" real);

CREATE TABLE "staging_books" ("isbn" varchar,
                            "booktitle" varchar(512),
                            "bookauthor" varchar(256),
                            "yearofpub" int,
                            "publisher" varchar,
                            "img_s" varchar(512),
                            "img_m" varchar(512),
                            "img_l" varchar(512));

CREATE TABLE "staging_ratings" ("bookrating" varchar,
                                "isbn" varchar,
                                "iduser" varchar);
""");
conn.commit()

CPU times: user 1.05 ms, sys: 76 µs, total: 1.13 ms
Wall time: 289 ms


## STEP 2: Copy S3 Data To Staging

In [11]:
%%time
cur.execute("""
copy staging_users 
from 's3://udacity-capstone-project-828/users' 
credentials 'aws_iam_role={}'
format as csv
delimiter ';'
compupdate off
emptyasnull
blanksasnull
IGNOREHEADER 1
region 'us-west-2';
""".format(DWH_ROLE_ARN))
conn.commit()

CPU times: user 1.33 ms, sys: 0 ns, total: 1.33 ms
Wall time: 2.44 s


In [12]:
%%time
cur.execute("""
copy staging_books 
from 's3://udacity-capstone-project-828/books' 
credentials 'aws_iam_role={}'
format as csv
delimiter ';'
compupdate off
emptyasnull
blanksasnull
IGNOREHEADER 1
region 'us-west-2';
""".format(DWH_ROLE_ARN))
conn.commit()

CPU times: user 1.84 ms, sys: 0 ns, total: 1.84 ms
Wall time: 3.88 s


In [13]:
%%time
cur.execute("""
copy staging_ratings 
from 's3://udacity-capstone-project-828/ratings' 
credentials 'aws_iam_role={}'
format as json 's3://udacity-capstone-project-828/jsonpaths.json'
compupdate off
emptyasnull
blanksasnull
region 'us-west-2';
""".format(DWH_ROLE_ARN))
conn.commit()

CPU times: user 1.4 ms, sys: 0 ns, total: 1.4 ms
Wall time: 2.57 s


In [14]:
cur.execute("""DELETE FROM staging_users WHERE iduser IS NULL;""")
cur.execute("""DELETE FROM staging_books WHERE isbn IS NULL;""")
cur.execute("""DELETE FROM staging_ratings WHERE bookrating IS NULL;""")
cur.execute("""DELETE FROM staging_ratings WHERE isbn IS NULL;""")
cur.execute("""DELETE FROM staging_ratings WHERE iduser IS NULL;""")
conn.commit()

In [15]:
conn.commit()

## STEP 3: Create Tables for Star Schema DB

In [16]:
%%time
cur.execute("""
-- -----------------------------------------------------
-- Table `users`
-- -----------------------------------------------------
DROP TABLE IF EXISTS books_publisher ;
DROP TABLE IF EXISTS books_author ;
DROP TABLE IF EXISTS books_title ;
DROP TABLE IF EXISTS publishers ;
DROP TABLE IF EXISTS authors ;
DROP TABLE IF EXISTS titles ;
DROP TABLE IF EXISTS yearofpub ;
DROP TABLE IF EXISTS ratings ;
DROP TABLE IF EXISTS users ;

CREATE TABLE IF NOT EXISTS users (
  iduser INT NOT NULL,
  Location VARCHAR(256) NOT NULL,
  Age INT NOT NULL,
  PRIMARY KEY (iduser));
  
CREATE TABLE IF NOT EXISTS ratings (
  idbookratings VARCHAR(256) NOT NULL,
  ISBN VARCHAR(256) NOT NULL,
  iduser INT NOT NULL,
  rating INT NOT NULL,
  PRIMARY KEY (idbookratings),
  FOREIGN KEY (iduser) REFERENCES users (iduser));
  
CREATE TABLE IF NOT EXISTS yearofpub (
  yearofpub INT NOT NULL,
  ISBN VARCHAR(256) NOT NULL,
  PRIMARY KEY (yearofpub),
  FOREIGN KEY (ISBN) REFERENCES ratings (idbookratings));
  
CREATE TABLE IF NOT EXISTS authors (
  idauthor VARCHAR(256) NOT NULL,
  authorname VARCHAR(256) NOT NULL,
  PRIMARY KEY (idauthor));
  
CREATE TABLE IF NOT EXISTS books_author (
  ISBN VARCHAR(256) NOT NULL,
  idauthor VARCHAR(256) NOT NULL,
  PRIMARY KEY (ISBN),
  FOREIGN KEY (idauthor) REFERENCES authors (idauthor),
  FOREIGN KEY (ISBN) REFERENCES ratings (idbookratings));
  
CREATE TABLE IF NOT EXISTS titles (
  idTitles VARCHAR(256) NOT NULL,
  title VARCHAR(512) NOT NULL,
  PRIMARY KEY (idTitles));
  
CREATE TABLE IF NOT EXISTS books_title (
  idTitles VARCHAR(256) NOT NULL,
  ISBN VARCHAR(256) NOT NULL,
  PRIMARY KEY (idTitles),
  FOREIGN KEY (idTitles) REFERENCES titles (idTitles),
  FOREIGN KEY (ISBN) REFERENCES ratings (idbookratings));
  
CREATE TABLE IF NOT EXISTS publishers (
  idPublishers VARCHAR(256) NOT NULL,
  publisher VARCHAR(256) NOT NULL,
  PRIMARY KEY (idPublishers));
  
CREATE TABLE IF NOT EXISTS books_publisher (
  idPublishers VARCHAR(256) NOT NULL,
  ISBN VARCHAR(256) NOT NULL,
  PRIMARY KEY (idPublishers),
  FOREIGN KEY (idPublishers) REFERENCES publishers (idPublishers),
  FOREIGN KEY (ISBN) REFERENCES ratings (idbookratings));
""")
conn.commit()

CPU times: user 934 µs, sys: 0 ns, total: 934 µs
Wall time: 324 ms


In [17]:
%%time
cur.execute("""
-- ---------
-- INSERT
-- ---------
INSERT INTO users
SELECT distinct iduser, location, age
FROM staging_users;

CREATE TEMP TABLE ratings_staging (LIKE ratings);

INSERT INTO ratings_staging
SELECT distinct
    isbn AS idbookratings,
    isbn AS ISBN, 
    CAST(iduser AS INT), 
    CAST(bookrating AS INT) AS rating
FROM staging_ratings ;

INSERT INTO ratings 
SELECT distinct
    md5(rs.ISBN || rs.iduser || rs.rating) AS idbookratings, 
    rs.ISBN, 
    rs.iduser, 
    rs.rating 
FROM (SELECT rs.* 
    FROM ratings_staging rs) rs
LEFT JOIN ratings r
    ON rs.idbookratings = r.idbookratings
    WHERE r.idbookratings IS NULL;
    
DROP TABLE IF EXISTS ratings_staging;

INSERT INTO yearofpub
SELECT distinct yearofpub, ISBN
FROM staging_books;

INSERT INTO authors
SELECT distinct md5(bookauthor) AS idauthor, bookauthor AS authorname
FROM staging_books;

INSERT INTO books_author
SELECT distinct ISBN, md5(bookauthor) AS idauthor
FROM staging_books;

INSERT INTO titles
SELECT distinct md5(booktitle) AS idTitles, booktitle AS title
FROM staging_books;

INSERT INTO books_title
SELECT distinct md5(booktitle) AS idTitles, ISBN
FROM staging_books;

INSERT INTO publishers
SELECT distinct md5(publisher) AS idPublishers, publisher
FROM staging_books;

INSERT INTO books_publisher
SELECT distinct md5(publisher) AS idPublishers, ISBN
FROM staging_books;
""")
conn.commit()

CPU times: user 715 µs, sys: 0 ns, total: 715 µs
Wall time: 4.96 s


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [18]:
%%time
tables =["books_publisher", "books_author", "books_title", "publishers", "authors", 
         "titles", "yearofpub", "ratings", "users"]

try:
    for table in tables:
        sql = "select COUNT (*) from {}".format(table)
        number = cur.execute(sql)
        num_records=cur.fetchone()
        if num_records[0] > 0:
            print("Data quality on table '{}' check passed with '{}' records".format(table, num_records[0]))
        else:
            print("No records present in destination table '{}'".format(table))
except Exception as e:
    print(e)

# Perform quality checks here

Data quality on table 'books_publisher' check passed with '271357' records
Data quality on table 'books_author' check passed with '271357' records
Data quality on table 'books_title' check passed with '271357' records
Data quality on table 'publishers' check passed with '16760' records
Data quality on table 'authors' check passed with '102017' records
Data quality on table 'titles' check passed with '242012' records
Data quality on table 'yearofpub' check passed with '271357' records
Data quality on table 'ratings' check passed with '1148594' records
Data quality on table 'users' check passed with '278858' records
CPU times: user 7.16 ms, sys: 0 ns, total: 7.16 ms
Wall time: 820 ms


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [19]:
%%time
tables =["books_publisher", "books_author", "books_title", "publishers", "authors", 
         "titles", "yearofpub", "ratings", "users"]

try:
    for table in tables:
        sql = "select * from {} limit 1".format(table)
        dat = sqlio.read_sql_query(sql, conn)
        row=dat.iloc[0].to_dict()
        print("Row 0 '{}' of Table '{}'".format(row, table))
except Exception as e:
    print(e)

Row 0 '{'idpublishers': '568d044227b9a875d6534dc9700207be', 'isbn': '3423241489'}' of Table 'books_publisher'
Row 0 '{'isbn': '0345369068', 'idauthor': 'e009561f055e0ad994611c279fbd63c1'}' of Table 'books_author'
Row 0 '{'idtitles': '6dd1bc9445f9c3c3fff565985b15b57c', 'isbn': '0312148267'}' of Table 'books_title'
Row 0 '{'idpublishers': 'cf0da27dc129b2a35fcc046924a8e372', 'publisher': 'Laia'}' of Table 'publishers'
Row 0 '{'idauthor': '1621320cf5f525d3d80d47a83ca58cda', 'authorname': 'David Adams Richards'}' of Table 'authors'
Row 0 '{'idtitles': 'a9bfb64da9505aa56035a88752534753', 'title': 'Clara Callan'}' of Table 'titles'
Row 0 '{'yearofpub': 2001, 'isbn': '0002005018'}' of Table 'yearofpub'
Row 0 '{'idbookratings': '34f92fddc04e5d7800d21bf62c9e8d9f', 'isbn': '0373165897', 'iduser': 153662, 'rating': 0}' of Table 'ratings'
Row 0 '{'iduser': 384, 'location': 'bucuresti, n/a, romania', 'age': 23}' of Table 'users'
CPU times: user 151 ms, sys: 419 µs, total: 151 ms
Wall time: 1.29 s


Visit Section 1.2 https://github.com/manchhui/Udacity-DENG-Capstone for the data dictionary.

#### Step 5: Complete Project Write Up

* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Project Technologies & Rationale
A brief description of each of the three core technologies that is used in this project and the rationale behind each choice:
> **Description**: Amazon Simple Storage Service is storage for the Internet. It is designed to make web-scale computing easier for developers. **Amazon S3** has a simple web services interface that you can use to store and retrieve any amount of data, at any time, from anywhere on the web. [aws.amazon.com](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html). 

> **Description**: **Amazon Redshift** is a fully managed, petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes of data and scale to a petabyte or more. This enables you to use your data to acquire new insights for your business and customers. [aws.amazon.com](https://docs.aws.amazon.com/redshift/latest/mgmt/welcome.html). 

> **Description**: **Apache Airflow** is an open-source workflow management platform. It started at Airbnb in October 2014 as a solution to manage the company's increasingly complex workflows. Creating Airflow allowed Airbnb to programmatically author and schedule their workflows and monitor them via the built-in Airflow user interface. From the beginning, the project was made open source, becoming an Apache Incubator project in March 2016 and a Top-Level Apache Software Foundation project in January 2019. [Wikipedia](https://en.wikipedia.org/wiki/Apache_Airflow).

> **Rationale**: With a potential for 100x increase in data and hundreds of concurrent users querying the data, from the current approximately one million rows or a few hundred megabytes of data and tens of users, Amazon Redshift coupled with S3 storage and Airflow is the preferred solution going forward. The technology itself is easily scalable removing the need for Valdivian to manage onsite infrastructure, amd therefore can quickly scale up or down computational resources and allow robust ETL pipelines to be built and maintained with ease.

#### Proposed Data Refresh Rate
Currently the airflow schedule has been set to refresh the data every morning at 7am, which is considered an acceptable schedule considering the need for upto data. 