# Project Title
### Data Engineering Capstone Project

#### Project Summary
The main goal of the project to create a Data Lake in S3 using Airflow trigger Spark.  

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import os
import configparser
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

In [2]:
spark = SparkSession.builder.appName("DataEngineeringCapstoneProject").getOrCreate()

# spark = SparkSession.builder.\
# config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
# .enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of project is to build a Data Mode to analysis CVID-19 world vaccination progress. For example We can analysis where need to speed up vaccination progress.

#### The following technologies were used: 
- Spark
- Airflow
- AWS EMR S3 

#### Describe and Gather Data 

##### 1.The Data sources:

The Daily and Total Vaccination for COVID-19 in the world is provided by Kaggle
- [country_vaccinations.csv](https://www.kaggle.com/gpreda/covid-world-vaccination-progress)

WHO Coronavirus Disease (COVID-19) is provided by WHO
- [WHO-COVID-19-global-data.csv](https://covid19.who.int/)

ISO country code provide by Kaggle
- [country_code.csv](https://www.kaggle.com/koki25ando/country-code)

The Data contains 12 columns provide by Kaggle
 - [ Countries_usefulFeatures.csv](https://www.kaggle.com/ishivinal/covid19-useful-features-by-country)
 
The Data extracted from Wikipedia's list of countries by category is provided by Kaggle 
 - [WORLD DATA by country (2020)](https://www.kaggle.com/daniboy370/world-data-by-country-2020?select=Life+expectancy.csv)

##### 2.Gather Data

In [3]:
spark.conf.set('spark.sql.session.timeZone', 'UTC') 

In [6]:
COVID19_vaccinations_spark = spark.read.csv("data/country_vaccinations.csv",sep=",", inferSchema=True, header=True)

##### The data contains the following information:

 - Country- this is the country for which the vaccination information is provided;
 - Country ISO Code - ISO code for the country;
 - Date - date for the data entry; for some of the dates we have only the daily vaccinations, for others, only the (cumulative) total;
 - Total number of vaccinations - this is the absolute number of total immunizations in the country;
 - Total number of people vaccinated - a person, depending on the immunization scheme, will receive one or more (typically 2) vaccines; at a certain moment, the number of vaccination might be larger than the number of people;
 - Total number of people fully vaccinated - this is the number of people that received the entire set of immunization according to the immunization scheme (typically 2); at a certain moment in time, there might be a certain number of people that received one vaccine and another number (smaller) of people that received all vaccines in the scheme;
 - Daily vaccinations (raw) - for a certain data entry, the number of vaccination for that date/country;
 - Daily vaccinations - for a certain data entry, the number of vaccination for that date/country;
 - Total vaccinations per hundred - ratio (in percent) between vaccination number and total population up to the date in the country;
 - Total number of people vaccinated per hundred - ratio (in percent) between population immunized and total population up to the date in the country;
 - Total number of people fully vaccinated per hundred - ratio (in percent) between population fully immunized and total population up to the date in the country;
 - Number of vaccinations per day - number of daily vaccination for that day and country;
 - Daily vaccinations per million - ratio (in ppm) between vaccination number and total population for the current date in the country;
 - Vaccines used in the country - total number of vaccines used in the country (up to date);
 - Source name - source of the information (national authority, international organization, local organization etc.);
 - Source website - website of the source of information;

In [7]:
COVID19_vaccinations_spark.printSchema()

root
 |-- country: string (nullable = true)
 |-- iso_code: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_vaccinations: double (nullable = true)
 |-- people_vaccinated: double (nullable = true)
 |-- people_fully_vaccinated: double (nullable = true)
 |-- daily_vaccinations_raw: double (nullable = true)
 |-- daily_vaccinations: double (nullable = true)
 |-- total_vaccinations_per_hundred: double (nullable = true)
 |-- people_vaccinated_per_hundred: double (nullable = true)
 |-- people_fully_vaccinated_per_hundred: double (nullable = true)
 |-- daily_vaccinations_per_million: double (nullable = true)
 |-- vaccines: string (nullable = true)
 |-- source_name: string (nullable = true)
 |-- source_website: string (nullable = true)



Add columns *year*  *month*  *week*  for aggregating data

In [8]:
# COVID19_vaccinations_spark = COVID19_vaccinations_spark.withColumn('year', func.year('date'))\
#                               .withColumn('month', func.month('date'))\
#                               .withColumn('week_of_year', func.weekofyear('date'))

In [9]:
COVID19_vaccinations_spark.show(1)

+-------+--------+----------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+---------------+------------------+--------------------+
|country|iso_code|      date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|       vaccines|       source_name|      source_website|
+-------+--------+----------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+---------------+------------------+--------------------+
|Albania|     ALB|2021-01-10|               0.0| 

In [10]:
COVID19_global_spark = spark.read.csv("data/WHO-COVID-19-global-data.csv",sep=",", inferSchema=True, header=True)

In [11]:
COVID19_global_spark.printSchema()

root
 |-- Date_reported: string (nullable = true)
 |-- Country_code: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- WHO_region: string (nullable = true)
 |-- New_cases: integer (nullable = true)
 |-- Cumulative_cases: integer (nullable = true)
 |-- New_deaths: integer (nullable = true)
 |-- Cumulative_deaths: integer (nullable = true)



In [12]:
COVID19_global_spark.show(5)

+-------------+------------+-----------+----------+---------+----------------+----------+-----------------+
|Date_reported|Country_code|    Country|WHO_region|New_cases|Cumulative_cases|New_deaths|Cumulative_deaths|
+-------------+------------+-----------+----------+---------+----------------+----------+-----------------+
|     2020/1/3|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|     2020/1/4|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|     2020/1/5|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|     2020/1/6|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|     2020/1/7|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
+-------------+------------+-----------+----------+---------+----------------+----------+-----------------+
only showing top 5 rows



In [13]:
country_code_spark = spark.read.csv("data/country_code.csv",sep=",", inferSchema=True, header=True)

In [14]:
country_code_spark.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Country_name: string (nullable = true)
 |-- code_2digit: string (nullable = true)
 |-- code_3digit: string (nullable = true)



In [15]:
country_code_spark.show(5)

+---+--------------+-----------+-----------+
|_c0|  Country_name|code_2digit|code_3digit|
+---+--------------+-----------+-----------+
|  2|   Afghanistan|         AF|        AFG|
|  3| Aland Islands|         AX|        ALA|
|  4|       Albania|         AL|        ALB|
|  5|       Algeria|         DZ|        DZA|
|  6|American Samoa|         AS|        ASM|
+---+--------------+-----------+-----------+
only showing top 5 rows



In [16]:
COVID19_usefulFeatures_spark = spark.read.csv("data/Countries_usefulFeatures.csv",sep=",", inferSchema=True, header=True)

- Country_Region: Name of the country
- Population_Size: the population size 2018 stats
- Tourism: International tourism, number of arrivals 2018
- Date_FirstFatality: Date of the first Fatality of the COVID-19
- Date_FirstConfirmedCase: Date of the first confirmed case of the COVID-19
- Latitude
- Longitude
- Mean_Age: mean age of the population 2018 stats
- Lockdown_Date: date of the lockdown
- Lockdown_Type: type of the lockdown
- Country_Code: 3 digit country code

In [17]:
COVID19_usefulFeatures_spark.printSchema()
COVID19_usefulFeatures_spark.show(5)

root
 |-- Country_Region: string (nullable = true)
 |-- Population_Size: integer (nullable = true)
 |-- Tourism: integer (nullable = true)
 |-- Date_FirstFatality: string (nullable = true)
 |-- Date_FirstConfirmedCase: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longtitude: double (nullable = true)
 |-- Mean_Age: double (nullable = true)
 |-- Lockdown_Date: string (nullable = true)
 |-- Lockdown_Type: string (nullable = true)
 |-- Country_Code: string (nullable = true)

+--------------+---------------+-------+------------------+-----------------------+----------+------------------+--------+-------------+-------------+------------+
|Country_Region|Population_Size|Tourism|Date_FirstFatality|Date_FirstConfirmedCase|  Latitude|        Longtitude|Mean_Age|Lockdown_Date|Lockdown_Type|Country_Code|
+--------------+---------------+-------+------------------+-----------------------+----------+------------------+--------+-------------+-------------+------------+
|   Afg

In [18]:
country_GDP_spark = spark.read.csv("data/WORLD DATA by country (2020)/GDP per capita.csv",sep=",", inferSchema=True, header=True)

In [19]:
country_GDP_spark.printSchema()
country_GDP_spark.show(5)

root
 |-- Country: string (nullable = true)
 |-- GDP per capita: double (nullable = true)
 |-- ISO-code: string (nullable = true)

+-------------------+--------------+--------+
|            Country|GDP per capita|ISO-code|
+-------------------+--------------+--------+
|        Afghanistan|        2182.0|     AFG|
|            Albania|       14866.0|     ALB|
|            Algeria|       16091.0|     DZA|
|             Angola|        6763.0|     AGO|
|Antigua and Barbuda|       30593.0|     ATG|
+-------------------+--------------+--------+
only showing top 5 rows



In [20]:
country_life_expectancy_spark = spark.read.csv("data/WORLD DATA by country (2020)/Life expectancy.csv",sep=",", inferSchema=True, header=True)

In [21]:
country_life_expectancy_spark.printSchema()
country_life_expectancy_spark.show(5)

root
 |-- Country: string (nullable = true)
 |-- Life expectancy: double (nullable = true)
 |-- ISO-code: string (nullable = true)

+-------------------+---------------+--------+
|            Country|Life expectancy|ISO-code|
+-------------------+---------------+--------+
|        Afghanistan|           64.5|     AFG|
|            Algeria|           76.7|     DZA|
|            Andorra|           81.8|     AND|
|             Angola|           60.8|     AGO|
|Antigua and Barbuda|           76.9|     ATG|
+-------------------+---------------+--------+
only showing top 5 rows



In [22]:
country_median_age_spark = spark.read.csv("data/WORLD DATA by country (2020)/Median age.csv",sep=",", inferSchema=True, header=True)
country_median_age_spark.printSchema()
country_median_age_spark.show(5)

root
 |-- Country: string (nullable = true)
 |-- Median age: double (nullable = true)
 |-- ISO-code: string (nullable = true)

+--------------+----------+--------+
|       Country|Median age|ISO-code|
+--------------+----------+--------+
|   Afghanistan|      27.4|     AFG|
|       Albania|      32.9|     ALB|
|       Algeria|      28.1|     DZA|
|American Samoa|      25.5|     ASM|
|       Andorra|      44.3|     AND|
+--------------+----------+--------+
only showing top 5 rows



In [23]:
country_population_growth_spark = spark.read.csv("data/WORLD DATA by country (2020)/Population growth.csv",sep=",", inferSchema=True, header=True)
country_population_growth_spark.printSchema()
country_population_growth_spark.show(5)

root
 |-- Country: string (nullable = true)
 |-- Population growth: double (nullable = true)
 |-- ISO-code: string (nullable = true)

+--------------+-----------------+--------+
|       Country|Population growth|ISO-code|
+--------------+-----------------+--------+
|   Afghanistan|             2.41|     AFG|
|       Albania|             0.26|     ALB|
|       Algeria|             1.89|     DZA|
|American Samoa|            -0.26|     ASM|
|       Andorra|             0.63|     AND|
+--------------+-----------------+--------+
only showing top 5 rows



In [24]:
country_urbanization_rate_spark = spark.read.csv("data/WORLD DATA by country (2020)/Urbanization rate.csv",sep=",", inferSchema=True, header=True)
country_urbanization_rate_spark.printSchema()
country_urbanization_rate_spark.show(5)

root
 |-- Country: string (nullable = true)
 |-- Urbanization rate: double (nullable = true)
 |-- ISO-code: string (nullable = true)

+---------+-----------------+--------+
|  Country|Urbanization rate|ISO-code|
+---------+-----------------+--------+
|   Monaco|            100.0|     MCO|
|    Nauru|            100.0|     NRU|
|Singapore|            100.0|     SGP|
| Anguilla|            100.0|     AIA|
|  Bermuda|            100.0|     BMU|
+---------+-----------------+--------+
only showing top 5 rows



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

2.1 Explore and Cleaning  *COVID19_vaccinations_spark DataFrame*

In [25]:
# Explore the Null value columns
count = COVID19_vaccinations_spark.count()
{col: f'{ COVID19_vaccinations_spark.filter(COVID19_vaccinations_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in COVID19_vaccinations_spark.columns}

{'country': '0.00%',
 'iso_code': '6.85%',
 'date': '0.00%',
 'total_vaccinations': '34.25%',
 'people_vaccinated': '44.01%',
 'people_fully_vaccinated': '62.53%',
 'daily_vaccinations_raw': '44.37%',
 'daily_vaccinations': '3.47%',
 'total_vaccinations_per_hundred': '34.25%',
 'people_vaccinated_per_hundred': '44.01%',
 'people_fully_vaccinated_per_hundred': '62.53%',
 'daily_vaccinations_per_million': '3.47%',
 'vaccines': '0.00%',
 'source_name': '0.00%',
 'source_website': '0.00%'}

The 'iso_code' fill NULL as iso country code. Other column null value are filled with 0. 

In [26]:
dict_null = COVID19_vaccinations_spark.select("country","iso_code")\
                          .filter(COVID19_vaccinations_spark["iso_code"].isNull()).groupBy("country").count()
dict_null.show()

+----------------+-----+
|         country|count|
+----------------+-----+
|           Wales|   76|
|         England|   76|
|        Scotland|   76|
|Northern Ireland|   76|
+----------------+-----+



In [27]:
vaccinations_cleaned = COVID19_vaccinations_spark.fillna({"iso_code":"GBR", "total_vaccinations":0.0, "people_vaccinated":0.0, "people_fully_vaccinated":0.0,\
                       "daily_vaccinations_raw":0.0, "daily_vaccinations":0.0, "total_vaccinations_per_hundred":0.0,\
                                    "people_vaccinated_per_hundred":0.0, "people_fully_vaccinated_per_hundred":0.0,\
                                    "daily_vaccinations_per_million":0.0
                                   })

2.2 Explore and Cleaning *COVID19_global_spark* 

In [28]:
# Explore the Null value columns
count = COVID19_global_spark.count()
{col: f'{ COVID19_global_spark.filter(COVID19_global_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in COVID19_global_spark.columns}

{'Date_reported': '0.00%',
 'Country_code': '0.00%',
 'Country': '0.00%',
 'WHO_region': '0.00%',
 'New_cases': '0.00%',
 'Cumulative_cases': '0.00%',
 'New_deaths': '0.00%',
 'Cumulative_deaths': '0.00%'}

In [29]:
global_data_clean = COVID19_global_spark
global_data_clean.select("country","Country_code").show(1)

+-----------+------------+
|    country|Country_code|
+-----------+------------+
|Afghanistan|          AF|
+-----------+------------+
only showing top 1 row



The country code is 2 digit

2.3 Explore and Cleaning *COVID19_usefulFeatures_spark*

In [30]:
# Explore the Null value columns
count = COVID19_usefulFeatures_spark.count()
{col: f'{ COVID19_usefulFeatures_spark.filter(COVID19_usefulFeatures_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in COVID19_usefulFeatures_spark.columns}

{'Country_Region': '0.00%',
 'Population_Size': '0.00%',
 'Tourism': '0.00%',
 'Date_FirstFatality': '15.22%',
 'Date_FirstConfirmedCase': '0.00%',
 'Latitude': '0.00%',
 'Longtitude': '0.00%',
 'Mean_Age': '0.00%',
 'Lockdown_Date': '17.93%',
 'Lockdown_Type': '17.93%',
 'Country_Code': '0.00%'}

The NULL values are meaningful,Just Keeping 
- Date_FirstFatality: Date of the first Fatality of the COVID-19
- Lockdown_Date: date of the lockdown
- Lockdown_Type: type of the lockdown

In [31]:
usefulFeatures_cleaned = COVID19_usefulFeatures_spark

2.4 Explore and Cleaning *WORLD DATA by country (2020)*

In [32]:
count = country_GDP_spark.count()
{col: f'{ country_GDP_spark.filter(country_GDP_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_GDP_spark.columns}

{'Country': '0.00%', 'GDP per capita': '0.00%', 'ISO-code': '0.00%'}

In [33]:
country_GDP_cleaned = country_GDP_spark

In [34]:
count = country_life_expectancy_spark.count()
{col: f'{ country_life_expectancy_spark.filter(country_life_expectancy_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_life_expectancy_spark.columns}

{'Country': '0.00%', 'Life expectancy': '0.00%', 'ISO-code': '0.00%'}

In [35]:
country_life_expectancy_cleaned = country_life_expectancy_spark 

In [36]:
count = country_median_age_spark.count()
{col: f'{ country_median_age_spark.filter(country_median_age_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_median_age_spark.columns}

{'Country': '0.00%', 'Median age': '0.00%', 'ISO-code': '0.45%'}

In [37]:
dict_null = country_median_age_spark.select("Country","ISO-code")\
                          .filter(country_median_age_spark["ISO-code"].isNull()).groupBy("country").count()
dict_null.show(truncate=False)

+----------------------+-----+
|country               |count|
+----------------------+-----+
|British Virgin Islands|1    |
+----------------------+-----+



In [38]:
country_median_age_cleaned = country_median_age_spark.fillna({"ISO-code":"VGB"})

In [39]:
count = country_population_growth_spark.count()
{col: f'{ country_population_growth_spark.filter(country_population_growth_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_population_growth_spark.columns}

{'Country': '0.00%', 'Population growth': '0.00%', 'ISO-code': '0.00%'}

In [40]:
country_population_growth_cleaned = country_population_growth_spark

In [41]:
count = country_urbanization_rate_spark.count()
{col: f'{ country_urbanization_rate_spark.filter(country_urbanization_rate_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_urbanization_rate_spark.columns}

{'Country': '0.00%', 'Urbanization rate': '0.00%', 'ISO-code': '0.00%'}

In [42]:
country_urbanization_rate_cleaned = country_urbanization_rate_spark

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The dimension table is the entry point for data, There are four tables *country_region_dim*  *time_dim*  *vaccines_dim*   *source_dim*  and each measurement event in the world has a one-to-one relationship with the corresponding fact table row. There is one fact table *vaccinations_fact*.

![DataModel.jpg](image/DataModel.jpg)


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

![DataModel.jpg](image/ETL.jpg)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [43]:
! ls -l

total 1816
-rw-r--r--   1 tom  staff    1060  3  2 07:58 LICENSE
-rw-r--r--   1 tom  staff      23  3  2 07:58 README.md
drwxr-xr-x@  7 tom  staff     224  3  2 07:58 [34mdata[m[m
-rw-r--r--@  1 tom  staff  895638  3  3 19:57 data.zip
drwxr-xr-x  15 tom  staff     480  3  3 20:50 [34mdocker-airflow[m[m
drwxr-xr-x   6 tom  staff     192  3  3 18:08 [34mimage[m[m
-rw-r--r--   1 tom  staff   24026  3  3 21:30 research.ipynb


```
- LICENSE
- README.md
- data   # raw data files
- image # project relevant images
- docker-airflow #  DAG files 
- research.ipynb # The project jupyter notebook file
```

#### 4.1.1 Install Airflow Docker

##### 1. Download docker and installing
https://www.docker.com/products/docker-desktop

##### 2. Pull docker-airflow
```
docker pull puckel/docker-airflow

cd <project path>/docker-airflow/

docker build --rm --build-arg AIRFLOW_DEPS="datadog,dask" -t puckel/docker-airflow .

docker build --rm --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow .

```

###### 3. Run Airflow Docker

```

docker-compose -f docker-compose-LocalExecutor.yml up -d -v $(pwd)/requirements.txt:/requirements.txt

or 

docker run -d -p 8080:8080 puckel/docker-airflow webserver -v $(pwd)/plugins/:/usr/local/airflow/plugins -v $(pwd)/requirements.txt:/requirements.txt

docker ps

docker exec -it <container_id> /bin/bash
```

http://localhost:8080/admin/

<img src="image/localhost.png" width="400" align="left">

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.