O pipeline é o fluxo pelo qual o dado passa para o seu processamento

No pipeline, a coleta, transformação, carregamento/armazenamento e monitoramento de dados pode ocorre através de automação

Qual a diferença entre data wrangling e pipelines?

- O data wrangling é algo mais simples e utilizada dados já prontos, há uma manipulação dos dados para torná-los 'usáveis'
- O pipeline trabalha de maneira mais completa, podendo coletar/extrair de diferentes fontes para transformar os dados e torná-los 'usáveis'

Em um exemplo:

- Data wrangling: suponha que há um conjunto de dados com informações sobre clientes, mas as datas estão em um formato bagunçado. O data wrangling responsável pelo processo de organização desses dados para que possam ser facilmente compreendidos e utilizados, convertendo com uma espécie de padrão. A manipulação de dados é como organizar e preparar os dados. Isso envolve limpar os dados, moldá-los do jeito certo e garantir que tudo esteja adequado para a utilização.
- Pipeline: em um processo de dados de de vendas em diferentes lojas, deseja-se analisar o desempenho total, o que envolve a extração de dados de cada loja, a transformação para garantir a consistência (a exemplo, unifcar formato de datas, moedas) e caregar os dados em único local para analisar. Nesse caso, o processo de mover os dados de um lugar para outro, transformá-los para atender as necessidades específicas e carregá-los em um local onde possam ser utilizados existe. Para esse processo dar-se o nome de ETL: E(Extract -> Coleta), T (Tranform -> Transformação) e L (Load -> Carregamento)

Etapas:

- Coleta de dados: como os dados são coletados de diferentes fontes, é necessário checá-los antes. Normalmente em um projeto há as fontes de dados disponíveis, mas em outros ainda pode ser necessário uma busca mais aprofundada. 
    - Exploração de dados: verifica os tipos de dados das características, valores únicos e descrição dos dados.
- Transformação: A limpeza e separação são trabalhados juntos, já que o processo de transformalçao dos dados ocorre de acordo com as necessidades do projeto, incluindo limpeza, normalização, agregação.
    - Limpeza de dados: remoção de valores nulos, duplicatas ou outliers (dado errado que pode interferir nos resultados, como a altura de uma pessoa descrita como 300cm/300m) que possam afetar a qualidade dos dados
    - Feature Engineering: novas caractrísticas são criadas para agregar nos dados finais (a exemplo, tenho a data de nascimento nos dados e quero converter para a idade)
    - Transformação de dados: transforma os dados para melhor atender as preferências e/ou requisitos específicos (a exemplo converter os dados de uma moeda)

Exemplo de transformação:

In [9]:
import pandas as pd

#Criando um dataframe exemplo
data = {'Produto': ['A', 'B', 'C', 'D', 'E'],
        'Preço': [100.0, 120.0, None, 150.0, 5000.0]} #Incluindo um valor nulo (None) e outlier (5000)
df = pd.DataFrame(data)


In [10]:
#Exibindo o dataframe original
print('Dataframe original:')
print(df)

Dataframe original:
  Produto   Preço
0       A   100.0
1       B   120.0
2       C     NaN
3       D   150.0
4       E  5000.0


In [11]:
#Limpeza de Dados: tratando valores nulos e outliers na coluna 'Preço'
#Substituindo valores nulos pela média e removendo outliers (valores acima de 1000)
df['Preço'].fillna(df['Preço'].mean(), inplace=True) #-> Forma antiga
#df.fillna({'Preço': 'Preço'}, inplace = True) #nova forma a partir do pandas 3.0 
#df = df['Preço'].fillna(df['Preço'].mean()) #-> também pode ser usado no pandas 3.0
df = df[df['Preço'] < 1000]

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['Preço'].fillna(df['Preço'].mean(), inplace=True) #-> Forma antiga


In [12]:
#Exibindo o dataframe após a limpeza
print('\nDataFrame após a limpeza:')
print(df)


DataFrame após a limpeza:
  Produto  Preço
0       A  100.0
1       B  120.0
3       D  150.0


