# Processamento de Dados: Camada Raw para Silver
**Disciplina:** Sistemas de Banco de Dados 2  
**Semestre:** 2025/2  
**Professor:** Thiago Luiz de Souza Gomes  
**Grupo 15**

**Integrantes:**
* Caio Ferreira Duarte (231026901)
* Laryssa Felix Ribeiro Lopes (231026840)
* Luísa de Souza Ferreira (232014807)
* Henrique Fontenelle Galvão Passos (231030771)
* Marjorie Mitzi Cavalcante Rodrigues (231039140)

---

## 1. Contextualização e Objetivos
Este notebook documenta a etapa de ETL (Extração, Transformação e Carga) do projeto. Na etapa anterior ("Analytics"), realizamos a exploração dos dados brutos de acidentes aéreos e identificamos diversas inconsistências, como problemas de codificação de texto, colunas com tipagem mista e valores nulos em métricas essenciais.

O objetivo deste script é ler o arquivo CSV original, aplicar as regras de limpeza definidas no Dicionário de Dados e persistir o resultado na tabela `silver.aviao` no PostgreSQL. Esta tabela servirá como fonte confiável para a construção do modelo dimensional (Star Schema) na próxima etapa.

In [293]:
import pandas as pd
import numpy as np
import os
import warnings
from sqlalchemy import create_engine, text

# Ignoramos avisos de depreciação futura do Pandas para manter a saída do console limpa
warnings.simplefilter(action='ignore', category=FutureWarning)

# Configuração para visualizar todas as colunas no output do Jupyter
pd.set_option('display.max_columns', None)

# Definição dos caminhos relativos dos arquivos
# Optamos por usar caminhos relativos para que o código funcione em qualquer máquina do grupo
ARQUIVO_RAW = '../Data_Layer/raw/dados_brutos.csv'
ARQUIVO_DDL = '../Data_Layer/silver/ddl.sql'

# String de conexão com o banco de dados (Container Docker)
# Utiliza o driver psycopg2 via SQLAlchemy
DB_URI = "postgresql://admin:admin@localhost:5432/db_aviao"

print("Bibliotecas importadas e variáveis de ambiente configuradas.")

Bibliotecas importadas e variáveis de ambiente configuradas.


## 2. Extração dos Dados (Camada Bronze)

Durante a análise exploratória, notamos que o arquivo `dados_brutos.csv` não utiliza a codificação padrão UTF-8. Ao tentarmos abrir o arquivo convencionalmente, caracteres especiais eram corrompidos. Identificamos que a codificação correta é `cp1252` (Windows-1252).

Abaixo, realizamos a leitura forçando esse encoding.

In [294]:
# Verificação de existência do arquivo antes da leitura
if not os.path.exists(ARQUIVO_RAW):
    print(f"[ERRO] O arquivo não foi encontrado no caminho: {ARQUIVO_RAW}")
else:
    print(f"Lendo arquivo: {ARQUIVO_RAW}")

# Leitura do CSV
try:
    # low_memory=False é utilizado pois o Pandas identificou tipos mistos em algumas colunas
    # durante a inferência inicial, o que consome mais memória mas garante a leitura correta.
    df = pd.read_csv(ARQUIVO_RAW, encoding='cp1252', low_memory=False)
    
    print("Leitura concluída.")
    print(f"Total de registros carregados: {df.shape[0]}")
    
except Exception as e:
    print(f"Falha na leitura do CSV. Detalhes: {e}")

Lendo arquivo: ../Data_Layer/raw/dados_brutos.csv
Leitura concluída.
Total de registros carregados: 88889


## 3. Padronização de Schema

Para adequar os dados às boas práticas de banco de dados e ao padrão definido no nosso Dicionário de Dados, renomeamos as colunas originais (que usam pontos e CamelCase) para o padrão `snake_case` (minúsculas separadas por underline). Colunas irrelevantes para as perguntas de negócio do projeto foram descartadas nesta etapa.

In [295]:
# Dicionário de mapeamento (De: Nome Original -> Para: Nome no Banco)
mapa_colunas = {
    'Event.Id': 'event_id',
    'Investigation.Type': 'investigation_type',
    'Accident.Number': 'accident_number',
    'Event.Date': 'event_date',
    'Location': 'location',
    'Country': 'country',
    'Latitude': 'latitude',
    'Longitude': 'longitude',
    'Airport.Code': 'airport_code',
    'Airport.Name': 'airport_name',
    'Injury.Severity': 'injury_severity',
    'Aircraft.damage': 'aircraft_damage',
    'Aircraft.Category': 'aircraft_category',
    'Registration.Number': 'registration_number',
    'Make': 'make',
    'Model': 'model',
    'Amateur.Built': 'amateur_built',
    'Number.of.Engines': 'number_of_engines',
    'Engine.Type': 'engine_type',
    'FAR.Description': 'far_description',
    'Schedule': 'schedule',
    'Purpose.of.flight': 'purpose_of_flight',
    'Air.carrier': 'air_carrier',
    'Total.Fatal.Injuries': 'total_fatal_injuries',
    'Total.Serious.Injuries': 'total_serious_injuries',
    'Total.Minor.Injuries': 'total_minor_injuries',
    'Total.Uninjured': 'total_uninjured',
    'Weather.Condition': 'weather_condition',
    'Broad.phase.of.flight': 'broad_phase_of_flight',
    'Report.Status': 'report_status',
    'Publication.Date': 'publication_date'
}

