# ETL RAW → SILVER: Airline On-Time / Delay Causes

## Introdução

Este notebook implementa o processo de ETL (Extract, Transform, Load) para transformar os dados brutos da camada **RAW** em dados limpos e padronizados na camada **SILVER**, seguindo a arquitetura de **Medalhão** (Medallion Architecture).

### O que este ETL faz?

Este pipeline:
1. **Extrai** os dados do CSV bruto (`data_raw.csv`)
2. **Transforma** os dados aplicando limpezas, padronizações e validações
3. **Carrega** os dados transformados na tabela `silver.silver_airline_on_time` do PostgreSQL

### Por que é importante no contexto do Medalhão?

- **RAW (Bronze)**: Dados brutos, sem tratamento, mantendo a fidelidade original
- **SILVER**: Dados limpos, padronizados e validados, prontos para análise e consumo
- **GOLD** (futuro): Dados agregados e modelados para dashboards e relatórios

A camada SILVER é crucial porque:
- Remove inconsistências que poderiam enviesar análises
- Padroniza formatos para facilitar consultas SQL
- Adiciona flags e metadados úteis (ex: outliers)
- Garante integridade referencial e tipos corretos


In [21]:
# Importações necessárias
from __future__ import annotations  # Permite usar type hints sem importar os tipos
import pandas as pd
import numpy as np
import os
from sqlalchemy import create_engine, text, inspect
import warnings

warnings.filterwarnings('ignore')

# Configuração de exibição
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)


## 1. Extract - Leitura do CSV

### O que fazemos aqui?

Carregamos o arquivo CSV bruto da camada RAW, lidando com possíveis problemas de encoding e exibindo informações básicas sobre os dados.

### Por que é importante?

- Garantir que os dados sejam lidos corretamente, independente do encoding
- Entender a estrutura inicial dos dados antes de qualquer transformação
- Documentar o estado original para comparação posterior


In [22]:
def load_csv(file_path: str) -> pd.DataFrame:
    """
    Carrega o CSV com tratamento de encoding.
    
    Args:
        file_path: Caminho para o arquivo CSV
        
    Returns:
        DataFrame com os dados carregados
    """
    try:
        df = pd.read_csv(file_path, encoding='utf-8')
        print("CSV carregado com encoding UTF-8")
    except UnicodeDecodeError:
        print("UTF-8 falhou, tentando latin1...")
        df = pd.read_csv(file_path, encoding='latin1')
        print("CSV carregado com encoding latin1")
    
    return df

# Caminho do arquivo
csv_path = '../Data Layer/raw/data_raw.csv'

# Carregar dados
print("=" * 80)
print("ETAPA 1: EXTRACT - Carregando dados do CSV")
print("=" * 80)
df_raw = load_csv(csv_path)

# Informações básicas
print(f"\nShape inicial: {df_raw.shape[0]:,} linhas × {df_raw.shape[1]} colunas")
print(f"\nPrimeiras linhas:")
display(df_raw.head())
print(f"\nInformações do DataFrame:")
df_raw.info()


ETAPA 1: EXTRACT - Carregando dados do CSV
CSV carregado com encoding UTF-8

Shape inicial: 279,182 linhas × 22 colunas

Primeiras linhas:


Unnamed: 0,year,month,carrier,carrier_name,airport,airport_name,arr_flights,arr_del15,carrier_ct,weather_ct,nas_ct,security_ct,late_aircraft_ct,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,Unnamed: 21
0,2004,1,DL,Delta Air Lines Inc.,PBI,"West Palm Beach/Palm Beach, FL: Palm Beach Int...",650.0,126.0,21.06,6.44,51.58,1.0,45.92,4.0,0.0,5425.0,881.0,397.0,2016.0,15.0,2116.0,
1,2004,1,DL,Delta Air Lines Inc.,PDX,"Portland, OR: Portland International",314.0,61.0,14.09,2.61,34.25,0.0,10.05,30.0,3.0,2801.0,478.0,239.0,1365.0,0.0,719.0,
2,2004,1,DL,Delta Air Lines Inc.,PHL,"Philadelphia, PA: Philadelphia International",513.0,97.0,27.6,0.42,51.86,0.0,17.12,15.0,0.0,4261.0,1150.0,16.0,2286.0,0.0,809.0,
3,2004,1,DL,Delta Air Lines Inc.,PHX,"Phoenix, AZ: Phoenix Sky Harbor International",334.0,78.0,20.14,2.02,39.39,0.0,16.45,3.0,1.0,3400.0,1159.0,166.0,1295.0,0.0,780.0,
4,2004,1,DL,Delta Air Lines Inc.,PIT,"Pittsburgh, PA: Pittsburgh International",217.0,47.0,8.08,0.44,21.89,0.0,16.59,4.0,1.0,1737.0,350.0,28.0,522.0,0.0,837.0,



Informações do DataFrame:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 279182 entries, 0 to 279181
Data columns (total 22 columns):
 #   Column               Non-Null Count   Dtype  
---  ------               --------------   -----  
 0   year                 279182 non-null  int64  
 1    month               279182 non-null  int64  
 2   carrier              279182 non-null  object 
 3   carrier_name         279182 non-null  object 
 4   airport              279182 non-null  object 
 5   airport_name         279182 non-null  object 
 6   arr_flights          278806 non-null  float64
 7   arr_del15            278744 non-null  float64
 8   carrier_ct           278806 non-null  float64
 9    weather_ct          278806 non-null  float64
 10  nas_ct               278806 non-null  float64
 11  security_ct          278806 non-null  float64
 12  late_aircraft_ct     278806 non-null  float64
 13  arr_cancelled        278806 non-null  float64
 14  arr_diverted         278806 non-null  flo

## 2. Transform - Limpeza e Padronização

### O que fazemos aqui?

