# COVID-19 Daily Tracking Analysis
### Data Engineering Capstone Project

#### Project Summary
This project will be building a data warehouse that tracks global covid-19 daily cases (confirmed, deaths, recovered) along with the global temperature and population data (Country & State level) to help researchers analyze the covid-19 trend in certain location and the impact of temperature & population on covid-19.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up
* Step 6: Project Alternative solution

### Step 1: Scope the Project and Gather Data

#### Scope 

The project will be utilizing the data from Covid-19 Data Repository by the center for Systems Science and Engineering (CSSE) at Johns Hopkins University and global temperature data & population from Kaggle. The raw data will be extracted, cleansed, & saved in AWS S3 bucket. The designed ELT pipeline will then extract the data from S3, stage them in Redshift, and transform them into a set of dimentional tables & one fact table for researchers to develop insights with the covid-19 trend in various location. 

The end solution will include raw data saved in S3 & analytical data saved in Redshift for analysis purpose

tools: AWS, Redshift, S3, Airflow (for future data pipeline task management)

#### Describe and Gather Data 

Data Source:

Global temperatures by state:
    https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data?select=GlobalLandTemperaturesByState.csv \
    includes the average temperature by state & country from 1750 to 2013
    
Global covid 19 daily report:
    https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports \
    includes the daily covid-19 cases (confirmed, deaths, recovered) by state & country from 2020-01-22 to as of today 
    
Indian population state & district data:
    https://www.kaggle.com/sirpunch/indian-census-data-with-geospatial-indexing?select=district+wise+population+for+year+2001+and+2011.csv \
    includes state, and district wise population for year 2001 and 2011

China population by province data:
    https://www.kaggle.com/quanncore/china-provinces-population \
    includes province level population data; Data has been collected from: http://population.city/ manually.

US population by state data:
    https://www.census.gov/programs-surveys/popest/technical-documentation/research/evaluation-estimates/2020-evaluation-estimates/2010s-state-detail.html \
    includes estimates of the Total Resident Population and Resident Population Age 18 Years and Older in US by state
 
Data Caveats and assumptions:
* The timestamp aspect of temperature & population data is not considered in this project
* Population data contains only three major countries for this project for demostration purpose
* Since there is no API for the temperature & population data, all four csv files are manually downloaded and uploaded to the project workspace.

In [2]:
%load_ext sql

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
import requests
import pandas as pd
import re 
import os
import configparser

In [None]:
#Set up all AWS configurations to connect to the redshift cluster in the next step
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY=config.get('AWS','AWS_ACCESS_KEY_ID')
SECRET= config.get('AWS','AWS_SECRET_ACCESS_KEY')

DWH_DB= config.get("DWH","DB_NAME")
DWH_DB_USER= config.get("DWH","DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DB_PASSWORD")
DWH_PORT = config.get("DWH","DB_PORT")

DWH_ENDPOINT=config.get("DWH","HOST")
DWH_ROLE_ARN=config.get("IAM_ROLE","ARN")

In [None]:
#Connect to Redshift Cluster
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

In [2]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"


In [4]:
# Read in the covid-19 daily data here--scrap the data from the source link

# Store the url as a string scalar: url => str
url = "https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports"

# Issue request: r => requests.models.Response
r = requests.get(url)

# Extract text: html_doc => str
html_doc = r.text

# Parse the HTML: soup => bs4.BeautifulSoup
soup = BeautifulSoup(html_doc, "lxml")

# Find all 'a' tags (which define hyperlinks): a_tags => bs4.element.ResultSet
a_tags = soup.find_all('a')

# Store a list of urls ending in .csv: urls => list
urls = ['https://raw.githubusercontent.com'+re.sub('/blob', '', link.get('href')) 
        for link in a_tags  if '.csv' in link.get('href')]

# Initialise an empty list the same length as the urls list: df_list => list
df_list = [pd.DataFrame([None]) for i in range(len(urls))]

# Store an empty list of dataframes: df_list => list

df_list = [pd.read_csv(url, sep = ',') for url in urls]

