# Project Title
### Data Engineering Capstone Project

#### Project Summary
India's National Stock Exchange (NSE) historical data spanning close to 20 years has been sourced in CSV format. Each stock has a file of its own and there are in excess of 1300 stocks. Additionally, a static file has been provided which maps the stock symbol to the Company. This data is to be processed and populated into Amazon Redshift for analytics. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import os
import datetime
import configparser
import pandas as pd
from time import time
import boto3

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import when

In [3]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Source data is in CSV format and is in the form of multiple files - each stock file has records for a single stock from 2000 onwards. There are 1386 files in all. Additionally, there is one static file mapping the stock symbol to the company. 
1. Transform grouping of source data from symbol to date based (to support sourcing additional data from the future)
2. Transform data from Step #1 and populate a data late (hosted on S3)
3. Transform data from Step #1 and populate a data warehouse (hosted on Redshift)
4. Run data quality checks to validate the load. 
5. Data would be leveraged for data analytics. 

Tool Usage:
1. Apache Spark (PySpark) for data exploration
2. AWS S3 for staging of source/intermediate files
3. AWS Redshift for hosting warehouse

#### Describe and Gather Data 
Dataset contains two categories of data:
1. Stock Data: 1386 files in CSV format which contains stock prices for 1364 NSE stocks from 2000 onwards. One record/stock for each trading day. In all, there are 3,758,123 records. Columns include Date, Open, Close, High, Low and Volume. 

2. Companies Data: 1 file in CSV format which maps the Stock Symbol to the Company. Contains 1384 records. 

