# 01 ETL Pipelines

> “Without a systematic way to start and keep data clean, bad data will happen.” — Donato Diorio

![data_flow](https://cdn.dribbble.com/users/1752792/screenshots/5652276/media/12db9ebc672c30dcb4d0fd125f70fb41.png)

Source: [Mindaugas Sadūnas](https://dribbble.com/shots/5652276-User-flow/attachments/10982649?mode=media)

## Learning Outcomes

By the end of this lesson you will,
1. Understand better why we need to move the data from one point to another to the same time that we clean it.
2. Understand how to combine data that comes from different sources.
3. Have the knowledge of how to create your own data pipelines with Python.
4. Learn a little more how to manipulate and mold your data in the way you need it to be.
5. Understand how to visualize the pipelines you create to help you with their development.

## Tabla de Contenidos

1. What are ETL Pipes and Why Should You Learn to Create Them?
2. Tools for the Session
3. Our case for this workshop
4. Data
5. Small pipes with 🐼's 'pipe'
6. Extract
7. Transform
8. Download
9. Launch the Pipeline
10. Automate it
11. Summary

## 1. What are ETL Pipelines and why should you learn how to create them?

![etl_pipe](https://databricks.com/wp-content/uploads/2021/05/ETL-Process.jpg)

**What are ETL Pipes?**

The acronym ETL stands for Extract, Transform, and Load. This is the process the data we consume as analysts, data scientists, scientific researchers, ect ... passes through before it reaches our hands.

**Why should you learn to create them?**

As data professionals, our task is to create value for our organizations, our clients and our collaborators using all the data that we have at our disposal. However, to get the most out of the data we have on hand, we need
1. Information about the process by which the data was generated, For example,
    - Point of sale
    - Clicks on an online marketplace like Amazon, Etzy, Ebay, ect.
    - Epidemiological studies
    - ...
2. Information about the transformations that occurred during the cleaning and merging process. For instance,
    - Celcius degrees were converted to fahrenheit
    - Prices in Chilean pesos were converted to {insert your preferred 💸}
    - Non-numerical and unavailable observations now contain "Not Available"
    - Numerical observations now contain the average value of its respective variable, for example, a variable with the salary of all employees of a company now contains $40,000$ / year USD in the values that were not available
    - ...
3. Information about how the data was stored and where. For instance,
    - Parquet format
    - NOSQL or SQL database
    - CSV
    - ...

Understanding how the three processes described above flow will help us to have more knowledge about the data that we are going to use, and one of the best ways to understand that process is through the creation of data pipelines.

**Which Data Professionals Use These Pipes?**

- Data scientists
- Data analysts
- Data Engineers
- Machine Learning Engineers
- Programmers
- DevOps Engineers
- Social Sciences Researchers

In essence, understanding how data flows in your organization will help you
- Give clean data to your analysis while leaving the original data, the source of truth, intact.
- Detect inconsistencies in the original data.
- Use the time you have to analyze and report on your findings more efficiently.

## 2. Tools

The tools that we will use in the workshop are the following.

- [pandas](https://pandas.pydata.org/) - "is a fast, powerful, flexible and easy to use open source data analysis and manipulation tool, built on top of the Python programming language."
- [Prefect](https://docs.prefect.io/) - "is a new workflow management system, designed for a modern infrastructure and powered by the open source workflow engine, Prefect Core. Users organize tasks into `Tasks` and` Flows`, and Prefect takes care of the rest."
- [sqlite3](https://docs.python.org/3/library/sqlite3.html) - "SQLite is a library written in C that provides a lightweight disk-based database that does not require a separate server process and allows the database to be accessed using a non-standard variant of the SQL query language."

Before we continue, let's load the modules we'll need and examine a prefect example.

In [None]:
import pandas as pd
from prefect import task, Flow
import sqlite3
from os.path import join
from contextlib import closing
from prefect.tasks.database.sqlite import SQLiteScript

pd.options.display.max_rows = None
pd.options.display.max_columns = None

Imagine we have data on all wildfires between 1983-2020 in the United States.

You can find more information about the data [here](https://www.kaggle.com/kkhandekar/total-wildfires-acres-affected-1983-2020).

In [None]:
example_data_in = join("..", "data", "01_part", "example", "federal_firefighting_costs.csv")

In [None]:
pd.read_csv(example_data_in).head()

As you can see, most variables need a bit of fixing since in Python we can't, for example, have numbers with formats like `$70,890`. Also, since we will need the new data every month, we will create an ETL pipeline so that we don't have to repeat the process again and again.

When you use prefect you have two important APIs, one is `task` and the other is` Flow`. `task` is used as a decorator on top of functions and allows you to tell prefect that that function will take part in your data pipeline via the` Flow` API.

For example, let's create 3 functions, one that extracts the data we need, another that cleans it, and another that downloads it. Each function will the `task`decorator on top of it.

In [None]:
@task
def extract(path):
    return pd.read_csv(path)

As you saw above, only the last 5 variables have commas (`,`) and money symbols (`$`) so we will create a `for` loop and we will replace both with an empty space (` "" `).

For the download process, we will save the data in the `parquet` format. This is one of the most popular formats as it has a columnar orientation rather than a row orientation.

![colvsrow](https://3.bp.blogspot.com/-3aUydn8zCsQ/VjslzWCu3pI/AAAAAAAAAI8/XOi77xQNmm0/s1600/Difference-between-Column-based-and-Row-based-Tables.png)

Source: [SAP HANA Central](http://www.hanaexam.com/p/row-store-vs-column-store.html)

In [None]:
@task
def transform(data):
    for col in data.iloc[:, 1:].columns:
        data[col] = data[col].str.replace(',', '').str.replace('$', '').astype(int)
    return data

In [None]:
@task
def load(data, path):
    data.to_parquet(path, compression='snappy')

When we have all the steps ready, we create a context manager in Python using the `Flow` API. We can give this function a name, for example, `" ETL Example "` and then assign what happens inside our context manager to a variable named `flow` without capital letters (although it can be anything you'd like as well. Inside the context we can instantiate our 3 functions and link them all together.

In [None]:
example_data_out = join("..", "data", "01_part", "example", "my_test.parquet")

In [None]:
with Flow("Ejemplo ETL") as flow:
    data = extract(example_data_in)
    data_clean = transform(data)
    load(data_clean, example_data_out)

You can see the result of the steps to follow in our pipeline using `flow.visualize()` and you can start it with `flow.run()`.

In [None]:
flow.visualize()

In [None]:
flow.run()

To make sure we have the correct data, let's create a visualization with pandas and hvplot, a package that allows us to add interactivity to our pandas' charts.

In [None]:
import hvplot.pandas

In [None]:
pd.read_parquet(join("..", "data", "01_part", "example", "my_test.parquet")).hvplot(x='Year', y="ForestService")

## 3. Case Study

Imagine you work for a data science consultancy called Beautiful Analytics. Your boss tells you that she has a project for you in which you will work for three governments using data on shared bikes in the cities of London (England, UK), Seoul (South Korea), and Washington (DC, USA). The problem that each government wants to solve is the same,

**Challenge #1**

> How many bikes do we need to keep available in the city at every hour for the next few years? In other words, how many bikes will be rented at every hour of the day?

Each government captures similar data but, as you can imagine, they all use different words and measures in reference to the same variable, which means that our first job before we can answer the question above is to fix the data and put it in amore user-friendly way. By the way, what would really help us a lot is to automate the extraction, transformation and loading of our data since in the future, we will continue to receive the data from the governments. This means that our first real problem is,

**Challenge #0**

> Create a data pipeline that extracts, transforms and loads the necessary data.

## 4. Data

![bikes](https://camo.githubusercontent.com/87d0f6a329d5dd8915136dcf9b121b789bfa613abac31d591f5629cdfb072595/68747470733a2f2f696d672e6b6f72656174696d65732e636f2e6b722f75706c6f61642f6e65777356322f696d616765732f3230323130332f33653962353830316334333034386563613331623333303931373663386461392e6a7067)

All three data files contain similar information about how many bicycles have been rented each hour, day, week and month, for several years and for each city.

You can get more information about the data of each city using the following links.

- [Seoul, Korea del Sur](https://archive.ics.uci.edu/ml/datasets/Seoul+Bike+Sharing+Demand#)
- [London, England, UK](https://www.kaggle.com/hmavrodiev/london-bike-sharing-dataset)
- [Washington, DC, USA](https://www.kaggle.com/marklvl/bike-sharing-dataset?select=hour.csv)

Here are the variables that appear in the three data files.

| London | Seoul | Washington |
|:------:|:------:|:------:|
| date            | date            | instant   |
| count           | count           | date      |
| temperature     | hour            | seasons   |
| temp_feels_like | temperature     | year      |
| humidity        | humidity        | month     |
| wind_speed      | wind_speed      | hour           |
| weather_code    | visibility      | is_holiday     |
| is_holiday      | dew_point_temp  | weekday        |
| is_weekend      | solar_radiation | workingday     |
| seasons         | rainfall        | weathersit     |
|                 | snowfall        | temperature    |
|                 | seasons         | temp_feels_like |
|                 | is_holiday      | humidity        |
|                 | functioning_day | wind_speed      |
|                 |                 | casual     |
|                 |                 | registered |
|                 |                 | count      |


In [None]:
london_path = join('..', 'data', "01_part", 'raw', 'london', 'london_bikes.db')
seoul_path = join('..', 'data', "01_part", 'raw', 'seoul', 'SeoulBikeData.csv')
wash_dc_path = join('..', 'data', "01_part", 'raw', 'wash_dc', 'washington.json')

We will need to save our new file in a database or in a format in which we are given both what the file occupies and the speed with which we can open and use it. So we will create two paths and two names for the files that we will use later.

In [None]:
clean_path = join('..', 'data', "01_part", 'processed', 'clean.parquet')
clean_db_path = join('..', 'data', "01_part", 'processed', 'bikes.db')

The data we have about the bikes in London is in a SQLite database and to read it we first need to create a connection to the database. The next step is to use the pandas `read_sql_query` function to read the data. This function takes as an argument two things, the program to grab the data and the connection to the database.

In [None]:
conn = sqlite3.connect(london_path)
query = "SELECT * FROM uk_bikes"

In [None]:
london = pd.read_sql_query(query, conn)
london.head()

Seoul data is in text form and separated by commas, and the data for DC is in JSON format. For these two we can use `pd.read_csv` and` pd.read_json`, respectively.

In [None]:
seoul = pd.read_csv(seoul_path)
seoul.head()

In [None]:
washington = pd.read_json(wash_dc_path)
washington.head()

## 5. Data Pipelines with 🐼's `pipe`

![](https://camo.githubusercontent.com/45ae53e215244585378c3e414ce05abb4f5f6be3/68747470733a2f2f6d656469612e67697068792e636f6d2f6d656469612f4978365150753533576c4236772f67697068792e676966)

The `pipe` operator is a pandas function that allows you to chain operations that take a data set, modify it, and return the modified version of the original data. In essence, it allows us to move the data through a series of steps until we reach the structure in which we want to have it.

For example, imagine that we have a group of data and 4 functions to fix it, the chain of operations would look like this.

```python 
(data.pipe(change_cols, list_of_cols)
     .pipe(clean_numeric_vars, list_of_numeric_vars)
     .pipe(add_dates_and_location, 'Auckland', 'NZ')
     .pipe(fix_and_drop, 'column_to_fix', seasons_NZ, cols_drop_NZ))
```

Another way to visualize what happens with pandas' `pipe` is through the following food process where we have ingredients and we actually require food.

![img](../images/pandas_pipe.png)

Let's start with a small example without `pipe` first.

In [None]:
toy_data = pd.DataFrame({"Postal Codes": [22345, 32442, 20007], 
                         "Cities": ["Miami", "Dallas", "Washington"],
                         "Date": pd.date_range(start='9/27/2021', periods=3)})
toy_data

In [None]:
def change_cols(data, cols_list):
    data.columns = cols_list
    return data

In [None]:
change_cols(toy_data, ["postal_code", "city", "date"])

As you can see, with a single function it doesn't make much sense to pipe the process but with a chain of functions, the story changes.

And since we have columns with different names, let's create 3 lists with the same names for the three data files since we will need them soon.

In [None]:
london_cols = ['date', 'count', 'temperature', 'temp_feels_like', 'humidity', 'wind_speed', 'weather_code', 'is_holiday', 'is_weekend', 'seasons']
seoul_cols = ['date', 'count', 'hour', 'temperature', 'humidity', 'wind_speed', 'visibility', 'dew_point_temp', 'solar_radiation', 'rainfall', 'snowfall', 'seasons', 'is_holiday', 'functioning_day']
wa_dc_cols = ['instant', 'date', 'seasons', 'year', 'month', 'hour', 'is_holiday', 'weekday', 'workingday', 'weathersit', 'temperature', 'temp_feels_like', 'humidity', 'wind_speed', 'casual', 'registered', 'count']

The next thing we want to do is add additional information about the dates we have for each file. We can achieve this after converting the `date` variable to `datetime` format, which will allow us to access the year, month, week, ect. inside each date.

In [None]:
def add_dates_and_location(data, city, country):
    
    data['date'] = pd.to_datetime(data['date'])
    data["year"] = data['date'].dt.year
    data["month"] = data['date'].dt.month
    data["week"] = data['date'].dt.isocalendar().week.astype(int)
    data["day"] = data['date'].dt.day
    data["weekday"] = data['date'].dt.dayofweek
    data["is_weekend"] = (data["weekday"] > 4).astype(int)
    # note that we don't want to overwrite data that already has the hour as a column
    if 'hour' not in data.columns: 
        data["hour"] = data['date'].dt.hour
    data['date'] = data['date'].dt.date
    data['city'] = city
    data['country'] = country
    
    return data

In [None]:
add_dates_and_location(toy_data, "Sydney", "AU")

As you can see, we added a lot of information to our data with a simple function, but what happens when we want to chain two or three or four operations together? The following would not be very easy to read right? `add_dates_and_location (change_cols (toy_data, [" postal_code "," city "," date "])," Sydney "," AU ")`. Let's move on to using our `pipe` operator now.

In [None]:
toy_data = pd.DataFrame({"Postal Codes": [22345, 32442, 20007], 
                         "Cities": ["Miami", "Dallas", "Washington"],
                         "Date": pd.date_range(start='9/27/2021', periods=3)})

In [None]:
(
    toy_data.pipe(change_cols, ["zip_code", "city", "date"])
            .pipe(add_dates_and_location, "Sydney", "AU")
)

As you can see, now the chain of our functions is more readable than before and we can continue and chain even more functions in the same fashion.

In our data we have the stages of the year with different names and we also have variables that we do not need or that are not in the three files. Let's fix both!

In [None]:
seasons_london = {0: 'Spring', 1: 'Summer', 2: 'Fall', 3: 'Winter'}
seasons_wa_dc = {1: 'Spring', 2: 'Summer', 3: 'Fall', 4: 'Winter'}
holidays_seoul = {'No Holiday': 0, 'Holiday': 1}

In [None]:
cols_drop_london = ['temp_feels_like', 'weather_code']
cols_drop_seoul = ['visibility', 'dew_point_temp', 'solar_radiation', 'rainfall', 'snowfall', 'functioning_day']
cols_drop_wa_dc = ['instant', 'temp_feels_like', 'casual', 'registered', 'workingday', 'weathersit']

In [None]:
def fix_and_drop(data, col_to_fix, mapping, cols_to_drop):
    data[col_to_fix] = data[col_to_fix].map(mapping)
    return data.drop(cols_to_drop, axis=1)

Let's test the `pipe` operator but with the data from DC now and with the list of columns that we created a while ago.

In [None]:
washington.head()

In [None]:
(washington.pipe(change_cols, wa_dc_cols)
           .pipe(fix_and_drop, 'seasons', seasons_wa_dc, cols_drop_wa_dc)).head()

Lastly, we need to normalize the data for DC as the columns have been altered a bit.

In [None]:
def normalize_vars(data):
    data['temperature'] = data['temperature'].apply(lambda x: (x * 47) - 8)
    data['humidity'] = data['humidity'].apply(lambda x: (x / 100))
    data['wind_speed'] = data['wind_speed'].apply(lambda x: (x / 67))
    return data

In [None]:
def extract_data(path):
    return pd.read_json(path)

Finally, we can use our `pipe` operator again to complete the process and see the end result.

In [None]:
washington = (extract_data(wash_dc_path).pipe(change_cols, wa_dc_cols)
                                        .pipe(add_dates_and_location, 'DC', 'USA')
                                        .pipe(fix_and_drop, 'seasons', seasons_wa_dc, cols_drop_wa_dc)
                                        .pipe(normalize_vars))
washington.head()

## Exercise

1. Create a function to extract the London data.
2. Create a data pipe similar to the one in Washington using pandas' `pipe`.

## 6. Extract

Depending on where the data is, in what format it is stored, and how we can access it, this can either be one of the shortest steps or one of the longest ones in our data pipeline. Here are some of the formats that you might encounter in your day to day.

- Text: usually the text format is similar to what we see in Microsoft Excel but without formulas or graphics. For example, CVS or TSV.
- JSON: JavaScript Object Notation is a very popular sub-language for its simple syntactics
- Databases: These can be SQL, NOSQL, MPP (massively parallel processing), among others.
- GeoJSON: It is a type of format for data that contains geographic information. There are many more types of data for GIS.
- HTML: Refers to Hyper Text Markup Language and represents the skeleton of almost all web pages in existence.
- ...

Since we already learned how to create useful pipelines with pandas, we now need to create functions for our main ETL pipeline to automate the process. We can achieve this using the prefect decorator, `@task` from earlier. This decorator takes note of the functions that we want to link together and helps us create a network in which each node is a function and each link connects one or more functions only once.

Remember, the decorator `@task` is also a function within prefect and as such, we can pass several arguments to it that help us modify the behavior of each function in our pipeline.

You can learn more about the `task` API in the official docs [here](https://docs.prefect.io/core/concepts/tasks.html#overview).

In [None]:
task??

In [None]:
@task
def extract_1(path):
    return pd.read_csv(path)

In [None]:
@task
def extract_2(path):
    conn = sqlite3.connect(path)
    query = "SELECT * FROM uk_bikes"
    return pd.read_sql_query(query, conn)

In [None]:
@task
def extract_3(path):
    return pd.read_json(path)

## 7. Transform

The most common transformations that happen at this stage are usually the ones we created earlier. In short,

- Clean text data
- Normalize columns
- Convert numeric variables to the same unit
- Deal with missing values
- Join data

Our transform step will be a parent function to our pipe operators from earlier. Hence, a combination of functions handle by a single one.

In [None]:
def order_and_merge(data_lists):
    
    pick_order = data_lists[0].columns
    new_list = [d.reindex(columns=pick_order).sort_values(['date', 'hour']) for d in data_lists]
    df = pd.concat(new_list)
    return df

In [None]:
@task
def transform(london, seoul, washington):
    
    london = (london.pipe(change_cols, london_cols)
                    .pipe(add_dates_and_location, 'London', 'UK')
                    .pipe(fix_and_drop, 'seasons', seasons_london, cols_drop_london))
    
    seoul = (seoul.pipe(change_cols, seoul_cols)
                  .pipe(add_dates_and_location, 'Seoul', 'SK')
                  .pipe(fix_and_drop, 'is_holiday', holidays_seoul, cols_drop_seoul))
    
    wash_dc = (washington.pipe(change_cols, wa_dc_cols)
                         .pipe(add_dates_and_location, 'DC', 'USA')
                         .pipe(fix_and_drop, 'seasons', seasons_wa_dc, cols_drop_wa_dc)
                         .pipe(normalize_vars))
    
    return order_and_merge([london, seoul, wash_dc])

## 8. Load

We will have a function for saving the data in a database. For this, Prefect provides us with a wrapper function around SQLite called, `SQLiteScript` which allows us to create a database and run a SQL scipt on top of it. This comes in hady for large and small operations/prototypes alike.

In [None]:
new_table = SQLiteScript(
    db=clean_db_path,
    script="""CREATE TABLE IF NOT EXISTS bike_sharing (date text, count integer, temperature real, humidity real,
              wind_speed real, is_holiday real, is_weekend integer, seasons text, year integer,
              month integer, week integer, day integer,hour integer, weekday integer, city text,
              country text)"""
    )

In [None]:
@task
def load(data, path_and_name):
    
    data = list(data.itertuples(name='Bikes', index=False))

    insert_cmd = "INSERT INTO bike_sharing VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
    with closing(sqlite3.connect(path_and_name)) as conn:
        with closing(conn.cursor()) as cursor:
            cursor.executemany(insert_cmd, data)
            conn.commit()

## 9. Lanza La Tuberia

In the same fashion as before, we will combine our functions in a `Flow` with a context manager but this time, we will set up an upstream task with Prefect's `set_upstream()` function as we need our database to be created before we can load the data in it. As you can imagine by the name of the function, it is also possible to set up downstream tasks if need be.

In [None]:
with Flow('bikes-ETL') as flow:
    
    the_table = new_table()
    
    london = extract_2(london_path)
    seoul = extract_1(seoul_path)
    wash_dc = extract_3(wash_dc_path)
    
    transformed = transform(london, seoul, wash_dc)
        
    data_loaded = load(transformed, clean_db_path)
    data_loaded.set_upstream(the_table)

In [None]:
flow.visualize()

In [None]:
flow.run()

In [None]:
pd.read_sql_query("SELECT * FROM bike_sharing", sqlite3.connect(clean_db_path)).head()

## Exercise

Change the function to unload (`load ()`) and make it save the results in `parquet` format. Run the pipeline again and make sure the results are the same as above by reading the data with pandas' respective function for parquet files.

## 10. Automate

Our boss tells us that the data of the 3 cities will be updated every Saturday so we have to automate the interval in which we want our program to run. For that we have the `IntervalSchedule` function in prefect, which allows us to set the time interval we need. Whether it's a minute, two weeks, or a month, adding this detail to our pipeline is a trivial task.

In [None]:
from prefect.schedules import IntervalSchedule
import datetime

In [None]:
schedule = IntervalSchedule(interval=datetime.timedelta(minutes=1), 
                            # start_date=datetime.datetime(2021, 11, 5)
                           )

In [None]:
with Flow('bikes-ETL', schedule=schedule) as flow:
    
    the_table = new_table()
    
    london = extract_2(london_path)
    seoul = extract_1(seoul_path)
    wash_dc = extract_3(wash_dc_path)
    
    transformed = transform(london, seoul, wash_dc)
        
    data_loaded = load(transformed, clean_db_path)
    data_loaded.set_upstream(the_table)

In [None]:
flow.run()

To learn more about how to program and update your pipes, please visit the official documentation at [prefect schedules](https://docs.prefect.io/core/concepts/schedules.html#simple-schedules).

## 11. Summary

1. Creating ETL pipes helps you save time cleaning your data.
2. pandas `pipe` helps you create chains of functions and save time and lines of code.
3. Prefect now time since it chains more functions for you and helps you create schedules for your functions.
4. No matter what type of professional you are, moving and cleaning your data is an invaluable tool that is worth knowing.