In [5]:
# load all COVID-19 csv files into one dataframe
df= pd.concat(df_list, axis=0, ignore_index=True, sort=False)

In [6]:
#Read in Global Land Temperature by State csv as dataframe
df_temp=pd.read_csv("GlobalLandTemperaturesByState.csv")

In [7]:
#Read in all population data as dataframe
df_china_base=pd.read_csv("china_provinces_population.csv")
df_us_base=pd.read_csv("sc-est2020-18+pop-res.csv")
df_india_base=df_india_base=pd.read_csv('district wise population for year 2001 and 2011.csv')

In [2]:

# from pyspark.sql import SparkSession
# spark = SparkSession.builder.\
# config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
# .enableHiveSupport().getOrCreate()
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
spark

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [9]:
# Clean up Covid-19 Daily Report Data--handle data inconsistancy & duplicates & missing values etc.

#one type of colunm names
col_A=["FIPS","Admin2","Province_State","Country_Region","Last_Update","Lat","Long_","Confirmed","Deaths","Recovered"]

#the other type of column names
col_B=["FIPS","Admin2","Province/State","Country/Region","Last Update","Latitude", "Longitude","Confirmed","Deaths","Recovered"]


#Process the data saved in with column names as col_A: Drop the duplicates, rename the column names, update the timestamp field to proper format
df_A=df[df['Country/Region'].isna()][col_A].copy()
df_A.drop_duplicates(inplace=True)
df_A.rename(columns = {'Lat':'Latitude', 'Long_':'Longitude', 'Province_State':'State','Country_Region':'Country'},inplace=True)
df_A["Last_Update"]=pd.to_datetime(df_A.Last_Update)
df_A["Date"]=df_A["Last_Update"].dt.date

#Process the data saved in with column names as col_B: Drop the duplicates, rename the column names, update the timestamp field to proper format
df_B=df[~df['Country/Region'].isna()][col_B].copy()
df_B.drop_duplicates(inplace=True)
df_B.rename(columns={'Province/State':'State', 'Country/Region':'Country','Last Update':'Last_Update'}, inplace=True)
df_B["Last_Update"]=pd.to_datetime(df_B.Last_Update)
df_B["Date"]=df_B["Last_Update"].dt.date

df_covid=df_A.append(df_B)
#Clean up the column formats
df_covid['FIPS']=df_covid['FIPS'].astype('float64')
df_covid['Latitude']=df_covid['Latitude'].astype('float64')
df_covid['Longitude']=df_covid['Longitude'].astype('float64')
df_covid['Confirmed']=df_covid['Confirmed'].astype('float64')
df_covid['Deaths']=df_covid['Deaths'].astype('float64')
df_covid['Recovered']=df_covid['Recovered'].astype('float64')
df_covid['Year']=df_covid["Last_Update"].dt.year
df_covid['Month']=df_covid["Last_Update"].dt.month

#this step is used to create spark dataframe so that it will not cause any field type conflict
df_covid['Admin2'].fillna("None", inplace=True)
df_covid['State'].fillna("None", inplace=True)

df_covid.drop_duplicates(inplace=True)
df_covid.sort_values(by=["Year","Month","Country","State"], inplace=True)
df_covid.reset_index(drop=True, inplace=True)

In [10]:
df_covid.head(5)

Unnamed: 0,FIPS,Admin2,State,Country,Last_Update,Latitude,Longitude,Confirmed,Deaths,Recovered,Date,Year,Month
0,,,New South Wales,Australia,2020-01-27 23:59:00,,,4.0,,,2020-01-27,2020,1
1,,,New South Wales,Australia,2020-01-28 23:00:00,,,4.0,,,2020-01-28,2020,1
2,,,New South Wales,Australia,2020-01-29 19:30:00,,,4.0,,,2020-01-29,2020,1
3,,,New South Wales,Australia,2020-01-30 16:00:00,,,4.0,,2.0,2020-01-30,2020,1
4,,,New South Wales,Australia,2020-01-31 23:59:00,,,4.0,,2.0,2020-01-31,2020,1


In [11]:
#Clean up the temperature data

