In [1]:
import re
import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup

# ETL Process

- **Extract**: The process of <u>gathering</u>, **<u>reading</u>**, <u>requesting</u>, <u>collecting</u> data from any number of sources.
- **Transform**: The process of converting data into meaningful information. Some examples of transformations are the <u>removal</u> of columns/rows/duplicates, <u>cleaning</u> of columns, <u>creating</u> new information (columns), <u>combining</u> information, <u>using rules</u> to create new features.
- **Load**: The process of **writing** data (usually) into a database.

# Example Case: COVID-19 data extraction.

## Get data from JOHN HOPKINS UNIVERSITY

In [None]:
# url for COVID-19, John Hopkins University
url = 'https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports'

In [5]:
type(csv_files)

list

In [6]:
response = requests.get(url)

In [7]:
html = response.content

In [8]:
soup = BeautifulSoup(html)

In [9]:
# get raw csv link from url
csv_files = ['https://github.com' + tag['href'] for tag in soup.find_all('a') if tag['href'].endswith('.csv')]

In [11]:
csv_files

list

## Get data from github

In [None]:
### Get date of the github files
# [re.findall('\d{2}-\d{2}-\d{4}', file)[0] for file in  csv_files]
"""
# extract dates from github
dates_in_github = [re.findall('\d{2}-\d{2}-\d{4}', file)[0] for file in  csv_files]

# get dates for tables in our database
dates_in_db = [text[0].replace('_','-') 
                    for text in [re.findall('\d{2}_\d{2}_\d{4}', name) 
                        for name in engine.table_names()] if len(text) > 0]

missing_months = set(dates_in_github) - set(dates_in_db)

files_to_download = []

for month in missing_months:
    for file in csv_files:
        if month in file:
            files_to_download.append(file)
"""

In [2]:
url = 'https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports'
def get_data(url):
    response = requests.get(url)
    html = response.content
    soup = BeautifulSoup(html)
    csv_files = ['https://github.com' + tag['href'] for tag in soup.find_all('a') if tag['href'].endswith('.csv')]
    return csv_files

csv_files = get_data(url)


In [21]:
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
pool = Pool(processes=cpu_count())

def gitdata(file):
        response = requests.get(file)
        html = response.content
        soup = BeautifulSoup(html)
        soup.find('div', attrs={'class':'BtnGroup'}).find('a')['href']
        csv_url = 'https://github.com' + soup.find_all('div', attrs={'class':'BtnGroup'})[-1].find_all('a')[0]['href']
        date = re.findall('\d{2}-\d{2}-\d{4}', csv_url)[0].replace('-','_')
        filename = 'corona_' + date + '.csv'
        df = pd.read_csv(csv_url)
        
        return df


var = list(map(gitdata,tqdm(csv_files)))
var

100%|████████████████████████████████████████████████████████████████████████████████| 744/744 [27:00<00:00,  2.18s/it]


