# Data Engineering Capstone Project

## Enviroment setup

In [None]:
# Import necessary libraries
import pandas as pd
import uuid
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.sql.functions import monotonically_increasing_id as mono_id
import configparser

In [None]:
# Read config file
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

INPUT_DATA = config['LOCAL']['INPUT_DATA']
INPUT_DATA_VACCINES = config['LOCAL']['INPUT_DATA_VACCINES']
OUTPUT_DATA = config['LOCAL']['OUTPUT_DATA']
DATA_COLUMNS = config['COMMON']['DATA_COLUMNS']

In [None]:
@udf(StringType())
def null_id_to_uuid (id):
    if id == None or id == "null":
        return str(uuid.uuid4().hex)
    return id

In [None]:
def write_parquet(df, parquet_name):
    parquet_path = OUTPUT_DATA + f'{parquet_name}.parquet'
    df.write.mode("overwrite").parquet(parquet_path)
    print(f'Writing {parquet_name} Table DONE.')


In [None]:
def read_parquet(parquet_name):
    parquet_path = OUTPUT_DATA + f'{parquet_name}.parquet'
    return spark.read.parquet(parquet_path)

In [None]:
def check_nulls(df, columns_list, expected_value):
    df.createOrReplaceTempView("viewcheck")
    sql_check = f"SELECT COUNT(*) FROM viewcheck WHERE 1 <> 1 {''.join([' OR ' + c + ' IS NULL ' for c in columns_list])}"
    
    dfcheck = spark.sql(sql_check)
    
    value_check = dfcheck.collect()[0][0]
    
    return value_check == expected_value

In [None]:
def check_has_content(df):
    return df.count() > 0

In [None]:
# Spark session
spark = SparkSession \
        .builder\
        .appName("Brazilian COVID-19 Immunization")\
        .getOrCreate()

## Step 1: Scope the Project and Gather Data
In this step, we’ll:

* Identify and gather the data we'll be using for our project (at least two sources and more than 1 million rows).
* Explain what end use cases we'd like to prepare the data for (e.g., analytics table, app back-end, source-of-truth database, etc.)

We choose the following datasets:
* Brazilian Government' dataset [COVID-19 population immunization program](https://dados.gov.br/dataset/covid-19-vacinacao/resource/ef3bd0b8-b605-474b-9ae5-c97390c197a8?inner_span=True)
    * Complete data: "Dados completos"
    * Data per Brazilian States: "Dados XX", where XX is the State code

For development we include just a subset of data in /data/vaccines.csv.

When executing the pipeline on production, we should download the complete dataset, replacing the file vaccines.csv.

**The dataset will be prepared for a data warehouse analysis table, available for public use (citizens, press, health institutions), for monitoring population immunization rates.**.

In [None]:
vaccines_df = spark.read.csv(INPUT_DATA_VACCINES, sep=';', header=True)

vaccines_df.printSchema()

## Step 2: Explore and Assess the Data
In this step we need:
* Explore the data to identify data quality issues, like missing values, duplicate data, etc.
* Document steps necessary to clean the data

In [None]:
# Read the data dictionary from JSON and extract the valid columns
col_names = pd.read_json(DATA_COLUMNS, typ='series')
valid_columns = col_names.index
valid_columns

In [None]:
# Get the difference between the dataframe colums and the valid columns
columns_todrop = list(set(vaccines_df.columns) - set(valid_columns))

columns_todrop

In [None]:
# Remove unused columns from dataframe
vaccines_df = vaccines_df.drop(*columns_todrop)
vaccines_df.printSchema()

In [None]:
# Replace the null values
vaccines_df = vaccines_df\
    .withColumn('paciente_id_null', null_id_to_uuid(vaccines_df.paciente_id))\
    .drop('paciente_id')\
    .withColumnRenamed('paciente_id_null', 'paciente_id')
    
vaccines_df = vaccines_df.fillna(\
    {\
        'vacina_categoria_codigo': 0, \
        'vacina_categoria_nome': 'N/A', \
        'vacina_grupoatendimento_nome': 'N/A', \
        'paciente_enumsexobiologico': 'N/A',\
        'paciente_endereco_nmmunicipio': 'N/A', \
        'paciente_endereco_nmpais': 'N/A', \
        'paciente_endereco_uf': 'N/A', \
        'estalecimento_nofantasia': 'N/A'
    })

In [None]:
vaccines_df.printSchema()

In [None]:
write_parquet(vaccines_df, 'staging_immunization')

vaccines_df = read_parquet('staging_immunization')

## Step 3: Define the Data Model
_Map out the conceptual data model and explain why you chose that model_

