# Introdução 

Nesse tutorial vamos aprender o básico sobre as etapas de uma rotina de ETL(extract, transform and load). A ideia aqui é utilizar um dataset público real para demonstrar como implementar as diferentes etapas dessa rotina. Vamos explorar alguns conceitos básicos da Engenharia de dados, como implementar rotinas para extração de arquivos e como manipular dados tabulares com o Pandas. Lembrando que toda a implementação aqui é focada em *small data*.

# O que é ETL

**ETL(Extract, transform and load)** é um processo que consiste em integrar dados de diferentes fontes buscando consolidá-los de uma maneira que facilite o processo de análise. O termo se popularizou com o surgimento das *data warehouses*, que nasceram da necessidade de centralizar diversas fontes de dados para permitir criar análises que ajudassem as empresas na tomada de decisão.

O processo consiste de três etapas:

1. *Extract*: extrair dados de uma fonte de informações, seja banco de dados, API, arquivos, etc. Para serem processados posteriormente;
2. *Transform*: nessa etapa os dados extraídos passam por uma transformação para atender os requisitos das aplicações cliente. A transormação envolve:
    - Limpar e validar os dados para garantir qualidade;
    - Transformar o formato dos dados para, por exemplo, facilitar usabilidade e busca;
    - Combinar diferentes fontes para compor as informações necessárias;
    - Aplicar regras de negócio.
3. *Load*: carregar resultado das transformações no sistema de destino, como *date warehouses* ou *data lakes*.

# Data warehouse e data lake 

**Data warehouses**, segundo Bill Inmon, é uma coleção de dados orientada à assunto, não volátil, integrada e variável no tempo para apoiar as decisões de negócio. Exemplos de ferramentas em nuvem:

<div style="background-color: white">
  <img src="./img/redshift.png">
   <img src="./img/bigquery.png">
</div>


**Data warehouse**: Somente dados estruturados, geralmente alimenta BI's;

**Data lake**: Dados estruturados e não estruturados, utilizado por cientistas de dados.

![](./img/etl_diagram.png)

# A fonte de dados 

