# ETL Pipelines 4 Data Professionals

> “Sin una forma sistemática de iniciar y mantener los datos limpios, los datos incorrectos ocurrirán.” — Donato Diorio

![data_flow](https://cdn.dribbble.com/users/1752792/screenshots/5652276/media/12db9ebc672c30dcb4d0fd125f70fb41.png)

Source: [Mindaugas Sadūnas](https://dribbble.com/shots/5652276-User-flow/attachments/10982649?mode=media)

## Resultados del Aprendizaje

Al final de este taller,
1. Entenderas mejor por que necesitamos mover los datos de un punto a otro a la misma ves que los limpiamos.
2. Comprenderas como combinar datos que vienen de diferentes fuentes
3. Tendras el conocimiento de como crear tus proprias tuberias de datos con Python
4. Aprenderas un poco mas como manipular y moldear tus datos en la forma en que los necesitas.
5. Entenderas como visualisar las tuberias que creas para ayudarte en su desarrollo.

## Tabla de Contenidos

1. Que son las Tuberias ETL y Por Que Deberias Aprender a Crearlas?
2. Herramientas Para la Sesion
3. Nuestro Caso Para Este Taller
4. Datos
5. Tuberias Pequeñas con 🐼's `pipe`
6. Extraer
7. Transformar
8. Descargar
9. Lanza La Tuberia
10. Automatizar
11. Resumen

## 1. Que son las Tuberias ETL y Por Que Deberias Aprender a Crearlas?

![etl_pipe](https://databricks.com/wp-content/uploads/2021/05/ETL-Process.jpg)

**Que son las Tuberias ETL?**

El acrónimo ETL significa Extract (Extraer), Transform (Transformar), y Load (Descargar). Este es el proceso por el cual pasan los datos que consumimos como analistas, científicos de datos, investigadores científicos, ect..., antes de que lleguen a nuestras manos.

**Por Que Deberias Aprender a Crearlas?**

Como profesionales de datos, nuestro tarea es crear valor para nuestras organizaciones, nuestros clientes y nuestros colaboradores usando todos los datos que tengamos a nuestra disposicion. Sin embargo, para sacarle el maximo provecho a los datos que tenemos a mano, necesitamos
1. Informacion sobre proceso por el cual se generaron los datos, Por ejemplo,
    - Punto de ventas
    - Clicks en un mercado en línea como Amazon, Etzy, Ebay, ect.
    - Estudio epidemiológico
    - ...
2. Informacion acerca de las transformaciones que ocurrieron durante el proceso de limpieza y combinacion. Por ejemplo,
    - Grados Celcius fueron convertidos a fahrenheit
    - Precios en pesos Chilenos fueron convertidos a {inserta tu 💸 preferida}
    - Observaciones no numericas y no disponibles ahora contienen "No Disponible"
    - Observaciones numericas ahora contienen el valor promedio de su respectiva variable, por ejemplo, un variable con el salario de todos los empleados de una compañía ahora contiene $40,000$/año USD en los valores que no estaban disponibles
    - ...
3. Informacion acerca de como se almacenaron los datos y en donde. Por ejemplo,
    - Parquet format
    - NOSQL or SQL base de datos
    - CSV
    - ...

El entender como fluyen los tres procesos descritos arriba nos ayudara a tener mas conocimiento acerca de los datos que vamos a usar, y una de las mejores maneras para entender ese proceso es a traves de la creacion de tuberias de datos.

**Cuales Profesionales de Datos Usan Estas Tuberias?**

- Cientificos de datos
- Analistas de datos
- Ingenieros de datos
- Machine Learning Engineers
- Programadores
- DevOps Engineers
- Investigadores de Ciencias Sociales

En conclusion, entender como fluyen los datos en tu organizacion te ayudara a
- Utilizar los datos limpios para tus analysis mientras dejas los datos originales intactos.
- Detectar inconsistencias en los datos originales.
- Usar el tiempo que tienes para analizar e informar sobre tus hallazgos de manera mas eficiente.

## 2. Herramientas Para la Sesion

Las herramientas que utilizaremos en el taller son las siguientes.

- [pandas](https://pandas.pydata.org/) - "es una herramienta de análisis y manipulación de datos de código abierto rápida, potente, flexible y fácil de usar, construida sobre el lenguaje de programación Python."
- [Prefect](https://docs.prefect.io/) - "es un nuevo sistema de gestión de flujo de trabajos, diseñado para una infraestructura moderna e impulsado por el motor de flujo de trabajo de código abierto llamado, Prefect Core. Los usuarios organizan las tareas en `Tasks` y `Flows`, y Prefect se encarga del resto."
- [sqlite3](https://docs.python.org/3/library/sqlite3.html) - "SQLite es una biblioteca escrita en C que proporciona una base de datos ligera basada en disco que no requiere un proceso de servidor independiente y permite acceder a la base de datos mediante una variante no estándar del lenguaje de consulta SQL."

Antes de continuar, carguemos los modulos que necesitaremos y examinemos un ejemplo de prefect.

In [None]:
import pandas as pd
from prefect import task, Flow
import sqlite3
from os.path import join
from contextlib import closing
from prefect.tasks.database.sqlite import SQLiteScript

pd.options.display.max_rows = None
pd.options.display.max_columns = None

Imaginate que tenemos unos datos acerca de todos incendios forestales entre 1983-2020 en los Estados Unidos.

Puedes encontrar mas informacion sober los datos [aqui](https://www.kaggle.com/kkhandekar/total-wildfires-acres-affected-1983-2020).

In [None]:
pd.read_csv(join("..", "data", "example",
                 "Federal Firefighting Costs (Suppression Only).csv")).head()

Como puedes ver, la majoria de las variables necesitan un poco de arreglo ya que en Python no podemos, por ejemplo, numeros con formatos como `$70,890`. Tambien, ya que necesitaremos los nuevos datos todos los meses, crearemos una tuberia ETL para no tener que repetir el proceso de nuevo.

Cuando usas prefect tienes dos API's importantes, una es `task` y la otra es `Flow`. `task` se usa como un decorador arriba de funciones y te permite decirle a prefect que esa funcion tomara parte en tu tuberia de datos a traves del `Flow` API.

Por ejemplo, creemos 3 funciones, una que extraiga los datos que necesitamos, otra que los limpie, y otra que los descargue, y pongamoles a las 3 el decorados `task`.

In [None]:
path = join("..", "data", "example", "Federal Firefighting Costs (Suppression Only).csv")

In [None]:
@task
def extract(path):
    return pd.read_csv(path)

Como viste arriba, solo las ultimas 5 variables tienen comas (`,`) y simbolos de dinero (`$`) asi que crearemos un `for` loop y a cada una de las variables les reemplazaremos ambas por un espacio vacio (`""`).

Para el proceso de descarga, guardaremos los datos en el formato `parquet`. Este es uno de los formatos mas populares ya que tiene una orientacion columnar en ves de por fila.

![colvsrow](https://3.bp.blogspot.com/-3aUydn8zCsQ/VjslzWCu3pI/AAAAAAAAAI8/XOi77xQNmm0/s1600/Difference-between-Column-based-and-Row-based-Tables.png)

Source: [SAP HANA Central](http://www.hanaexam.com/p/row-store-vs-column-store.html)

In [None]:
@task
def transform(data):
    for col in data.iloc[:, 1:].columns:
        data[col] = data[col].str.replace(',', '').str.replace('$', '').astype(int)
    return data

In [None]:
@task
def load(data, path):
    data.to_parquet(path, compression='snappy')

Cuando tenemos todos los pasos listos, creamos un gestor de contexto en Python usando el `Flow` API. A esta funcion le podemos dar un nombre, por ejemplo, `"Ejemplo ETL"` y despues asignar lo que pasa adentro del contexto a una variable de nombre `flow` (sin mayuscula). Adentro del contexto podemos instanciar nuestras 3 funciones y enlazar una con la otra.

In [None]:
with Flow("Ejemplo ETL") as flow:
    data = extract()
    data_clean = transform(data)
    load(data_clean, join("..", "data", "example", "my_test.parquet"))

Puedes ver el resultado de los pasos a seguir en nuestra tuberia usando `flow.visualize()` y puedes iniciarla con `flow.run()`. 

In [None]:
flow.visualize()

In [None]:
flow.run()

Para cerciorarnos de que tenemos los datos correctos, creemos una visualisacion con pandas y hvplot que nos permite agregar interactividad a nuestros graficos.

In [None]:
import hvplot.pandas

In [None]:
pd.read_parquet(join("..", "data", "example", "my_test.parquet")).hvplot(x='Year', y="ForestService")

## 3. Nuestro Caso Para Este Taller

Imagina que trabajas para una consultoria de ciencias de datos que se llama, Beautiful Analytics. Tu jefa te dice que tiene un projecto para ti en el cual trabajaras para tres gobiernos usando datos sobre las bicicletas compartidas en las ciudades de Londres (England, UK), Seoul (South Korea), y Washington (DC, USA). El problema que cada gobierno quere resolver es el mismo,

**Desafio # 1**

> cuantas bicicletas necesitamos mantener disponible en la ciudad a cada hora durante los proximos años?

Cada gobierno captura datos similares pero, como ya te puedes imaginar, todos usan palabras y medidas diferentes en referencia a la misma variable. Lo que quiere decir que nuestro primer trabajo antes de poder responder la prgunta de arriba es, arreglar los datos y ponerlos de una manera en la que los podamos usar mejor. De paso, lo que de verdad nos ayudaria un monton es automatizar la extraccion, transformacion y descarga de nuestros datos cuando ya esten limpios, ya que en el futuro seguiremos recibiendo los datos por parte de los gobiernos. Esto quiere decir que nuestro primer real problema es,

**Desafio # 0**

> Crea una tuberia de datos que extraiga, transforme y descargue los datos necesarios.

## 4. Datos

![bikes](https://camo.githubusercontent.com/87d0f6a329d5dd8915136dcf9b121b789bfa613abac31d591f5629cdfb072595/68747470733a2f2f696d672e6b6f72656174696d65732e636f2e6b722f75706c6f61642f6e65777356322f696d616765732f3230323130332f33653962353830316334333034386563613331623333303931373663386461392e6a7067)

Los tres archivos de datos contienen informacion similar acerca de cuantas bicicletas se han necesitado a cada hora durante varios anos, para cada ciudad.

Puedes obtener mas informacion acerca de los datos de cada ciudad usando los siguientes enlaces.

- [Seoul, Korea del Sur](https://archive.ics.uci.edu/ml/datasets/Seoul+Bike+Sharing+Demand#)
- [London, England, UK](https://www.kaggle.com/hmavrodiev/london-bike-sharing-dataset)
- [Washington, DC, USA](https://www.kaggle.com/marklvl/bike-sharing-dataset?select=hour.csv)

Aqui estan las variables que aparecen and los tres archivos de datos.

| London | Seoul | Washington |
|:------:|:------:|:------:|
| date            | date            | instant   |
| count           | count           | date      |
| temperature     | hour            | seasons   |
| temp_feels_like | temperature     | year      |
| humidity        | humidity        | month     |
| wind_speed      | wind_speed      | hour           |
| weather_code    | visibility      | is_holiday     |
| is_holiday      | dew_point_temp  | weekday        |
| is_weekend      | solar_radiation | workingday     |
| seasons         | rainfall        | weathersit     |
|                 | snowfall        | temperature    |
|                 | seasons         | temp_feels_like |
|                 | is_holiday      | humidity        |
|                 | functioning_day | wind_speed      |
|                 |                 | casual     |
|                 |                 | registered |
|                 |                 | count      |


In [None]:
london_path = join('..', 'data', 'raw', 'london', 'london_bikes.db')
seoul_path = join('..', 'data', 'raw', 'seoul', 'SeoulBikeData.csv')
wash_dc_path = join('..', 'data', 'raw', 'wash_dc', 'washington.json')

Necesitaremos guardar nuestro nuevo archivo en una base de datos o en un formato en el cual se nos facilite tanto lo que ocupa el archivo como la velocidad con la que lo podemos abrir e utilizar. Por eso crearemos dos caminos y dos nombres para los archivos que usaremos luego.

In [None]:
clean_path = join('..', 'data', 'processed', 'clean.parquet')
clean_db_path = join('..', 'data', 'processed', 'bikes.db')

Los datos que tenemos de las bicicletas en Londres estan en una base de datos SQLite y para leerlos primero necesitamos crear una coneccion a la base de datos. El siguiente paso es usar la funcion de pandas `read_sql_query` para leer los datos. Esta funcion toma como argumento dos cosas, el programa para agarrar los datos y la coneccion a la base de datos.

In [None]:
conn = sqlite3.connect(london_path)
query = "SELECT * FROM uk_bikes"

In [None]:
london = pd.read_sql_query(query, conn)
london.head()

Los datos de Seoul estan en forma de texto y separados por comas, y los datos de Washington estan en el formato JSON. Para estos dos podemos utilizar `pd.read_csv` y `pd.read_json`, respectivamente.

In [None]:
seoul = pd.read_csv(seoul_path)
seoul.head()

In [None]:
washington = pd.read_json(wash_dc_path)
washington.head()

## 5. Tuberias de datos con 🐼's `pipe`

![](https://camo.githubusercontent.com/45ae53e215244585378c3e414ce05abb4f5f6be3/68747470733a2f2f6d656469612e67697068792e636f6d2f6d656469612f4978365150753533576c4236772f67697068792e676966)

El operador `pipe` es una función de pandas que te permite encadenar operaciones que toman un conjunto de datos, lo modifican y te devuelven la versión modificada de los datos originales. En esencia, nos permite mover los datos a través de una serie de pasos hasta que alcancemos la estructura que deseamos.

Por ejemplo, imaginate que tenemos un grupo de datos y 4 funciones para arreglarlo, la cadena se veria de la siguiente manera.

```python 
(data.pipe(change_cols, list_of_cols)
     .pipe(clean_numeric_vars, list_of_numeric_vars)
     .pipe(add_dates_and_location, 'Auckland', 'NZ')
     .pipe(fix_and_drop, 'column_to_fix', seasons_NZ, cols_drop_NZ))
```

Otra manera de visualizar lo que sucede con pandas' `pipe` es a traves de la siguiente imagen.

![img](images/pandas_pipe.png)

Empecemos con un pequeño ejemplo primero sin usar `pipe`.

In [None]:
toy_data = pd.DataFrame({"Postal Codes": [22345, 32442, 20007], 
                         "Cities": ["Miami", "Dallas", "Washington"],
                         "Date": pd.date_range(start='9/27/2021', periods=3)})
toy_data

In [None]:
def change_cols(data, cols_list):
    data.columns = cols_list
    return data

In [None]:
change_cols(toy_data, ["postal_code", "city", "date"])

Como puedes ver, con una solo funcion no tiene mucho sentido pasarla por el `pipe` pero con una cadena de funciones, la historia cambia.

Y a que tenemos columnas con nombres differentes, creemos 3 listas con los mismos nombres para los tres archivos de datos ya que pronto los necesitaremos.

In [None]:
london_cols = ['date', 'count', 'temperature', 'temp_feels_like', 'humidity', 'wind_speed', 'weather_code', 'is_holiday', 'is_weekend', 'seasons']
seoul_cols = ['date', 'count', 'hour', 'temperature', 'humidity', 'wind_speed', 'visibility', 'dew_point_temp', 'solar_radiation', 'rainfall', 'snowfall', 'seasons', 'is_holiday', 'functioning_day']
wa_dc_cols = ['instant', 'date', 'seasons', 'year', 'month', 'hour', 'is_holiday', 'weekday', 'workingday', 'weathersit', 'temperature', 'temp_feels_like', 'humidity', 'wind_speed', 'casual', 'registered', 'count']

Lo siguiente que queremos hacer es agregar informacion adicional acerca de las fechas que tenemos para cada archivo. Esto lo podemos lograr despues de convertir la variable `date` a formato `datetime`, el cual nos permitira accesar el año, mes, semana, ect. de adentro de cada fecha.

In [None]:
def add_dates_and_location(data, city, country):
    
    data['date'] = pd.to_datetime(data['date'])
    data["year"] = data['date'].dt.year
    data["month"] = data['date'].dt.month
    data["week"] = data['date'].dt.isocalendar().week.astype(int)
    data["day"] = data['date'].dt.day
    data["hour"] = data['date'].dt.hour
    data["weekday"] = data['date'].dt.dayofweek
    data["is_weekend"] = (data["weekday"] > 4).astype(int)
    data['date'] = data['date'].dt.date
    data['city'] = city
    data['country'] = country
    
    return data

In [None]:
add_dates_and_location(toy_data, "Sydney", "AU")

Como puedes ver, agregamos un monton de informacion a nuestros datos con una simple funcion, pero que pasa cuando queremos encadenar dos o tres or cuatro? Lo siguiente no estaria muy facil de leer cierto? `add_dates_and_location(change_cols(toy_data, ["postal_code", "city", "date"]), "Sydney", "AU")`. Movamonos al `pipe` operator.

In [None]:
toy_data = pd.DataFrame({"Postal Codes": [22345, 32442, 20007], 
                         "Cities": ["Miami", "Dallas", "Washington"],
                         "Date": pd.date_range(start='9/27/2021', periods=3)})

In [None]:
(
    toy_data.pipe(change_cols, ["zip_code", "city", "date"])
            .pipe(add_dates_and_location, "Sydney", "AU")
)

Como puedes ver, ahora la cadena de nuestras funciones es mas legible que antes y podemos continuar y encadenar aun mas funciones en este mismo estylo.

En nuestros datos tenemos las etapas del año con diferentes nombres y tambien tenemos variables que no necesitamos o que no estan en los tres archivos. Arreglemos ambas!

In [None]:
seasons_london = {0: 'Spring', 1: 'Summer', 2: 'Fall', 3: 'Winter'}
seasons_wa_dc = {1: 'Spring', 2: 'Summer', 3: 'Fall', 4: 'Winter'}
holidays_seoul = {'No Holiday': 0, 'Holiday': 1}

In [None]:
cols_drop_london = ['temp_feels_like', 'weather_code']
cols_drop_seoul = ['visibility', 'dew_point_temp', 'solar_radiation', 'rainfall', 'snowfall', 'functioning_day']
cols_drop_wa_dc = ['instant', 'temp_feels_like', 'casual', 'registered', 'workingday', 'weathersit']

In [None]:
def fix_and_drop(data, col_to_fix, mapping, cols_to_drop):
    data[col_to_fix] = data[col_to_fix].map(mapping)
    return data.drop(cols_to_drop, axis=1)

Probemos el `pipe` pero con los datos de Washington, DC ahora y con la lista de columnas que creamos hace rato.

In [None]:
washington.head()

In [None]:
(washington.pipe(change_cols, wa_dc_cols)
           .pipe(fix_and_drop, 'seasons', seasons_wa_dc, cols_drop_wa_dc)).head()

Por último, necesitamos normalizar los datos para Washington DC ya que las columnas se han alterado un poco.

In [None]:
def normalize_vars(data):
    data['temperature'] = data['temperature'].apply(lambda x: (x * 47) - 8)
    data['humidity'] = data['humidity'].apply(lambda x: (x / 100))
    data['wind_speed'] = data['wind_speed'].apply(lambda x: (x / 67))
    return data

In [None]:
def extract_data(path):
    return pd.read_json(path)

Finalmente, podemos usar nuestro operador de tubería de nuevo para completar el proceso.

In [None]:
washington = (extract_data(wash_dc_path).pipe(change_cols, wa_dc_cols)
                                        .pipe(add_dates_and_location, 'DC', 'USA')
                                        .pipe(fix_and_drop, 'seasons', seasons_wa_dc, cols_drop_wa_dc)
                                        .pipe(normalize_vars))
washington.head()

## Ejercicio

1. Crea una funcion para extraer los datos de Londres.
2. Crea una tuberia de datos similar a la de Washington usando pandas' `pipe`.

## 6. Extraer

Dependiendo de donde esten los datos, en que formato esten almacenados, y como podemos accesarlos, este puede ser uno de los pasos mas cortos tanto como el mas largo en nuestra tuberia de datos. Aqui estan algunos de los formatos que podrias encontrar en tu dia a dia.

- Texto: usualmente el formato de texto es similar al que vemos en Microsoft Excel pero sin formulas o graficos. Por ejemplo, CVS o TSV.
- JSON: JavaScript Object Notation es un sub-lenguage bastante popular por su sintactica simple
- Databases: Estas pueden ser SQL, NOSQL, MPP (massively parallel processing), entre otras.
- GeoJSON: Es un tipo de formato para datos que contienen informacion geografica. Existen muchos mas tipos de datos para GIS.
- HTML: Se refiere a Hyper Text Markup Language y representa el esqueleto de casi todas las paginas web en existencia.
- ...

Ya que aprendimos a como crear tuberias con pandas, ahora necesitamos crear funciones para nuestra tuberia ETL principal y esto lo podemos lograr usando el decorador de prefect, `@task`. Este decorador recuerda las funciones que queremos enlazar y nos ayuda a crear una red en la cual cada nodo es una funcion y cada enlaze conecta una o mas funciones una sola vez.

Recuerda, al decorador `@task` es una funcion dentro de prefect y como tal, le podemos pasar varios argumentos que nos ayudan a modificar el comportamiento de cada function en nuestra tuberia.

Puedes aprender mas acerca del `task` API en los [documentos oficiales aqui](https://docs.prefect.io/core/concepts/tasks.html#overview).

In [None]:
task??

In [None]:
@task
def extract_1(path):
    return pd.read_csv(path)

In [None]:
@task
def extract_2(path):
    conn = sqlite3.connect(path)
    query = "SELECT * FROM uk_bikes"
    return pd.read_sql_query(query, conn)

In [None]:
@task
def extract_3(path):
    return pd.read_json(path)

## 7. Transformar

Las transformaciones mas comunes que suceden en esta etapa usualmente son las que creamos anteriormente. En resumen,

- Limpiar datos
- Convertir variables numericas a la misma unidad
- Unir datos


In [None]:
def order_and_merge(data_lists):
    
    pick_order = data_lists[0].columns
    new_list = [d.reindex(columns=pick_order).sort_values(['date', 'hour']) for d in data_lists]
    df = pd.concat(new_list)
    return df

In [None]:
@task
def transform(london, seoul, washington):
    
    london = (london.pipe(change_cols, london_cols)
                    .pipe(add_dates_and_location, 'London', 'UK')
                    .pipe(fix_and_drop, 'seasons', seasons_london, cols_drop_london))
    
    seoul = (seoul.pipe(change_cols, seoul_cols)
                  .pipe(add_dates_and_location, 'Seoul', 'SK')
                  .pipe(fix_and_drop, 'is_holiday', holidays_seoul, cols_drop_seoul))
    
    wash_dc = (washington.pipe(change_cols, wa_dc_cols)
                         .pipe(add_dates_and_location, 'DC', 'USA')
                         .pipe(fix_and_drop, 'seasons', seasons_wa_dc, cols_drop_wa_dc)
                         .pipe(normalize_vars))
    
    return order_and_merge([london, seoul, wash_dc])

## 8. Descargar

In [None]:
new_table = SQLiteScript(
    db=clean_db_path,
    script="""CREATE TABLE IF NOT EXISTS bike_sharing (date text, count integer, temperature real, humidity real,
              wind_speed real, is_holiday real, is_weekend integer, seasons text, year integer,
              month integer, week integer, day integer,hour integer, weekday integer, city text,
              country text)"""
    )

In [None]:
@task
def load(data, path_and_name):
    
    data = list(data.itertuples(name='Bikes', index=False))
    
    insert_cmd = "INSERT INTO bike_sharing VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
    with closing(sqlite3.connect(path_and_name)) as conn:
        with closing(conn.cursor()) as cursor:
            cursor.executemany(insert_cmd, data)
            conn.commit()

## 9. Lanza La Tuberia

In [None]:
with Flow('bikes-ETL') as flow:
    
    the_table = new_table()
    
    london = extract_2(london_path)
    seoul = extract_1(seoul_path)
    wash_dc = extract_3(wash_dc_path)
    
    transformed = transform(london, seoul, wash_dc)
        
    data_loaded = load(transformed, clean_db_path)
    data_loaded.set_upstream(the_table)

In [None]:
flow.visualize()

In [None]:
flow.run()

In [None]:
pd.read_sql_query("SELECT * FROM bike_sharing", sqlite3.connect(clean_db_path)).head()

## Ejercicio

Cambia la funcion para descargar (`load()`) y haz que guarde los resultados en formato `parquet`. Corre la tuberia de nuevo y cerciorate de que los resultados sean iguales a los anteriores.

## 10. Automatizar

Nuestra jefa nos comenta que los datos de las 3 ciudades seran actualizados todos los sabados asi que tenemos que automatizar el intervalo en el cual queremos que nuestro programa corra. Para ese tenoms la funcion de `IntervalSchedule` en prefect, y esta nos permite establecer el intervalo de tiempo que necesitamos. Ya sea un minuto, dos semanas, o un mes, anadir este detallo es algo trivial.

In [None]:
from prefect.schedules import IntervalSchedule
import datetime

In [None]:
schedule = IntervalSchedule(interval=datetime.timedelta(minutes=1), 
                            # start_date=datetime.datetime(2021, 11, 5)
                           )

In [None]:
with Flow('bikes-ETL', schedule=schedule) as flow:
    
    the_table = new_table()
    
    london = extract_2(london_path)
    seoul = extract_1(seoul_path)
    wash_dc = extract_3(wash_dc_path)
    
    transformed = transform(london, seoul, wash_dc)
        
    data_loaded = load(transformed, clean_db_path)
    data_loaded.set_upstream(the_table)

In [None]:
flow.run()

Para aprender mas sobre como programar y actualizar tus tuberias, por favor visita la documentacion oficial en [prefect schedules](https://docs.prefect.io/core/concepts/schedules.html#simple-schedules).

## 11. Resumen

1. Crear tuberias ETL te ayuda a ahorar tiempo con la limpieza de tus datos.
2. pandas `pipe` te ayuda a crear cadenas de funciones y ahorrar tiempo y lineas de codigo.
3. Prefect te ahora tiempo ya que encadena mas funciones por ti y te ayuda a crear horarios para tus funciones.
4. No importa que tipo de profesional seas, mover y limpiar tus datos es una herramienta invaluable que no esta de mas saber.