# map the state to align with Covid-19 data
# Cleaned up majority of the countries except Russia
def map_state(x):
    #US state update
    if x=="Georgia (State)":
        return "Georgia"
    #China state update
    elif  x=="Ningxia Hui":
        return "Ningxia"
    elif x=="Xinjiang Uygur":
        return "Xinjiang"
    elif x=="Xizang":
        return "Tibet" 
    elif x=="Nei Mongol":
        return "Inner Mongolia"
    #India state update
    elif x=="Andaman And Nicobar":
        return "Andaman and Nicobar Islands"
    elif x=="Dadra And Nagar Haveli":
        return "Dadra and Nagar Haveli and Daman and Diu"
    elif x=="Daman And Diu":
        return "Dadra and Nagar Haveli and Daman and Diu"
    elif x=="Orissa":
        return "Odisha"
    #Russia state update
    elif x=="Adygey":
        return "Adygea Republic"
    elif x=="Amur":
        return "Amur Oblast"
    elif x=="Altay":
        return "Altai Republic"
    elif x=="Arkhangel'Sk":
        return "Arkhangelsk Oblast"
    elif x=="Astrakhan'":
        return "Astrakhan Oblast"
    elif x=="Bashkortostan":
        return "Bashkortostan Republic"
    elif x=="Belgorod":
        return "Belgorod Oblast"
    elif x=="Bryansk":
        return "Bryansk Oblast"
    elif x=="Buryat":
        return "Buryatia Republic"
    elif x=="Chechnya":
        return "Chechen Republic"
    elif x=="Chelyabinsk":
        return "Chelyabinsk Oblast"
    else:
        return x
    
df_temp['Country']=df_temp['Country'].apply(lambda x: 'US' if x == 'United States' else x)

df_temp['State']=df_temp['State'].apply(lambda x: map_state(x))
df_temp.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
0,1855-05-01,25.544,1.171,Acre,Brazil
1,1855-06-01,24.228,1.103,Acre,Brazil
2,1855-07-01,24.371,1.044,Acre,Brazil
3,1855-08-01,25.427,1.073,Acre,Brazil
4,1855-09-01,25.675,1.014,Acre,Brazil


In [12]:
#Get the average termperature for each country & state as a reference table
df_temp_grp=df_temp.groupby(['Country', 'State'])['AverageTemperature'].mean().to_frame().reset_index()

df_temp_grp.head(5)

Unnamed: 0,Country,State,AverageTemperature
0,Australia,Australian Capital Territory,11.581977
1,Australia,New South Wales,17.231321
2,Australia,Northern Territory,24.788343
3,Australia,Queensland,23.017097
4,Australia,South Australia,19.143123


In [13]:
#Clean up China population data
df_china=df_china_base.copy()
df_china['Country']='China'
df_china.rename(columns={'PROVINCE NAME':'State', 'POPULATION':'Population'}, inplace=True)

#Clean up US population data
df_us=df_us_base[df_us_base['NAME'] !='United States'][['NAME', 'POPESTIMATE2020']]
df_us['Country']='US'
df_us.rename(columns={'NAME':'State', 'POPESTIMATE2020':'Population'}, inplace=True)

#Clean up Indian population data
def map_india_state(x):   
    if x=="Andaman & Nicobar Islands":
        return "Andaman and Nicobar Islands"
    elif x=="Dadra & Nagar Haveli":
        return "Dadra and Nagar Haveli and Daman and Diu"
    elif x=="Daman & Diu":
        return "Dadra and Nagar Haveli and Daman and Diu"
    elif x=="Jammu & Kashmir":
        return "Jammu and Kashmir"
    elif x=="Odisha (Orissa)":
        return "Odisha"
    elif x=="Puducherry (Pondicherry)":
        return "Puducherry"
    else:
        return x
    
df_india_base['State']=df_india_base['State'].apply(lambda x: map_india_state(x))
df_india=df_india_base[['State','Population in 2011']].groupby(['State'])['Population in 2011'].sum().to_frame().reset_index()
df_india['Country']='India'
df_india.rename(columns={'Population in 2011':'Population'}, inplace=True)