# Seleciona apenas as colunas mapeadas e renomeia
df = df[list(mapa_colunas.keys())].rename(columns=mapa_colunas)

print("Esquema de colunas padronizado.")

Esquema de colunas padronizado.


## 4. Tratamento e Limpeza de Dados

Nesta etapa aplicamos as correções para os problemas de qualidade de dados identificados:

1.  **Conversão de Datas:** As datas vieram como texto e em formatos mistos. Utilizamos a função `to_datetime` com tratamento de erros.
2.  **Valores Nulos em Métricas:** Campos como `total_fatal_injuries` possuíam valores nulos (`NaN`). Para viabilizar somas e médias no SQL, assumimos que a ausência de informação indica zero feridos.
3.  **Campo Severidade:** A coluna continha dados sujos como "Fatal(2)". Extraímos apenas a categoria textual ("Fatal").
4.  **Tipagem:** Conversão explícita de colunas numéricas e booleanas para garantir integridade na inserção no banco.

In [296]:
# --- 4. TRANSFORM: DATA CLEANING E TYPE CASTING (CORRIGIDO) ---
print("Iniciando tratamento de tipos e valores nulos...")

# 1. Tratamento de Datas
colunas_data = ['event_date', 'publication_date']
for col in colunas_data:
    if col in df.columns:
        df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True)

# 2. Tratamento de Inteiros (Vítimas e Motores)
colunas_numericas = [
    'total_fatal_injuries', 'total_serious_injuries', 
    'total_minor_injuries', 'total_uninjured', 'number_of_engines'
]
for col in colunas_numericas:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)

# 3. Limpeza da coluna Severidade (Regra de Negócio)
if 'injury_severity' in df.columns:
    df['injury_severity'] = df['injury_severity'].astype(str).str.split('(').str[0].str.strip()

# 4. Tratamento de Booleano (Amateur Built)
if 'amateur_built' in df.columns:
    df['amateur_built'] = df['amateur_built'].astype(str).str.lower().isin(['yes', 'y', 'true', '1'])

# --- CORREÇÃO DO TRUNCATE (CORTE DE TEXTOS LONGOS) ---
# A correção aqui remove o 'case=False' e usa uma lista explícita de valores para substituir

# Grupo A: Colunas com limite de 50 caracteres (VARCHAR 50)
cols_limit_50 = [
    'event_id', 'investigation_type', 'accident_number', 'injury_severity',
    'aircraft_damage', 'aircraft_category', 'registration_number', 
    'engine_type', 'schedule', 'weather_condition', 'report_status'
]

print("   Aplicando limite de 50 caracteres...")
for col in cols_limit_50:
    if col in df.columns:
        # 1. Converte para string e remove espaços
        df[col] = df[col].astype(str).str.strip()
        # 2. Remove o texto 'nan' que aparece ao converter nulos para string
        df[col] = df[col].replace(['nan', 'NaN', 'NAN', 'None'], '')
        # 3. Corta no limite e garante que vazio vire None (para o banco entender como NULL)
        df[col] = df[col].apply(lambda x: x[:50] if x else None)

# Grupo B: Colunas com limite de 100 caracteres (VARCHAR 100)
cols_limit_100 = ['make', 'model', 'country', 'purpose_of_flight', 'broad_phase_of_flight']

print("   Aplicando limite de 100 caracteres...")
for col in cols_limit_100:
    if col in df.columns:
        df[col] = df[col].astype(str).str.strip()
        df[col] = df[col].replace(['nan', 'NaN', 'NAN', 'None'], '')
        df[col] = df[col].apply(lambda x: x[:100] if x else None)

# Grupo C: Colunas com limite de 200 caracteres (VARCHAR 200)
cols_limit_200 = ['location', 'airport_name', 'air_carrier', 'far_description']

print("   Aplicando limite de 200 caracteres...")
for col in cols_limit_200:
    if col in df.columns:
        df[col] = df[col].astype(str).str.strip()
        df[col] = df[col].replace(['nan', 'NaN', 'NAN', 'None'], '')
        df[col] = df[col].apply(lambda x: x[:200] if x else None)
        
print("Tipagem e ajuste de tamanho concluídos.")

