# Data Engineering Lab
---

In this lab we are manipulating data from several sources (local files, API, PostgreSQL) to build an analytics database.
Some actions can be ad-hoc (like the initial ingestions in this notebook) and others are recurrent ETL (Extract-Transform-Load) processes, like the data pipelines we will also go through later.


# 0 - Libraries and utilities

In [1]:
import requests
import json
import os

import pandas as pd
import psycopg2 as pg

> ⚠️ In the code below we are declaring a password and other sensitive details as readable strings. **Definitely not a good practice!!** If you ever have to do this in a production system or somewhere where your code is exposed (e.g. Github), there are several libraries that help to hide your credetials. Here are [some examples](http://theautomatic.net/2020/04/28/how-to-hide-a-password-in-a-python-script/).

In [26]:
# DB config
DB_HOST='localhost'
DB_NAME='postgres'
DB_USER='postgres'
DB_PASSWORD='timileyin'
DB_PORT=5432 # Default is 5432

# File paths
#RAW_DATA_FOLDER_PATH = os.path.join('data', 'raw')
COUNTRY_REGION_FILE_PATH = os.path.join('country_regions.txt')

# Table names
TEST_PREFIX=''
MOVE_RANGE_TABLE_NAME = f'{TEST_PREFIX}rpl_move_range'
COVID_SURVEY_TABLE_NAME = f'{TEST_PREFIX}rpl_covid_survey'

# Time range (for COVID survey API)
FROM_DATE = '2021-01-01'
TO_DATE = '2021-06-30'

# Other parameters
SURVEY_INDICATORS = [
    'mask',
    'covid',
    'tested_positive_14d',
    'anosmia',
]

## 0.1 functions
The functions below are wrappers to interact with the database (create tables, query data, insert data from file...)

In [27]:
# Returns objects to interact with database
def connect_db():
    conn = pg.connect(
        host=DB_HOST,
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        port=DB_PORT,
    )
    cur = conn.cursor()
    return conn, cur

# Creates empty table in database
def create_table(table_name, table_schema, drop_if_exists=False):
    conn, cur = connect_db()
    cols = ',\n'.join([f'{col_name} {col_type}' for col_name, col_type in table_schema])
    
    if drop_if_exists:
        sql = f'DROP TABLE IF EXISTS {table_name};'
        print(sql)
        print('\n\n')
        cur.execute(sql)

    sql = f"""
        CREATE TABLE IF NOT EXISTS {table_name}(
            {cols}
        );
    """
    print(sql)
    print('\n\n')
    cur.execute(sql)
    conn.commit()
    conn.close()


# Loads data from file to table in database
def load_file_in_table(
    file_path,
    table_name,
    table_schema=None,
    sep='\t',
    skip_header=True,
    file_encoding='utf-8',
    run_create_table=False,
    overwrite_filter=None,
):
    conn, cur = connect_db()
    
    if run_create_table:
        create_table(table_name, table_schema)
    
    if overwrite_filter:
        sql = f'DELETE FROM {table_name} WHERE {overwrite_filter}'
        print(sql)
        cur.execute(sql)
        print('\n\n')

    print('-----------------------\nFile loading: STARTED\n-----------------------\n')
    with open(file_path, 'r', encoding=file_encoding) as f:
        if skip_header:
            next(f)
        cur.copy_from(f, table_name, sep=sep)
        conn.commit()
    conn.close()

    print('-----------------------\nFile loading: FINISHED\n-----------------------\n')


# Runs query on database
def query_db(query):
    conn, cur = connect_db()
    output = pd.io.sql.read_sql_query(query, conn)
    conn.close()
    return output

# Returns a dataframe listing all tables in database
def get_tables_list():
    return query_db('''
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema='public'
        AND table_type='BASE TABLE'
        ORDER BY table_name;
    ''')


# #########################
# Other auxiliary functions
# #########################

def get_indicator_code(indicator):
    indicator_to_code = {
        'covid': 'cli',
        'flu': 'ili',
        'mask': 'mc',
        'contact': 'dc',
        'finance': 'hf',
        'anosmia': 'anos',
        'vaccine_acpt': 'vu',
    }
    return indicator_to_code.get(indicator, indicator)

Now we can query the database to confirm the data is there.
The auxiliary function ```query_db()``` gets a SQL query and returns a *pandas.DataFrame* with the output.

# 1 - Load data: COVID survey (API)
API documentation: [https://gisumd.github.io/COVID-19-API-Documentation](https://gisumd.github.io/COVID-19-API-Documentation)

Now instead of loading from a file already downloaded, we are going to connect to an API, save the output locally and then load that API response into the database.

This API can return surveys with several indicators (e.g. % people using mask, % of people recently diagnosed with COVID-19, etc) for different countries.

First we will ask the API for a list of all countries, and then retrieve several reports.


## 1.1 - Load country & regions

Using one of the auxiliary functions defined above (```load_file_in_table()```) we load the file into a table in the PostgreSQL database.

To create a table in the database we need to specify its schema (i.e. their columns and [data types](https://www.postgresql.org/docs/9.5/datatype.html)).

In [28]:
# request data from api
response = requests.get("https://covidmap.umd.edu/api/region").text

# convert json data to dictionary
response_dict = json.loads(response)

# convert to pandas dataframe
df = pd.DataFrame.from_dict(response_dict['data'])

# Save dataframe in file
df.to_csv(COUNTRY_REGION_FILE_PATH, sep='\t', index=False, encoding='utf-8')

# Load file in database
table_schema = (
    ('country', 'text'),
    ('region', 'text')
)
load_file_in_table(COUNTRY_REGION_FILE_PATH, 'country_regions', table_schema=table_schema, run_create_table=True)


        CREATE TABLE IF NOT EXISTS country_regions(
            country text,
region text
        );
    



-----------------------
File loading: STARTED
-----------------------

-----------------------
File loading: FINISHED
-----------------------



In [29]:
query_db("SELECT * FROM country_regions")

Unnamed: 0,country,region
0,Afghanistan,Badakhshan
1,Afghanistan,Balkh
2,Afghanistan,Kabul
3,Albania,Tiranë
4,Algeria,Alger
...,...,...
1257,Vietnam,Tuyên Quang
1258,Vietnam,Vĩnh Phúc
1259,Vietnam,Yên Bái
1260,Yemen,Aden


## 1.2 - Load survey data

### 1.2.0 API call example

#### 1st sample: % of mask usage in Nigeria

In [30]:
# request data from api
response = requests.get("https://covidmap.umd.edu/api/resources?indicator=mask&type=daily&country=Nigeria&daterange=20201201-20201204").text

#convert json data to dic data for use!
response_dict = json.loads(response)

# convert to pandas dataframe
df = pd.DataFrame.from_dict(response_dict['data'])

In [7]:
df.head()

Unnamed: 0,percent_mc,mc_se,percent_mc_unw,mc_se_unw,sample_size,country,iso_code,gid_0,survey_date
0,0.576779,0.035234,0.579634,0.025223,383.0,Nigeria,NGA,NGA,20201201
1,0.497795,0.032746,0.517157,0.024739,408.0,Nigeria,NGA,NGA,20201202
2,0.49177,0.033392,0.541885,0.025492,382.0,Nigeria,NGA,NGA,20201203
3,0.512306,0.032774,0.566434,0.023926,429.0,Nigeria,NGA,NGA,20201204


#### 2nd sample: % of people avoiding contact in Finland

In [31]:
# request data from api
response = requests.get("https://covidmap.umd.edu/api/resources?indicator=avoid_contact&type=daily&country=Finland&daterange=20201201-20201204").text

#convert json data to dic data for use!
response_dict = json.loads(response)

# convert to pandas dataframe
df = pd.DataFrame.from_dict(response_dict['data'])
df

Unnamed: 0,pct_avoid_contact,avoid_contact_se,pct_avoid_contact_unw,avoid_contact_se_unw,sample_size,country,iso_code,gid_0,survey_date
0,0.717799,0.027852,0.706977,0.021949,430.0,Finland,FIN,FIN,20201201
1,0.697544,0.03189,0.695238,0.022461,420.0,Finland,FIN,FIN,20201202
2,0.733249,0.030002,0.746341,0.021488,410.0,Finland,FIN,FIN,20201203
3,0.680975,0.03155,0.711618,0.020634,482.0,Finland,FIN,FIN,20201204


### 1.2.1 Get survey data for several indicators and countries

Using what we have seen so far, plus some loops and parameterization, we can download reports for different indicators and countries.

In [42]:
for indicator in SURVEY_INDICATORS:
    print('--------------------------\n')
    print(f'-- INDICATOR: {indicator}')
    print('\n--------------------------\n')
    
    # ############
    # Create table
    # ############
    indicator_code = get_indicator_code(indicator)
    indicator_table_name = f'{COVID_SURVEY_TABLE_NAME}_{indicator}'

    table_schema = (
        (f'percent_{indicator_code}', 'float'),
        (f'{indicator_code}_se', 'float'),
        (f'percent_{indicator_code}_unw', 'float'),
        (f'{indicator_code}_se_unw', 'float'),
        ('sample_size', 'NUMERIC'),
        ('country', 'text'),
        ('iso_code', 'text'),
        ('gid_0', 'text'),
        ('survey_date', 'NUMERIC'),
    )
    create_table(indicator_table_name, table_schema, drop_if_exists=True)

    # ####################
    # Load data into table
    # ####################
    from_date_no_dash = FROM_DATE.replace('-', '')
    to_date_no_dash = TO_DATE.replace('-', '')

    countries = [
        'Spain',
        'Germany',
        'Japan',
    ]

    for country in countries:
        file_path = f'covid_survey__{country}__{indicator_code}__{from_date_no_dash}__{to_date_no_dash}.txt'
        response = requests.get(f"https://covidmap.umd.edu/api/resources?indicator={indicator_code}&type=daily&country={country}&daterange={from_date_no_dash}-{to_date_no_dash}").text
        response_dict = json.loads(response)
        df = pd.DataFrame.from_dict(response_dict, orient="index")
        #df.to_csv(file_path, sep='\t', index=False, encoding='utf-8')


--------------------------

-- INDICATOR: mask

--------------------------

DROP TABLE IF EXISTS rpl_covid_survey_mask;




        CREATE TABLE IF NOT EXISTS rpl_covid_survey_mask(
            percent_mc float,
mc_se float,
percent_mc_unw float,
mc_se_unw float,
sample_size NUMERIC,
country text,
iso_code text,
gid_0 text,
survey_date NUMERIC
        );
    



--------------------------

-- INDICATOR: covid

--------------------------

DROP TABLE IF EXISTS rpl_covid_survey_covid;




        CREATE TABLE IF NOT EXISTS rpl_covid_survey_covid(
            percent_cli float,
cli_se float,
percent_cli_unw float,
cli_se_unw float,
sample_size NUMERIC,
country text,
iso_code text,
gid_0 text,
survey_date NUMERIC
        );
    



--------------------------

-- INDICATOR: tested_positive_14d

--------------------------

DROP TABLE IF EXISTS rpl_covid_survey_tested_positive_14d;




        CREATE TABLE IF NOT EXISTS rpl_covid_survey_tested_positive_14d(
            percent_tested_positive_

In [44]:
df

Unnamed: 0,0
error,need parameter:[indicator: 'covid' or 'flu' or...


In [45]:
query_db(f"SELECT * FROM {COVID_SURVEY_TABLE_NAME}_mask LIMIT 10")

Unnamed: 0,percent_mc,mc_se,percent_mc_unw,mc_se_unw,sample_size,country,iso_code,gid_0,survey_date


With the auxiliary function ```get_tables_list()``` we can take a look at all the tables in the database, created throuhout this lab.

In [46]:
get_tables_list()

Unnamed: 0,table_name
0,country_regions
1,rpl_covid_survey_anosmia
2,rpl_covid_survey_covid
3,rpl_covid_survey_mask
4,rpl_covid_survey_tested_positive_14d


# What next?
1. In the example above we are getting the reports from a few countries. What if we want to add some more. What if we want to add them all?
2. Take a look at the entire list of indicators for the COVID survey in the [API documentation](https://gisumd.github.io/COVID-19-API-Documentation/docs/indicators/indicators.html). Would you like to include any of this in your analysis?

# 2. Data Pipeline with Luigi
[Luigi](https://luigi.readthedocs.io/en/stable/index.html) is a Python framework to schedule and orquestrate batch jobs. This framework and other similar (e.g. Apache AirFlow) are very useful to build data pipelines.

In Luigi, pipeline consists in a collection of tasks, each of them can have several funcionalities/methods. The main ones are:
- **Task.requires()**: Other tasks which are dependencies, therefore need to run before.
- **Task.run()**: The action the specific task executes/handles.
- **Task.output()**: The expected outcome for this task. In Luigi jargon this outputs are called "targets" and they can take many formats (e.g. a text file, a data partition on Hive, etc). A task is not considered complete until the target exists. If at the time of scheduling the pipeline the target exists already, then the task will not run.


## 2.1 rpl_covid_survey.py


In the file **rpl_covid_survey.py** we'll show an example of a Luigi pipeline, where we download a daily report from the API and load it into PostgreSQL. It has 4 classes:
1. CreateTable: Creates table in database, if it doesn't exist already.
2. DownloadAPIReport: Downloads report from API.
3. LoadReportIntoDB: Loads report into the database
4. MasterTask: This class generates several instances of the previous tasks, to download and ingest data for several survey indicators and countries.


To run a luigi pipeline from the terminal the code is as follows:

> python -m luigi --module {pipeline_name} {task_name} --{parameter} {parameter_value} --local-scheduler

So for our specific example it can be as follows:

> python -m luigi --module pipelines.rpl_covid_survey MasterTask --date 2021-07-01 --test-prefix test_1_ --local-scheduler
(you can omit ```--test-prefix``` parameter to insert into the "production" table instead).

## 2.1 covid_survey_covid_mask
In this pipeline we cross the data from 2 reports from 2 different indicators (*covid* and *mask*).

> python -m luigi --module pipelines.covid_survey_covid_mask LoadTable --date 2021-07-01 --test-prefix test_1_ --local-scheduler

# 3. What next?
Try writing a similar pipeline to covid_survey_covid_mask. Using SQL is what a data engineer would use in a "real scenario" querying big loads of data, but you can use Pandas if you prefer.


# 4. Homework
The COVID survey provides with a collection of indicators wide enough to run interesting analysis. But this could be crossed with other data sources to run even more insightful assessments, such as: 

- [Movement Range Maps](https://data.humdata.org/dataset/movement-range-maps)
- [HRSL (High Resolution Settlement Layer)](https://research.fb.com/downloads/high-resolution-settlement-layer-hrsl/)