In [None]:
#Supondo que 'df' seja o DataFrame com os dados de vendas
df['Data'] = pd.to_datetime(df['Data']) #Convertendo para o formato de data

#Criando uma nova coluna com o dia da semana
df['DiaDaSemana'] = df['Data'].dt.day_name()

#One-Hot Encondig para a coluna 'DiaDaSemana' 
df = pdf.get_dummies(df, columns['DiaDaSemana'])

#Exibindo as primeiras linhas do DataFrame após a transformação
df.head()

- Carregamento e armazenamento: armazenz os dados transformados em um local de destino, como banco de dados, um data warehouse ou mesmo um arquivo
    - Carregamento de dados: coleta de todos dados de suas respectivas fontes
    - Destino: depois de coletar os dados necessários (carregamento de dados), os dados são reunidos em um único arquivo e começam a ser preparados/moldados
    - Análise/Relatórios: análise e/ou relatório ou outras ações que se deseja realizar nos dados

In [None]:
import pandas as pd
from sqlalchemy import create_engine

#Suponhamos que há um arquivo CSV, banco de dados SQL e uma API
df_csv = pd.read_csv('dados_csv.csv')

#Conectando ao banco de dados SQL (Como o SQLite)
engine = create_engine('sqlite:///banco_sql.bd')
query_sql = '%SELECT * FROM tabela_sql'
df_sql = pd.read_sql_query(query_sql, engine)

#Supondo uma API fictícia (precisa da URL real da API e as credencias)
api_url = 'https://api.exemplo.com/dados'
df_api = pd.read_json(api_url)

#Agora temos os dados de diferentes fontes. Vamos transformá-los

#Transformação: a exemplo, adicionar uma nova coluna aos DataFrames
df_csv['Nova_Coluna'] = df_csv['Coluna_A'] + df_csv['Coluna_B']
df_sql['Nova_Coluna'] = df_sql['Coluna_C'] * 2
df_api['Nova_Coluna'] = df_api['Coluna X'] - df_api['Coluna_Y']

#Agora vamos concatenar (ou combinar) os DataFrames em um único DataFrame
df_final = pd.concat([df_csv, df_sql, df_api], ignore_index = True)

#Por fim, vamos carregar o DataFrame final em um arquivo CSV ou em um banco de dados

#Salvando em um arquivo CSV
df_final.to_csv('dados_final.csv', index=False)

#Salvando no mesmo banco de ddos SQL
df_final.to_sql('tabela_final', engine, if_exists='replace', index=False)

- Monitoramento: monitorar o desempenho da pipeline, gerenciar falhas e lidar com situações excepcionais são partes críticas da administração de ume pipeline de dados
    - Monitoramento de desempenho: monitorar o tempo que cada etapa leva para ser concluída
    - Monitoramento de qualidade: verficar a qualidade dos dados em cada etapa
    - Monitoramento de recursos: monitora o uso de recursos como capacidade de armazenamento e poder de processamento

    - Diferentes tecnologias para monitoramento de dados:
        - Apache Airflow
        - Prometheus
        - Grafana
        - Datadog
        - Azure Application Insights
        - Python usando a bibloteca logging

In [None]:
import pandas as pd
import logging

#Configurando o logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

#Função de Extração
def extract_data(file_path):
    logging.info('Iniciando extração de dados...')
    df = pd.read_csv(file_path)
    loggind.info('Extração de dados concluída')
    return df

- Automação: é a capacidade de realizar tarefas e processos de forma programada e sem intervenção manual. Isso é essencial para garantir eficiência, consistência e confiabilidade em todo o fluxo de dados. Aqui estão alguns aspectos-chhave da automação em uma pipeline de dados:
    - Agendamento de tarefas
    - Orquestração de fluxo de trabalho
    - Gestão de dependências
    - Monitoramento e notificação
    - Tratamento de erros e retentativas
    - Gerenciamento de configuração
    - Atualizações automáticas
    - Integração contínua e implantação contínua (CI/CD)
    - Escalonamento automático