The data model is a star schema consisting of 5 Dimensions table and 1 Fact table:
  * Dimensions tables:
      * vaccines table: Vaccines and suppliers
      * health_institution table: Hospitals, Nursing home, Clinics 
      * category table: Priority groups
      * population_group table: Demograph group (professions, age group, ethnicity)
      * patient table: Demograph data (age, city, gender)
  * Fact table:
      * immunization table: Dimensions, First | second dose, date

![ER Data Model - Star Scheme](./docs/er-model-star.jpg)

_List the steps necessary to pipeline the data into the chosen data model_
* ETL starts the enviroment setup: imports, read config file, def functions and create Spark Session
* ETL script takes source data (Brazilian Government' dataset COVID-19 population immunization program)
* Raw data is read into dataframe and cleaned (remove unused columns, fill nulls) 
* For each dimension and fact table 
	* Create a temporary view table
	* Read data to new dataframe
    * Check data quality: key columns don't have nulls, each table has content
	* Create id/indexes (if necessary)
	* Write parquet files

## Step 4: Run ETL to Model the Data
* Create the data pipelines and the data model
* Include a data dictionary
* Run data quality checks to ensure the pipeline ran as expected
	* Integrity constraints on the relational database (e.g., unique key, data type, etc.)
	* Unit tests for the scripts to ensure they are doing the right thing
	* Source/count checks to ensure completeness

In [None]:
# Create vaccines table
vaccines_df.createOrReplaceTempView("vaccines_table_DF")
vaccines_table_DF = spark.sql("""
    SELECT  DISTINCT vacina_codigo AS id, 
                     vacina_nome AS name, 
                     vacina_fabricante_nome AS supplier
    FROM vaccines_table_DF
    ORDER BY supplier
""")

vaccines_table_DF.printSchema()
vaccines_table_DF.show()

In [None]:
# Check data quality
if not check_nulls(vaccines_table_DF, ['id', 'name', 'supplier'], 0): raise Exception('Null: Vaccines tables')
if not check_has_content(vaccines_table_DF): raise Exception('No content: Vaccines table')

In [None]:
# Write parquet file
write_parquet(vaccines_table_DF, 'vaccines')

In [None]:
# Create Health Institution table
vaccines_df.createOrReplaceTempView("health_institution_table_DF")
health_institution_table_DF = spark.sql("""
    SELECT DISTINCT estalecimento_nofantasia AS name,
                    estabelecimento_razaosocial AS organization,
                    estabelecimento_uf AS state,
                    estabelecimento_municipio_nome AS city
    FROM health_institution_table_DF
    ORDER BY name
""")

health_institution_table_DF = health_institution_table_DF.select(mono_id().alias('id'), '*')
health_institution_table_DF.printSchema()
health_institution_table_DF.show()

In [None]:
# Check data quality
if not check_nulls(health_institution_table_DF, ['id', 'name', 'organization', 'state', 'city'], 0): raise Exception('Null: Health institution table')
if not check_has_content(health_institution_table_DF): raise Exception('No content: Health institution table')

In [None]:
# Write parquet file
write_parquet(health_institution_table_DF, 'health_institution')

In [None]:
# Create Category table
vaccines_df.createOrReplaceTempView("category_table_DF")
category_table_DF = spark.sql("""
    SELECT DISTINCT vacina_categoria_codigo AS id,
                    vacina_categoria_nome AS name
            FROM category_table_DF
            ORDER BY name
""")

category_table_DF.printSchema()
category_table_DF.show()

In [None]:
# Check data quality
if not check_nulls(category_table_DF, ['id', 'name'], 0): raise Exception('Null: Category table')
if not check_has_content(category_table_DF): raise Exception('No content: Category table')

In [None]:
# Write parquet file
write_parquet(category_table_DF, 'category')

In [None]:
# Create Population Groups table
vaccines_df.createOrReplaceTempView("population_group_table_DF")
population_group_table_DF = spark.sql("""
    SELECT DISTINCT vacina_grupoatendimento_codigo AS id,
                    vacina_grupoatendimento_nome AS name
            FROM population_group_table_DF
        ORDER BY name
""")

population_group_table_DF.printSchema()
population_group_table_DF.show()

In [None]:
# Check data quality
if not check_nulls(population_group_table_DF, ['id', 'name'], 0): raise Exception('Null: Population group table')
if not check_has_content(population_group_table_DF): raise Exception('No content: Population group table')

In [None]:
# Write parquet file
write_parquet(population_group_table_DF, 'population_group')

In [None]:
# Create Patient table
vaccines_df.createOrReplaceTempView("patient_table_DF")
patient_table_DF = spark.sql("""
    SELECT DISTINCT paciente_id AS id,
                    paciente_idade AS age,
                    paciente_datanascimento AS birthdate,
                    paciente_enumsexobiologico AS gender,
                    paciente_endereco_nmpais AS country,
                    paciente_endereco_uf AS state,
                    paciente_endereco_nmmunicipio AS city
            FROM patient_table_DF
            ORDER BY id
""")

patient_table_DF.printSchema()
patient_table_DF.show()

In [None]:
df1 = spark.sql("select * from patient_table_DF WHERE paciente_id is null")

df1.toPandas().style

In [None]:
# Check data quality
if not check_nulls(patient_table_DF, ['id', 'age', 'birthdate', 'gender', 'country', 'state', 'city'], 0): raise Exception('Null: Patient table')
if not check_has_content(patient_table_DF): raise Exception('No content: Patient table')

In [None]:
# Write parquet file and get back to Spark:
write_parquet(patient_table_DF, 'patient')

In [None]:
# Get back Health Institution table from parquet
health_institution_table_DF = read_parquet('health_institution')

In [None]:
# Create Patient table and write parquet files
vaccines_df.createOrReplaceTempView("vdf")
health_institution_table_DF.createOrReplaceTempView("hidf")
immunization_table_DF = spark.sql("""
    SELECT DISTINCT vdf.paciente_id AS patient_id,
            hidf.id AS health_institution_id,
            vdf.vacina_categoria_codigo AS category_id,
            vdf.vacina_grupoatendimento_codigo AS population_group_id,
            vdf.vacina_codigo AS vaccines_id,
            vdf.vacina_descricao_dose AS vaccines_dose,
            vdf.vacina_dataaplicacao AS jab_date
        FROM vdf INNER JOIN hidf
        ON vdf.estalecimento_nofantasia == hidf.name
        AND vdf.estabelecimento_razaosocial == hidf.organization
        ORDER BY jab_date
""")

immunization_table_DF.printSchema()

In [None]:
# Check data quality
if not check_nulls(immunization_table_DF, \
        ['patient_id', \
        'health_institution_id', \
        'category_id', \
        'population_group_id', \
        'vaccines_id', \
        'vaccines_dose', \
        'jab_date'], 0): raise Exception('Null: Immunization table')
if not check_has_content(immunization_table_DF): raise Exception('No content: Immunization table')

In [None]:
# Write parquet file and get back to Spark:
write_parquet(immunization_table_DF, 'immunization')
immunization_table_DF = read_parquet('immunization')

In [None]:
immunization_table_DF.show()

## Step 5: Complete Project Write Up
Tools:
* Python
* Pandas
* Spark

These tools/technologies are apropriated to manipulate large dataset, processing in paralellized clusters

**ETL script should be run weekly basis**, or whenever the .gov.br update the datasets

How the script would approach the problem differently under the following scenarios:
* If the data was increased by 100x: _Use Spark Clustered to parallel the data load_
* If the pipelines were run on a daily basis by 7am: _Refactor the script to process only new informations. The [.gov.br API](https://dados.gov.br/dataset/covid-19-vacinacao/resource/97a8fbcf-941f-4d2e-91ba-dd467d5bdeac?inner_span=True) could be used to request the delta information_
* If the database needed to be accessed by 100+ people: _Store the parquet files on a cloud data lake_

### Sample queries

In [None]:
vaccines_table_DF = read_parquet('vaccines')

# Get Health institution by vaccines aplication
vaccines_table_DF.createOrReplaceTempView("vdf")
health_institution_table_DF.createOrReplaceTempView("hidf")
immunization_table_DF.createOrReplaceTempView("idf")

health_institution_rank = spark.sql("""
    SELECT 
        CASE
            WHEN (GROUPING(hidf.name) = 1) THEN '# Total Institution'
            ELSE hidf.name
        END AS Institution,
        CASE
            WHEN (GROUPING(vdf.name) = 1) THEN '# Total Vaccines'
            ELSE vdf.name
        END AS Vaccine,
        COUNT(*) AS TOTAL
    FROM idf 
    INNER JOIN hidf ON idf.health_institution_id = hidf.id
    INNER JOIN vdf ON idf.vaccines_id = vdf.id
    GROUP BY CUBE(hidf.name, vdf.name)
    ORDER BY Institution, TOTAL DESC
""").toPandas()

health_institution_rank.style

In [None]:
# Get vaccines aplication
vaccines_rank = spark.sql("""
    SELECT 
        CASE
            WHEN (GROUPING(vdf.supplier) = 1) THEN '# Total Supplier'
            ELSE vdf.supplier
        END AS Supplier,
        CASE
            WHEN (GROUPING(vdf.name) = 1) THEN '# Total Vaccines'
            ELSE vdf.name
        END AS Vaccine,
        COUNT(*) AS TOTAL
    FROM idf 
    INNER JOIN vdf ON idf.vaccines_id = vdf.id
    GROUP BY CUBE(vdf.supplier, vdf.name)
    ORDER BY 1, TOTAL DESC
""").toPandas()

vaccines_rank.style