[      FIPS Admin2     Province_State Country_Region          Last_Update  \
 0      NaN    NaN                NaN    Afghanistan  2021-01-02 05:22:33   
 1      NaN    NaN                NaN        Albania  2021-01-02 05:22:33   
 2      NaN    NaN                NaN        Algeria  2021-01-02 05:22:33   
 3      NaN    NaN                NaN        Andorra  2021-01-02 05:22:33   
 4      NaN    NaN                NaN         Angola  2021-01-02 05:22:33   
 ...    ...    ...                ...            ...                  ...   
 3996   NaN    NaN  W.P. Kuala Lumpur       Malaysia  2021-01-02 05:22:33   
 3997   NaN    NaN        W.P. Labuan       Malaysia  2021-01-02 05:22:33   
 3998   NaN    NaN     W.P. Putrajaya       Malaysia  2021-01-02 05:22:33   
 3999   NaN    NaN            Unknown       Malaysia  2021-01-02 05:22:33   
 4000   NaN    NaN                NaN          Tonga  2021-01-02 05:22:33   
 
            Lat       Long_  Confirmed  Deaths  Recovered  Active  \
 0   

In [25]:
uniao = pd.concat(var)

In [27]:
uniao.to_csv("todocovid.csv")

In [6]:
response = requests.get(file)
html = response.content
soup = BeautifulSoup(html)

In [25]:
soup.find('div', attrs={'class':'BtnGroup'}).find('a')['href']

'/CSSEGISandData/COVID-19/raw/master/csse_covid_19_data/csse_covid_19_daily_reports/01-01-2021.csv'

In [26]:
# append it with github.com
csv_url = 'https://github.com' + soup.find_all('div', attrs={'class':'BtnGroup'})[-1].find_all('a')[0]['href']
csv_url

'https://github.com/CSSEGISandData/COVID-19/raw/master/csse_covid_19_data/csse_covid_19_daily_reports/01-01-2021.csv'

In [27]:
# extract date 
date = re.findall('\d{2}-\d{2}-\d{4}', csv_url)[0].replace('-','_')
date

'01_01_2021'

In [28]:
filename = 'corona_' + date + '.csv'
filename

'corona_01_01_2021.csv'

In [29]:
df = pd.read_csv(csv_url)
df.head()

Unnamed: 0,FIPS,Admin2,Province_State,Country_Region,Last_Update,Lat,Long_,Confirmed,Deaths,Recovered,Active,Combined_Key,Incident_Rate,Case_Fatality_Ratio
0,,,,Afghanistan,2021-01-02 05:22:33,33.93911,67.709953,52513,2201,41727,0.0,Afghanistan,0.0,4.252222
1,,,,Albania,2021-01-02 05:22:33,41.1533,20.1683,58316,1181,33634,23501.0,Albania,2026.409062,2.025173
2,,,,Algeria,2021-01-02 05:22:33,28.0339,1.6596,99897,2762,67395,29740.0,Algeria,227.809861,2.764848
3,,,,Andorra,2021-01-02 05:22:33,42.5063,1.5218,8117,84,7463,570.0,Andorra,10505.403482,1.034865
4,,,,Angola,2021-01-02 05:22:33,-11.2027,17.8739,17568,405,11146,6017.0,Angola,53.452981,2.305328


## Some simple data cleaning (TRANSFORM)

In [30]:
colnames = df.rename({'Province_State':'province', 
                      'Country_Region':'country',
                      'Admin2':'Admin'}, 
                     axis=1).columns

df.columns = [col.lower() for col in colnames]
df.head(2)

Unnamed: 0,fips,admin,province,country,last_update,lat,long_,confirmed,deaths,recovered,active,combined_key,incident_rate,case_fatality_ratio
0,,,,Afghanistan,2021-01-02 05:22:33,33.93911,67.709953,52513,2201,41727,0.0,Afghanistan,0.0,4.252222
1,,,,Albania,2021-01-02 05:22:33,41.1533,20.1683,58316,1181,33634,23501.0,Albania,2026.409062,2.025173


In [31]:
# normalize date format
df['last_update'] = pd.to_datetime(df['last_update'])
df.head()

Unnamed: 0,fips,admin,province,country,last_update,lat,long_,confirmed,deaths,recovered,active,combined_key,incident_rate,case_fatality_ratio
0,,,,Afghanistan,2021-01-02 05:22:33,33.93911,67.709953,52513,2201,41727,0.0,Afghanistan,0.0,4.252222
1,,,,Albania,2021-01-02 05:22:33,41.1533,20.1683,58316,1181,33634,23501.0,Albania,2026.409062,2.025173
2,,,,Algeria,2021-01-02 05:22:33,28.0339,1.6596,99897,2762,67395,29740.0,Algeria,227.809861,2.764848
3,,,,Andorra,2021-01-02 05:22:33,42.5063,1.5218,8117,84,7463,570.0,Andorra,10505.403482,1.034865
4,,,,Angola,2021-01-02 05:22:33,-11.2027,17.8739,17568,405,11146,6017.0,Angola,53.452981,2.305328


In [34]:
# normalize country names
df.country = df.country.str.replace('*','',regex=False)

In [64]:
'9'.zfill(2)

'09'

In [35]:
# create column
df['anomesdia'] = df.last_update.apply(lambda x : f'{str(x.year)}-{str(x.month).zfill(2)}-{str(x.day).zfill(2)}')
df.head()

Unnamed: 0,fips,admin,province,country,last_update,lat,long_,confirmed,deaths,recovered,active,combined_key,incident_rate,case_fatality_ratio,anomesdia
0,,,,Afghanistan,2021-01-02 05:22:33,33.93911,67.709953,52513,2201,41727,0.0,Afghanistan,0.0,4.252222,2021-01-02
1,,,,Albania,2021-01-02 05:22:33,41.1533,20.1683,58316,1181,33634,23501.0,Albania,2026.409062,2.025173,2021-01-02
2,,,,Algeria,2021-01-02 05:22:33,28.0339,1.6596,99897,2762,67395,29740.0,Algeria,227.809861,2.764848,2021-01-02
3,,,,Andorra,2021-01-02 05:22:33,42.5063,1.5218,8117,84,7463,570.0,Andorra,10505.403482,1.034865,2021-01-02
4,,,,Angola,2021-01-02 05:22:33,-11.2027,17.8739,17568,405,11146,6017.0,Angola,53.452981,2.305328,2021-01-02


## Store file

### In an ETL-process, logging is one of the most important things.

In [36]:
import logging
logging.basicConfig(level=logging.INFO)

In [37]:
logger = logging.getLogger('name')

In [38]:
logger.info('Testing simple log.')

INFO:name:Testing simple log.


### Logging with time is important

In [52]:
# reset config: logging.root.handlers = []
logging.root.handlers = []
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s.%(msecs)03d %(levelname)s - %(funcName)s: %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')

In [53]:
logger = logging.getLogger('test_log')

In [54]:
logger.info('Testing log. This log should show time information')

2021-11-15 16:20:16.825 INFO - <module>: Testing log. This log should show time information


### Logging to file is also important.

In [57]:
logging.root.handlers = []
logging.basicConfig(filename='test.log',
                    level=logging.INFO,
                    format='%(asctime)s.%(msecs)03d %(levelname)s - %(funcName)s: %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')

logger = logging.getLogger('test_log')

logger.info('Testing - this log should go to a file.')

## Connect to database

In [64]:
from sqlalchemy import create_engine
db_server='pymysql'
user='root'
db_port = '3306'
password = 'admin'
ip = 'localhost'
db_name = 'corona'
engine = create_engine(f'mysql+{db_server}://{user}:{password}@{ip}:{db_port}/{db_name}?charset=utf8')
conn = engine.connect()

In [65]:
f'mysql+{db_server}://{user}:{password}@{ip}:{db_port}/{db_name}?charset=utf8'

'mysql+pymysql://root:admin@localhost:3306/corona?charset=utf8'

In [61]:
table_name = filename.split('.')[0]
table_name

'corona_01_01_2021'

In [62]:
df.to_sql(table_name, conn, if_exists='replace', index=False)

In [63]:
engine.table_names()

['corona_01_01_2021']