Vamos trabalhar com a fonte de dados abertos da [Comissão de Valores Mobiliários](http://www.cvm.gov.br/) contendo a cotação diária dos fundos de investimentos negociados no mercado brasileiro. Essa fonte é atualizada diariamente com os dados de fechamento do dia anterior e as cotações são agrupadas por arquivos correspondentes a cada mês do ano.

Esse é um cenário bem comum em fontes de dados abertas, uma série de arquivos no formato CSV agrupando informações de acordo com a data, então a solução que vamos implementar durante o tutorial é reaproveitável para outras fontes de dados.

Primeiro é importante analisar a fonte de dados, entender como ela está estruturada, quais campos compõem o dataset e como nós podemos automatizar a coleta dos dados.

A fonte de dados contendo a cotação diária dos fundos pode ser acessada através do portal de dados abertos pelo link:

http://www.dados.gov.br/dataset/fi-doc-inf_diario

## Como automatizar o download 

Podemos observar que no site existe um link para todos os datasets dos últimos 12 meses. Primeiro precisamos analisar se existe um padrão nas urls de download dos arquivos, para isso vamos copiar alguns endereços e compará-los.

Copiei três links e vamos compará-los a seguir:

http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_201907.csv

http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_201910.csv

http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_202003.csv

Como podemos observar existe um padrão claro nos links:

`dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_YYYYMM.csv`

# ETL

In [7]:
import pandas as pd
import requests
from tqdm import tqdm

## Extrair 

O processo de extração consiste nesse caso em fazer o download de todos os arquivos da janela de tempo q nos interessa. As etapas pra nós atingirmos esse objetivo são:

1. Automatizar a geração do nome dos arquivos: já que a única coisa que varia nos links é a data dos arquivos precisamos automatizar a geração dessas datas de acordo com a janela de tempo do nosso interesse;

2. Requisitar o arquivo: precisamos enviar uma requisição para o portal de dados abertos do arquivo que queremos fazer o download;

3. Salvar arquivo: o portal de dados abertos vai nos enviar o arquivo requisitado e precisamos salvá-lo no nosso computador.

### Gera lista de datas

In [9]:
data = pd.date_range(start='2019-01', end='2020-01', freq='M')[0]

In [31]:
date_list = generate_date('2020-01', '2020-03')

### Requisita e salva os arquivos 

1. Chamar método `get`
2. Checar se a requisição teve sucesso olhando o atributo `status_code`
3. Salvar o conteúdo no computar

In [19]:
response = requests.get('http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_202003.csv')

In [22]:
base_url = 'http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_{date}.csv'

In [32]:
files = [base_url.format(date=date) for date in date_list]

In [51]:
def download_files(file_list):
    save_list = []
    for file in file_list:
        response = requests.get(file)
        
        if response.status_code != 200:
            print('Erro ao baixar o arquivo {}'.format(file))
            continue
        
        file_save_name = 'data/{}'.format(file.split('/')[-1])
        
        with open(file_save_name, 'wb') as f:
            f.write(response.content)
        
        save_list.append(file_save_name)
        
    return save_list

In [52]:
saved_files = download_files(files)

In [53]:
saved_files

['data/inf_diario_fi_202001.csv', 'data/inf_diario_fi_202002.csv']

## Análise

Aqui vou abrir um parênteses para uma análise rápido do nosso dateframe, é claro que só a parte de exploração dos dados vale um tutorial completo, então não vou explorar muito esses aspecto aqui, mas de qualquer forma é importante ter noção de alguns pontos básicos quando se trabalha com uma fonte de dados:

- Qual o tipo de cada coluna
- Quantos valores nulos que existem
- Como estão formatados

Essa etapa não faz parte do ETL, na verdade essa exploração inicial normalmente é feita antes de construir a *pipeline* para entender a fonte de dados utilizada.

In [55]:
# Leitura de um arquivo
df = pd.read_csv('data/inf_diario_fi_202001.csv', sep=';')

In [56]:
# Formato do dataframe (linhas, colunas)
df.shape

(369894, 8)

In [57]:
# Quais são nossas colunas, seus tipos e se existem valores nulos
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 369894 entries, 0 to 369893
Data columns (total 8 columns):
 #   Column         Non-Null Count   Dtype  
---  ------         --------------   -----  
 0   CNPJ_FUNDO     369894 non-null  object 
 1   DT_COMPTC      369894 non-null  object 
 2   VL_TOTAL       369894 non-null  float64
 3   VL_QUOTA       369894 non-null  float64
 4   VL_PATRIM_LIQ  369894 non-null  float64
 5   CAPTC_DIA      369894 non-null  float64
 6   RESG_DIA       369894 non-null  float64
 7   NR_COTST       369894 non-null  int64  
dtypes: float64(5), int64(1), object(2)
memory usage: 22.6+ MB


In [62]:
df.head()

Unnamed: 0,CNPJ_FUNDO,DT_COMPTC,VL_TOTAL,VL_QUOTA,VL_PATRIM_LIQ,CAPTC_DIA,RESG_DIA,NR_COTST
0,00.017.024/0001-53,2020-01-02,1132491.66,27.225023,1123583.0,0.0,0.0,1
1,00.017.024/0001-53,2020-01-03,1132685.12,27.224496,1123561.25,0.0,0.0,1
2,00.017.024/0001-53,2020-01-06,1132881.43,27.225564,1123605.31,0.0,0.0,1
3,00.017.024/0001-53,2020-01-07,1133076.85,27.226701,1123652.24,0.0,0.0,1
4,00.017.024/0001-53,2020-01-08,1132948.59,27.227816,1123698.26,0.0,0.0,1


In [63]:
len(df.CNPJ_FUNDO.unique())

16977

## Transformar 

Nessa etapa é necessário consolidar todas as informações extraídas, aplicar os tratamentos e regras de negócio que fazem sentido para a aplicação cliente, defini alguns objetivos para nos orientar durante essa etapa:

1. Consolidar todos os arquivos em um *dataframe*;
2. Transformar tipo da coluna de data(`DT_COMPTC`) para `datetime`;
3. Manter somente fundos com mais de `1000` cotistas;
4. Manter somente informações sobre: data, CNPJ do fundo e valor da cota;
5. Mudar o formato do dataframe para:

|            | 00.017.024/0001-53 | 97.929.213/0001-34 | 00.068.305/0001-35 | ... |
|------------|--------------------|--------------------|--------------------|-----|
| 2020-01-02 | 27.225023          | 27.112737          | 1.733476e+08       | ... |
| 2020-01-03 | 27.224496          | 27.115661          | 6.611408e+07       | ... |
| ...        | ...                | ...                | ...                | ... |

### 1. Consolida todos os arquivos 

In [64]:
raw_df_list = [pd.read_csv(file, sep=';') for file in saved_files]
raw_df = pd.concat(raw_df_list)

In [65]:
raw_df.shape

(675754, 8)

### 2. Converte coluna para tipo data 

In [68]:
raw_df.DT_COMPTC = pd.to_datetime(raw_df.DT_COMPTC, format='%Y-%m-%d')

In [70]:
raw_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 675754 entries, 0 to 305859
Data columns (total 8 columns):
 #   Column         Non-Null Count   Dtype         
---  ------         --------------   -----         
 0   CNPJ_FUNDO     675754 non-null  object        
 1   DT_COMPTC      675754 non-null  datetime64[ns]
 2   VL_TOTAL       675754 non-null  float64       
 3   VL_QUOTA       675754 non-null  float64       
 4   VL_PATRIM_LIQ  675754 non-null  float64       
 5   CAPTC_DIA      675754 non-null  float64       
 6   RESG_DIA       675754 non-null  float64       
 7   NR_COTST       675754 non-null  int64         
dtypes: datetime64[ns](1), float64(5), int64(1), object(1)
memory usage: 46.4+ MB


### 3. Filtra por quantidade de cotistas

In [72]:
df_final_date = raw_df[raw_df.DT_COMPTC == raw_df.DT_COMPTC.max()]

In [75]:
qtd_shareholders = 1000
valid_cnpjs = df_final_date.query('NR_COTST >= @qtd_shareholders').CNPJ_FUNDO

In [76]:
df_filtered = raw_df.query('CNPJ_FUNDO in @valid_cnpjs')

In [78]:
df_filtered.shape

(44403, 8)

### 4. Altera formato do dataframe 

In [80]:
result = pd.DataFrame(data={'DT_COMPTC': df_filtered.DT_COMPTC.sort_values().unique()})

In [82]:
group_cnpjs = df_filtered.groupby('CNPJ_FUNDO')

In [92]:
for group in tqdm(group_cnpjs.groups):
    df_cnpj = group_cnpjs.get_group(group)
    result = pd.merge(result, df_cnpj[['DT_COMPTC', 'VL_QUOTA']], on='DT_COMPTC', how='left')\
               .rename(columns={'VL_QUOTA': group})

100%|██████████| 1113/1113 [01:54<00:00,  9.74it/s]


In [94]:
result.set_index('DT_COMPTC', inplace=True)

In [96]:
result.head()

Unnamed: 0_level_0,00.068.305/0001-35,00.071.477/0001-68,00.180.995/0001-10,00.222.725/0001-24,00.222.816/0001-60,00.280.302/0001-60,00.306.278/0001-91,00.322.699/0001-06,00.360.293/0001-18,00.398.561/0001-90,...,67.976.449/0001-60,68.599.141/0001-06,68.623.479/0001-56,68.670.512/0001-07,68.971.183/0001-26,73.899.759/0001-21,88.002.696/0001-36,88.198.056/0001-43,97.519.703/0001-62,97.519.794/0001-36
DT_COMPTC,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2020-01-02,27.112737,9.979337,550.381807,3695.421072,7689.303576,4696.049724,18.811236,13.04165,22.378766,227.998183,...,12.634536,4.013766,5.693611,1.864455,1.100212,40.429076,0.711309,6107.57366,22.081756,21.002109
2020-01-03,27.115661,9.97977,550.468081,3697.159049,7690.674711,4696.806615,18.813887,13.043727,22.501972,225.529173,...,12.541972,4.013966,5.694228,1.853287,1.100384,40.128902,0.709291,6119.45635,22.085839,21.005407
2020-01-06,27.118216,9.980323,550.475403,3697.272064,7690.802997,4697.423331,18.816628,13.045941,22.544107,223.095944,...,12.452326,4.014164,5.694767,1.81677,1.100555,39.842954,0.703597,6091.10393,22.089976,21.008664
2020-01-07,27.121996,9.980816,550.688536,3697.922105,7694.073773,4698.291493,18.819392,13.047728,22.593079,223.376967,...,12.428292,4.014379,5.695564,1.816762,1.100703,39.768262,0.702067,6132.30651,22.095126,21.011485
2020-01-08,27.126159,9.981363,550.89919,3698.652972,7697.353404,4699.138945,18.822258,13.050008,22.501116,223.389476,...,12.382488,4.014584,5.696441,1.814723,1.100884,39.617461,0.69876,6130.89475,22.099806,21.014944


## Carregar 

Para etapa de carregamento vamos exportar o resultado em formato CSV e também vou demostrar como subir esse arquivo no [S3 da Amazon](https://aws.amazon.com/pt/s3/), que é um serviço de armazenamento de objetos na cloud, comumente usado como *data lake*.

In [97]:
result.to_csv('result.csv')

**Bônus**: Essa função utiliza do SDK do AWS para fazer o upload do arquivo resultante em um *bucket* no S3 da AWS, não vou entrar em detalhes sobre a configuração dessa ferramenta, mas se tiver interesse em saber mais a [documentação do boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#installation) tem um passo a passo sobre como utilizar o SDK.

In [100]:
import boto3

def upload_s3(file_path, remote_file_name, bucket):
    s3 = boto3.resource('s3')
    data = open(file_path, 'rb')
    s3.Bucket(bucket).put_object(Key=remote_file_name, Body=data)

In [101]:
upload_s3('result.csv', 'pythonbrasil.csv', 'pythonbrasil')

# Conclusão

Durante o tutorial passamos por todas as etapas do processo ETL, claro que a solução que implementamos aqui é simples e trabalha com um pequeno volume de dados, a partir do momento em que o volume de dados aumenta é preciso buscar ferramentas otimizadas, mas o processo no geral continua o mesmo. E para levar essa *pipeline* para produção o que falta? 

Em produção é importante utilizar ferramentas que paralelizem a execução das tarefas e que também sejam capazes de lidar com falhas durante o processo, no caso de *pipelines* que realizam processamento em lote(como a que nós implementamos), temos por exemplo ferramentas como:
- [Apache Airflow](https://airflow.apache.org/)
- [Luigi](https://github.com/spotify/luigi)
- [Apache Beam](https://beam.apache.org/)
- [Prefect](https://www.prefect.io/)

Essa mesma *pipeline* que nós implementamos foi a primeira versão do que eu fiz para alimentar o [fundos.sharke.com.br](fundos.sharke.com.br), hoje está bem mais complexa e roda no [Apache Airflow](https://airflow.apache.org/):

![](img/airflow_dag.png)