# ETLs, ELTs e Pipelines de Dados


## Introdução


---

Nos dias de hoje, as empresas lidam com grandes quantidades de dados em diferentes sistemas. Para tomar decisões informadas e eficientes, é crucial ter acesso a todos esses dados de forma integrada e atualizada. Aqui estão algumas razões pelas quais as extrações automatizadas de dados são necessárias:


1. **Aumento da eficiência:** extrair manualmente dados de diferentes sistemas é um processo demorado e propenso a erros. Com extrações automatizadas, é possível obter dados rapidamente e com precisão, permitindo uma tomada de decisão mais rápida e eficiente;
2. **Redução de erros:** as extrações automatizadas removem a possibilidade de erros humanos, como digitação incorreta ou omissão de informações;
3. **Integração de dados:** as extrações automatizadas permitem integrar dados de diferentes sistemas, fornecendo uma visão mais completa e precisa do negócio; e
4. **Tempo e recursos:** automatizar o processo de extração de dados economiza tempo e recursos, permitindo que os funcionários se concentrem em tarefas mais importantes.


Em resumo, as extrações automatizadas de dados são fundamentais para obter dados precisos, integrados e atualizados, aumentando a eficiência e efetividade da tomada de decisão.


<br><br>


## Arquitetura de soluções de extração de dados

---

A arquitetura de soluções de extração de dados pode envolver o uso de APIs (Application Programming Interfaces), webhooks, planilhas, etc. para obter dados de diferentes fontes de forma programática. Neste tipo de solução, a extração de dados é automatizada usando scripts escritos em alguma linguagem de programação (como por exemplo, Python). De forma simples, podemos definir um processo de extração de dados em 5 partes:


1. **Fonte de dados:** a primeira etapa é identificar a fonte de dados desejada. Isso pode incluir bancos de dados, sistemas de gerenciamento de conteúdo (CMS), APIs, sites, plataformas de e-commerce, arquivos exce, entre outros;
2. **Requisição/Acesso/Webscrapping/Raspagem dos dados:** a próxima etapa é acessar sua fonte de dados e extrair a informação;
4. **Tratamento de dados:** após a recepção dos dados, precisamos tratá-los antes de serem usados. Isso pode incluir a limpeza dos dados, a conversão para um formato desejado e sua posterior análise. Em Python, podemos utilizar bibliotecas como Pandas ou Numpy para realizar tais tarefas;
5. **Armazenamento de dados:** por fim, os dados extraídos precisam ser armazenados em algum local para uso futuro. E, isso pode ser feito em bancos de dados locais ou na nuvem, como o Amazon S3 ou o Google Cloud Storage.


<img src = "https://streamsets.b-cdn.net/wp-content/uploads/data-integration-and-data-engineering-streamsets.png" width=800>

<img src="https://www.striim.com/wp-content/uploads/2022/06/image1-2.png" width=800>

É importante destacar que a soma destas partes permite a automatização da extração de dados, tornando-a mais eficiente e confiável. Além disso, a utilização de Python como linguagem de programação permite a facilidade de uso e a ampla disponibilidade de bibliotecas para tratamento e armazenamento de dados.

<br>

## ETLs & ELTs


---

No mundo de gerenciamento de dados o processo de **extrair, transformar e carregar (ETL)** é uma etapa fundamental na coleta, processamento e preparação de dados para análise. 
ETL envolve extrair dados de diversas fontes, transformá-los em um formato estruturado e carregá-los em um destino onde possam ser analisados.
Este processo é crucial para manter a qualidade e a consistência dos dados.
Vamos entender melhor o que é cada uma das siglas:

### 1. Extração (E)
A primeira etapa do ETL é a extração, na qual os dados são coletados de diferentes fontes. Essas Fontes podem incluir bancos de dados, planilhas, APIs, logs e muito mais. Aqui estão alguns exemplos:

- extraindo dados de um banco de dados

```sql
select * from sales_data
```

- extraindo dados de uma API REST:

```python
import request

response = request.get("https://api.exemple.com/data")
data = response.json()
```

### 2. Transformação (T)

Após a extração de dados, a próxima etapa é a transformação. Durante esta fase os dados são limpos, enriquecidos e transformados em um formato consistente. A transformação pode envolver: 
- remoção de duplicatas
- tratamento de valores ausentes
- agregação de dados por exemplo
- transformações de tipos de dados (string > float)
- transformando um formato de data

