# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [30]:
import pandas as pd

def explore_df(df):
    print(df.info())
    df.describe()
    print(df.head(3))

def check_time_series(df_check, date_col, group_by):
    # check that the time series column doesn't have missing gaps

    min_date = df_check[date_col].min()
    max_date = df_check[date_col].max()
    date_range = set(
        d.strftime('%Y-%m-%d') for d in
        pd.date_range(min_date, max_date, freq='D'))
    total_days = len(date_range)
    
    def _check_group(df_group):
        group_value = df_group.iloc[0][group_by]
        missing_dates = date_range - set(df_group[date_col].unique())

        if missing_dates:
            print(f'found missing time series for {group_value} '
                  f'- {len(missing_dates)} days missing (~{round(100*len(missing_dates)/total_days)}%)')
                
    df_check.groupby(group_by).apply(_check_group)

def describe_nulls(df, thr=0.05):
    df_nulls = df.isnull().mean().reset_index()
    return df_nulls[df_nulls[0] > thr]

df_activity = pd.read_csv('./data/activity.csv')
df_population = pd.read_csv('./data/population.csv')
df_vaccinations_global = pd.read_csv('./data/vaccinations_global.csv')
df_vaccinations_usa = pd.read_csv('./data/vaccinations_usa.csv')


  exec(code_obj, self.user_global_ns, self.user_ns)


# Explore COVID19 Activity table

* Total of 2157792 rows in the Activity Table.
* This is time series from 2020-01-29 to 2021-09-19 for 222 countries world wide. 
* This data comes from two sources:
    - New York Times
    - JHU CSSE Global Timeseries 

In [20]:
explore_df(df_activity)

print('Unique countries count:', len(df_activity.COUNTRY_SHORT_NAME.unique()))
print('Minimum date:', df_activity.REPORT_DATE.min())
print('Maxium date:', df_activity.REPORT_DATE.max())
print('Data sources:', df_activity.DATA_SOURCE_NAME.unique())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2157792 entries, 0 to 2157791
Data columns (total 13 columns):
 #   Column                           Dtype  
---  ------                           -----  
 0   PEOPLE_POSITIVE_CASES_COUNT      int64  
 1   COUNTY_NAME                      object 
 2   PROVINCE_STATE_NAME              object 
 3   REPORT_DATE                      object 
 4   CONTINENT_NAME                   object 
 5   DATA_SOURCE_NAME                 object 
 6   PEOPLE_DEATH_NEW_COUNT           int64  
 7   COUNTY_FIPS_NUMBER               float64
 8   COUNTRY_ALPHA_3_CODE             object 
 9   COUNTRY_SHORT_NAME               object 
 10  COUNTRY_ALPHA_2_CODE             object 
 11  PEOPLE_POSITIVE_NEW_CASES_COUNT  int64  
 12  PEOPLE_DEATH_COUNT               int64  
dtypes: float64(1), int64(4), object(8)
memory usage: 214.0+ MB
None
   PEOPLE_POSITIVE_CASES_COUNT COUNTY_NAME PROVINCE_STATE_NAME REPORT_DATE  \
0                         1263       Perry        

In [24]:
explore_df(df_population)

print('Unique countries count:', len(df_population.COUNTRY_SHORT_NAME.unique()))
print('Data sources:', df_population.DATA_SOURCE_NAME.unique())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3483 entries, 0 to 3482
Data columns (total 10 columns):
 #   Column                       Non-Null Count  Dtype  
---  ------                       --------------  -----  
 0   COUNTRY_SHORT_NAME           3483 non-null   object 
 1   COUNTRY_ALPHA_3_CODE         3483 non-null   object 
 2   COUNTRY_ALPHA_2_CODE         3482 non-null   object 
 3   PROVINCE_STATE_NAME          3272 non-null   object 
 4   COUNTY_NAME                  3218 non-null   object 
 5   COUNTY_FIPS_NUMBER           3218 non-null   float64
 6   GEO_LATITUDE                 3483 non-null   float64
 7   GEO_LONGITUDE                3483 non-null   float64
 8   GEO_REGION_POPULATION_COUNT  3483 non-null   int64  
 9   DATA_SOURCE_NAME             3483 non-null   object 