Aplicamos uma série de transformações para limpar e padronizar os dados:
1. Padronização de nomes de colunas
2. Remoção de colunas 100% nulas
3. Remoção de duplicados
4. Conversão de tipos de dados
5. Tratamento de valores nulos
6. Validação de integridade (valores negativos)
7. Detecção de outliers
8. Normalização de campos categóricos

### Por que é importante?

- **Padronização de colunas**: Facilita consultas SQL e evita erros por espaços/caracteres especiais
- **Remoção de colunas vazias**: Reduz ruído e economiza espaço
- **Tipos corretos**: Permite operações matemáticas e agregações eficientes
- **Tratamento de nulos**: Evita erros em cálculos e mantém consistência
- **Flags de outliers**: Permite análises futuras sem perder dados


In [23]:
# Inicializar relatório de ETL
etl_report = {
    'linhas_inicial': len(df_raw),
    'colunas_inicial': len(df_raw.columns),
    'colunas_removidas': [],
    'duplicados_removidos': 0,
    'colunas_nulos_preenchidos': {},
    'valores_negativos_corrigidos': {},
    'outliers_detectados': 0
}

df = df_raw.copy()
print("=" * 80)
print("ETAPA 2: TRANSFORM - Iniciando transformações")
print("=" * 80)


ETAPA 2: TRANSFORM - Iniciando transformações


