<h1>Table of Contents<span class="tocSkip"></span></h1>


# Introduction
<hr style = "border:2px solid black" ></hr>


**What?** Simple ETL pipeline



# What is an ETL pipeline?
<hr style = "border:2px solid black" ></hr>


- An ETL (Data Extraction, Transformation, Loading) pipeline is a set of processes used to Extract, Transform, and Load data from a source to a target.

- The source of the data can be from one or many sources, such as from an API call, CSV files, information within a database, and many more.

- We take these sources of information then transform it in a way where it can be used immediately by another client, user or developer within some target storage.
    
- **ETL vs. data pipeline** ETL pipelines qualify as a type of data pipeline. But “data pipeline” is a more general term, and a data pipeline does not necessarily involve data transformation or even loading into a destination database—the loading process in a data pipeline could activate another process or workflow, for instance.



# Import modules
<hr style = "border:2px solid black" ></hr>

In [1]:
import requests
import pandas as pd
import numpy as np
import os
import sqlite3
from tqdm.auto import tqdm

 # Extraction
 <hr style = "border:2px solid black" ></hr>


- We'll be collecting raw data from John Hopkins University’s [GitHub](https://github.com/CSSEGISandData/COVID-19). 
- Specifically we will be looking at the daily reports.
- From Github’s API documentation, we can send a GET request to this endpoint to receive a JSON response back telling us the urls for each csv file so we can read directly.
- `OWNER` is owner of GitHub repository
- `REPO` is name of repository
- `PATH` is specific path to our folder we want to examine relative to REPO
- `URL` is api endpoint used to send GET request
    


In [2]:
OWNER = 'CSSEGISandData'
REPO = 'COVID-19'
PATH = 'csse_covid_19_data/csse_covid_19_daily_reports'
URL = f'https://api.github.com/repos/{OWNER}/{REPO}/contents/{PATH}'
print(f'Downloading paths from {URL}')

Downloading paths from https://api.github.com/repos/CSSEGISandData/COVID-19/contents/csse_covid_19_data/csse_covid_19_daily_reports


In [3]:
download_urls = []
response = requests.get(URL)

In [4]:
response.json()