Iniciando tratamento de tipos e valores nulos...
   Aplicando limite de 50 caracteres...


  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True)


   Aplicando limite de 100 caracteres...
   Aplicando limite de 200 caracteres...
Tipagem e ajuste de tamanho concluídos.


## 5. Validação de Regras de Qualidade (Data Quality)

Antes da carga, aplicamos filtros de consistência:
* **Geolocalização:** Coordenadas com valores impossíveis (Latitude > 90 ou Longitude > 180) são convertidas para nulo, pois indicam erro de preenchimento.
* **Unicidade:** O campo `event_id` é a chave primária da tabela. Removemos registros duplicados mantendo a primeira ocorrência.

In [297]:
# Função para validar limites geográficos
def validar_coordenada(valor, limite_maximo):
    try:
        valor_float = float(valor)
        if abs(valor_float) <= limite_maximo:
            return valor_float
        return None # Retorna nulo se estiver fora do limite
    except:
        return None

# Aplicação da validação
df['latitude'] = df['latitude'].apply(lambda x: validar_coordenada(x, 90))
df['longitude'] = df['longitude'].apply(lambda x: validar_coordenada(x, 180))

# Remoção de duplicatas baseada no ID do evento
qtd_inicial = len(df)
df = df.drop_duplicates(subset=['event_id'], keep='first')
qtd_removida = qtd_inicial - len(df)

if qtd_removida > 0:
    print(f"Atenção: Foram removidos {qtd_removida} registros duplicados.")
else:
    print("Nenhuma duplicidade de ID encontrada.")

Atenção: Foram removidos 938 registros duplicados.


## 6. Carga no Data Warehouse (Load)

Os dados tratados serão inseridos no esquema `silver` do PostgreSQL.
Utilizamos a estratégia de **Truncate and Load** (limpar a tabela e inserir tudo novamente). Isso foi escolhido para garantir que, caso o script precise ser rodado várias vezes durante o desenvolvimento, não geremos dados duplicados no banco.

Primeiramente, garantimos que a tabela existe executando o script DDL. Em seguida, realizamos a carga em lotes.

In [298]:
print("Iniciando conexão com o Banco de Dados...")

try:
    engine = create_engine(DB_URI)
    
    # Passo 1: Execução do DDL (Garantia de Estrutura)
    if os.path.exists(ARQUIVO_DDL):
        with open(ARQUIVO_DDL, 'r', encoding='utf-8') as f:
            sql_ddl = f.read()
            
        with engine.connect() as conn:
            conn.execute(text(sql_ddl))
            conn.commit()
            print("DDL executado com sucesso (Tabela criada/atualizada).")
    
    # Passo 2: Limpeza da Tabela (Truncate)
    with engine.connect() as conn:
        conn.execute(text("TRUNCATE TABLE silver.aviao CASCADE;"))
        conn.commit()
        print("Tabela silver.aviao limpa.")
    
    # Passo 3: Inserção dos Dados
    print("Inserindo dados...")
    df.to_sql(
        name='aviao',
        con=engine,
        schema='silver',
        if_exists='append', # Adiciona aos dados existentes (que acabamos de limpar)
        index=False,
        chunksize=1000 # Insere em lotes de 1000 linhas para melhor performance
    )
    
    print(f"Processo finalizado. {len(df)} registros inseridos com sucesso.")

except Exception as e:
    print(f"Ocorreu um erro durante a carga no banco: {e}")

Iniciando conexão com o Banco de Dados...
DDL executado com sucesso (Tabela criada/atualizada).
Tabela silver.aviao limpa.
Inserindo dados...
Processo finalizado. 87951 registros inseridos com sucesso.


In [299]:
# Verificação simples dos dados inseridos
print("\n--- Relatório Final de Carga ---")

try:
    with engine.connect() as conn:
        # Contagem total
        result = conn.execute(text("SELECT COUNT(*) FROM silver.aviao"))
        total = result.scalar()
        print(f"Total de linhas na tabela silver.aviao: {total}")
        
        # Amostra de dados
        print("\nAmostra dos dados carregados:")
        df_amostra = pd.read_sql("SELECT event_id, event_date, make, injury_severity FROM silver.aviao LIMIT 5", conn)
        display(df_amostra)
        
except Exception as e:
    print("Não foi possível realizar a consulta de validação.")


--- Relatório Final de Carga ---
Total de linhas na tabela silver.aviao: 87951

Amostra dos dados carregados:


Unnamed: 0,event_id,event_date,make,injury_severity
0,20001218X45444,1948-10-24,Stinson,Fatal
1,20001218X45447,1962-07-19,Piper,Fatal
2,20061025X01555,1974-08-30,Cessna,Fatal
3,20001218X45448,1977-06-19,Rockwell,Fatal
4,20041105X01764,1979-08-02,Cessna,Fatal
