# Data Engineering Lab
---

In this lab we are manipulating data from several sources (local files, API, PostgreSQL) to build an analytics database.
Some actions can be ad-hoc (like the initial ingestions in this notebook) and others are recurrent ETL (Extract-Transform-Load) processes, like the data pipelines we will also go through later.


# 0 - Libraries and utilities

In [1]:
import requests
import json
import os

import pandas as pd
import psycopg2 as pg

In [2]:
# DB config
DB_HOST='localhost'
DB_NAME='postgres'
DB_USER='postgres'
DB_PASSWORD='postgres'
DB_PORT=5433 # Default is 5432

# File paths
RAW_DATA_FOLDER_PATH = os.path.join('data', 'raw')
MOVE_RANGE_FILE_PATH = os.path.join(RAW_DATA_FOLDER_PATH ,'movement-range-2021-09-09.txt')
COUNTRY_REGION_FILE_PATH = os.path.join(RAW_DATA_FOLDER_PATH , 'country_regions.txt')

# Table names
TEST_PREFIX=''
MOVE_RANGE_TABLE_NAME = f'{TEST_PREFIX}rpl_move_range'
COVID_SURVEY_TABLE_NAME = f'{TEST_PREFIX}rpl_covid_survey'

# Time range (for COVID survey API)
FROM_DATE = '2021-01-01'
TO_DATE = '2021-06-30'

# Other parameters
SURVEY_INDICATORS = [
    'mask',
    'covid',
    'tested_positive_14d',
    'anosmia',
]

## 0.1 functions
The functions below are wrappers to interact with the database (create tables, query data, insert data from file...)

In [3]:
# Returns objects to interact with database
def connect_db():
    conn = pg.connect(
        host=DB_HOST,
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        port=DB_PORT,
    )
    cur = conn.cursor()
    return conn, cur

# Creates empty table in database
def create_table(table_name, table_schema, drop_if_exists=False):
    conn, cur = connect_db()
    cols = ',\n'.join([f'{col_name} {col_type}' for col_name, col_type in table_schema])
    
    if drop_if_exists:
        sql = f'DROP TABLE IF EXISTS {table_name};'
        print(sql)
        print('\n\n')
        cur.execute(sql)

    sql = f"""
        CREATE TABLE IF NOT EXISTS {table_name}(
            {cols}
        );
    """
    print(sql)
    print('\n\n')
    cur.execute(sql)
    conn.commit()
    conn.close()


# Loads data from file to table in database
def load_file_in_table(
    file_path,
    table_name,
    table_schema=None,
    sep='\t',
    skip_header=True,
    file_encoding='utf-8',
    run_create_table=False,
    overwrite_filter=None,
):
    conn, cur = connect_db()
    
    if run_create_table:
        create_table(table_name, table_schema)
    
    if overwrite_filter:
        sql = f'DELETE FROM {table_name} WHERE {overwrite_filter}'
        print(sql)
        cur.execute(sql)
        print('\n\n')

    print('-----------------------\nFile loading: STARTED\n-----------------------\n')
    with open(file_path, 'r', encoding=file_encoding) as f:
        if skip_header:
            next(f)
        cur.copy_from(f, table_name, sep=sep)
        conn.commit()
    conn.close()

    print('-----------------------\nFile loading: FINISHED\n-----------------------\n')


# Runs query on database
def query_db(query):
    conn, cur = connect_db()
    output = pd.io.sql.read_sql_query(query, conn)
    conn.close()
    return output

# Returns a dataframe listing all tables in database
def get_tables_list():
    return query_db('''
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema='public'
        AND table_type='BASE TABLE'
        ORDER BY table_name;
    ''')


# #########################
# Other auxiliary functions
# #########################

def get_indicator_code(indicator):
    indicator_to_code = {
        'covid': 'cli',
        'flu': 'ili',
        'mask': 'mc',
        'contact': 'dc',
        'finance': 'hf',
        'anosmia': 'anos',
        'vaccine_acpt': 'vu',
    }
    return indicator_to_code.get(indicator, indicator)