Exemplo transformando um formato de data de MM/DD/AAAA para
AAAA-MM-DD:

In [0]:
import pandas as pd 

datas = ["08/02/2023", "09/19/2023", "03/02/2023"] 
df = pd.DataFrame(datas, columns=["date"])

df["new_date"] = pd.to_datetime(df.date, format="%m/%d/%Y")#.dt.strftime(format="")
df



Unnamed: 0,date,new_date
0,08/02/2023,2023-08-02
1,09/19/2023,2023-09-19
2,03/02/2023,2023-03-02


### 3. Carga (L)
A etapa final do ETL é carregar os dados transformados de um destino, normalmente em um data warehouse, banco de dados ou data lake. 
Aqui estão alguns exemplos de carregamento:

- carregando dados em um banco de dados SQL: 

```sql
INSERT INTO target_table
SELECT * FROM transformed_data
```

- carregar dados em um data warehouse na nuvem como Amazon Redshift 

```python
import psycopg2

# conectando ao Redshift
conn = psycopg2.connect(database="mydb", user="user", password="password", host="redshift-cluster-url")

# Inserindo dados na tabela
cur = conn.cursor()
cur.execute("COPY target_table FROM 's3://data-bucket/trasformed_data.csv' CSV;")

```

### Quais as diferenças entre ETL e ELT?

ETL, ELT e pipeline de dados são conceitos importantes na gestão de dados. E, podemos defini-los da seguinte forma:


1. **ETL (Extração, Transformação e Carga):** ETL é uma sigla que se refere ao processo de extração de dados de várias fontes, as transformações utilizadas para torná-los compatíveis com o sistema de destino e ao seu carregamento no sistema de destino. Esse processo é importante para garantir que os dados sejam integrados corretamente em um sistema único e possam posteriormente serem utilizados para a tomada de decisão informada.


2. **ELT (Extração, Carga e Transformação):** a ELT é semelhante ao ETL, mas a ordem dos passos é diferente. Em uma ELT, os dados são extraídos de fontes externas, carregados no sistema de destino e, em seguida, transformados para serem compatíveis com o sistema de destino. A ELT é uma abordagem popular quando se tem um hardware poderoso disponível para transformação de dados no lado do sistema de destino.

<center>
<img src="https://miro.medium.com/v2/resize:fit:2000/format:webp/1*-6tNymvTTqGIWJlzQHwBaw.png"  width="60%" height="30%" texto="Fonte: https://miro.medium.com/v2/resize:fit:2000/format:webp/1*-6tNymvTTqGIWJlzQHwBaw.png">
</center>

No primeiro caso o load geralmente acontece em bd tabelares como redshift enquanto o segundo pode exigir que vc tenha um data lake que aceite diferentes formatos



#### Casos de uso para ETL

- Quando os dados precisam de limpeza, enriquecimento e integração extensivos antes da análise 

- Quando a empresa utiliza data warehouse tradicional com fontes de dados estruturados

- Quando os dados devem ser transformados num formato consistente para a elaboração de relatório e/ou precisam passar por algum processo de agregação.

- Quando o dado é utilizado diretamente pelo time de negócio.

#### Casos de uso para ELT

- Quando é necessário lidar com grandes volumes de dados brutos e não estruturados

- Quanda a empresa utiliza arquiteturas modernas de data Lake 

- Quando a exploração e análise de dados é realizada diretamente nos dados brutos.


## Pipeline de dados com extração ativa ou passiva

Pipeline de dados se referem ao processo de fluxo contínuo de dados de uma fonte para um sistema de destino. Elas podem ser divididas em duas categorias:

- **Pipelines de extração ativa:** são aquelas em que os dados são extraídos da fonte em intervalos regulares de tempo; e
- **Pipelines de extração passiva:** são aquelas em que os dados são enviados assincronamente pela sistema originador da informação;

Em suma, ETL, ELT e pipeline de dados são conceitos importantes para a gestão de dados que ajudam a garantir que os dados sejam integrados corretamente em um único sistema e possam ser usados para tomar decisões informadas. A escolha entre ETL e ELT depende de fatores como a quantidade de dados e o poder de processamento disponível, enquanto que a escolha entre extração ativa ou passiva irá depender de outros fatores como a volumetria dos dados e o tempo de latência requisitado.