In [24]:
def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Padroniza nomes de colunas: strip, lowercase, espaços por _, remove caracteres especiais.
    
    Args:
        df: DataFrame original
        
    Returns:
        DataFrame com colunas padronizadas
    """
    df = df.copy()
    new_columns = {}
    
    for col in df.columns:
        # Strip, lowercase, substituir espaços por underscore
        new_col = col.strip().lower().replace(' ', '_')
        # Remover caracteres especiais (manter apenas letras, números e underscore)
        new_col = ''.join(c if c.isalnum() or c == '_' else '' for c in new_col)
        # Remover underscores múltiplos
        while '__' in new_col:
            new_col = new_col.replace('__', '_')
        # Remover underscore no início/fim
        new_col = new_col.strip('_')
        new_columns[col] = new_col
    
    df.rename(columns=new_columns, inplace=True)
    print(f"Colunas padronizadas: {len(new_columns)} colunas")
    return df

df = standardize_columns(df)
print(f"\nColunas após padronização:")
print(df.columns.tolist())


Colunas padronizadas: 22 colunas

Colunas após padronização:
['year', 'month', 'carrier', 'carrier_name', 'airport', 'airport_name', 'arr_flights', 'arr_del15', 'carrier_ct', 'weather_ct', 'nas_ct', 'security_ct', 'late_aircraft_ct', 'arr_cancelled', 'arr_diverted', 'arr_delay', 'carrier_delay', 'weather_delay', 'nas_delay', 'security_delay', 'late_aircraft_delay', 'unnamed_21']


In [25]:
# Remover colunas 100% nulas
print("\n" + "=" * 80)
print("2.1 Removendo colunas 100% nulas")
print("=" * 80)

colunas_antes = set(df.columns)
null_counts = df.isnull().sum()
colunas_100_nulas = null_counts[null_counts == len(df)].index.tolist()

if colunas_100_nulas:
    df = df.drop(columns=colunas_100_nulas)
    etl_report['colunas_removidas'].extend(colunas_100_nulas)
    print(f"Removidas {len(colunas_100_nulas)} colunas 100% nulas: {colunas_100_nulas}")
else:
    print("Nenhuma coluna 100% nula encontrada")

print(f"Colunas restantes: {len(df.columns)}")



2.1 Removendo colunas 100% nulas
Removidas 1 colunas 100% nulas: ['unnamed_21']
Colunas restantes: 21


In [26]:
# Remover duplicados
print("\n" + "=" * 80)
print("2.2 Removendo registros duplicados")
print("=" * 80)

linhas_antes = len(df)
df = df.drop_duplicates()
linhas_depois = len(df)
duplicados = linhas_antes - linhas_depois

etl_report['duplicados_removidos'] = duplicados
print(f"Duplicados removidos: {duplicados:,} registros")
print(f"Linhas restantes: {linhas_depois:,}")



2.2 Removendo registros duplicados
Duplicados removidos: 0 registros
Linhas restantes: 279,182


In [27]:
def cast_types(df: pd.DataFrame) -> pd.DataFrame:
    """
    Converte tipos de dados conforme regras de negócio.
    
    Args:
        df: DataFrame a ser convertido
        
    Returns:
        DataFrame com tipos convertidos
    """
    df = df.copy()
    
    # Converter year e month para Int64
    if 'year' in df.columns:
        df['year'] = pd.to_numeric(df['year'], errors='coerce').astype('Int64')
        print("year convertido para Int64")
    
    if 'month' in df.columns:
        df['month'] = pd.to_numeric(df['month'], errors='coerce').astype('Int64')
        print("month convertido para Int64")
    
    # Colunas de contagem (terminadas com _ct, _flights, _cancelled, _diverted, _del15)
    count_patterns = ['_ct', '_flights', '_cancelled', '_diverted', '_del15']
    count_cols = [col for col in df.columns 
                  if any(col.endswith(pattern) for pattern in count_patterns)]
    
    for col in count_cols:
        if col in df.columns:
            # Converter para float primeiro, depois arredondar e converter para Int64
            df[col] = pd.to_numeric(df[col], errors='coerce')
            # Arredondar valores decimais para inteiros (mantém NaN)
            df[col] = df[col].round()
            # Converter para Int64 nullable (suporta NaN)
            df[col] = df[col].astype('Int64')
            print(f"{col} convertido para Int64")
    
    # Colunas de delay (terminadas com _delay, arr_delay)
    delay_patterns = ['_delay', 'arr_delay']
    delay_cols = [col for col in df.columns 
                  if any(col.endswith(pattern) for pattern in delay_patterns) 
                  and col not in count_cols]
    
    for col in delay_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce').astype('Float64')
            print(f"{col} convertido para Float64")
    
    return df

print("\n" + "=" * 80)
print("2.3 Convertendo tipos de dados")
print("=" * 80)
df = cast_types(df)



2.3 Convertendo tipos de dados
year convertido para Int64
month convertido para Int64
arr_flights convertido para Int64
arr_del15 convertido para Int64
carrier_ct convertido para Int64
weather_ct convertido para Int64
nas_ct convertido para Int64
security_ct convertido para Int64
late_aircraft_ct convertido para Int64
arr_cancelled convertido para Int64
arr_diverted convertido para Int64
arr_delay convertido para Float64
carrier_delay convertido para Float64
weather_delay convertido para Float64
nas_delay convertido para Float64
security_delay convertido para Float64
late_aircraft_delay convertido para Float64


In [28]:
def clean_nulls(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
    """
    Trata valores nulos: preenche com 0 em colunas numéricas de delay/contagem quando faz sentido.
    
    Args:
        df: DataFrame a ser limpo
        
    Returns:
        Tupla (DataFrame limpo, dicionário com colunas preenchidas)
    """
    df = df.copy()
    colunas_preenchidas = {}
    
    # Colunas numéricas de delay e contagem
    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    
    # Colunas de delay e contagem que faz sentido preencher com 0
    fill_zero_cols = [col for col in numeric_cols 
                      if any(pattern in col for pattern in ['_delay', '_ct', '_flights', '_cancelled', '_diverted', '_del15'])]
    
    for col in fill_zero_cols:
        null_count = df[col].isnull().sum()
        if null_count > 0:
            df[col] = df[col].fillna(0)
            colunas_preenchidas[col] = null_count
            print(f"{col}: {null_count:,} nulos preenchidos com 0")
    
    return df, colunas_preenchidas

print("\n" + "=" * 80)
print("2.4 Tratando valores nulos")
print("=" * 80)
df, nulos_preenchidos = clean_nulls(df)
etl_report['colunas_nulos_preenchidos'] = nulos_preenchidos



2.4 Tratando valores nulos
arr_flights: 376 nulos preenchidos com 0
arr_del15: 438 nulos preenchidos com 0
carrier_ct: 376 nulos preenchidos com 0
weather_ct: 376 nulos preenchidos com 0
nas_ct: 376 nulos preenchidos com 0
security_ct: 376 nulos preenchidos com 0
late_aircraft_ct: 376 nulos preenchidos com 0
arr_cancelled: 376 nulos preenchidos com 0
arr_diverted: 376 nulos preenchidos com 0
arr_delay: 376 nulos preenchidos com 0
carrier_delay: 376 nulos preenchidos com 0
weather_delay: 376 nulos preenchidos com 0
nas_delay: 376 nulos preenchidos com 0
security_delay: 376 nulos preenchidos com 0
late_aircraft_delay: 376 nulos preenchidos com 0


In [29]:
def validate_integrity(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
    """
    Valida integridade: não permite valores negativos em contagens e delays.
    Transforma negativos em NaN e depois em 0.
    
    Args:
        df: DataFrame a ser validado
        
    Returns:
        Tupla (DataFrame validado, dicionário com correções)
    """
    df = df.copy()
    correcoes = {}
    
    # Colunas de contagem e delay
    count_delay_cols = [col for col in df.select_dtypes(include=[np.number]).columns 
                        if any(pattern in col for pattern in ['_ct', '_flights', '_cancelled', '_diverted', '_del15', '_delay'])]
    
    for col in count_delay_cols:
        negativos = (df[col] < 0).sum()
        if negativos > 0:
            # Transformar negativos em NaN e depois em 0
            df.loc[df[col] < 0, col] = np.nan
            df[col] = df[col].fillna(0)
            correcoes[col] = negativos
            print(f"{col}: {negativos:,} valores negativos corrigidos para 0")
    
    return df, correcoes

print("\n" + "=" * 80)
print("2.5 Validando integridade (valores negativos)")
print("=" * 80)
df, correcoes_negativos = validate_integrity(df)
etl_report['valores_negativos_corrigidos'] = correcoes_negativos



2.5 Validando integridade (valores negativos)
nas_delay: 2 valores negativos corrigidos para 0


In [30]:
def add_outlier_flags(df: pd.DataFrame, column: str = 'arr_delay') -> tuple[pd.DataFrame, int]:
    """
    Adiciona flag de outlier usando IQR (Interquartile Range) para uma coluna específica.
    
    Args:
        df: DataFrame
        column: Nome da coluna para detectar outliers
        
    Returns:
        Tupla (DataFrame com flag, número de outliers detectados)
    """
    df = df.copy()
    
    if column not in df.columns:
        print(f"Coluna {column} não encontrada, pulando detecção de outliers")
        return df, 0
    
    # Calcular IQR
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    
    # Limites
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    # Flag de outlier
    flag_col = f'is_outlier_{column}'
    df[flag_col] = (df[column] < lower_bound) | (df[column] > upper_bound)
    df[flag_col] = df[flag_col].astype('Int64')
    
    outliers_count = df[flag_col].sum()
    
    print(f"Flag {flag_col} criada")
    print(f"  - Q1: {Q1:.2f}, Q3: {Q3:.2f}, IQR: {IQR:.2f}")
    print(f"  - Limites: [{lower_bound:.2f}, {upper_bound:.2f}]")
    print(f"  - Outliers detectados: {outliers_count:,} ({outliers_count/len(df)*100:.2f}%)")
    
    return df, outliers_count

print("\n" + "=" * 80)
print("2.6 Adicionando flags de outliers")
print("=" * 80)
df, outliers_count = add_outlier_flags(df, 'arr_delay')
etl_report['outliers_detectados'] = outliers_count



2.6 Adicionando flags de outliers
Flag is_outlier_arr_delay criada
  - Q1: 504.00, Q3: 3257.00, IQR: 2753.00
  - Limites: [-3625.50, 7386.50]
  - Outliers detectados: 30,979 (11.10%)


In [31]:
def normalize_categoricals(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normaliza campos categóricos: strip() e uppercase em códigos.
    
    Args:
        df: DataFrame
        
    Returns:
        DataFrame com campos categóricos normalizados
    """
    df = df.copy()
    
    # Campos de identificação (códigos)
    code_cols = ['carrier', 'airport']
    
    for col in code_cols:
        if col in df.columns:
            # Converter para string, strip, uppercase
            df[col] = df[col].astype(str).str.strip().str.upper()
            print(f"{col} normalizado (uppercase + strip)")
    
    # Campos de texto (nomes)
    text_cols = ['carrier_name', 'airport_name']
    
    for col in text_cols:
        if col in df.columns:
            # Apenas strip
            df[col] = df[col].astype(str).str.strip()
            print(f"{col} normalizado (strip)")
    
    return df

