# ETL — RAW → SILVER (Sinistros 2025) usando DDL (PostgreSQL)

Este notebook:
1. Lê o CSV **RAW** (`datatran2025.csv`)
2. Aplica as transformações definidas para a **SILVER**
3. Executa o **DDL** para criar `silver.silver_sinistros` no PostgreSQL
4. Carrega os dados na tabela (modo padrão: **reload**)

> Rodar no **VS Code** (kernel local). PostgreSQL no Docker.

# Dependências

In [1]:
!python -m pip install pandas numpy sqlalchemy psycopg2-binary sqlparse



# Configurações

In [2]:

import os
import pandas as pd
import numpy as np
from pathlib import Path
import sqlparse
from sqlalchemy import create_engine, text

# CONFIG (ajuste conforme seu ambiente)
os.environ["POSTGRES_HOST"] = os.getenv("POSTGRES_HOST", "127.0.0.1")
os.environ["POSTGRES_PORT"] = os.getenv("POSTGRES_PORT", "5433")
os.environ["POSTGRES_DB"] = os.getenv("POSTGRES_DB", "sinistros_2025")
os.environ["POSTGRES_USER"] = os.getenv("POSTGRES_USER", "postgres")
os.environ["POSTGRES_PASSWORD"] = os.getenv("POSTGRES_PASSWORD", "dan1920")

user = os.environ["POSTGRES_USER"]
pwd  = os.environ["POSTGRES_PASSWORD"]
host = os.environ["POSTGRES_HOST"]
port = os.environ["POSTGRES_PORT"]
db   = os.environ["POSTGRES_DB"]

engine = create_engine(f"postgresql+psycopg2://{user}:{pwd}@{host}:{port}/{db}", pool_pre_ping=True)

# teste de conexão
with engine.connect() as conn:
    conn.execute(text("SELECT 1"))
print(f"Conectado! host={host} port={port} db={db} user={user}")


Conectado! host=127.0.0.1 port=5433 db=sinistros_2025 user=postgres


# Extração RAW

In [3]:
def find_raw_csv() -> Path:
    p = Path.cwd()
    for _ in range(10):
        candidate = p / "Data Layer" / "raw" / "datatran2025.csv"
        if candidate.exists():
            return candidate
        if p == p.parent:
            break
        p = p.parent
    candidate = Path("datatran2025.csv")
    if candidate.exists():
        return candidate
    raise FileNotFoundError("Não encontrei o CSV RAW. Coloque em 'Data Layer/raw/' ou ajuste o path")

RAW_PATH = find_raw_csv()
print("CSV encontrado em:", RAW_PATH)

# Ler CSV
df = pd.read_csv(RAW_PATH, encoding="latin-1", sep=";", low_memory=False)
print(f"Shape inicial: {df.shape}")
df.head(3)


CSV encontrado em: c:\Users\Daniel\OneDrive\Documentos\Sinistros2\Sinistros_Transito\Data Layer\raw\datatran2025.csv
Shape inicial: (65683, 30)


Unnamed: 0,id,data_inversa,dia_semana,horario,uf,br,km,municipio,causa_acidente,tipo_acidente,...,feridos_graves,ilesos,ignorados,feridos,veiculos,latitude,longitude,regional,delegacia,uop
0,652493,01/01/2025,quarta-feira,06:20:00,SP,116,225,GUARULHOS,Reação tardia ou ineficiente do condutor,Tombamento,...,0,0,1,1,2,-2348586772,-4654075317,SPRF-SP,DEL01-SP,UOP01-DEL01-SP
1,652519,01/01/2025,quarta-feira,07:50:00,CE,116,5462,PENAFORTE,Pista esburacada,Colisão frontal,...,0,1,4,1,6,-7812288,-3908333306,SPRF-CE,DEL05-CE,UOP03-DEL05-CE
2,652522,01/01/2025,quarta-feira,08:45:00,PR,369,882,CORNELIO PROCOPIO,Reação tardia ou ineficiente do condutor,Colisão traseira,...,0,2,0,3,2,-23182565,-50637228,SPRF-PR,DEL07-PR,UOP05-DEL07-PR


# Transformação 

In [4]:
# Converte SIM/NÃO para boolean
def sim_nao_to_bool(s: pd.Series) -> pd.Series:
    s = s.astype("string").str.strip().str.upper()
    return s.map({
        "SIM": True, "S": True, "TRUE": True,
        "NÃO": False, "NAO": False, "N": False, "FALSE": False
    }).astype("boolean")


In [5]:
# remover duplicatas
df = df.drop_duplicates()

# preencher strings nulas
text_cols = df.select_dtypes(include=["string"]).columns.tolist()
for c in text_cols:
    df[c] = df[c].fillna("").astype(str).str.strip().str.upper()

# numéricos: preencher nulos com 0 e tratar negativos
num_cols = ["pessoas","mortos","ilesos","feridos","feridos_leves","feridos_graves","ignorados","veiculos"]
for c in num_cols:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")
        df.loc[df[c]<0, c] = np.nan
        df[c] = df[c].round(0).astype("Int64")


In [6]:
required_cols = [
    "id", "data_acidente", "hora_acidente", "uf",
    "municipio", "br", "pessoas", "mortos", "feridos", "ilesos", "veiculos"
]
required_cols = [c for c in required_cols if c in df.columns]