## Exemplo de um ETL

1. Extração de dados em formato de arquivos do tipo csv, da base de qualidade do ar do link https://raw.githubusercontent.com/PacktPublishing/Time-Series-Analysis-with-Python-Cookbook./main/datasets/Ch2/AirQualityUCI.csv ;

2. Transformação desses dados;

3. Armazenamento dos dados transformados em arquivos do tipo csv.

Metadados:

- Date (DD/MM/YYYY)
- Time (HH.MM.SS)
- True hourly averaged concentration CO in mg/m^3 (reference analyzer)
- PT08.S1 (tin oxide) hourly averaged sensor response (nominally CO targeted)
- True hourly averaged overall Non Metanic HydroCarbons concentration in microg/m^3 (reference analyzer)
- True hourly averaged Benzene concentration in microg/m^3 (reference analyzer)
- PT08.S2 (titania) hourly averaged sensor response (nominally NMHC targeted)
- True hourly averaged NOx concentration in ppb (reference analyzer)
- PT08.S3 (tungsten oxide) hourly averaged sensor response (nominally NOx targeted)
- True hourly averaged NO2 concentration in microg/m^3 (reference analyzer)
-  PT08.S4 (tungsten oxide) hourly averaged sensor response (nominally NO2 targeted)
-  PT08.S5 (indium oxide) hourly averaged sensor response (nominally O3 targeted)
-  Temperature in °C
-  Relative Humidity (%)
-  AH Absolute Humidity

https://archive.ics.uci.edu/dataset/360/air+quality

In [0]:
import pandas as pd

url = "https://raw.githubusercontent.com/PacktPublishing/Time-Series-Analysis-with-Python-Cookbook./main/datasets/Ch2/AirQualityUCI.csv"
df = pd.read_csv(url, sep=";")
df.head()

Unnamed: 0,Date,Time,CO(GT),PT08.S1(CO),NMHC(GT),C6H6(GT),PT08.S2(NMHC),NOx(GT),PT08.S3(NOx),NO2(GT),PT08.S4(NO2),PT08.S5(O3),T,RH,AH
0,10/03/2004,18:00:00,2.6,1360.0,150,11.881723,1045.5,166.0,1056.25,113.0,1692.0,1267.5,13.6,48.875001,0.757754
1,10/03/2004,19:00:00,2.0,1292.25,112,9.397165,954.75,103.0,1173.75,92.0,1558.75,972.25,13.3,47.7,0.725487
2,10/03/2004,20:00:00,2.2,1402.0,88,8.997817,939.25,131.0,1140.0,114.0,1554.5,1074.0,11.9,53.975,0.750239
3,10/03/2004,21:00:00,2.2,1375.5,80,9.228796,948.25,172.0,1092.0,122.0,1583.75,1203.25,11.0,60.0,0.786713
4,10/03/2004,22:00:00,1.6,1272.25,51,6.518224,835.5,131.0,1205.0,116.0,1490.0,1110.0,11.15,59.575001,0.788794


### Transformação

Suponha que o time de analytics precisa dos dados de valor máximo de CO e média de temperatura mensal.

In [0]:
df_grouped = df.assign(mes=pd.to_datetime(df.Date, format="%d/%m/%Y").dt.month).groupby("mes").agg({"CO(GT)": "max", "T": "mean"})
df_grouped

Unnamed: 0_level_0,CO(GT),T
mes,Unnamed: 1_level_1,Unnamed: 2_level_1
1,8.7,-8.85168
2,8.4,-16.312587
3,8.1,13.522721
4,7.3,9.530184
5,6.5,16.13489
6,6.4,14.521481
7,5.3,29.11017
8,3.5,14.039819
9,7.5,19.005671
10,9.5,20.20009


### Load

Criar um diretório e salvar os dados em um arquivo csv.

In [0]:
dbutils.fs.mkdirs("FileStore/tables/aula_1/etl/")
dbutils.fs.ls("FileStore/tables/aula_1/")

Out[7]: [FileInfo(path='dbfs:/FileStore/tables/aula_1/elt/', name='elt/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/aula_1/etl/', name='etl/', size=0, modificationTime=0)]

In [0]:
import pyspark.pandas as ps

dfs = ps.DataFrame(df_grouped)
dfs.to_csv("FileStore/tables/aula_1/etl/bi_dashboard.csv", index=False)