print("\n" + "=" * 80)
print("2.7 Normalizando campos categóricos")
print("=" * 80)
df = normalize_categoricals(df)



2.7 Normalizando campos categóricos
carrier normalizado (uppercase + strip)
airport normalizado (uppercase + strip)
carrier_name normalizado (strip)
airport_name normalizado (strip)


In [32]:
# Atualizar relatório final
etl_report['linhas_final'] = len(df)
etl_report['colunas_final'] = len(df.columns)

print("\n" + "=" * 80)
print("RESUMO DAS TRANSFORMAÇÕES")
print("=" * 80)
print(f"Linhas: {etl_report['linhas_inicial']:,} → {etl_report['linhas_final']:,}")
print(f"Colunas: {etl_report['colunas_inicial']} → {etl_report['colunas_final']}")
print(f"Duplicados removidos: {etl_report['duplicados_removidos']:,}")
print(f"Colunas removidas: {len(etl_report['colunas_removidas'])}")
print(f"Outliers detectados: {etl_report['outliers_detectados']:,}")

print("\nPrimeiras linhas após transformação:")
display(df.head())

print("\nInformações do DataFrame transformado:")
df.info()



RESUMO DAS TRANSFORMAÇÕES
Linhas: 279,182 → 279,182
Colunas: 22 → 22
Duplicados removidos: 0
Colunas removidas: 1
Outliers detectados: 30,979

Primeiras linhas após transformação:


Unnamed: 0,year,month,carrier,carrier_name,airport,airport_name,arr_flights,arr_del15,carrier_ct,weather_ct,nas_ct,security_ct,late_aircraft_ct,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,is_outlier_arr_delay
0,2004,1,DL,Delta Air Lines Inc.,PBI,"West Palm Beach/Palm Beach, FL: Palm Beach Int...",650,126,21,6,52,1,46,4,0,5425.0,881.0,397.0,2016.0,15.0,2116.0,0
1,2004,1,DL,Delta Air Lines Inc.,PDX,"Portland, OR: Portland International",314,61,14,3,34,0,10,30,3,2801.0,478.0,239.0,1365.0,0.0,719.0,0
2,2004,1,DL,Delta Air Lines Inc.,PHL,"Philadelphia, PA: Philadelphia International",513,97,28,0,52,0,17,15,0,4261.0,1150.0,16.0,2286.0,0.0,809.0,0
3,2004,1,DL,Delta Air Lines Inc.,PHX,"Phoenix, AZ: Phoenix Sky Harbor International",334,78,20,2,39,0,16,3,1,3400.0,1159.0,166.0,1295.0,0.0,780.0,0
4,2004,1,DL,Delta Air Lines Inc.,PIT,"Pittsburgh, PA: Pittsburgh International",217,47,8,0,22,0,17,4,1,1737.0,350.0,28.0,522.0,0.0,837.0,0



Informações do DataFrame transformado:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 279182 entries, 0 to 279181
Data columns (total 22 columns):
 #   Column                Non-Null Count   Dtype  
---  ------                --------------   -----  
 0   year                  279182 non-null  Int64  
 1   month                 279182 non-null  Int64  
 2   carrier               279182 non-null  object 
 3   carrier_name          279182 non-null  object 
 4   airport               279182 non-null  object 
 5   airport_name          279182 non-null  object 
 6   arr_flights           279182 non-null  Int64  
 7   arr_del15             279182 non-null  Int64  
 8   carrier_ct            279182 non-null  Int64  
 9   weather_ct            279182 non-null  Int64  
 10  nas_ct                279182 non-null  Int64  
 11  security_ct           279182 non-null  Int64  
 12  late_aircraft_ct      279182 non-null  Int64  
 13  arr_cancelled         279182 non-null  Int64  
 14  arr_diverted

In [33]:
# ==========================================================
# 2.8 Engenharia de Dados: Criação de Período Unificado
# ==========================================================

print("\n" + "=" * 80)
print("2.8 Criando coluna flight_date (mantendo year e month)")
print("=" * 80)

df['flight_date'] = pd.to_datetime(
    df['year'].astype(str) + '-' + df['month'].astype(str).str.zfill(2) + '-01'
)

print("Coluna 'flight_date' criada com sucesso!")
print(f"Total de colunas agora: {len(df.columns)}")
display(df[['year', 'month', 'flight_date']].head())


2.8 Criando coluna flight_date (mantendo year e month)
Coluna 'flight_date' criada com sucesso!
Total de colunas agora: 23


Unnamed: 0,year,month,flight_date
0,2004,1,2004-01-01
1,2004,1,2004-01-01
2,2004,1,2004-01-01
3,2004,1,2004-01-01
4,2004,1,2004-01-01


## 3. Load - Carregar no PostgreSQL

### O que fazemos aqui?

1. Conectamos ao PostgreSQL usando SQLAlchemy
2. Criamos o schema `silver` se não existir
3. Criamos a tabela `silver.silver_airline_on_time` se não existir
4. Carregamos os dados transformados na tabela
5. Criamos índices para melhorar performance de consultas