before = len(df)
df = df.dropna(subset=required_cols).copy()
after = len(df)
print(f"Removidas {before - after:,} linhas com NULLs em colunas obrigatórias")


Removidas 0 linhas com NULLs em colunas obrigatórias


In [7]:
# colunas a remover
DROP_COLS = ["km","feridos_leves","feridos_graves","ignorados","regional","delegacia","uop"]
df = df.drop(columns=[c for c in DROP_COLS if c in df.columns], errors="ignore")

# renomear
RENAME = {"data_inversa":"data_acidente", "horario":"hora_acidente", "uso_solo":"area_urbana"}
df = df.rename(columns={k:v for k,v in RENAME.items() if k in df.columns})

# area_urbana: SIM/NÃO -> boolean
if "area_urbana" in df.columns:
    df["area_urbana"] = sim_nao_to_bool(df["area_urbana"])


In [8]:
# datas
if "data_acidente" in df.columns:
    df["data_acidente"] = pd.to_datetime(df["data_acidente"], dayfirst=True, errors="coerce").dt.date

# horas
if "hora_acidente" in df.columns:
    h = df["hora_acidente"].astype("string")
    t1 = pd.to_datetime(h, format="%H:%M", errors="coerce")
    t2 = pd.to_datetime(h, format="%H:%M:%S", errors="coerce")
    df["hora_acidente"] = t1.fillna(t2).dt.time

# latitude/longitude
if "latitude" in df.columns:
    df["latitude"] = pd.to_numeric(df["latitude"].astype("string").str.replace(",", ".", regex=False), errors="coerce")
    df.loc[~df["latitude"].between(-90,90),"latitude"] = np.nan

if "longitude" in df.columns:
    df["longitude"] = pd.to_numeric(df["longitude"].astype("string").str.replace(",", ".", regex=False), errors="coerce")
    df.loc[~df["longitude"].between(-180,180),"longitude"] = np.nan


In [9]:
for c in required_cols:
    if df[c].dtype.name in ["Int64","float64"]:
        df[c] = df[c].fillna(0).astype("Int64")
    elif df[c].dtype.name in ["object","string"]:
        df[c] = df[c].fillna("UNKNOWN")


In [10]:
# Lista de colunas obrigatórias para o DW / merges
required_cols_dw = [
    "id", "data_acidente", "hora_acidente", "uf", "municipio", "br",
    "tipo_acidente", "causa_acidente", "classificacao_acidente",
    "fase_dia", "condicao_metereologica", "tipo_pista", "tracado_via", "sentido_via",
    "pessoas","mortos","feridos","ilesos","veiculos","area_urbana",
    "latitude","longitude"
]

# 3️⃣ Preencher numéricos nulos com 0
num_cols = ["pessoas","mortos","feridos","ilesos","veiculos","latitude","longitude"]
for c in num_cols:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
        if c not in ["latitude","longitude"]:
            df[c] = df[c].astype("Int64")

# 4️⃣ Garantir booleano area_urbana
if "area_urbana" in df.columns:
    df["area_urbana"] = df["area_urbana"].astype("boolean").fillna(False)

# 5️⃣ Conferir nulos restantes
nulls_remaining = df[required_cols_dw].isna().sum().sum()
print(f"Total de valores nulos restantes nas colunas obrigatórias: {nulls_remaining}")


Total de valores nulos restantes nas colunas obrigatórias: 0


# Carregamento dos dados na Silver

In [11]:
DDL_PATH = r"C:\Users\Daniel\OneDrive\Documentos\Sinistros2\Sinistros_Transito\Data Layer\silver\ddl.sql"  # ajuste seu caminho
ddl_sql = open(DDL_PATH,'r',encoding="utf-8").read()

ddl_clean = sqlparse.format(ddl_sql, strip_comments=True)
commands = [cmd.strip() for cmd in sqlparse.split(ddl_clean) if cmd.strip()]

with engine.begin() as conn:
    for cmd in commands:
        conn.execute(text(cmd))

print("DDL Silver executado com sucesso!")


DDL Silver executado com sucesso!


In [12]:
TARGET_SCHEMA = "silver"
TARGET_TABLE  = "silver_sinistros"

# criar schema se não existir
with engine.begin() as conn:
    conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {TARGET_SCHEMA}"))

# ajustar colunas do DF para o banco
cols_in_db = pd.read_sql(
    f"""
    SELECT column_name
    FROM information_schema.columns
    WHERE table_schema='{TARGET_SCHEMA}' AND table_name='{TARGET_TABLE}'
    ORDER BY ordinal_position
    """,
    engine
)["column_name"].tolist()

df_load = df[[c for c in df.columns if c in cols_in_db]].copy()

# remover linhas sem id
if "id" in df_load.columns:
    df_load = df_load[df_load["id"].notna()].copy()

# to_sql
df_load.to_sql(
    name=TARGET_TABLE,
    con=engine,
    schema=TARGET_SCHEMA,
    if_exists="append",
    index=False,
    chunksize=5000,
    method="multi"
)

print(f"Silver carregada: {len(df_load):,} registros")


Silver carregada: 65,683 registros