dtypes: float64(3), int64(1), object(6)
memory usage: 272.2+ KB
None
  COUNTRY_SHORT_NAME COUNTRY_ALPHA_3_CODE COUNTRY_ALPHA_2_CODE  \
0        Afghanistan                  AFG                   AF

In [27]:
explore_df(df_vaccinations_global)

print('Unique countries count:', len(df_vaccinations_global.Country_Region.unique()))
print('Minimum date:', df_vaccinations_global.Date.min())
print('Maxium date:', df_vaccinations_global.Date.max())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 74658 entries, 0 to 74657
Data columns (total 8 columns):
 #   Column                       Non-Null Count  Dtype  
---  ------                       --------------  -----  
 0   Country_Region               74658 non-null  object 
 1   Date                         74658 non-null  object 
 2   Doses_admin                  74658 non-null  int64  
 3   People_partially_vaccinated  35165 non-null  float64
 4   People_fully_vaccinated      35165 non-null  float64
 5   Report_Date_String           74658 non-null  object 
 6   UID                          74328 non-null  float64
 7   Province_State               38999 non-null  object 
dtypes: float64(3), int64(1), object(4)
memory usage: 4.6+ MB
None
  Country_Region        Date  Doses_admin  People_partially_vaccinated  \
0    Afghanistan  2021-02-22            0                          0.0   
1    Afghanistan  2021-02-23            0                          0.0   
2    Afghanistan  2021-

In [71]:
explore_df(df_vaccinations_usa)