### Por que é importante?

- **Conexão padronizada**: SQLAlchemy facilita migração entre SGBDs
- **Schema organizado**: Separação lógica dos dados por camada
- **CREATE TABLE IF NOT EXISTS**: Idempotência (pode rodar múltiplas vezes)
- **if_exists='replace'**: Facilita reprocessamento durante desenvolvimento
- **Índices**: Melhoram performance de consultas por year, month, carrier, airport


In [34]:

POSTGRES_DB = os.getenv('POSTGRES_DB', 'airline_delay_causes')
POSTGRES_USER = os.getenv('POSTGRES_USER', 'postgres')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'postgres')
POSTGRES_HOST = 'localhost'
POSTGRES_PORT = 5432

connection_string = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"

print("=" * 80)
print("ETAPA 3: LOAD - Conectando ao PostgreSQL")
print("=" * 80)
print(f"Host: {POSTGRES_HOST}:{POSTGRES_PORT}")
print(f"Database: {POSTGRES_DB}")
print(f"User: {POSTGRES_USER}")

try:
    engine = create_engine(connection_string, echo=False)
    with engine.connect() as conn:
        result = conn.execute(text("SELECT version();"))
        version = result.fetchone()[0]
        print(f"Conectado ao PostgreSQL: {version.split(',')[0]}")
        
        conn.execute(text("CREATE SCHEMA IF NOT EXISTS silver;"))
        conn.commit()
        print("Schema 'silver' criado/verificado")
except Exception as e:
    print(f"Erro ao conectar: {e}")
    raise


ETAPA 3: LOAD - Conectando ao PostgreSQL
Host: localhost:5432
Database: airline_delay_causes
User: postgres
Conectado ao PostgreSQL: PostgreSQL 15.15 on x86_64-pc-linux-musl
Schema 'silver' criado/verificado


In [35]:
def generate_ddl(df: pd.DataFrame, schema: str = 'silver', table_name: str = 'silver_airline_on_time') -> str:
    """
    Gera o DDL SQL completo para criar a tabela baseado no DataFrame.
    Inclui schema, drop table, primary key composta, comentários e constraints.
    """
    full_table_name = f"{schema}.{table_name}"
    ddl_parts = []
    
    ddl_parts.append(f"CREATE SCHEMA IF NOT EXISTS {schema};")
    ddl_parts.append("")
    ddl_parts.append(f"DROP TABLE IF EXISTS {full_table_name};")
    ddl_parts.append("")
    ddl_parts.append(f"CREATE TABLE {full_table_name} (")
    
    column_definitions = []
    column_comments = []
    not_null_cols = {'year', 'month', 'carrier', 'airport', 'arr_flights', 
                     'arr_del15', 'arr_delay', 'is_outlier_arr_delay'}
    
    for col in df.columns:
        dtype = df[col].dtype
        sql_type = None
        comment = None
        
        if col == 'year':
            sql_type = "INTEGER NOT NULL"
            comment = "Ano do registro."
        elif col == 'month':
            sql_type = "INTEGER NOT NULL"
            comment = "Mês do registro (1-12)."
        elif col == 'carrier':
            sql_type = "CHAR(2) NOT NULL"
            comment = "Código IATA da companhia aérea."
        elif col == 'airport':
            sql_type = "CHAR(3) NOT NULL"
            comment = "Código IATA do aeroporto."
        elif col == 'carrier_name':
            sql_type = "VARCHAR(255)"
            comment = "Nome completo da companhia aérea."
        elif col == 'airport_name':
            sql_type = "VARCHAR(255)"
            comment = "Nome completo do aeroporto."
        elif col == 'arr_flights':
            sql_type = "INTEGER NOT NULL"
            comment = "Número total de voos de chegada."
        elif col == 'arr_del15':
            sql_type = "INTEGER NOT NULL"
            comment = "Número de voos com atraso >= 15 minutos."
        elif col == 'carrier_ct':
            sql_type = "INTEGER"
            comment = "Contagem de atrasos causados pela companhia aérea."
        elif col == 'weather_ct':
            sql_type = "INTEGER"
            comment = "Contagem de atrasos causados por condições climáticas."
        elif col == 'nas_ct':
            sql_type = "INTEGER"
            comment = "Contagem de atrasos causados pelo Sistema Nacional de Espaço Aéreo (NAS)."
        elif col == 'security_ct':
            sql_type = "INTEGER"
            comment = "Contagem de atrasos causados por segurança."
        elif col == 'late_aircraft_ct':
            sql_type = "INTEGER"
            comment = "Contagem de atrasos causados por aeronave atrasada."
        elif col == 'arr_cancelled':
            sql_type = "INTEGER"
            comment = "Número de voos cancelados."
        elif col == 'arr_diverted':
            sql_type = "INTEGER"
            comment = "Número de voos desviados."
        elif col == 'arr_delay':
            sql_type = "NUMERIC(10,2) NOT NULL"
            comment = "Total de minutos de atraso na chegada."
        elif col == 'carrier_delay':
            sql_type = "NUMERIC(10,2)"
            comment = "Minutos de atraso causados pela companhia aérea."
        elif col == 'weather_delay':
            sql_type = "NUMERIC(10,2)"
            comment = "Minutos de atraso causados por condições climáticas."
        elif col == 'nas_delay':
            sql_type = "NUMERIC(10,2)"
            comment = "Minutos de atraso causados pelo Sistema Nacional de Espaço Aéreo (NAS)."
        elif col == 'security_delay':
            sql_type = "NUMERIC(10,2)"
            comment = "Minutos de atraso causados por segurança."
        elif col == 'late_aircraft_delay':
            sql_type = "NUMERIC(10,2)"
            comment = "Minutos de atraso causados por aeronave atrasada."
        elif col == 'is_outlier_arr_delay':
            sql_type = "INTEGER NOT NULL"
            comment = "Flag indicando se arr_delay é outlier (1=sim, 0=não, calculado via IQR)."
        elif col == 'flight_date':
            sql_type = "DATE"
            comment = "Data do voo (primeiro dia do mês, derivado de year e month)."
        else:
            if pd.api.types.is_integer_dtype(dtype):
                sql_type = "INTEGER"
            elif pd.api.types.is_float_dtype(dtype):
                sql_type = "DOUBLE PRECISION"
            elif pd.api.types.is_datetime64_any_dtype(dtype):
                sql_type = "DATE"
            else:
                sql_type = "TEXT"
            comment = f"Coluna {col}."
        
        if col in not_null_cols and 'NOT NULL' not in sql_type:
            sql_type = sql_type.replace("INTEGER", "INTEGER NOT NULL")
            sql_type = sql_type.replace("NUMERIC(10,2)", "NUMERIC(10,2) NOT NULL")
        
        column_definitions.append(f"    {col} {sql_type}")
        
        if comment:
            column_comments.append(f"COMMENT ON COLUMN {full_table_name}.{col} IS '{comment}';")
    
    column_definitions.append("    CONSTRAINT pk_silver_airline_on_time PRIMARY KEY (year, month, carrier, airport)")
    
    ddl_parts.append(",\n".join(column_definitions))
    ddl_parts.append(");")
    ddl_parts.append("")
    ddl_parts.append(f"COMMENT ON TABLE {full_table_name} IS 'Camada SILVER: tabela única com dados de atrasos de voos limpos e enriquecidos.';")
    ddl_parts.append("")
    ddl_parts.extend(column_comments)
    
    return "\n".join(ddl_parts)