#Combine population data from all three countries 
df_population= pd.concat([df_us,df_india, df_china], axis=0, ignore_index=True, sort=False)
df_population.head(5)

Unnamed: 0,State,Population,Country
0,Alabama,4921532,US
1,Alaska,731158,US
2,Arizona,7421401,US
3,Arkansas,3030522,US
4,California,39368078,US


#### Save the raw data to S3
Save the daily Covid-19 data in S3 bucket with partition by Year & Month
Save the temperature data directly to a S3 bucket since the data is already grouped & in a relatively small size

In [14]:
# Save the temperature data
sdf_temp = spark.createDataFrame(df_temp_grp)

#Save to local workspace folder
#sdf_temp.write.mode("overwrite").parquet("AvgTemperatureByState")

#Save to S3
sdf_temp.write.mode("overwrite").parquet("s3a://udacity-capston/AvgTemperatureByState")

In [15]:
# Save the temperature data
sdf_pop = spark.createDataFrame(df_population)
# sdf_pop.printSchema()

#Save to local workspace folder
#sdf_pop.write.mode("overwrite").parquet("PopulationByState")

#Save to S3
sdf_temp.write.mode("overwrite").parquet("s3a://udacity-capston/PopulationByState")

In [16]:
#Save the Daily Covid data
sdf_covid=spark.createDataFrame(df_covid)

#Save to local workspace folder
#sdf_covid.write.partitionBy("year","month").mode("overwrite").parquet("DailyCovid")

#Save to S3 with year & month as partition
sdf_covid.write.partitionBy("year","month").mode("overwrite").parquet("s3a://udacity-capston/DailyCovid")

In [4]:
sdf_covid.printSchema()

root
 |-- FIPS: double (nullable = true)
 |-- Admin2: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Last_Update: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Confirmed: double (nullable = true)
 |-- Deaths: double (nullable = true)
 |-- Recovered: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Below is the Entity Relationship Diagram:
![ERD](ERD.JPG)
The fact_covid the fact table that keeps track of the measurements such as the number of confirmed cases, deaths, and recovered.
The dim_time and dim_location are served as dimensional tables to enrich the analysis around time & location. The temperature & population data are aggregated to the State and Country level as the additional attributes in dim_location table.

The star schema is used for this project for its simple query design and fast aggregation for analysis purpose. As the purpose of the project is quite simple -- to analyze the covid-trend with the impact of timing, temperature, and population. Keeping all covid measurement data in one fact table and all other critical attributes in other dimensional tables makes most sense for this project.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
1. create all the analytical tables and staging tables
2. Copy the data from S3 to staging tables on Redshift
3. Transform the staging tables and load the data into each analytical table


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

* Create Tables

In [None]:
%%sql 
DROP TABLE IF EXISTS "staging_covid";
CREATE TABLE IF NOT EXISTS "staging_covid" (
    "FIPS" varchar ,
    "Admin2" varchar,
    "State" varchar,
    "Country" varchar,
    "Last_update" TIMESTAMP,
    "Latitude" numeric,
    "Longitude" numeric,
    "Confirmed" numeric,
    "Deaths" numeric,
    "Recovered" numeric,
    "Date" DATE, 
    "Year" int4,
    "Month" integer
);

DROP TABLE IF EXISTS "staging_temp";
CREATE TABLE IF NOT EXISTS  "staging_temp" (
    "Country" varchar,
    "State" varchar,
    "AverageTemperature" numeric

);

DROP TABLE IF EXISTS "staging_pop";
CREATE TABLE IF NOT EXISTS "staging_pop" (
    "Country" varchar,
    "State" varchar,
    "Population" numeric
);

DROP TABLE IF EXISTS "fact_covid";
CREATE TABLE IF NOT EXISTS "fact_covid" (
    "covid_id" INTEGER IDENTITY (1, 1) primary key distkey,
    "Last_update" timestamp NOT NULL sortkey ,
    "location_id" integer,  
    "Confirmed" numeric,
    "Deaths" numeric,
    "Recovered" numeric,
    "Date" DATE NOT NULL, 
    "Year" integer NOT NULL,
    "Month" integer NOT NULL
); 