print('Minimum date:', df_vaccinations_usa.Date.min())
print('Maxium date:', df_vaccinations_usa.Date.max())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 91360 entries, 0 to 91359
Data columns (total 13 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   Province_State   91360 non-null  object 
 1   Date             91360 non-null  object 
 2   Vaccine_Type     91360 non-null  object 
 3   FIPS             83248 non-null  float64
 4   Country_Region   91360 non-null  object 
 5   Lat              83248 non-null  float64
 6   Long_            83248 non-null  float64
 7   Doses_alloc      15045 non-null  float64
 8   Doses_shipped    70489 non-null  float64
 9   Doses_admin      74282 non-null  float64
 10  Stage_One_Doses  43333 non-null  float64
 11  Stage_Two_Doses  48665 non-null  float64
 12  Combined_Key     91360 non-null  object 
dtypes: float64(8), object(5)
memory usage: 9.1+ MB
None
  Province_State        Date Vaccine_Type  FIPS Country_Region      Lat  \
0        Alabama  2020-12-10          All   1.0             US  32.31

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

# Check COVID-19 activity table
No missing time series data per reported countries

In [4]:
# check for date gaps in the data
check_time_series(df_activity, 'REPORT_DATE', 'COUNTRY_SHORT_NAME')

In [45]:
# check for null values spread
describe_nulls(df_activity)

Unnamed: 0,index,0
1,COUNTY_NAME,0.077487
2,PROVINCE_STATE_NAME,0.061426
7,COUNTY_FIPS_NUMBER,0.093266


In [59]:
# describe duplicates
def get_duplicates(df, columns):
    return df[df.duplicated(subset=columns)]

def describe_duplicates(df, columns):
    df_dups = get_duplicates(df, columns)
    print('Duplicate ratio:', round(100*len(df_dups)/len(df)), '%')
    return df_dups

describe_duplicates(df_activity, ('COUNTRY_SHORT_NAME', 'REPORT_DATE', 'PROVINCE_STATE_NAME', 'COUNTY_NAME'))

Duplicate ratio: 0 %


Unnamed: 0,PEOPLE_POSITIVE_CASES_COUNT,COUNTY_NAME,PROVINCE_STATE_NAME,REPORT_DATE,CONTINENT_NAME,DATA_SOURCE_NAME,PEOPLE_DEATH_NEW_COUNT,COUNTY_FIPS_NUMBER,COUNTRY_ALPHA_3_CODE,COUNTRY_SHORT_NAME,COUNTRY_ALPHA_2_CODE,PEOPLE_POSITIVE_NEW_CASES_COUNT,PEOPLE_DEATH_COUNT


# Check COVID-19 Vaccinations tables - global and usa
There are missing timeseries information

In [74]:
check_time_series(df_vaccinations_global, 'Date', 'Country_Region')

found missing time series for Afghanistan - 88 days missing (~31%)
found missing time series for Albania - 29 days missing (~10%)
found missing time series for Algeria - 126 days missing (~45%)
found missing time series for Andorra - 49 days missing (~18%)
found missing time series for Angola - 94 days missing (~34%)
found missing time series for Antigua and Barbuda - 82 days missing (~29%)
found missing time series for Argentina - 16 days missing (~6%)
found missing time series for Armenia - 147 days missing (~52%)
found missing time series for Australia - 63 days missing (~22%)
found missing time series for Austria - 14 days missing (~5%)
found missing time series for Azerbaijan - 34 days missing (~12%)
found missing time series for Bahamas - 102 days missing (~36%)
found missing time series for Bahrain - 9 days missing (~3%)
found missing time series for Bangladesh - 44 days missing (~16%)
found missing time series for Barbados - 65 days missing (~23%)
found missing time series for 

In [25]:
describe_nulls(df_vaccinations_global)

Unnamed: 0,index,0
3,People_partially_vaccinated,0.528986
4,People_fully_vaccinated,0.528986
7,Province_State,0.477631


In [65]:
describe_duplicates(df_vaccinations_global, ('Date', 'Country_Region'))

Duplicate ratio: 52 %


Unnamed: 0,Country_Region,Date,Doses_admin,People_partially_vaccinated,People_fully_vaccinated,Report_Date_String,UID,Province_State
1470,Bangladesh,2021-05-28,427518,,,2021-05-29,5001.0,Barisal
1471,Bangladesh,2021-05-29,427518,,,2021-05-30,5001.0,Barisal
1472,Bangladesh,2021-05-30,427518,,,2021-05-31,5001.0,Barisal
1473,Bangladesh,2021-05-31,427518,,,2021-06-01,5001.0,Barisal
1474,Bangladesh,2021-05-28,2001467,,,2021-05-29,5002.0,Chittagong
...,...,...,...,...,...,...,...,...
74646,United Kingdom,2021-09-19,2840,,,2021-09-20,500.0,Montserrat
74647,United Kingdom,2021-09-19,2505605,,,2021-09-20,82602.0,Northern Ireland
74648,United Kingdom,2021-09-19,7892,,,2021-09-20,654.0,"Saint Helena, Ascension and Tristan da Cunha"
74649,United Kingdom,2021-09-19,7962028,,,2021-09-20,82603.0,Scotland


The duplicates ratio is high, let's investigate more

In [64]:
df_vaccinations_global[
    (df_vaccinations_global.Country_Region=='Bangladesh') & 
    (df_vaccinations_global.Date == '2021-05-28')]

Unnamed: 0,Country_Region,Date,Doses_admin,People_partially_vaccinated,People_fully_vaccinated,Report_Date_String,UID,Province_State
1466,Bangladesh,2021-05-28,9901717,5823245.0,4115773.0,2021-05-29,50.0,
1470,Bangladesh,2021-05-28,427518,,,2021-05-29,5001.0,Barisal
1474,Bangladesh,2021-05-28,2001467,,,2021-05-29,5002.0,Chittagong
1478,Bangladesh,2021-05-28,3221588,,,2021-05-29,5003.0,Dhaka
1482,Bangladesh,2021-05-28,1217243,,,2021-05-29,5004.0,Khulna
1486,Bangladesh,2021-05-28,472413,,,2021-05-29,5005.0,Mymensingh
1490,Bangladesh,2021-05-28,1070678,,,2021-05-29,5006.0,Rajshahi
1494,Bangladesh,2021-05-28,974035,,,2021-05-29,5007.0,Rangpur
1498,Bangladesh,2021-05-28,516775,,,2021-05-29,5008.0,Sylhet
1502,Bangladesh,2021-05-28,0,,,2021-05-29,5009.0,Unassigned


For `Bangaladesh`, clearly I missed column `Province_State` as primary key here. 
- `Province_State` must be taken into account
- need to drop when Province_State is "Unassigned"

In [68]:
describe_duplicates(df_vaccinations_global, ('Date', 'Country_Region', 'Province_State'))

Duplicate ratio: 0 %


Unnamed: 0,Country_Region,Date,Doses_admin,People_partially_vaccinated,People_fully_vaccinated,Report_Date_String,UID,Province_State


Cool this is the key we need to look at
Date, Country_Region and Province_State

Another observation from this table is that values for People_partially_vaccinated and People_fully_vaccinated is relevant only at the 'Global Country' row where Province_State is NaN. This is very important to us when we fill the appropriate Fact table for people vaccinated. 

In [75]:
check_time_series(df_vaccinations_usa, 'Date', 'Province_State')

found missing time series for Bureau of Prisons - 171 days missing (~60%)
found missing time series for Federal Bureau of Prisons - 81 days missing (~29%)
found missing time series for Federated States of Micronesia - 173 days missing (~61%)
found missing time series for Marshall Islands - 173 days missing (~61%)
found missing time series for Republic of Palau - 173 days missing (~61%)


In [26]:
describe_nulls(df_vaccinations_usa)

Unnamed: 0,index,0
3,FIPS,0.088792
5,Lat,0.088792
6,Long_,0.088792
7,Doses_alloc,0.835322
8,Doses_shipped,0.228448
9,Doses_admin,0.186931
10,Stage_One_Doses,0.52569
11,Stage_Two_Doses,0.467327


In [69]:
describe_duplicates(df_vaccinations_usa, ('Date', 'Country_Region', 'Province_State'))

Duplicate ratio: 81 %


Unnamed: 0,Province_State,Date,Vaccine_Type,FIPS,Country_Region,Lat,Long_,Doses_alloc,Doses_shipped,Doses_admin,Stage_One_Doses,Stage_Two_Doses,Combined_Key
1,Alabama,2020-12-10,Moderna,1.0,US,32.3182,-86.9023,,,,,,"Alabama, US"
2,Alabama,2020-12-10,Pfizer,1.0,US,32.3182,-86.9023,,,,,,"Alabama, US"
4,Alabama,2020-12-11,Moderna,1.0,US,32.3182,-86.9023,,,,,,"Alabama, US"
5,Alabama,2020-12-11,Pfizer,1.0,US,32.3182,-86.9023,,,,,,"Alabama, US"
7,Alabama,2020-12-12,Moderna,1.0,US,32.3182,-86.9023,,,,,,"Alabama, US"
...,...,...,...,...,...,...,...,...,...,...,...,...,...
91355,Wyoming,2021-09-19,Janssen,56.0,US,42.7560,-107.3025,,35600.0,23034.0,21417.0,,"Wyoming, US"
91356,Wyoming,2021-09-19,Pfizer,56.0,US,42.7560,-107.3025,,314175.0,253444.0,,112520.0,"Wyoming, US"
91357,Wyoming,2021-09-19,Moderna,56.0,US,42.7560,-107.3025,,294860.0,230822.0,,101149.0,"Wyoming, US"
91358,Wyoming,2021-09-19,All,56.0,US,42.7560,-107.3025,,644635.0,507670.0,275154.0,213815.0,"Wyoming, US"


There is a very high rate of duplicates. By looking at the data I can tell the cause is that there are separate values for each Vaccination Type (Moderna, Pfizer, Janssen etc)

In [71]:
df_vaccinations_usa[
    (df_vaccinations_usa.Country_Region=='US') & 
    (df_vaccinations_usa.Province_State == 'Wyoming') &
    (df_vaccinations_usa.Date == '2021-09-19')]

Unnamed: 0,Province_State,Date,Vaccine_Type,FIPS,Country_Region,Lat,Long_,Doses_alloc,Doses_shipped,Doses_admin,Stage_One_Doses,Stage_Two_Doses,Combined_Key
91354,Wyoming,2021-09-19,Unknown,56.0,US,42.756,-107.3025,,0.0,370.0,,,"Wyoming, US"
91355,Wyoming,2021-09-19,Janssen,56.0,US,42.756,-107.3025,,35600.0,23034.0,21417.0,,"Wyoming, US"
91356,Wyoming,2021-09-19,Pfizer,56.0,US,42.756,-107.3025,,314175.0,253444.0,,112520.0,"Wyoming, US"
91357,Wyoming,2021-09-19,Moderna,56.0,US,42.756,-107.3025,,294860.0,230822.0,,101149.0,"Wyoming, US"
91358,Wyoming,2021-09-19,All,56.0,US,42.756,-107.3025,,644635.0,507670.0,275154.0,213815.0,"Wyoming, US"
91359,Wyoming,2021-09-19,Unassigned,56.0,US,42.756,-107.3025,,0.0,0.0,253737.0,146.0,"Wyoming, US"


# Check Population table

In [28]:
describe_nulls(df_population)

Unnamed: 0,index,0
3,PROVINCE_STATE_NAME,0.06058
4,COUNTY_NAME,0.076084
5,COUNTY_FIPS_NUMBER,0.076084


# Compatibility between Data sources

The join between tables is going to be on **Country name** and **Date**

The date was checked above and is ok to use on joins.

The country name is more difficult to join on, there is a normalization 
challange of the country name.
it can be called in different ways and 
it's our mission to join the gap between the data sources.

In [44]:
# check countries on all tables, what is the overlap ratio
df_activity_countries = set(df_activity.COUNTRY_SHORT_NAME.unique())
df_population_countries = set(df_population.COUNTRY_SHORT_NAME.unique())
df_vacc_global_countries = set(df_vaccinations_global.Country_Region.unique())

# how much conutry columns overlap in join?
print(
    'Activity and Populations countries compatibility rate: ',
    round(
        len(df_activity_countries & df_population_countries) / 
            max(len(df_activity_countries), len(df_population_countries)), 2)
)

# how much conutry columns overlap in join?
print(
    'Activity and Global Vaccinations countries compatibility rate: ',
    round(
        len(df_activity_countries & df_vacc_global_countries) / 
            max(len(df_activity_countries), len(df_vacc_global_countries)), 2)
)

print('What countries are missing:', ', '.join(
    (df_activity_countries | df_vacc_global_countries) - (df_activity_countries & df_vacc_global_countries)
))

Activity and Populations countries compatibility rate:  0.97
Activity and Global Vaccinations countries compatibility rate:  0.73
What countries are missing: Isle of Man, US, Tanzania, Saint Helena, Ascension and Tristan da Cunha, French Guiana, Cayman Islands, Congo (Kinshasa), Kiribati, Benin, Yemen, Vanuatu, Djibouti, Bermuda, New Caledonia, Cuba, Lesotho, Falkland Islands (Malvinas), Anguilla, Martinique, Burundi, Niger, Timor-Leste, Congo (Brazzaville), Greenland, Guinea-Bissau, Eritrea, British Virgin Islands, Liberia, Tajikistan, US (Aggregate), St Martin, West Bank and Gaza, Reunion, Channel Islands, United States, French Polynesia, Chad, Sint Maarten, World, Saint Barthelemy, Haiti, Montserrat, Saint Pierre and Miquelon, Samoa, Aruba, Gibraltar, Faroe Islands, Comoros, Curacao, Wallis and Futuna, Bonaire, Sint Eustatius and Saba, Holy See, Somalia, Burkina Faso, Mayotte, Turks and Caicos Islands, Summer Olympics 2020, Cook Islands, Madagascar, Libya, Guadeloupe, Cabo Verde, Ce

## Country compatibility results

 - Between Activity table and Population table there is a very high rate of compatibility (as expected, the tables came from the same source and creator).

 - Between Activity table and Vaccinations table there is some mismatches (73% compatibility between countries names). Maybe can use other columns to join. In the following block let's take more columns (ISO names) to consideration and see how it imacts our join success ratio.


0.73

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [29]:
import findspark  
from pyspark.sql import SparkSession 


def create_spark_session():
    findspark.init() 
    spark = SparkSession \
        .builder \
        .appName('lab') \
        .getOrCreate()
    return spark

spark = create_spark_session()

def read_sources(spark):
    return (
        spark.read.csv('./data/activity.csv'),
        spark.read.csv('./data/population.csv'),
        spark.read.csv('./data/vaccinations_global.csv'),
        spark.read.csv('./data/vaccinations_usa.csv'),
    )

df_activity, df_population, df_vacc_global, df_vacc_usa = read_sources(spark)
 

In [None]:
from pyspark.sql.functions import lit



#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.