# 1.1 - Load data: Move Range (plain text file)
Link to source file: [https://data.humdata.org/dataset/movement-range-maps](https://data.humdata.org/dataset/movement-range-maps)

Using one of the auxiliary functions defined above (```load_file_in_table()```) we load the file into a table in the PostgreSQL database.
As you see, for the table schema we need to pass the name of the table columns and their data types (here we only use *text* and *float*, but there are many more).

In [4]:
table_schema = (
    ('ds', 'text'),
    ('country', 'text'),
    ('polygon_source', 'text'),
    ('polygon_id', 'text'),
    ('polygon_name', 'text'),
    ('all_day_bing_tiles_visited_relative_change', 'float'),
    ('all_day_ratio_single_tile_users', 'float'),
    ('baseline_name', 'text'),
    ('baseline_type', 'text')
)

load_file_in_table(MOVE_RANGE_FILE_PATH, MOVE_RANGE_TABLE_NAME, table_schema=table_schema, run_create_table=True)


        CREATE TABLE IF NOT EXISTS rpl_move_range(
            ds text,
country text,
polygon_source text,
polygon_id text,
polygon_name text,
all_day_bing_tiles_visited_relative_change float,
all_day_ratio_single_tile_users float,
baseline_name text,
baseline_type text
        );
    



-----------------------
File loading: STARTED
-----------------------

-----------------------
File loading: FINISHED
-----------------------



Now we can query the database to confirm the data is there.
The auxiliary function ```query_db()``` gets a SQL query and returns a *pandas.DataFrame* with the output.

In [5]:
query_db(f"SELECT * FROM {MOVE_RANGE_TABLE_NAME} LIMIT 10;")

Unnamed: 0,ds,country,polygon_source,polygon_id,polygon_name,all_day_bing_tiles_visited_relative_change,all_day_ratio_single_tile_users,baseline_name,baseline_type
0,2021-01-01,AGO,GADM,AGO.10.10_1,Lubango,-0.35291,0.25398,full_february,DAY_OF_WEEK
1,2021-01-02,AGO,GADM,AGO.10.10_1,Lubango,-0.06131,0.1733,full_february,DAY_OF_WEEK
2,2021-01-03,AGO,GADM,AGO.10.10_1,Lubango,-0.00392,0.21932,full_february,DAY_OF_WEEK
3,2021-01-04,AGO,GADM,AGO.10.10_1,Lubango,0.15114,0.11662,full_february,DAY_OF_WEEK
4,2021-01-05,AGO,GADM,AGO.10.10_1,Lubango,0.12696,0.10832,full_february,DAY_OF_WEEK
5,2021-01-06,AGO,GADM,AGO.10.10_1,Lubango,0.08147,0.12958,full_february,DAY_OF_WEEK
6,2021-01-07,AGO,GADM,AGO.10.10_1,Lubango,0.11349,0.13038,full_february,DAY_OF_WEEK
7,2021-01-08,AGO,GADM,AGO.10.10_1,Lubango,0.07934,0.1285,full_february,DAY_OF_WEEK
8,2021-01-09,AGO,GADM,AGO.10.10_1,Lubango,0.1352,0.14456,full_february,DAY_OF_WEEK
9,2021-01-10,AGO,GADM,AGO.10.10_1,Lubango,0.04224,0.20155,full_february,DAY_OF_WEEK


# 1.2 - Load data: COVID survey (API)
API documentation: [https://gisumd.github.io/COVID-19-API-Documentation](https://gisumd.github.io/COVID-19-API-Documentation)

Now instead of loading from a file already downloaded, we are going to connect to an API, save the output locally and then load that API response into the database.

This API can return surveys with several indicators (e.g. % people using mask, % of people recently diagnosed with COVID-19, etc) for different countries.

First we will ask the API for a list of all countries, and then retrieve several reports.


## 1.2.1 - Load country & regions

In [6]:
# request data from api
response = requests.get("https://covidmap.umd.edu/api/region").text

# convert json data to dictionary
response_dict = json.loads(response)

# convert to pandas dataframe
df = pd.DataFrame.from_dict(response_dict['data'])

# Save dataframe in file
df.to_csv(COUNTRY_REGION_FILE_PATH, sep='\t', index=False, encoding='utf-8')

# Load file in database
table_schema = (
    ('country', 'text'),
    ('region', 'text')
)
load_file_in_table(COUNTRY_REGION_FILE_PATH, 'country_regions', table_schema=table_schema, run_create_table=True)


        CREATE TABLE IF NOT EXISTS country_regions(
            country text,
region text
        );
    



-----------------------
File loading: STARTED
-----------------------

-----------------------
File loading: FINISHED
-----------------------



In [7]:
query_db("SELECT * FROM country_regions")

Unnamed: 0,country,region
0,Afghanistan,Badakhshan
1,Afghanistan,Balkh
2,Afghanistan,Kabul
3,Albania,Tiranë
4,Algeria,Alger
...,...,...
3781,Vietnam,Tuyên Quang
3782,Vietnam,Vĩnh Phúc
3783,Vietnam,Yên Bái
3784,Yemen,Aden


## 1.2.2 - Load survey data

### 1.2.2.0 API call example.

#### 1st sample: % of mask usage in Finland

In [8]:
# request data from api
response = requests.get("https://covidmap.umd.edu/api/resources?indicator=mask&type=daily&country=Finland&daterange=20201201-20201204").text

#convert json data to dic data for use!
response_dict = json.loads(response)

# convert to pandas dataframe
df = pd.DataFrame.from_dict(response_dict['data'])

In [9]:
df

Unnamed: 0,percent_mc,mc_se,percent_mc_unw,mc_se_unw,sample_size,country,iso_code,gid_0,survey_date
0,0.728795,0.027808,0.725173,0.021454,433.0,Finland,FIN,FIN,20201201
1,0.7113,0.030391,0.726636,0.021543,428.0,Finland,FIN,FIN,20201202
2,0.718504,0.031471,0.731707,0.021882,410.0,Finland,FIN,FIN,20201203
3,0.76261,0.027996,0.750515,0.019649,485.0,Finland,FIN,FIN,20201204


#### 2nd sample: % of people avoiding contact in Finland

In [10]:
# request data from api
response = requests.get("https://covidmap.umd.edu/api/resources?indicator=avoid_contact&type=daily&country=Finland&daterange=20201201-20201204").text

#convert json data to dic data for use!
response_dict = json.loads(response)

# convert to pandas dataframe
df = pd.DataFrame.from_dict(response_dict['data'])
df

Unnamed: 0,pct_avoid_contact,avoid_contact_se,pct_avoid_contact_unw,avoid_contact_se_unw,sample_size,country,iso_code,gid_0,survey_date
0,0.717799,0.027852,0.706977,0.021949,430.0,Finland,FIN,FIN,20201201
1,0.697544,0.03189,0.695238,0.022461,420.0,Finland,FIN,FIN,20201202
2,0.733249,0.030002,0.746341,0.021488,410.0,Finland,FIN,FIN,20201203
3,0.680975,0.03155,0.711618,0.020634,482.0,Finland,FIN,FIN,20201204


### 1.2.2.1 Get survey data for several indicators and countries

In [11]:
for indicator in SURVEY_INDICATORS:
    print('--------------------------\n')
    print(f'-- INDICATOR: {indicator}')
    print('\n--------------------------\n')
    
    # ############
    # Create table
    # ############
    indicator_code = get_indicator_code(indicator)
    indicator_table_name = f'{COVID_SURVEY_TABLE_NAME}_{indicator}'

    table_schema = (
        (f'percent_{indicator_code}', 'float'),
        (f'{indicator_code}_se', 'float'),
        (f'percent_{indicator_code}_unw', 'float'),
        (f'{indicator_code}_se_unw', 'float'),
        ('sample_size', 'NUMERIC'),
        ('country', 'text'),
        ('iso_code', 'text'),
        ('gid_0', 'text'),
        ('survey_date', 'NUMERIC'),
    )
    create_table(indicator_table_name, table_schema, drop_if_exists=True)

    # ####################
    # Load data into table
    # ####################
    from_date_no_dash = FROM_DATE.replace('-', '')
    to_date_no_dash = TO_DATE.replace('-', '')

    countries = [
        'Spain',
        'Germany',
        'Japan',
    ]

    for country in countries:
        file_path = f'{RAW_DATA_FOLDER_PATH}/covid_survey__{country}__{from_date_no_dash}__{to_date_no_dash}.txt'
        response = requests.get(f"https://covidmap.umd.edu/api/resources?indicator=mask&type=daily&country={country}&daterange={from_date_no_dash}-{to_date_no_dash}").text
        response_dict = json.loads(response)
        df = pd.DataFrame.from_dict(response_dict['data'])
        df.to_csv(file_path, sep='\t', index=False, encoding='utf-8')

        overwrite_filter_sql = f"country='{country}' AND survey_date BETWEEN {from_date_no_dash} AND {to_date_no_dash}"
        load_file_in_table(file_path, indicator_table_name, table_schema, overwrite_filter=overwrite_filter_sql)


--------------------------

-- INDICATOR: mask

--------------------------

DROP TABLE IF EXISTS rpl_covid_survey_mask;




        CREATE TABLE IF NOT EXISTS rpl_covid_survey_mask(
            percent_mc float,
mc_se float,
percent_mc_unw float,
mc_se_unw float,
sample_size NUMERIC,
country text,
iso_code text,
gid_0 text,
survey_date NUMERIC
        );
    



DELETE FROM rpl_covid_survey_mask WHERE country='Spain' AND survey_date BETWEEN 20210101 AND 20210630



-----------------------
File loading: STARTED
-----------------------

-----------------------
File loading: FINISHED
-----------------------

DELETE FROM rpl_covid_survey_mask WHERE country='Germany' AND survey_date BETWEEN 20210101 AND 20210630



-----------------------
File loading: STARTED
-----------------------

-----------------------
File loading: FINISHED
-----------------------

DELETE FROM rpl_covid_survey_mask WHERE country='Japan' AND survey_date BETWEEN 20210101 AND 20210630



-----------------------
File loa

Let's take a look at some of the data we have loaded into the db by running a very simple query:

In [13]:
query_db(f"SELECT * FROM {COVID_SURVEY_TABLE_NAME}_mask LIMIT 10")

Unnamed: 0,percent_mc,mc_se,percent_mc_unw,mc_se_unw,sample_size,country,iso_code,gid_0,survey_date
0,0.97687,0.003171,0.978026,0.002738,2867.0,Spain,ESP,ESP,20210101.0
1,0.971886,0.003932,0.975384,0.002771,3128.0,Spain,ESP,ESP,20210102.0
2,0.968848,0.004155,0.974099,0.002665,3552.0,Spain,ESP,ESP,20210103.0
3,0.962965,0.004469,0.969561,0.002968,3351.0,Spain,ESP,ESP,20210104.0
4,0.963617,0.007143,0.975008,0.002742,3241.0,Spain,ESP,ESP,20210105.0
5,0.96915,0.004062,0.974103,0.002806,3205.0,Spain,ESP,ESP,20210106.0
6,0.971114,0.003742,0.973653,0.002771,3340.0,Spain,ESP,ESP,20210107.0
7,0.969259,0.003668,0.971892,0.002814,3451.0,Spain,ESP,ESP,20210108.0
8,0.972043,0.003563,0.972437,0.002881,3229.0,Spain,ESP,ESP,20210109.0
9,0.966202,0.004061,0.970404,0.002859,3514.0,Spain,ESP,ESP,20210110.0


With the auxiliary function ```get_tables_list()``` we can take a look at all the tables in the database, created throuhout this lab.

In [12]:
get_tables_list()

Unnamed: 0,table_name
0,country_regions
1,rpl_covid_survey_anosmia
2,rpl_covid_survey_covid
3,rpl_covid_survey_finance
4,rpl_covid_survey_mask
5,rpl_covid_survey_tested_positive_14d
6,rpl_move_range
7,test_2_rpl_covid_survey_finance
8,test_2_rpl_covid_survey_mask
9,test_3_rpl_covid_survey_finance


# What next?
1. In the example above we are getting the reports from a few countries. What if we want to add some more. What if we want to add them all?
2. Take a look at the entire list of indicators for the COVID survey [survely](https://gisumd.github.io/COVID-19-API-Documentation/docs/indicators/indicators.html). Would you like to include any of this in your analysis?

# 2. Data Pipeline with Luigi
[Luigi](https://luigi.readthedocs.io/en/stable/index.html) is a Python framework to build to schedule batch jobs. This framework and other similar (e.g. Apache AirFlow) are very useful to build data pipelines.

In Luigi, pipeline consists in a collection of tasks, each of them can have several funcionalities/methods. The main ones are:
- **Task.requires()**: Other tasks which are dependencies, therefore need to run before.
- **Task.run()**: The action the specific task executes/handles.
- **Tasl.output()**: The expected outcome for this task. In Luigi jargon this outputs are called "targets" and they can take many formats (e.g. a text file, a data partition on Hive, etc). A task is not considered complete until the target exists. If at the time of scheduling the pipeline the target exists already, then the task will not run.


## 2.1 rpl_covid_survey.py


In the file **rpl_covid_survey.py** we'll show an example of a Luigi pipeline, where we download a daily report from the API and load it into PostgreSQL. It has 4 classes:
1. CreateTable: Creates table in database, if it doesn't exist already.
2. DownloadAPIReport: Downloads report from API.
3. LoadReportIntoDB: Loads report into the database
4. MasterTask: This class generates several instances of the previous tasks, to download and ingest data for several survey indicators and countries.


To run a luigi pipeline from the terminal the code is as follows:

> python -m luigi --module {pipeline_name} {task_name} --{parameter} {parameter_value} --local-scheduler

So for our specific example it can be as follows:

> python -m luigi --module pipelines.rpl_covid_survey MasterTask --date 2021-07-01 --test-prefix test_1_ --local-scheduler
(you can omit ```--test-prefix``` parameter to insert into the "production" table instead).

## 2.1 covid_survey_covid_mask
In this pipeline we cross the data from 2 reports from 2 different indicators (*covid* and *mask*).

> python -m luigi --module pipelines.covid_survey_covid_mask LoadTable --date 2021-07-01 --test-prefix test_1_ --local-scheduler

# What next?
Try writing a similar pipeline to covid_survey_covid_mask. Using SQL is what a data engineer would use in a "real scenario" querying big loads of data, but you can use Pandas if you prefer.