DROP TABLE IF EXISTS "dim_location";
CREATE TABLE IF NOT EXISTS "dim_location" (
    "location_id" INTEGER IDENTITY (1, 1) primary key sortkey,
    "State" varchar,
    "Country" varchar NOT NULL,
    "AverageTemperature" numeric,
    "Population" numeric

)
diststyle all;

DROP TABLE IF EXISTS "dim_time";
CREATE TABLE IF NOT EXISTS "dim_time" (
    "Last_update" timestamp NOT NULL primary key sortkey,
    "day" int NOT NULL,
    "week" int NOT NULL,
    "month" int NOT NULL,
    "year" int NOT NULL, 
    "weekday" int

)
diststyle all;

* Load values to tables

In [None]:
#copy covid data from S3 to staging table
qry_covid_stage = """
    COPY staging_covid FROM {}
        CREDENTIALS 'aws_iam_role={}'
        REGION '{}'
        FORMAT AS PARQUET
""".format(config.get('S3','COVID_DATA'), config.get('IAM_ROLE','ARN') ,config.get('REGION','REGION'))
%sql $qry_covid_stage

In [None]:
#copy temperature data from S3 to staging table
qry_temp_stage = """
    COPY staging_covid FROM {}
        CREDENTIALS 'aws_iam_role={}'
        REGION '{}'
        FORMAT AS PARQUET
""".format(config.get('S3','TEMP_DATA'), config.get('IAM_ROLE','ARN') ,config.get('REGION','REGION'))
%sql $qry_temp_stage

In [None]:
#copy population data from S3 to staging table
qry_pop_stage = """
    COPY staging_covid FROM {}
        CREDENTIALS 'aws_iam_role={}'
        REGION '{}'
        FORMAT AS PARQUET
""".format(config.get('S3','POP_DATA'), config.get('IAM_ROLE','ARN') ,config.get('REGION','REGION'))
%sql $qry_pop_stage

In [None]:
%%sql
insert into dim_location (
    State,
    Country,
    AverageTemperature,
    Population

) select distinct 
    t1.State,
    t1.Country,
    t2.AverageTemperature,
    t3.Population
    from staging_covid t1 left join staging_temp t2 on t1.State=t2.State and t1.Country=t2.Country
        left join staging_pop t3 on t1.State=t3.State and t1.Country=t3.Country
    where t1.Country is not null and t1.Country <>''
    order by t1.Country, t1.State
;

In [None]:
%%sql
insert into fact_covid (   
    Last_update,
    location_id,    
    Confirmed,
    Deaths,
    Recovered,
    Date, 
    Year,
    Month ) 
    
    select distinct
        c.Last_update,
        l.location_id,
        sum(c.Confirmed) as Confirmed,
        sum(c.Deaths) as Deaths,
        sum(c.Recovered) as Recovered,
        c.Date, 
        c.Year,
        c.Month     

    from staging_covid c 
    
    left join dim_location l on c.State=l.State and c.Country=l.Country
    group by c.Last_update, l.location_id, c.Date, c.Year, c.Month 
    order by c.Last_update, l.location_id
;

In [None]:
%%sql
insert into dim_time ( 
    Last_update timestamp NOT NULL primary key sortkey,
    day,
    week,
    month,
    year, 
    weekday) 

    select distinct 
        c.Last_update,
        EXTRACT (DAY FROM c.Last_update) as day,
        EXTRACT (WEEK FROM c.Last_update) as week, 
        EXTRACT (MONTH FROM c.Last_update) as month,
        EXTRACT (YEAR FROM c.Last_update) as year,
        EXTRACT (WEEKDAY FROM c.Last_update) as weekday
    from staging_covid c
    order by c.Last_update
    ;

In [43]:
#Show the final results of the data warehouse table -- dim_location saved in the local workspace 
dim_location=spark.read.parquet("spark-warehouse/dim_location")
dim_location.show(20)