# Gerar DDL
ddl_sql = generate_ddl(df, 'silver', 'silver_airline_on_time')

print("=" * 80)
print("3.1 Gerando DDL")
print("=" * 80)
print(ddl_sql)

# Salvar DDL em arquivo
ddl_path = '../Data Layer/silver/ddl.sql'
os.makedirs(os.path.dirname(ddl_path), exist_ok=True)
with open(ddl_path, 'w', encoding='utf-8') as f:
    f.write(ddl_sql)
print(f"\nDDL salvo em: {ddl_path}")

3.1 Gerando DDL
CREATE SCHEMA IF NOT EXISTS silver;

DROP TABLE IF EXISTS silver.silver_airline_on_time;

CREATE TABLE silver.silver_airline_on_time (
    year INTEGER NOT NULL,
    month INTEGER NOT NULL,
    carrier CHAR(2) NOT NULL,
    carrier_name VARCHAR(255),
    airport CHAR(3) NOT NULL,
    airport_name VARCHAR(255),
    arr_flights INTEGER NOT NULL,
    arr_del15 INTEGER NOT NULL,
    carrier_ct INTEGER,
    weather_ct INTEGER,
    nas_ct INTEGER,
    security_ct INTEGER,
    late_aircraft_ct INTEGER,
    arr_cancelled INTEGER,
    arr_diverted INTEGER,
    arr_delay NUMERIC(10,2) NOT NULL,
    carrier_delay NUMERIC(10,2),
    weather_delay NUMERIC(10,2),
    nas_delay NUMERIC(10,2),
    security_delay NUMERIC(10,2),
    late_aircraft_delay NUMERIC(10,2),
    is_outlier_arr_delay INTEGER NOT NULL,
    flight_date DATE,
    CONSTRAINT pk_silver_airline_on_time PRIMARY KEY (year, month, carrier, airport)
);

COMMENT ON TABLE silver.silver_airline_on_time IS 'Camada SILVER: tabe

In [36]:
def load_to_postgres(df: pd.DataFrame, engine, schema: str = 'silver', table_name: str = 'silver_airline_on_time'):
    """
    Carrega DataFrame no PostgreSQL.
    
    Args:
        df: DataFrame a ser carregado
        engine: SQLAlchemy engine
        schema: Nome do schema
        table_name: Nome da tabela
    """
    print("\n" + "=" * 80)
    print("3.2 Criando tabela e carregando dados")
    print("=" * 80)
    
    with engine.connect() as conn:
        statements = [s.strip() for s in ddl_sql.split(';') if s.strip()]
        for stmt in statements:
            if stmt:
                conn.execute(text(stmt))
        conn.commit()
        print(f"Tabela {schema}.{table_name} criada/verificada")

    print(f"Carregando {len(df):,} linhas...")

    df_clean = df.replace({np.nan: None})
    
    df_clean.to_sql(
        name=table_name,
        con=engine,
        schema=schema,
        if_exists='replace',
        index=False,
        chunksize=5000,
        method='multi'
    )
    
    print("Dados carregados com sucesso!")

load_to_postgres(df, engine, 'silver', 'silver_airline_on_time')



3.2 Criando tabela e carregando dados
Tabela silver.silver_airline_on_time criada/verificada
Carregando 279,182 linhas...
Dados carregados com sucesso!


In [37]:
# Criar índices
print("\n" + "=" * 80)
print("3.3 Criando índices")
print("=" * 80)

index_columns = []
if 'year' in df.columns:
    index_columns.append('year')
if 'month' in df.columns:
    index_columns.append('month')
if 'carrier' in df.columns:
    index_columns.append('carrier')
if 'airport' in df.columns:
    index_columns.append('airport')
if 'flight_date' in df.columns:
    index_columns.append('flight_date')

with engine.connect() as conn:
    for col in index_columns:
        index_name = f"idx_{col}"
        try:
            conn.execute(text(f"CREATE INDEX IF NOT EXISTS {index_name} ON silver.silver_airline_on_time ({col});"))
            conn.commit()
            print(f"Indice criado: {index_name} em {col}")
        except Exception as e:
            print(f"Erro ao criar indice {index_name}: {e}")



