# Project Title
### WHO-COVID-19-Data-Warehouse-Redshift

#### Project Summary
The main goal of the project to create a Data Lake in S3 using Airflow trigger Spark.  

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
import pandas as pd
import os
import configparser
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

In [3]:
spark = SparkSession.builder.appName("WHO-COVID-19-Data-Warehouse-Redshift").getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of project is to build a Data Mode to analysis CVID-19 world vaccination progress. For example We can analysis where need to speed up vaccination progress.

#### The following technologies were used: 
- Spark
- Airflow
- AWS Redshift, S3

#### Describe and Gather Data 

##### 1.The Data sources:

The Daily and Total Vaccination for COVID-19 in the world is provided by Kaggle
- [country_vaccinations.csv](https://www.kaggle.com/gpreda/covid-world-vaccination-progress)

WHO Coronavirus Disease (COVID-19) is provided by WHO
- [WHO-COVID-19-global-data.csv](https://covid19.who.int/)
There are 1002510 rows

ISO country code provide by Kaggle
- [country_code.csv](https://www.kaggle.com/koki25ando/country-code)

The Data contains 12 columns provide by Kaggle
 - [ Countries_usefulFeatures.csv](https://www.kaggle.com/ishivinal/covid19-useful-features-by-country)
 
The Data extracted from Wikipedia's list of countries by category is provided by Kaggle 
 - [WORLD DATA by country (2020)](https://www.kaggle.com/daniboy370/world-data-by-country-2020?select=Life+expectancy.csv)

##### 2.Gather Data

In [4]:
spark.conf.set('spark.sql.session.timeZone', 'UTC') 

In [5]:
COVID19_vaccinations_spark = spark.read.csv("data/country_vaccinations.csv",sep=",", inferSchema=True, header=True)

##### The data contains the following information:

 - Country- this is the country for which the vaccination information is provided;
 - Country ISO Code - ISO code for the country;
 - Date - date for the data entry; for some of the dates we have only the daily vaccinations, for others, only the (cumulative) total;
 - Total number of vaccinations - this is the absolute number of total immunizations in the country;
 - Total number of people vaccinated - a person, depending on the immunization scheme, will receive one or more (typically 2) vaccines; at a certain moment, the number of vaccination might be larger than the number of people;
 - Total number of people fully vaccinated - this is the number of people that received the entire set of immunization according to the immunization scheme (typically 2); at a certain moment in time, there might be a certain number of people that received one vaccine and another number (smaller) of people that received all vaccines in the scheme;
 - Daily vaccinations (raw) - for a certain data entry, the number of vaccination for that date/country;
 - Daily vaccinations - for a certain data entry, the number of vaccination for that date/country;
 - Total vaccinations per hundred - ratio (in percent) between vaccination number and total population up to the date in the country;
 - Total number of people vaccinated per hundred - ratio (in percent) between population immunized and total population up to the date in the country;
 - Total number of people fully vaccinated per hundred - ratio (in percent) between population fully immunized and total population up to the date in the country;
 - Number of vaccinations per day - number of daily vaccination for that day and country;
 - Daily vaccinations per million - ratio (in ppm) between vaccination number and total population for the current date in the country;
 - Vaccines used in the country - total number of vaccines used in the country (up to date);
 - Source name - source of the information (national authority, international organization, local organization etc.);
 - Source website - website of the source of information;

In [6]:
COVID19_vaccinations_spark.printSchema()

root
 |-- country: string (nullable = true)
 |-- iso_code: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- total_vaccinations: double (nullable = true)
 |-- people_vaccinated: double (nullable = true)
 |-- people_fully_vaccinated: double (nullable = true)
 |-- daily_vaccinations_raw: double (nullable = true)
 |-- daily_vaccinations: double (nullable = true)
 |-- total_vaccinations_per_hundred: double (nullable = true)
 |-- people_vaccinated_per_hundred: double (nullable = true)
 |-- people_fully_vaccinated_per_hundred: double (nullable = true)
 |-- daily_vaccinations_per_million: double (nullable = true)
 |-- vaccines: string (nullable = true)
 |-- source_name: string (nullable = true)
 |-- _c14: string (nullable = true)



In [7]:
COVID19_vaccinations_spark.show(1)

+-------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+---------------+------------------+----+
|country|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|       vaccines|       source_name|_c14|
+-------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+---------------+------------------+----+
|Albania|     ALB|2021-01-09 16:00:00|               0.0|             

In [8]:
COVID19_global_spark = spark.read.csv("data/WHO-COVID-19-global-data.csv",sep=",", inferSchema=True, header=True)

In [9]:
COVID19_global_spark.printSchema()

root
 |-- Date_reported: string (nullable = true)
 |-- Country_code: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- WHO_region: string (nullable = true)
 |-- New_cases: integer (nullable = true)
 |-- Cumulative_cases: integer (nullable = true)
 |-- New_deaths: integer (nullable = true)
 |-- Cumulative_deaths: integer (nullable = true)



In [10]:
print(f'There are {COVID19_global_spark.count()} rows')

There are 1002510 rows


In [11]:
COVID19_global_spark.show(5)

+-------------+------------+-----------+----------+---------+----------------+----------+-----------------+
|Date_reported|Country_code|    Country|WHO_region|New_cases|Cumulative_cases|New_deaths|Cumulative_deaths|
+-------------+------------+-----------+----------+---------+----------------+----------+-----------------+
|     2020/1/3|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|     2020/1/4|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|     2020/1/5|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|     2020/1/6|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|     2020/1/7|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
+-------------+------------+-----------+----------+---------+----------------+----------+-----------------+
only showing top 5 rows



In [12]:
country_code_spark = spark.read.csv("data/country_code.csv",sep=",", inferSchema=True, header=True)

In [13]:
country_code_spark.printSchema()

root
 |-- Country_name: string (nullable = true)
 |-- code_2digit: string (nullable = true)
 |-- code_3digit: string (nullable = true)



In [13]:
country_code_spark.show(5)

+--------------+-----------+-----------+
|  Country_name|code_2digit|code_3digit|
+--------------+-----------+-----------+
|   Afghanistan|         AF|        AFG|
| Aland Islands|         AX|        ALA|
|       Albania|         AL|        ALB|
|       Algeria|         DZ|        DZA|
|American Samoa|         AS|        ASM|
+--------------+-----------+-----------+
only showing top 5 rows



In [14]:
COVID19_usefulFeatures_spark = spark.read.csv("data/Countries_usefulFeatures.csv",sep=",", inferSchema=True, header=True)

- Country_Region: Name of the country
- Population_Size: the population size 2018 stats
- Tourism: International tourism, number of arrivals 2018
- Date_FirstFatality: Date of the first Fatality of the COVID-19
- Date_FirstConfirmedCase: Date of the first confirmed case of the COVID-19
- Latitude
- Longitude
- Mean_Age: mean age of the population 2018 stats
- Lockdown_Date: date of the lockdown
- Lockdown_Type: type of the lockdown
- Country_Code: 3 digit country code

In [15]:
COVID19_usefulFeatures_spark.printSchema()
COVID19_usefulFeatures_spark.show(5)

root
 |-- Country_Region: string (nullable = true)
 |-- Population_Size: integer (nullable = true)
 |-- Tourism: integer (nullable = true)
 |-- Date_FirstFatality: timestamp (nullable = true)
 |-- Date_FirstConfirmedCase: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longtitude: double (nullable = true)
 |-- Mean_Age: double (nullable = true)
 |-- Lockdown_Date: timestamp (nullable = true)
 |-- Lockdown_Type: string (nullable = true)
 |-- Country_Code: string (nullable = true)

+--------------+---------------+-------+-------------------+-----------------------+----------+------------------+--------+-------------------+-------------+------------+
|Country_Region|Population_Size|Tourism| Date_FirstFatality|Date_FirstConfirmedCase|  Latitude|        Longtitude|Mean_Age|      Lockdown_Date|Lockdown_Type|Country_Code|
+--------------+---------------+-------+-------------------+-----------------------+----------+------------------+--------+-------------------+-----

In [16]:
country_GDP_spark = spark.read.csv("data/WORLD-DATA-by-country-2020/GDP_per_capita.csv",sep=",", inferSchema=True, header=True)

In [17]:
country_GDP_spark.printSchema()
country_GDP_spark.show(5)

root
 |-- Country: string (nullable = true)
 |-- GDP per capita: double (nullable = true)
 |-- ISO-code: string (nullable = true)

+-------------------+--------------+--------+
|            Country|GDP per capita|ISO-code|
+-------------------+--------------+--------+
|             Angola|        2182.0|     AFG|
|Antigua and Barbuda|       14866.0|     ALB|
|          Argentina|       16091.0|     DZA|
|             Angola|        6763.0|     AGO|
|Antigua and Barbuda|       30593.0|     ATG|
+-------------------+--------------+--------+
only showing top 5 rows



In [18]:
country_life_expectancy_spark = spark.read.csv("data/WORLD-DATA-by-country-2020/Life_expectancy.csv",sep=",", inferSchema=True, header=True)

In [19]:
country_life_expectancy_spark.printSchema()
country_life_expectancy_spark.show(5)

root
 |-- country: string (nullable = true)
 |-- life_expectancy: double (nullable = true)
 |-- iso_code: string (nullable = true)

+-------------------+---------------+--------+
|            country|life_expectancy|iso_code|
+-------------------+---------------+--------+
|        Afghanistan|           64.5|     AFG|
|            Algeria|           76.7|     DZA|
|            Andorra|           81.8|     AND|
|             Angola|           60.8|     AGO|
|Antigua and Barbuda|           76.9|     ATG|
+-------------------+---------------+--------+
only showing top 5 rows



In [20]:
country_median_age_spark = spark.read.csv("data/WORLD-DATA-by-country-2020/Median_age.csv",sep=",", inferSchema=True, header=True)
country_median_age_spark.printSchema()
country_median_age_spark.show(5)

root
 |-- country: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- iso_code: string (nullable = true)

+--------------+----------+--------+
|       country|median_age|iso_code|
+--------------+----------+--------+
|   Afghanistan|      27.4|     AFG|
|       Albania|      32.9|     ALB|
|       Algeria|      28.1|     DZA|
|American Samoa|      25.5|     ASM|
|       Andorra|      44.3|     AND|
+--------------+----------+--------+
only showing top 5 rows



In [21]:
country_population_growth_spark = spark.read.csv("data/WORLD-DATA-by-country-2020/Population_growth.csv",sep=",", inferSchema=True, header=True)
country_population_growth_spark.printSchema()
country_population_growth_spark.show(5)

root
 |-- country: string (nullable = true)
 |-- population_growth: double (nullable = true)
 |-- iso_code: string (nullable = true)

+--------------+-----------------+--------+
|       country|population_growth|iso_code|
+--------------+-----------------+--------+
|   Afghanistan|             2.41|     AFG|
|       Albania|             0.26|     ALB|
|       Algeria|             1.89|     DZA|
|American Samoa|            -0.26|     ASM|
|       Andorra|             0.63|     AND|
+--------------+-----------------+--------+
only showing top 5 rows



In [22]:
country_urbanization_rate_spark = spark.read.csv("data/WORLD-DATA-by-country-2020/Urbanization_rate.csv",sep=",", inferSchema=True, header=True)
country_urbanization_rate_spark.printSchema()
country_urbanization_rate_spark.show(5)

root
 |-- country: string (nullable = true)
 |-- urbanization_rate: double (nullable = true)
 |-- iso_code: string (nullable = true)

+---------+-----------------+--------+
|  country|urbanization_rate|iso_code|
+---------+-----------------+--------+
|   Monaco|            100.0|     MCO|
|    Nauru|            100.0|     NRU|
|Singapore|            100.0|     SGP|
| Anguilla|            100.0|     AIA|
|  Bermuda|            100.0|     BMU|
+---------+-----------------+--------+
only showing top 5 rows



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

2.1 Explore and Cleaning  *COVID19_vaccinations_spark DataFrame*

In [23]:
# Explore the Null value columns
count = COVID19_vaccinations_spark.count()
{col: f'{ COVID19_vaccinations_spark.filter(COVID19_vaccinations_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in COVID19_vaccinations_spark.columns}

{'country': '0.00%',
 'iso_code': '6.85%',
 'date': '0.00%',
 'total_vaccinations': '34.25%',
 'people_vaccinated': '44.01%',
 'people_fully_vaccinated': '62.53%',
 'daily_vaccinations_raw': '44.37%',
 'daily_vaccinations': '3.47%',
 'total_vaccinations_per_hundred': '34.25%',
 'people_vaccinated_per_hundred': '44.01%',
 'people_fully_vaccinated_per_hundred': '62.53%',
 'daily_vaccinations_per_million': '3.47%',
 'vaccines': '0.00%',
 'source_name': '0.00%',
 '_c14': '100.00%'}

The 'iso_code' fill NULL as iso country code. Other column null value are filled with 0. 

In [24]:
dict_null = COVID19_vaccinations_spark.select("country","iso_code")\
                          .filter(COVID19_vaccinations_spark["iso_code"].isNull()).groupBy("country").count()
dict_null.show()

+----------------+-----+
|         country|count|
+----------------+-----+
|           Wales|   76|
|         England|   76|
|        Scotland|   76|
|Northern Ireland|   76|
+----------------+-----+



In [25]:
vaccinations_cleaned = COVID19_vaccinations_spark.fillna({"iso_code":"GBR", "total_vaccinations":0.0, "people_vaccinated":0.0, "people_fully_vaccinated":0.0,\
                       "daily_vaccinations_raw":0.0, "daily_vaccinations":0.0, "total_vaccinations_per_hundred":0.0,\
                                    "people_vaccinated_per_hundred":0.0, "people_fully_vaccinated_per_hundred":0.0,\
                                    "daily_vaccinations_per_million":0.0
                                   })

2.2 Explore and Cleaning *COVID19_global_spark* 

In [26]:
# Explore the Null value columns
count = COVID19_global_spark.count()
{col: f'{ COVID19_global_spark.filter(COVID19_global_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in COVID19_global_spark.columns}

{'Date_reported': '0.00%',
 'Country_code': '0.00%',
 'Country': '0.00%',
 'WHO_region': '0.00%',
 'New_cases': '0.00%',
 'Cumulative_cases': '0.00%',
 'New_deaths': '0.00%',
 'Cumulative_deaths': '0.00%'}

In [27]:
global_data_clean = COVID19_global_spark
global_data_clean.select("country","Country_code").show(1)

+-----------+------------+
|    country|Country_code|
+-----------+------------+
|Afghanistan|          AF|
+-----------+------------+
only showing top 1 row



The country code is 2 digit

2.3 Explore and Cleaning *COVID19_usefulFeatures_spark*

In [28]:
# Explore the Null value columns
count = COVID19_usefulFeatures_spark.count()
{col: f'{ COVID19_usefulFeatures_spark.filter(COVID19_usefulFeatures_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in COVID19_usefulFeatures_spark.columns}

{'Country_Region': '0.00%',
 'Population_Size': '0.00%',
 'Tourism': '0.00%',
 'Date_FirstFatality': '15.22%',
 'Date_FirstConfirmedCase': '0.00%',
 'Latitude': '0.00%',
 'Longtitude': '0.00%',
 'Mean_Age': '0.00%',
 'Lockdown_Date': '17.93%',
 'Lockdown_Type': '17.93%',
 'Country_Code': '0.00%'}

The NULL values are meaningful,Just Keeping 
- Date_FirstFatality: Date of the first Fatality of the COVID-19
- Lockdown_Date: date of the lockdown
- Lockdown_Type: type of the lockdown

In [29]:
usefulFeatures_cleaned = COVID19_usefulFeatures_spark

2.4 Explore and Cleaning *WORLD DATA by country (2020)*

In [30]:
count = country_GDP_spark.count()
{col: f'{ country_GDP_spark.filter(country_GDP_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_GDP_spark.columns}

{'Country': '0.00%', 'GDP per capita': '0.00%', 'ISO-code': '0.00%'}

In [31]:
country_GDP_cleaned = country_GDP_spark

In [32]:
count = country_life_expectancy_spark.count()
{col: f'{ country_life_expectancy_spark.filter(country_life_expectancy_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_life_expectancy_spark.columns}

{'country': '0.00%', 'life_expectancy': '0.00%', 'iso_code': '0.00%'}

In [33]:
country_life_expectancy_cleaned = country_life_expectancy_spark 

In [34]:
count = country_median_age_spark.count()
{col: f'{ country_median_age_spark.filter(country_median_age_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_median_age_spark.columns}

{'country': '0.00%', 'median_age': '0.00%', 'iso_code': '0.45%'}

In [35]:
dict_null = country_median_age_spark.select("Country","iso_code")\
                          .filter(country_median_age_spark["iso_code"].isNull()).groupBy("country").count()
dict_null.show(truncate=False)

+----------------------+-----+
|country               |count|
+----------------------+-----+
|British Virgin Islands|1    |
+----------------------+-----+



In [36]:
country_median_age_cleaned = country_median_age_spark.fillna({"iso_code":"VGB"})

In [37]:
count = country_population_growth_spark.count()
{col: f'{ country_population_growth_spark.filter(country_population_growth_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_population_growth_spark.columns}

{'country': '0.00%', 'population_growth': '0.00%', 'iso_code': '0.00%'}

In [38]:
country_population_growth_cleaned = country_population_growth_spark

In [39]:
count = country_urbanization_rate_spark.count()
{col: f'{ country_urbanization_rate_spark.filter(country_urbanization_rate_spark[col].isNull()).count()/count *100 :.2f}%'\
for col in country_urbanization_rate_spark.columns}

{'country': '0.00%', 'urbanization_rate': '0.00%', 'iso_code': '0.00%'}

In [40]:
country_urbanization_rate_cleaned = country_urbanization_rate_spark

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The dimension table is the entry point for data, There are four tables *country_region_dim*  *time_dim*  *vaccines_dim*   *source_dim*  and each measurement event in the world has a one-to-one relationship with the corresponding fact table row. There is one fact table *vaccinations_fact*.

![DataModel.jpg](image/DataModel.jpg)


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

![DataModel.jpg](image/ETL.jpg)

### Step 4: Run Pipelines to Model the Data 

```
- LICENSE
- README.md
- data   # raw data files
- image # project relevant images
- docker-airflow #  DAG files 
- research.ipynb # The project jupyter notebook file
```

#### 4.1 Install Airflow Docker

##### 1. Download docker and installing
https://www.docker.com/products/docker-desktop

##### 2. Installation docker-airflow 
```
docker pull puckel/docker-airflow

```

##### 3. Build docker-airflow
```
cd <project path>/docker-airflow/

docker build --rm --build-arg AIRFLOW_DEPS="datadog,dask" -t puckel/docker-airflow .

docker build --rm --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow .
```

###### 3. Run Airflow Docker

```
docker-compose -f docker-compose-LocalExecutor.yml up -d

docker ps

docker exec -it <container_id> /bin/bash
```

http://localhost:8080/admin/

<img src="image/localhost.jpg" width="400" align="left">

#### 4.2 Config Airflow Connections¶

##### 1.Config aws_credentials   

<img src="image/aws_credentials.jpg" width="400" align="left">

- Login AWS User access ID 
- Password AWS User secret key ID

##### 2.Config redshift

<img src="image/redshift.jpg" width="400" align="left">

##### 3.Config variables

<img src="image/variables1.jpg" width="400" align="left">

<img src="image/variables2.jpg" width="400" align="left">

#### 4.3 Create the data model
Build the data pipelines to create the data model.

The project using airflow to create an ETL pipeline, COPAY data from S3 to Redshift as staging tables, and transform data into Fact and dim tables

<img src="image/dag.jpg" align="left">

Aginity is a very effective debug tool to check etl_loader_errors 

<img src="image/debug.jpg" align="left">

#### 4.3 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Source/Count checks to ensure completeness
 
 Create a customer operator 'DataQualityOperator' to check date quality, When the staging table were copied , the fact and dim tables were loaded.

#### 4.3 Data dictionary 

- **vaccinations_fact**

| Column Feature | Data type | Description |
| ----------- | ----------- |----------- |
| id | INTEGER | Record ID
| date | DATE |  date for the data entry
| vaccines_id | INTEGER | The id of vaccines, reference *vaccines_dim* table
| source_id | INTEGER | The id of source, reference *source_dim* table
| new_cases | INTEGER | Report new case
| cumulative_deaths | INTEGER | The cumulative deaths of the country
| total_vaccinations | INTEGER | this is the absolute number of total immunizations in the country
| people_vaccinated | INTEGER | a person, depending on the immunization scheme, will receive one or more (typically 2) vaccines; at a certain moment, the number of vaccination might be larger than the number of people
| people_fully_vaccinated | INTEGER | this is the number of people that received the entire set of immunization according to the immunization scheme (typically 2); at a certain moment in time, there might be a certain number of people that received one vaccine and another number (smaller) of people that received all vaccines in the scheme
| daily_vaccinations_raw | INTEGER |  for a certain data entry, the number of vaccination for that date/country
| daily_vaccinations | INTEGER |   for a certain data entry, the number of vaccination for that date/country
| total_vaccinations_per_hundred | FLOAT | ratio (in percent) between vaccination number and total population up to the date in the country
| people_vaccinated_per_hundred | FLOAT |  ratio (in percent) between population immunized and total population up to the date in the country
| people_vaccinated_per_hundred | FLOAT |  ratio (in percent) between population fully immunized and total population up to the date in the country
| daily_vaccinations_per_million | FLOAT |  ratio (in ppm) between vaccination number and total population for the current date in the country

- **country_region_dim**

| Column Feature | Data type | Description |
| ----------- | ----------- |----------- |
| iso_code | VARCHAR |  ISO country code
| population | BIGINT |  the population size 2018 stats
| mean_age | BIGINT |  mean age of the population 2018 stats
| first_fatality | DATE |  Date of the first Fatality of the COVID-19
| first_confirmed_case | DATE |  Date of the first confirmed case of the COVID-19
| lockdown_Date | DATE | date of the lockdown
| lockdown_Type | VARCHAR | type of the lockdown
| latitude | DATE | latitude value for BI
| longtitude | FLOAT | longtitude value for Data visualization
| GDP | FLOAT | GDP per capita
| life_expectancy | FLOAT | List of countries by life expectancy from wiki
| population_growth| FLOAT | List of countries by population growth from wiki
| urbanization_rate| FLOAT | Urbanization by country from wiki

- **time_dim**

| Column Feature | Data type | Description |
| ----------- | ----------- |----------- |
| date | DATE |  date for the data entry
| year | INTEGER |  using aggregation by year
| month | INTEGER |  using aggregation by month
| week_of_year | INTEGER |  using aggregation by week

- **vaccines_dim**

| Column Feature | Data type | Description |
| ----------- | ----------- |----------- |
| vaccines_id | INTEGER |  The id of vaccines
| name | VARCHAR |  The vaccines name


- **source_dim**

| Column Feature | Data type | Description |
| ----------- | ----------- |----------- |
| source_id | INTEGER |  The id of source where info comes from
| name | VARCHAR |  The source name

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
  
  - Spark to explore the dataset, In this case, I didn't build a data lake, Because Redshift is more suitable for this type of data.
  - I used Redshift to build a Data warehouse
  - I used Airflow to build an ETL pipeline that extracted data from s3 to Redshift.

* Propose how often the data should be updated and why.

  - The data should be updated daily at the same time as WHO records.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
   - Using Spark to process such large amounts of data is faster and store into s3, then design Redshift read data from s3 as a staging table
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   - Using Airflow to create a dag for tableau query data from the data warehouse.
 
 * The database needed to be accessed by 100+ people.
   - Provide AWS Redshift role ARN and Datebase username&password to people who using 

In [141]:
!pip3 install pandas-redshift



You should consider upgrading via the 'c:\users\tomge\appdata\local\programs\python\python39\python.exe -m pip install --upgrade pip' command.


#### Load Data from Redshift Warehouse API

In [42]:
import pandas_redshift as pd_r
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

pd_r.connect_to_redshift(
    dbname = config.get('CLUSTER', 'DB_NAME'),
    host = config.get('CLUSTER', 'HOST'),
    port = config.get('CLUSTER', 'DB_PORT'),
    user = config.get('CLUSTER', 'DB_USER'),
    password = config.get('CLUSTER', 'DB_PASSWORD')
)

In [53]:
df_vaccinations_fact = pd_r.redshift_to_pandas("""
    SELECT * FROM vaccinations_fact LIMIT 10
""")

df_vaccinations_fact.head()

Unnamed: 0,id,iso_code,date,vaccines_id,source_id,new_cases,cumulative_cases,new_deaths,cumulative_deaths,total_vaccinations,people_vaccinated,people_fully_vaccinated,daily_vaccinations_raw,daily_vaccinations,total_vaccinations_per_hundred,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred,daily_vaccinations_per_million
0,0,AFG,2020-01-03,,,0,0,0,0,,,,,,,,,
1,1677782,UGA,2020-01-03,,,0,0,0,0,,,,,,,,,
2,20641,DZA,2020-01-03,2.0,0.0,0,0,0,0,75000.0,,,,3748.0,0.17,,,85.0
3,1678212,UKR,2020-01-03,4.0,0.0,0,0,0,0,3051.0,3051.0,,1713.0,1446.0,0.01,0.01,,33.0
4,20643,DZA,2020-01-03,2.0,0.0,0,0,0,0,,,,,3748.0,,,,85.0


In [54]:
df_country_region = pd_r.redshift_to_pandas("""
    SELECT * FROM country_region_dim LIMIT 10
""")
df_country_region.head()

Unnamed: 0,iso_code,population,first_fatality,first_confirmed_case,lockdown_date,lockdown_type,latitude,longtitude,gdp,life_expectancy,population_growth,urbanization_rate
0,MLT,484630,2020-04-09,2020-03-08,2020-03-12,Full,35.937496,14.375416,49589,82.4,0.4,94.7
1,ARG,44494502,2020-03-09,2020-03-04,2020-03-20,Full,-38.416097,-63.616672,19971,76.5,0.88,92.1
2,JOR,9956011,2020-03-28,2020-03-04,2020-03-21,Full,30.585164,36.238414,9939,74.4,2.19,91.4
3,DNK,5793636,2020-03-15,2020-02-28,2020-03-11,Full,56.26392,9.501785,55675,80.8,0.36,88.1
4,BRA,209469333,2020-03-18,2020-02-27,2020-03-17,Partial,-14.235004,-51.92528,17106,75.7,0.87,87.1


In [55]:
df_time = pd_r.redshift_to_pandas("""
    SELECT * FROM time_dim
""")
df_time.head()

Unnamed: 0,date,year,month,week_of_year
0,2020-01-14,2020,1,3
1,2020-01-20,2020,1,4
2,2020-02-01,2020,2,5
3,2020-02-05,2020,2,6
4,2020-02-11,2020,2,7


In [56]:
df_vaccines = pd_r.redshift_to_pandas("""
    SELECT * FROM vaccines_dim
""")
df_vaccines.head()

Unnamed: 0,vaccines_id,name
0,2,Sputnik V
1,6,"Moderna, Oxford/AstraZeneca, Pfizer/BioNTech"
2,10,"Pfizer/BioNTech, Sinopharm/Beijing, Sputnik V"
3,14,Sinopharm/Beijing
4,18,"Pfizer/BioNTech, Sinovac"


In [57]:
df_source = pd_r.redshift_to_pandas("""
    SELECT * FROM source_dim
""")
df_source.head()

Unnamed: 0,source_id,name
0,2,Government of Andorra
1,6,Government of Azerbaijan
2,10,Sciensano
3,14,Regional governments via Coronavirus Brasil
4,18,Cayman Islands Government


So I can query data from Redshift DWH and analysis based on the data and look at the vaccination situation in relevant countries and regions.