+-----------+--------------------+-------------------+------------------+----------+
|location_id|               State|            Country|AverageTemperature|Population|
+-----------+--------------------+-------------------+------------------+----------+
|          1|                None|         Azerbaijan|              null|      null|
|          2|                None|        Afghanistan|              null|      null|
|          3|                None|            Albania|              null|      null|
|          4|                None|            Algeria|              null|      null|
|          5|                None|            Andorra|              null|      null|
|          6|                None|             Angola|              null|      null|
|          7|                None|Antigua and Barbuda|              null|      null|
|          8|                None|          Argentina|              null|      null|
|          9|                None|            Armenia|           

In [44]:
#Show the final results of the data warehouse table -- dim_time saved in the local workspace 
dim_time=spark.read.parquet("spark-warehouse/dim_time")
dim_time.show(20)

+-------------------+---+----+-----+----+-------+
|        Last_update|day|week|month|year|weekday|
+-------------------+---+----+-----+----+-------+
|2021-02-21 05:24:33| 21|   7|    2|2021|      1|
|2021-02-22 05:24:21| 22|   8|    2|2021|      2|
|2021-02-23 05:23:41| 23|   8|    2|2021|      3|
|2021-02-24 05:29:16| 24|   8|    2|2021|      4|
|2021-02-25 05:24:57| 25|   8|    2|2021|      5|
|2021-02-26 05:22:40| 26|   8|    2|2021|      6|
|2021-02-27 05:22:28| 27|   8|    2|2021|      7|
|2021-02-28 05:22:20| 28|   8|    2|2021|      1|
|2021-03-01 05:23:01|  1|   9|    3|2021|      2|
|2021-03-02 05:23:30|  2|   9|    3|2021|      3|
|2021-03-03 05:23:28|  3|   9|    3|2021|      4|
|2021-03-04 05:24:24|  4|   9|    3|2021|      5|
|2021-03-05 05:26:29|  5|   9|    3|2021|      6|
|2021-03-06 04:23:41|  6|   9|    3|2021|      7|
|2021-03-07 05:21:54|  7|   9|    3|2021|      1|
|2020-10-31 04:24:44| 31|  44|   10|2020|      7|
|2020-11-01 04:36:19|  1|  44|   11|2020|      1|


In [45]:
#Show the final results of the data warehouse table -- dim_time saved in the local workspace 
fact_covid=spark.read.parquet("spark-warehouse/fact_covid")
fact_covid.show(20)

+--------+-------------------+-----------+---------+-------+---------+----------+----+-----+
|covid_id|        Last_update|location_id|Confirmed| Deaths|Recovered|      Date|year|month|
+--------+-------------------+-----------+---------+-------+---------+----------+----+-----+
|  327322|2021-07-01 04:21:19|         64| 552937.0|15469.0| 513108.0|2021-07-01|2021|    7|
|  328084|2021-07-02 04:21:47|         64| 554681.0|15491.0| 517245.0|2021-07-02|2021|    7|
|  328846|2021-07-03 04:21:37|         64| 555831.0|15541.0| 519084.0|2021-07-03|2021|    7|
|  329609|2021-07-04 04:21:38|         64| 556637.0|15576.0| 519084.0|2021-07-04|2021|    7|
|  330372|2021-07-05 04:21:28|         64| 556667.0|15579.0| 519084.0|2021-07-05|2021|    7|
|  331135|2021-07-06 04:21:32|         64| 556694.0|15587.0| 520959.0|2021-07-06|2021|    7|
|  331899|2021-07-07 04:21:20|         64| 557708.0|15624.0| 520959.0|2021-07-07|2021|    7|
|  327323|2021-07-01 04:21:19|         65| 396442.0| 8606.0| 249641.0|

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

* check if the result table is empty or not; expected result: count >0

In [3]:
#read the final results saved in local workspace
fact_covid=spark.read.parquet("spark-warehouse/fact_covid")
fact_covid.createOrReplaceTempView("fact_covid")
spark.sql("select count(*) as count from fact_covid").show()

+------+
| count|
+------+
|332606|
+------+



In [4]:
#read the final results saved in local workspace
dim_location=spark.read.parquet("spark-warehouse/dim_location")
dim_location.createOrReplaceTempView("dim_location")
spark.sql("select count(*) as count from dim_location").show()