3.3 Criando índices
Indice criado: idx_year em year
Indice criado: idx_month em month
Indice criado: idx_carrier em carrier
Indice criado: idx_airport em airport
Indice criado: idx_flight_date em flight_date


In [38]:
# Verificações pós-carga
print("\n" + "=" * 80)
print("3.4 Verificações pós-carga")
print("=" * 80)

with engine.connect() as conn:
    # COUNT(*)
    result = conn.execute(text("SELECT COUNT(*) FROM silver.silver_airline_on_time;"))
    count = result.fetchone()[0]
    print(f"Total de registros na tabela: {count:,}")
    
    # Primeiras 5 linhas
    print(f"\nPrimeiras 5 linhas:")
    result = conn.execute(text("SELECT * FROM silver.silver_airline_on_time LIMIT 5;"))
    df_sample = pd.DataFrame(result.fetchall(), columns=result.keys())
    display(df_sample)
    
    # Agregação por year e month (se existirem)
    if 'year' in df.columns and 'month' in df.columns:
        print(f"\nAgregacao por year e month (primeiros 12):")
        result = conn.execute(text("""
            SELECT year, month, COUNT(*) as total
            FROM silver.silver_airline_on_time
            GROUP BY year, month
            ORDER BY year, month
            LIMIT 12;
        """))
        df_agg = pd.DataFrame(result.fetchall(), columns=result.keys())
        display(df_agg)



3.4 Verificações pós-carga
Total de registros na tabela: 279,182

Primeiras 5 linhas:


Unnamed: 0,year,month,carrier,carrier_name,airport,airport_name,arr_flights,arr_del15,carrier_ct,weather_ct,nas_ct,security_ct,late_aircraft_ct,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,is_outlier_arr_delay,flight_date
0,2004,1,DL,Delta Air Lines Inc.,PBI,"West Palm Beach/Palm Beach, FL: Palm Beach Int...",650,126,21,6,52,1,46,4,0,5425.0,881.0,397.0,2016.0,15.0,2116.0,0,2004-01-01
1,2004,1,DL,Delta Air Lines Inc.,PDX,"Portland, OR: Portland International",314,61,14,3,34,0,10,30,3,2801.0,478.0,239.0,1365.0,0.0,719.0,0,2004-01-01
2,2004,1,DL,Delta Air Lines Inc.,PHL,"Philadelphia, PA: Philadelphia International",513,97,28,0,52,0,17,15,0,4261.0,1150.0,16.0,2286.0,0.0,809.0,0,2004-01-01
3,2004,1,DL,Delta Air Lines Inc.,PHX,"Phoenix, AZ: Phoenix Sky Harbor International",334,78,20,2,39,0,16,3,1,3400.0,1159.0,166.0,1295.0,0.0,780.0,0,2004-01-01
4,2004,1,DL,Delta Air Lines Inc.,PIT,"Pittsburgh, PA: Pittsburgh International",217,47,8,0,22,0,17,4,1,1737.0,350.0,28.0,522.0,0.0,837.0,0,2004-01-01



Agregacao por year e month (primeiros 12):


Unnamed: 0,year,month,total
0,2003,6,1248
1,2003,7,1249
2,2003,8,1247
3,2003,9,1248
4,2003,10,1246
5,2003,11,1253
6,2003,12,1272
7,2004,1,1393
8,2004,2,1380
9,2004,3,1378


## 4. Validação Pós-Carga

### O que fazemos aqui?

Validamos que os dados foram carregados corretamente comparando:
- Número de linhas no DataFrame vs PostgreSQL
- Colunas no DataFrame vs colunas na tabela
- Nulos por coluna
- Min/max de métricas numéricas importantes

### Por que é importante?

- Garantir integridade dos dados após o load
- Detectar problemas de conversão de tipos
- Validar que não houve perda de dados
- Documentar qualidade dos dados na Silver


In [39]:
def validate_load(df: pd.DataFrame, engine, schema: str = 'silver', table_name: str = 'silver_airline_on_time'):
    """
    Valida o carregamento comparando DataFrame e tabela PostgreSQL.
    
    Args:
        df: DataFrame original
        engine: SQLAlchemy engine
        schema: Nome do schema
        table_name: Nome da tabela
    """
    print("=" * 80)
    print("ETAPA 4: VALIDACAO POS-CARGA")
    print("=" * 80)
    
    full_table_name = f"{schema}.{table_name}"
    
    # 1. Comparar número de linhas
    print("\n1. Comparacao de linhas:")
    df_rows = len(df)
    with engine.connect() as conn:
        result = conn.execute(text(f"SELECT COUNT(*) FROM {full_table_name};"))
        pg_rows = result.fetchone()[0]
    
    print(f"   DataFrame: {df_rows:,} linhas")
    print(f"   PostgreSQL: {pg_rows:,} linhas")
    if df_rows == pg_rows:
        print("   Numero de linhas coincide!")
    else:
        print(f"   Diferenca de {abs(df_rows - pg_rows):,} linhas")
    
    # 2. Comparar colunas
    print("\n2. Comparacao de colunas:")
    df_cols = set(df.columns)
    with engine.connect() as conn:
        inspector = inspect(engine)
        pg_cols = set([col['name'] for col in inspector.get_columns(table_name, schema=schema)])
    
    print(f"   DataFrame: {len(df_cols)} colunas")
    print(f"   PostgreSQL: {len(pg_cols)} colunas")
    
    missing_in_pg = df_cols - pg_cols
    extra_in_pg = pg_cols - df_cols
    
    if not missing_in_pg and not extra_in_pg:
        print("   Todas as colunas coincidem!")
    else:
        if missing_in_pg:
            print(f"   Colunas no DataFrame mas nao no PG: {missing_in_pg}")
        if extra_in_pg:
            print(f"   Colunas no PG mas nao no DataFrame: {extra_in_pg}")
    
    # 3. Nulos por coluna (amostra)
    print("\n3. Nulos por coluna (top 10 com mais nulos):")
    null_counts = df.isnull().sum().sort_values(ascending=False)
    null_counts = null_counts[null_counts > 0].head(10)
    if len(null_counts) > 0:
        for col, count in null_counts.items():
            pct = count / len(df) * 100
            print(f"   {col}: {count:,} ({pct:.2f}%)")
    else:
        print("   Nenhum nulo encontrado nas principais colunas")
    
    # 4. Min/Max de métricas numéricas
    print("\n4. Min/Max de metricas numericas:")
    metric_cols = [col for col in df.select_dtypes(include=[np.number]).columns 
                   if any(pattern in col for pattern in ['_delay', '_flights', '_ct'])]
    
    for col in metric_cols[:5]:  # Apenas primeiras 5
        if col in df.columns:
            min_val = df[col].min()
            max_val = df[col].max()
            print(f"   {col}: min={min_val:,.2f}, max={max_val:,.2f}")

