### Parâmetros iniciais

In [0]:
%pip install openpyxl
import os, io, csv, hashlib, datetime, re, requests
import pandas as pd
from io import BytesIO
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from pyspark.sql import functions as F, types as T

CATALOG        = "sinesp"
SCHEMA_BRONZE  = "bronze"
LANDING_MUNIC  = "/Volumes/sinesp/source/landing/sinesp/municipios"

UFS = ["AC","AL","AP","AM","BA","CE","DF","ES","GO","MA","MT","MS",
       "MG","PA","PB","PR","PE","PI","RJ","RN","RS","RO","RR","SC","SP","SE","TO"]

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA_BRONZE}")

### Localizar XLSX mais recente na landing

In [0]:
df_files = (spark.read.format("binaryFile")
            .option("recursiveFileLookup", "true")
            .load(LANDING_MUNIC)
            .filter(F.lower(F.col("path")).endswith(".xlsx"))
            .select("path","modificationTime"))

latest = df_files.orderBy(F.col("modificationTime").desc()).limit(1).collect()
if not latest:
    raise FileNotFoundError(f"Nenhum XLSX encontrado em {LANDING_MUNIC}")

latest_path = latest[0]["path"]
print(f"Usando arquivo de municípios: {latest_path}")

### Abrir XLSX via Spark -> Bytes -> pandas

In [0]:
file_bytes = (spark.read.format("binaryFile")
              .load(latest_path)
              .select("content")
              .head()[0])

xls = pd.ExcelFile(BytesIO(file_bytes))
sheet_ufs = [s for s in xls.sheet_names if s in UFS]
print("Abas (UFs) detectadas:", sheet_ufs)

### Helpers de normalização

In [0]:
# --- Ingestão + transformação Bronze (loop por UF) ---
accum = None
rows_written_by_uf = []

for uf in sheet_ufs:
    print(f"Processando aba {uf}...")

    # 1) Ler aba
    pdf = pd.read_excel(xls, sheet_name=uf, dtype=str)
    if pdf.empty:
        print(f"  [skip] aba {uf} vazia (sem linhas)")
        continue

    # 2) Normalizar colunas
    pdf = normalize_columns_pdf(pdf)

    # 3) Selecionar colunas principais (mantém 'mes_ano' como rescue)
    expected = ["cod_ibge","municipio","uf","regiao","mes_ano","vitimas"]
    existing = [c for c in expected if c in pdf.columns]
    pdf = pdf[existing].copy()

    # 4) UF da aba
    pdf["uf"] = uf

    # 5) Casts mínimos (aceitando nulos)
    if "cod_ibge" in pdf.columns:
        pdf["cod_ibge"] = pd.to_numeric(pdf["cod_ibge"], errors="coerce").astype("Int64")
    if "vitimas" in pdf.columns:
        pdf["vitimas"] = pd.to_numeric(pdf["vitimas"], errors="coerce").astype("Int64")

    # 6) Derivar year, month, year_month (parser robusto)
    if "mes_ano" in pdf.columns:
        ym = pdf["mes_ano"].apply(parse_mes_ano)
        pdf["year"]  = ym.apply(lambda t: t[0] if pd.notna(t[0]) else pd.NA).astype("Int64")
        pdf["month"] = ym.apply(lambda t: t[1] if pd.notna(t[1]) else pd.NA).astype("Int64")
        ys = pdf["year"].astype("string")
        ms = pdf["month"].astype("string").str.zfill(2)
        pdf["year_month"] = pd.to_datetime(ys + "-" + ms + "-01", errors="coerce")
    else:
        pdf["year"]       = pd.Series(dtype="Int64")
        pdf["month"]      = pd.Series(dtype="Int64")
        pdf["year_month"] = pd.to_datetime(pd.Series(dtype="string"), errors="coerce")

    # 7) Metadados
    pdf["_ingestion_ts"] = ingestion_ts
    pdf["_source_path"]  = latest_path

    # 8) Checks leves (não derruba por year_month nulo na Bronze)
    before = len(pdf)
    pdf = pdf[pdf["uf"] == uf]  # consistência de UF
    if "vitimas" in pdf.columns:
        pdf = pdf[pdf["vitimas"].isna() | (pdf["vitimas"] >= 0)]  # sem negativos
    after = len(pdf)
    if after < before:
        print(f"  [warn] {before - after} linha(s) removida(s) por checks leves")

    # Se ficou vazio, pula
    if pdf.empty:
        print(f"  [skip] aba {uf} ficou vazia após checks leves")
        continue

    # 9) Para Spark + ajuste de tipos finais
    df = (spark.createDataFrame(pdf.astype(object))
          .withColumn("year",       F.col("year").cast(T.IntegerType()))
          .withColumn("month",      F.col("month").cast(T.IntegerType()))
          .withColumn("year_month", F.to_date("year_month"))
          .withColumn("cod_ibge",   F.col("cod_ibge").cast(T.LongType()))
          .withColumn("vitimas",    F.col("vitimas").cast(T.IntegerType()))
          .withColumn("_ingestion_ts", F.col("_ingestion_ts").cast(T.TimestampType()))
         )

    # 10) Escrever tabela por UF (1ª vez overwrite; depois use 'append')
    tbl = f"{CATALOG}.{SCHEMA_BRONZE}.DadosMunicipio{uf}"
    (df.write
       .format("delta")
       .mode("overwrite")
       .option("overwriteSchema","true")
       .saveAsTable(tbl))
    n = df.count()
    rows_written_by_uf.append((uf, n))
    print(f"  [ok] gravado: {tbl} | linhas: {n}")

    # 11) Acumular para unificada
    accum = df if accum is None else accum.unionByName(df, allowMissingColumns=True)

# --- Tabela unificada (fora do loop) ---
if accum is not None:
    unified_tbl = f"{CATALOG}.{SCHEMA_BRONZE}.municipios_raw_row"
    (accum.write
       .format("delta")
       .mode("overwrite")              # nas próximas execuções, trocar para 'append'
       .option("overwriteSchema","true")
       .partitionBy("uf","year","month")
       .saveAsTable(unified_tbl))
    print(f"[ok] gravado também: {unified_tbl} (particionado por uf/year/month)")
else:
    print("Nenhuma aba processada.")

print("Resumo por UF (linhas escritas):", rows_written_by_uf)