Dataset has been obtained from Kaggle (https://www.kaggle.com/abhishekyana/nse-listed-1384-companies-data). 

In [5]:
# Read in the data here
stock_data = "s3a://nse-stock-data/stocks/*/*.csv"
companies_data = "s3a://nse-stock-data/static/Companies_list.csv"

In [6]:
df_companies = spark.read.csv(companies_data, header=True, inferSchema=True)
df_stocks = spark.read.csv(stock_data, header=True, inferSchema=True)

In [7]:
df_companies.count(), df_stocks.count()

(1384, 3756739)

In [8]:
df_companies.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Symbol: string (nullable = true)
 |-- Name: string (nullable = true)



In [9]:
pd.set_option('max_colwidth', 300)
df_companies.limit(5).toPandas()

Unnamed: 0,_c0,Symbol,Name
0,0,20MICRONS,20_Microns_Limited
1,1,3IINFOTECH,3i_Infotech_Limited
2,2,3MINDIA,3M_India_Limited
3,3,A2ZMES,A2Z_Maintenance_&_Engineering_Services_Limited
4,4,AANJANEYA,Aanjaneya_Lifecare_Limited


In [10]:
df_stocks.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- adj_close: string (nullable = true)
 |-- volume: string (nullable = true)



In [11]:
pd.set_option('max_colwidth', 300)
df_stocks.limit(5).toPandas()

Unnamed: 0,symbol,Date,open,high,low,close,adj_close,volume
0,20MICRONS,2015-07-01,33.0,33.349998,31.799999,31.950001,31.269989,29363
1,20MICRONS,2015-07-02,32.700001,32.75,31.85,31.9,31.221052,35751
2,20MICRONS,2015-07-03,32.349998,32.400002,31.75,32.150002,31.465734,13035
3,20MICRONS,2015-07-06,31.65,32.900002,31.65,31.85,31.172117,25504
4,20MICRONS,2015-07-07,32.0,32.900002,31.75,32.0,31.318926,45904


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [13]:
df_stocks.summary().show()

+-------+---------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|   symbol|              open|              high|               low|             close|         adj_close|            volume|
+-------+---------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|  3756739|           3756739|           3756739|           3756739|           3756739|           3756739|           3756739|
|   mean|     null|259.18565048165317| 263.8717590721991|254.43260375896116|258.72030193729694|243.76858077967336|1042479.7592680976|
| stddev|     null|1351.0480473503249|1367.8412675832865|1332.6522692561446|1349.1390782857247|1338.9817958504302| 8203915.916546446|
|    min|20MICRONS|          0.000000|              0.05|              0.05|              0.05|         -0.000028|                 0|
|    25%|     null|         24.049999|             24.77|     

##### From the summary above:
1. It seems like the numeric columns open, high, low, close, adj_close and volume have non-numeric data. 
2. adj_close has negative values

In [14]:
df_stocks.filter(df_stocks['open'] == 'null').show()

+----------+-------------------+----+----+----+-----+---------+------+
|    symbol|               Date|open|high| low|close|adj_close|volume|
+----------+-------------------+----+----+----+-----+---------+------+
|JAYSREETEA|2015-07-01 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-06 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-07 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-08 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-09 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-14 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-15 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-16 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-17 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-22 00:00:00|null|null|null| null|     null|  null|
|JAYSREETEA|2015-07-23 00:00:00|null|null|null| null|     null|  null|
|JAYSR

In [15]:
df_stocks.filter(df_stocks['open'] != 'null').count()

3689727

##### As per the count obtained above, there are 67012 records with numeric columns having value 'null'

In [None]:
schema_stocks = StructType([
    StructField("symbol", StringType()),
    StructField("date", DateType()),
    StructField("open", DoubleType()),
    StructField("high", DoubleType()),
    StructField("low", DoubleType()),
    StructField("close", DoubleType()),
    StructField("adj_close", DoubleType()),
    StructField("volume", IntegerType())
])

In [19]:
df_stocks = spark.read.csv(stock_data, header=True, schema=schema_stocks)

In [21]:
df_stocks.filter(df_stocks['adj_close'] < 0).show()

+----------+----------+-----+-----+-----+-----+---------+------+
|    symbol|      date| open| high|  low|close|adj_close|volume|
+----------+----------+-----+-----+-----+-----+---------+------+
|BANCOINDIA|2007-03-01| 2.89|2.955| 2.74|2.939|-1.694174| 14360|
|BANCOINDIA|2007-03-02|2.851|2.851|2.773|2.773|-1.598485|  5460|
|BANCOINDIA|2007-03-05| 2.74|2.906| 2.67| 2.67| -1.53911|  5150|
|BANCOINDIA|2007-03-06| 2.69|2.715| 2.64|2.715| -1.56505|  3160|
|BANCOINDIA|2007-03-07| 2.65| 2.65|2.628|2.637|-1.520088|  1650|
|BANCOINDIA|2007-03-08|2.901|2.901| 2.64| 2.64|-1.521817|    20|
|BANCOINDIA|2007-03-09| 2.75| 2.75|2.646|2.737|-1.577732|  4820|
|BANCOINDIA|2007-03-12| 2.76| 2.76|2.623|2.648|-1.526429|  9230|
|BANCOINDIA|2007-03-13| 2.68| 2.73| 2.66|2.705|-1.559286|  5680|
|BANCOINDIA|2007-03-14| 2.79| 2.79| 2.72| 2.72|-1.567933|  1390|
|BANCOINDIA|2007-03-15| 2.72|2.749| 2.71|2.735| -1.57658|  2570|
|BANCOINDIA|2007-03-16|  2.7| 2.75| 2.69| 2.69| -1.55064|  3120|
|BANCOINDIA|2007-03-19| 2

In [22]:
df_stocks.filter(df_stocks['adj_close'] < 0).count()

8955

##### As per the count obtained above, there are 8955 records with negative valued adj_close

df_stocks_cleaned = df_stocks.withColumn('adj_close', when(df_stocks['adj_close'] < 0, df_stocks['close']).otherwise(df_stocks['adj_close']))

df_stocks_cleaned.filter(df_stocks_cleaned['adj_close'] < 0).count()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Source data would be mapped into 2 dimensions and 1 fact table:

1. Stocks 
    - Table Type: Fact
    - Would host transactional data (stock prices) and have columns such as Date, Symbol, Stock Price (Open, Close, High, Low, Adjusted Close) and Volume. 
    - Table would be partitioned on Symbol (as most analytic queries would be limited to a particular stock (and not across stocks) to identify trends across a time period)
    
2. Companies
    - Table Type: Dimension
    - Would host static data (company names to symbol mapping) and have columns such as Stock Symbol and Company Name. 
    
3. Date
    - Table Type: Dimension
    - Would host static data (date information) and have columns such as Date, Year, Quarter, Month, Week & Day. 

This model has been arrived at to facilitate analysis of stock prices for a particular stock/company across a time period (Year, Quarter, Month, Week etc).

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

- Populate Data Warehouse (AWS Redshift)
    - Generate data for date dimension as a CSV file and load into the the dimension table
    - Transform source data (static company details) and populate the compabies dimension table.
    - Transform source data (stock prices) and populate the stage stocks table
    - Transform the stage stocks data and populate the stocks fact table partitioned by symbol. 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### STEP #1: Generate date dimension data

In [26]:
static_data = "s3a://nse-stock-data/static/"

In [27]:
df_date = pd.DataFrame({"Date": pd.date_range(start='2000-01-01', end='2020-12-31')})
df_date["Year"] = df_date.Date.dt.year
df_date["Quarter"] = df_date.Date.dt.quarter
df_date["Month"] = df_date.Date.dt.month
df_date["Week"] = df_date.Date.dt.weekofyear
df_date["Day"] = df_date.Date.dt.weekday_name

In [28]:
df_date["Date"] = df_date.Date.dt.date

In [29]:
date = spark.createDataFrame(df_date)

In [30]:
date.write.csv(static_data + "date.csv", mode='overwrite', header=True)

##### STEP #2: Connect to AWS Redshift

In [31]:
%load_ext sql

In [32]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY=config.get('AWS','AWS_ACCESS_KEY_ID')
SECRET= config.get('AWS','AWS_SECRET_ACCESS_KEY')

DWH_DB= config.get("CLUSTER","DB_NAME")
DWH_DB_USER= config.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD= config.get("CLUSTER","DB_PASSWORD")
DWH_PORT = config.get("CLUSTER","DB_PORT")

DWH_ENDPOINT=config.get("CLUSTER","HOST")    
DWH_ROLE_ARN=config.get("IAM_ROLE","ARN")

In [33]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev


'Connected: dwhuser@dev'

##### STEP #3: Create the schema and tables in Redshift

In [34]:
%%sql

CREATE SCHEMA IF NOT EXISTS stocks;
SET search_path TO stocks;

DROP TABLE IF EXISTS stage_stocks;
DROP TABLE IF EXISTS fact_stocks;
DROP TABLE IF EXISTS dim_companies;
DROP TABLE IF EXISTS dim_time;

CREATE TABLE stage_stocks
(
  symbol          varchar(100),
  Date            varchar(20),
  "open"          varchar(20),
  "high"          varchar(20),
  "low"           varchar(20),
  "close"         varchar(20),
  adj_close       varchar(20),
  volume          varchar(20)
);

CREATE TABLE fact_stocks
(
  id              bigint IDENTITY(0,1) PRIMARY KEY,
  symbol          varchar(100) NOT NULL distkey,
  "date"          date NOT NULL sortkey,
  "open"          numeric(20,6),
  "high"          numeric(20,6),
  "low"           numeric(20,6),
  "close"         numeric(20,6),
  adjusted_close  numeric(20,6),
  volume          bigint
);

CREATE TABLE dim_companies
(
  id                 smallint NOT NULL,
  symbol             varchar(100),
  company_name       varchar(500)
);

CREATE TABLE dim_date
(
  "date"       date NOT NULL sortkey,
  year         smallint NOT NULL,
  quarter      smallint NOT NULL,
  month        smallint NOT NULL,
  week         smallint NOT NULL,
  day          varchar(20) NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
(psycopg2.ProgrammingError) Relation "dim_date" already exists
 [SQL: 'CREATE TABLE dim_date\n(\n  "date"       date NOT NULL sortkey,\n  year         smallint NOT NULL,\n  quarter      smallint NOT NULL,\n  month        smallint NOT NULL,\n  week         smallint NOT NULL,\n  day          varchar(20) NOT NULL\n);']


##### STEP #4: Populate the dimensions (date and companies) and stage table

In [35]:
%%time
qry = """
    copy dim_date from 's3://nse-stock-data/static/date.csv' 
    iam_role '{}' 
    ignoreheader as 1
    delimiter ','
    region 'us-east-1';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 4.33 ms, sys: 510 µs, total: 4.84 ms
Wall time: 695 ms


In [36]:
%%time
qry = """
    copy dim_companies from 's3://nse-stock-data/static/Companies_list.csv' 
    iam_role '{}' 
    ignoreheader as 1
    delimiter ','
    region 'us-east-1';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 4.21 ms, sys: 0 ns, total: 4.21 ms
Wall time: 557 ms


In [37]:
%%time
qry = """
    copy stage_stocks from 's3://nse-stock-data/stocks' 
    iam_role '{}' 
    ignoreheader as 1
    delimiter ','
    emptyasnull
    blanksasnull    
    region 'us-east-1';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 5.96 ms, sys: 0 ns, total: 5.96 ms
Wall time: 14.9 s


##### STEP #5: Populate the fact table

In [38]:
%%time
qry = """
INSERT INTO fact_stocks (symbol, "date", "open", "high", "low", "close", adjusted_close, volume)
SELECT symbol AS symbol,
       CAST("date" as date) AS "date",
       CAST("open" as numeric(20,6)) AS "open",
       CAST("high" as numeric(20,6)) AS "high",
       CAST("low" as numeric(20,6)) AS "low",
       CAST("close" as numeric(20,6)) AS "close",
       CAST(adj_close as numeric(20,6)) AS adjusted_close,
       CAST(volume as bigint) AS volume
  FROM stocks.stage_stocks
 WHERE "open" != 'null';
"""

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
3689727 rows affected.
CPU times: user 5.91 ms, sys: 3.3 ms, total: 9.21 ms
Wall time: 5.12 s


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### Perform quality checks here

##### CHECK #1: Compare Source Vs Target records counts

In [49]:
cnt_dim_companies = %sql select count(*) from stocks.dim_companies;  
cnt_dim_date = %sql select count(*) from stocks.dim_date;  
cnt_stage_stocks = %sql select count(*) from stocks.stage_stocks;

 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
1 rows affected.
 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
1 rows affected.
 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
1 rows affected.


In [42]:
# Dimension Companies
if df_companies.count() == cnt_dim_companies[0][0]:
    print('Dimension Load - Companies: PASS')
else:
    print('Dimension Load - Companies: FAIL')

# Dimension Date
if date.count() == cnt_dim_date[0][0]:
    print('Dimension Load - Date: PASS')
else:
    print('Dimension Load - Date: FAIL')

# Stage Stocks
if df_stocks.count() == cnt_stage_stocks[0][0]:
    print('Stage Load - Stocks: PASS')
else:
    print('Stage Load - Stocks: FAIL')

Dimension Load - Companies: PASS
Dimension Load - Date: FAIL
Stage Load - Stocks: PASS


In [43]:
cnt_stage_stocks = %sql select count(*) from stocks.stage_stocks where "open" != 'null';
cnt_fact_stocks = %sql select count(*) from stocks.fact_stocks;

 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
1 rows affected.
 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
1 rows affected.


In [44]:
# Fact Stocks
if cnt_fact_stocks[0][0] == cnt_stage_stocks[0][0]:
    print('Fact Load - Stocks: PASS')
else:
    print('Fact Load - Stocks: FAIL')

Fact Load - Stocks: PASS


##### CHECK #2: Validate no NULL values

In [46]:
cnt_null_fact_stocks = %sql select count(*) from stocks.fact_stocks where "open" is null or "high" is null or "low" is null or "close" is null or volume is null;

 * postgresql://dwhuser:***@dwhcluster.ckj67osxtkpg.us-east-1.redshift.amazonaws.com:5439/dev
1 rows affected.


In [47]:
# Fact Stocks
if cnt_null_fact_stocks[0][0] == '0':
    print('Fact Load - Stocks: PASS')
else:
    print('Fact Load - Stocks: FAIL')

Fact Load - Stocks: FAIL


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.
1. Fact Table: Stocks
    - Column #1: Symbol - Stock/Ticker Symbol [Data Type: String]
    - Column #2: Date - Price Date [Data Type: Date]
    - Column #3: Open - Open Price [Data Type: Double]
    - Column #4: Close - Close Price [Data Type: Double]
    - Column #5: High - Intra-day high Price [Data Type: Double]
    - Column #6: Low - Intra-day low Price [Data Type: Double]
    - Column #7: Adjusted Close - Adjusted Close Price [Data Type: Double]
    - Column #8: Volume - Traded Volume [Data Type: Integer]
    
    
2. Dimension Table: Companies
    - Column #1: Index - Running Number/Sequence [Data Type: Integer]
    - Column #2: Symbol - Stock/Ticker Symbol [Data Type: String]
    - Column #3: Company - Company Name [Data Type: String]


3. Dimension Table: Date
    - Column #1: Date - Date [Data Type: Date]
    - Column #2: Year - Year extracted from Date [Data Type: Integer]
    - Column #3: Quarter - Quarter extracted from Date [Data Type: Integer]
    - Column #4: Month - Month extracted from Date [Data Type: Integer]
    - Column #5: Week - Week extracted from Date [Data Type: Integer]
    - Column #6: Day - Day extracted from Date [Data Type: Integer]

#### Step 5: Complete Project Write Up

* Clearly state the rationale for the choice of tools and technologies for the project.
   * AWS S3 was choosen for staging source/intermediate files as it an elastic cloud storage and integrates well with Apache Spark and AWS Redshift. 
   * Apache Spark was choosen as the tool for data exploration as we are dealing with a huge number of files. Spark has the ability to process multiple files contained within a directory and there is no need for building looping logic (as would have been required with Pandas). 
   * AWS Redshift was choosen as the data warehouse as again it is cloud based, scalable and can host dimensional data models for downstrea analytics. 



* Propose how often the data should be updated and why.
   * As we are dealing with stock data, the data could be sourced daily, weekly or even monthly. Daily would be ideal but there would be additional maintaince/support over head due to increased number of jobs. The downstream usage of the data would also be a major factor here. If the analytics is only done on a monthly basis, the load can be monthly. However, if there is a need for more frequent / more current data, then the load frequency could be increased. 



* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
   * Same approach, as the volume for a day currently is < 1500 records. So, even if there is a 100x increase, the number is still managable. Even if the load process is monthly, the existing ETL would be able to scale up. 
   
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    * Directory structure on S3 should be modified to support daily files. Additional Apache Airflow could be considered for workflow management inorder to schedule the daily loads.  
    
 * The database needed to be accessed by 100+ people.
    * Same approach. AWS Redshift can scale up to accomodate this large user base. However, based on the usage/queries executed, there may be a need to re-distribute the data or create replicas to cater to varies usage. Additionally, if a number of users aggregate and consume data of similar kind, there may be a case to introduce and populate aggregate tables to optimize the usage of the system. 