In [0]:
dbutils.fs.ls("FileStore/tables/aula_1/etl/")

Out[9]: [FileInfo(path='dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/', name='bi_dashboard.csv/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/aula_1/etl/raw_data/', name='raw_data/', size=0, modificationTime=0)]

In [0]:
display(dbutils.fs.ls("FileStore/tables/aula_1/etl/bi_dashboard.csv/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/_SUCCESS,_SUCCESS,0,1711145112000
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/_committed_1989785213703169077,_committed_1989785213703169077,1439,1711145111000
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/_committed_6614323851782428056,_committed_6614323851782428056,728,1710983810000
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/_committed_7951968242058302433,_committed_7951968242058302433,1442,1711144661000
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/_committed_vacuum3698962440085877290,_committed_vacuum3698962440085877290,96,1711144661000
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/_started_1989785213703169077,_started_1989785213703169077,0,1711145111000
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/_started_7951968242058302433,_started_7951968242058302433,0,1711144660000
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/part-00000-tid-1989785213703169077-47771cfd-e8e7-4430-be9d-9383b8b4ebea-396-1-c000.csv,part-00000-tid-1989785213703169077-47771cfd-e8e7-4430-be9d-9383b8b4ebea-396-1-c000.csv,32,1711145111000
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/part-00001-tid-1989785213703169077-47771cfd-e8e7-4430-be9d-9383b8b4ebea-397-1-c000.csv,part-00001-tid-1989785213703169077-47771cfd-e8e7-4430-be9d-9383b8b4ebea-397-1-c000.csv,55,1711145111000
dbfs:/FileStore/tables/aula_1/etl/bi_dashboard.csv/part-00002-tid-1989785213703169077-47771cfd-e8e7-4430-be9d-9383b8b4ebea-398-1-c000.csv,part-00002-tid-1989785213703169077-47771cfd-e8e7-4430-be9d-9383b8b4ebea-398-1-c000.csv,31,1711145111000


## Exemplo de um ELT

1. Extração de dados em formato de arquivos do tipo csv, da base de qualidade do ar do link https://raw.githubusercontent.com/PacktPublishing/Time-Series-Analysis-with-Python-Cookbook./main/datasets/Ch2/AirQualityUCI.csv ;

2. Armazenamento dos dados extraídos em arquivo local;

3. Transformação desses dados;

In [0]:
url = 'https://raw.githubusercontent.com/PacktPublishing/Time-Series-Analysis-with-Python-Cookbook./main/datasets/Ch2/AirQualityUCI.csv'
df_elt = pd.read_csv(url, delimiter=';')
df_elt.head()

Unnamed: 0,Date,Time,CO(GT),PT08.S1(CO),NMHC(GT),C6H6(GT),PT08.S2(NMHC),NOx(GT),PT08.S3(NOx),NO2(GT),PT08.S4(NO2),PT08.S5(O3),T,RH,AH
0,10/03/2004,18:00:00,2.6,1360.0,150,11.881723,1045.5,166.0,1056.25,113.0,1692.0,1267.5,13.6,48.875001,0.757754
1,10/03/2004,19:00:00,2.0,1292.25,112,9.397165,954.75,103.0,1173.75,92.0,1558.75,972.25,13.3,47.7,0.725487
2,10/03/2004,20:00:00,2.2,1402.0,88,8.997817,939.25,131.0,1140.0,114.0,1554.5,1074.0,11.9,53.975,0.750239
3,10/03/2004,21:00:00,2.2,1375.5,80,9.228796,948.25,172.0,1092.0,122.0,1583.75,1203.25,11.0,60.0,0.786713
4,10/03/2004,22:00:00,1.6,1272.25,51,6.518224,835.5,131.0,1205.0,116.0,1490.0,1110.0,11.15,59.575001,0.788794


### Load

Criar um diretório e salvar os dados em um arquivo csv.

In [0]:
dbutils.fs.mkdirs("FileStore/tables/aula_1/elt/bronze/")
df_elt_ps = ps.DataFrame(df_elt)
df_elt_ps.to_csv("/FileStore/tables/aula_1/elt/bronze/AirQuality.csv", index=True)

### Transformação

Suponha que o time de analytics precisa dos dados de valor máximo de CO e média de temperatura mensal.

In [0]:
df_elt = ps.read_csv("/FileStore/tables/aula_1/elt/bronze/AirQuality.csv")

df_elt["mes"] = ps.to_datetime(df_elt['Date']).dt.month
df_grouped = df_elt.groupby("mes").agg({"CO(GT)": "max", "T": "mean"})
df_grouped

Unnamed: 0_level_0,CO(GT),T
mes,Unnamed: 1_level_1,Unnamed: 2_level_1
9,7.5,19.005671
10,9.5,20.20009
11,11.9,13.482384
6,6.4,14.521481
5,6.5,16.13489
4,7.3,9.530184
3,8.1,13.522721
2,8.4,-16.312587
7,5.3,29.11017
8,3.5,14.039819


## Exercício

Realizar o processo de ETL e ELT utilizando os dados da [covid](https://en.wikipedia.org/wiki/COVID-19_pandemic_by_country_and_territory) e o `pd.read_html()` de forma a trazer os dados agrupados por América, África, Asia e Europa, bem como uma coluna com o total de pessoas vacinadas. Renomeie as colunas no padrão snake case removendo valores desnecessários.

Obs: a saída do read_html é uma lista de dataframes, escolha o item 15 para trabalhar (item que tem as colunas 'Region[30]', 'Total cases', 'Total deaths', 'Cases per million', 'Deaths per million', 'Current weekly cases', 'Current weekly deaths', 'Population millions', 'Vaccinated %[31]')


In [0]:
pip install lxml

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
dbutils.library.restartPython()

In [0]:
import pyspark.pandas as ps
df = ps.read_html("https://en.wikipedia.org/wiki/COVID-19_pandemic_by_country_and_territory")
df

Out[1]: [                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 COVID-19 pandemic                                                                                                                                                             

In [0]:
df[15]

Unnamed: 0,Region[30],Total cases,Total deaths,Cases per million,Deaths per million,Current weekly cases,Current weekly deaths,Population millions,Vaccinated %[31]
0,European Union,179537758,1185108,401363,2649,886074,3985,447,75.1
1,North America,103783777,1133607,281404,3074,476376,2975,369,76.1
2,Other Europe,57721948,498259,247054,2133,74354,248,234,61.2
3,South America,65835789,1313061,153151,3055,378622,1252,430,81.7
4,Russia and Central Asia,25646533,434988,108307,1837,49022,393,237,55.9
5,Central America,11338600,380660,63108,2119,60268,263,180,69.0
6,Middle East,22549784,238106,86400,912,13457,65,261,51.9
7,Oceania and islands in East Asia,60806544,318455,105317,552,1390401,2777,577,72.1
8,Caribbean,2605473,26237,60179,606,737,25,43,46.4
9,South Asia,50347136,620218,27121,334,1911,24,1856,69.4


In [0]:
df.dtypes


Out[5]: Region                    object
Total Cases                int64
Total Deaths               int64
Cases Per Million          int64
Deaths Per Million         int64
Current Weekly Cases       int64
Current Weekly Deaths      int64
Population Millions        int64
Vaccinated %[31]         float64
dtype: object

In [0]:
import pyspark.pandas as ps

df = ps.read_html("https://en.wikipedia.org/wiki/COVID-19_pandemic_by_country_and_territory")[15]

new_columns_name = {
    "Region[30]": "Region",
    "Total cases": "Total Cases",
    "Total deaths": "Total Deaths",
    "Cases per million": "Cases Per Million",
    "Deaths per million": "Deaths Per Million",
    "Current weekly cases": "Current Weekly Cases",
    "Current weekly deaths": "Current Weekly Deaths",
    "Population millions": "Population Millions",
    "Vaccinated %[31]": "Vaccinated"
}

df = df.rename(columns=new_columns_name)

print(df.head())





                    Region  Total Cases  Total Deaths  Cases Per Million  Deaths Per Million  Current Weekly Cases  Current Weekly Deaths  Population Millions  Vaccinated %[31]
0           European Union    179537758       1185108             401363                2649                886074                   3985                  447              75.1
1            North America    103783777       1133607             281404                3074                476376                   2975                  369              76.1
2             Other Europe     57721948        498259             247054                2133                 74354                    248                  234              61.2
3            South America     65835789       1313061             153151                3055                378622                   1252                  430              81.7
4  Russia and Central Asia     25646533        434988             108307                1837                 49022 

## [Avaliação anônima](https://forms.gle/tShxhxNYhvi6ZmQm8)