[{'name': '.gitignore',
  'path': 'csse_covid_19_data/csse_covid_19_daily_reports/.gitignore',
  'sha': '496ee2ca6a2f08396a4076fe43dedf3dc0da8b6d',
  'size': 9,
  'url': 'https://api.github.com/repos/CSSEGISandData/COVID-19/contents/csse_covid_19_data/csse_covid_19_daily_reports/.gitignore?ref=master',
  'html_url': 'https://github.com/CSSEGISandData/COVID-19/blob/master/csse_covid_19_data/csse_covid_19_daily_reports/.gitignore',
  'git_url': 'https://api.github.com/repos/CSSEGISandData/COVID-19/git/blobs/496ee2ca6a2f08396a4076fe43dedf3dc0da8b6d',
  'download_url': 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/.gitignore',
  'type': 'file',
  '_links': {'self': 'https://api.github.com/repos/CSSEGISandData/COVID-19/contents/csse_covid_19_data/csse_covid_19_daily_reports/.gitignore?ref=master',
   'git': 'https://api.github.com/repos/CSSEGISandData/COVID-19/git/blobs/496ee2ca6a2f08396a4076fe43dedf3dc0da8b6d',
   'html': '


- Using Python’s Requests library we will interpret the JSON response and examine the information we need. 

- What is returned is an array of object descriptions for every file in the github repo. We are interested in the ‘download_url’. We will use this ‘download_url’ to download this data directly using Pandas then transform it as we see fit.

- The script below collects all the `download_url`’s into a Python List Object



In [5]:
for data in tqdm(response.json()):
    if data['name'].endswith('.csv'):
        download_urls.append(data['download_url'])

  0%|          | 0/775 [00:00<?, ?it/s]

In [8]:
pd.DataFrame(download_urls)

Unnamed: 0,0
0,https://raw.githubusercontent.com/CSSEGISandDa...
1,https://raw.githubusercontent.com/CSSEGISandDa...
2,https://raw.githubusercontent.com/CSSEGISandDa...
3,https://raw.githubusercontent.com/CSSEGISandDa...
4,https://raw.githubusercontent.com/CSSEGISandDa...
...,...
768,https://raw.githubusercontent.com/CSSEGISandDa...
769,https://raw.githubusercontent.com/CSSEGISandDa...
770,https://raw.githubusercontent.com/CSSEGISandDa...
771,https://raw.githubusercontent.com/CSSEGISandDa...


# Transform
<hr style = "border:2px solid black" ></hr>


- Taking a look at the information, we are interested in looking at specific values. If these values are not within the DataFrame, we will replace/add these columns with Empty values (such as numpy.nan).

- Some labels are inconsistent. They define the same type of data, but they are named differently. For example, ‘Last Update’ is also called ‘Last_Update’. We will also redefine these labels for consistency. 

- The code above transform the already collected data as a Pandas DataFrame add makes the DataFrames consistent by adding and relabelling columns. 



In [11]:
# List of labels to be renamed
relabel = {
    # 'Last Update': 'Last_Update',
    'Country/Region': 'Country_Region',
    'Lat': 'Latitude',
    'Long_': 'Longitude',
    'Province/State': 'Province_State',
}

In [12]:
def factor_dataframe(dat, filename):
    """ Refactor the dataframe to be uploaded into a SQL database
    as a pandas DataFrame
    """
    # rename labels
    for label in dat:
        if label in relabel:
            dat = dat.rename(columns={label: relabel[label]})

    # return a dataframe with these parameters
    labels = ['Province_State', 'Country_Region',
              'Last_Update', 'Confirmed', 'Deaths', 'Recovered']
    # filename is datetime
    if 'Last_Update' not in dat:
        dat['Last_Update'] = pd.to_datetime(filename)

    # replace columns not in dataframe with nan
    for label in labels:
        if label not in dat:
            dat[label] = np.nan

    return dat[labels]

# Load
<hr style = "border:2px solid black" ></hr>


- We can directly load our pandas DataFrame into a SQL database using `pandas.DataFrame.to_sql`.

- If the table within the database has already created, replace it with a new one. This is a very bad way to do this in production. But for the simplicity of this exercise it suffices.

- The code below describes how to connect to a SQLite database using sqlite3.



In [13]:
def upload_to_sql(filenames, db_name, debug=False):
    """ Given a list of paths, upload to a database
    """
    
    conn = sqlite3.connect(f"{db_name}.db")

    if debug:
        print("Uploading into database")
    for i, file_path in tqdm(list(enumerate(filenames))):

        dat = pd.read_csv(file_path)

        # rename labels
        filename = os.path.basename(file_path).split('.')[0]
        dat = factor_dataframe(dat, filename)

        # write records to sql database
        if i == 0:  # if first entry, and table name already exist, replace
            dat.to_sql(db_name, con=conn, index=False, if_exists='replace')
        else:  # otherwise append to current table given db_name
            dat.to_sql(db_name, con=conn, index=False, if_exists='append')

# Run ETL pipeline and clean up
<hr style = "border:2px solid black" ></hr>

In [14]:
# upload into sql database
upload_to_sql(download_urls, 'example', debug=True)

Uploading into database


  0%|          | 0/773 [00:00<?, ?it/s]

In [17]:
# Check is database was saved locally
!ls

Simple ETL pipeline.ipynb example.db


In [None]:
# Delete database
! rm example.db

# References
<hr style = "border:2px solid black" ></hr>


- https://medium.com/analytics-vidhya/building-a-etl-pipeline-226656a22f6d
- [COVID-19 Data Repository by the Center for Systems Science and Engineering (CSSE) at Johns Hopkins University](https://github.com/CSSEGISandData/COVID-19)
- From Github’s API documentation, we can send a GET request to this endpoint to receive a JSON response back telling us the urls for each csv file so we can read directly.