+-----+
|count|
+-----+
| 1028|
+-----+



In [5]:
#read the final results saved in local workspace
dim_time=spark.read.parquet("spark-warehouse/dim_time")
dim_time.createOrReplaceTempView("dim_time")
spark.sql("select count(*) as count from dim_time").show()

+-----+
|count|
+-----+
| 3006|
+-----+



* check unique key in dim_location table; 
* the count of primary key (location_id); the distinct count of primary key (location_id); the distinct count of (country & state) combination
* expected result: all three count fields should return the same value as 1028

In [6]:
# check location count
#check the location count--should be 1028 unique records
spark.sql("""select count(location_id) as location_id_count, 
            count(distinct location_id) as distinct_location_id_count,
          count(distinct (Country,State)) as Country_State_count from dim_location""").show()


+-----------------+--------------------------+-------------------+
|location_id_count|distinct_location_id_count|Country_State_count|
+-----------------+--------------------------+-------------------+
|             1028|                      1028|               1028|
+-----------------+--------------------------+-------------------+



* Source/Count checks to ensure completeness
* Check the dim_time table contains values in proper time range;
* Expecked result: min timestamp: 2020-01-22 & max timestamp: 2021-07-07

In [7]:

# read the final results saved in local workspace
spark.sql("select min(Last_update) as min_ts, max(Last_update) as max_ts from dim_time").show()


+-------------------+-------------------+
|             min_ts|             max_ts|
+-------------------+-------------------+
|2020-01-22 17:00:00|2021-07-07 04:21:20|
+-------------------+-------------------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Please see Data Dictionary.xlsx file

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    

* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

* Clearly state the rationale for the choice of tools and technologies for the project. \
This project is built fully on AWS cloud service by utilizing S3 as the raw data storage and Redshift as the data warehouse platform. In term of the pricing with the storage option, AWS is relatively cheap comparing to other physical servers. Loading data from S3 and Redshift with the copy option makes the ETL process much simple and efficient. Redshift is also easy to scale up and out with its compute nodes, which will help when database grows. In addition, with AWS platform, there are also great built-in tools such as SageMaker for building machine learning models, QuickSight for data visualization, Apache Airflow for scheduling data pipelines etc. 


* Propose how often the data should be updated and why. \
Currently the data source for Covid-19 daily reports updates daily and new csv record is created with the data from the prior day with around 4000 rows. To keep the analytics up to date as the source, the data should be updated daily. Since it will be incremental load with one csv file everyday, the Airflow schedule job will be relatively quick and low maintenance

* The data was increased by 100x. \
If the data was increased by 100x, the S3 storage option can be utilized along with the Lifecycle policy. The older version of the raw data can be transferred to either S3 Standard IA or Glacier for archive purpose. As for the data warehouse in Redshift, either scaling up the compute nodes or adding more nodes can help with the huge increase of the data size. 

 * The data populates a dashboard that must be updated on a daily basis by 7am every day. \
This can be done by setting up the scheduling data pipeline with Apache Airflow. Apache Airflow allows users to write DAGs in python and run the data pipeline along with the data validation on a schedule. It also provides an UI to track the data pipeline schedule performance, which will help the users to check if any step failed in the ETL process. The job can also be triggered manually after fixing the error.

* The database needed to be accessed by 100+ people. \
Depend on the different needs by the increase of audience, there are couple different options. For example, if the research group is specifically looking for the Covid-19 tracker for the U.S., the smaller data warehouse CUBE can be created with U.S. only data from the existing database. This will help the query performance and resource utilization. If the group of audience is doing the repetitive query for a dashboard view, the solution can be as the prior answer--materialized view and schedule SQL query run. The dashboard can be the destination for this group of audience for the checking the analysis results, instead of providing read access to the database directly. In addition, Redshift provides linear concurrency scaling for increase of concurrent users and it can boost throughput by 35 times.

### Step 6: Project Alternative solution

Run steps in command line:
1. prep_data.py
2. create_tables.py
3. etl.py