validate_load(df, engine, 'silver', 'silver_airline_on_time')


ETAPA 4: VALIDACAO POS-CARGA

1. Comparacao de linhas:
   DataFrame: 279,182 linhas
   PostgreSQL: 279,182 linhas
   Numero de linhas coincide!

2. Comparacao de colunas:
   DataFrame: 23 colunas
   PostgreSQL: 23 colunas
   Todas as colunas coincidem!

3. Nulos por coluna (top 10 com mais nulos):
   Nenhum nulo encontrado nas principais colunas

4. Min/Max de metricas numericas:
   arr_flights: min=0.00, max=21,977.00
   carrier_ct: min=0.00, max=1,792.00
   weather_ct: min=0.00, max=718.00
   nas_ct: min=0.00, max=4,091.00
   security_ct: min=0.00, max=81.00


## 5. Relatório Final do ETL

### O que foi transformado?

Este relatório resume todas as transformações aplicadas durante o processo ETL, permitindo rastreabilidade e auditoria.

### Por que é importante?

- **Rastreabilidade**: Documenta exatamente o que foi feito
- **Auditoria**: Permite verificar qualidade e completude
- **Debugging**: Facilita identificar problemas no pipeline
- **Melhoria contínua**: Base para otimizações futuras


In [40]:
print("=" * 80)
print("RELATÓRIO FINAL DO ETL")
print("=" * 80)

# Criar DataFrame de resumo
resumo_data = {
    'Métrica': [
        'Linhas Inicial (RAW)',
        'Linhas Final (SILVER)',
        'Linhas Removidas (Duplicados)',
        'Colunas Inicial',
        'Colunas Final',
        'Colunas Removidas (100% Nulas)',
        'Outliers Detectados (arr_delay)',
        'Colunas com Nulos Preenchidos',
        'Colunas com Valores Negativos Corrigidos'
    ],
    'Valor': [
        f"{etl_report['linhas_inicial']:,}",
        f"{etl_report['linhas_final']:,}",
        f"{etl_report['duplicados_removidos']:,}",
        etl_report['colunas_inicial'],
        etl_report['colunas_final'],
        len(etl_report['colunas_removidas']),
        f"{etl_report['outliers_detectados']:,}",
        len(etl_report['colunas_nulos_preenchidos']),
        len(etl_report['valores_negativos_corrigidos'])
    ]
}

df_resumo = pd.DataFrame(resumo_data)
display(df_resumo)

# Detalhes adicionais
if etl_report['colunas_removidas']:
    print(f"\nColunas removidas: {etl_report['colunas_removidas']}")

if etl_report['colunas_nulos_preenchidos']:
    print(f"\nColunas com nulos preenchidos:")
    for col, count in etl_report['colunas_nulos_preenchidos'].items():
        print(f"   - {col}: {count:,} nulos preenchidos com 0")

if etl_report['valores_negativos_corrigidos']:
    print(f"\nColunas com valores negativos corrigidos:")
    for col, count in etl_report['valores_negativos_corrigidos'].items():
        print(f"   - {col}: {count:,} valores negativos corrigidos para 0")

print("\n" + "=" * 80)
print("ETL RAW -> SILVER CONCLUIDO COM SUCESSO!")
print("=" * 80)


RELATÓRIO FINAL DO ETL


Unnamed: 0,Métrica,Valor
0,Linhas Inicial (RAW),279182
1,Linhas Final (SILVER),279182
2,Linhas Removidas (Duplicados),0
3,Colunas Inicial,22
4,Colunas Final,22
5,Colunas Removidas (100% Nulas),1
6,Outliers Detectados (arr_delay),30979
7,Colunas com Nulos Preenchidos,15
8,Colunas com Valores Negativos Corrigidos,1



Colunas removidas: ['unnamed_21']

Colunas com nulos preenchidos:
   - arr_flights: 376 nulos preenchidos com 0
   - arr_del15: 438 nulos preenchidos com 0
   - carrier_ct: 376 nulos preenchidos com 0
   - weather_ct: 376 nulos preenchidos com 0
   - nas_ct: 376 nulos preenchidos com 0
   - security_ct: 376 nulos preenchidos com 0
   - late_aircraft_ct: 376 nulos preenchidos com 0
   - arr_cancelled: 376 nulos preenchidos com 0
   - arr_diverted: 376 nulos preenchidos com 0
   - arr_delay: 376 nulos preenchidos com 0
   - carrier_delay: 376 nulos preenchidos com 0
   - weather_delay: 376 nulos preenchidos com 0
   - nas_delay: 376 nulos preenchidos com 0
   - security_delay: 376 nulos preenchidos com 0
   - late_aircraft_delay: 376 nulos preenchidos com 0

Colunas com valores negativos corrigidos:
   - nas_delay: 2 valores negativos corrigidos para 0

ETL RAW -> SILVER CONCLUIDO COM SUCESSO!
