# Criando as bases RAW


In [1]:
# =========================
# RAW CSV/TXT/TSV  ->  RAW Parquet (mesmo prefixo RAW)
# Converte TODOS os arquivos de RAW. Força \t para .tsv.
# === Versão com detecção de ENCODING + fallbacks robustos ===
# =========================
import sys, re, unicodedata, csv, boto3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T

# ---------- Bootstrap Glue ----------
spark = None
try:
    from awsglue.utils import getResolvedOptions
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.context import SparkContext

    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    args = getResolvedOptions(
        sys.argv,
        ["JOB_NAME", "BUCKET", "RAW_PREFIX", "NORMALIZE_COLUMNS", "FLATTEN", "OVERWRITE_EXISTING"]
    )
    job = Job(glueContext); job.init(args["JOB_NAME"], args)
except Exception:
    # Fallback p/ notebook
    spark = SparkSession.builder.appName("raw_to_parquet_all").getOrCreate()
    args = {
        "BUCKET": "bucket-etl-edb011",
        "RAW_PREFIX": "raw",
        "NORMALIZE_COLUMNS": "true",
        "FLATTEN": "true",
        "OVERWRITE_EXISTING": "true",
    }

# ---------- Parâmetros ----------
BUCKET   = args.get("BUCKET")
RAW_PREF = args.get("RAW_PREFIX", "raw").strip("/")
DO_NORM  = args.get("NORMALIZE_COLUMNS", "true").lower() == "true"
DO_FLAT  = args.get("FLATTEN", "true").lower() == "true"
OVERWRITE= args.get("OVERWRITE_EXISTING", "true").lower() == "true"

if not BUCKET or not RAW_PREF:
    raise RuntimeError("Informe --BUCKET e --RAW_PREFIX.")

bucket_uri = f"s3://{BUCKET}"
raw_uri    = f"{bucket_uri}/{RAW_PREF}"

print(f"[INFO] BUCKET={BUCKET} | RAW=s3://{BUCKET}/{RAW_PREF}/ | NORMALIZE_COLUMNS={DO_NORM} | FLATTEN={DO_FLAT} | OVERWRITE_EXISTING={OVERWRITE}")

s3c = boto3.client("s3")

# ---------- Detecção de encoding ----------
try:
    import chardet
    _HAVE_CHARDET = True
except Exception:
    _HAVE_CHARDET = False

def _normalize_charset(name: str) -> str:
    """Mapeia nomes diversos para o que o Spark/Java aceita melhor."""
    if not name:
        return "UTF-8"
    n = name.strip().lower().replace("_", "-")
    if n in ("utf-8", "utf8", "utf-8-sig", "utf8-sig", "ascii"):
        # ascii é subconjunto de utf-8
        return "UTF-8"
    if n in ("iso-8859-1", "latin-1", "latin1"):
        return "ISO-8859-1"
    if n in ("cp1252", "windows-1252", "windows1252"):
        return "windows-1252"
    # fallback genérico: tenta usar em Java tal como está (pode funcionar)
    return name

def detect_encoding_s3(key: str, sample_bytes: int = 65536) -> tuple[str, float]:
    """
    Lê um pedaço do arquivo no S3 e tenta detectar o encoding.
    Retorna (encoding_normalizado, confiança[0..1]).
    """
    try:
        obj = s3c.get_object(Bucket=BUCKET, Key=key, Range=f"bytes=0-{sample_bytes}")
        raw = obj["Body"].read()
        if _HAVE_CHARDET:
            res = chardet.detect(raw)
            enc = _normalize_charset(res.get("encoding") or "UTF-8")
            conf = float(res.get("confidence") or 0.0)
            return enc, conf
        else:
            # Sem chardet, assume UTF-8 e deixa fallback cuidar do resto
            return "UTF-8", 0.0
    except Exception:
        return "UTF-8", 0.0

# ---------- Listagem: inclui .csv, .txt e .tsv ----------
def list_raw_files(prefix: str):
    cont = None
    while True:
        kw = {"Bucket": BUCKET, "Prefix": prefix}
        if cont: kw["ContinuationToken"] = cont
        resp = s3c.list_objects_v2(**kw)
        for o in resp.get("Contents", []):
            k = o["Key"]
            if not k.endswith("/") and k.lower().endswith((".csv", ".txt", ".tsv")):
                yield k
        if resp.get("IsTruncated"):
            cont = resp.get("NextContinuationToken")
        else:
            break

# ---------- Util: normalizar nomes de coluna (remove BOM e acentos) ----------
def to_snake(s: str) -> str:
    # Remove BOM se aparecer no início
    s = s.lstrip("\ufeff")
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    s = re.sub(r"\s+", "_", s.strip())
    s = re.sub(r"[^0-9a-zA-Z_]", "_", s)
    s = re.sub(r"_+", "_", s)
    return s.lower().strip("_")[:64]

# ---------- Inferência de delimitador (usa encoding detectado); força \t para .tsv ----------
def infer_delimiter_s3(key: str, encoding_hint: str | None = None, sample_bytes: int = 65536) -> str:
    low = key.lower()
    if low.endswith(".tsv"):
        return "\t"  # TSV = tab
    try:
        obj = s3c.get_object(Bucket=BUCKET, Key=key, Range=f"bytes=0-{sample_bytes}")
        head = obj["Body"].read()
        enc = encoding_hint or detect_encoding_s3(key, sample_bytes=sample_bytes)[0]
        sample = head.decode(enc, errors="ignore")
    except Exception:
        sample = ""

    try:
        dialect = csv.Sniffer().sniff(sample, delimiters=[",", ";", "|", "\t"])
        return dialect.delimiter
    except Exception:
        # heurística simples
        c_comma = sample.count(",")
        c_semi  = sample.count(";")
        c_pipe  = sample.count("|")
        if c_semi >= max(c_comma, c_pipe): return ";"
        if c_pipe > c_comma: return "|"
        if "\t" in sample: return "\t"
        return ","

def normalize_columns_df(df):
    if not DO_NORM:
        return df
    cols = []
    for c in df.columns:
        if c is None:
            continue
        name = str(c).lstrip("\ufeff")  # remove BOM se vier do header
        if name.lower().startswith("unnamed"):
            continue
        cols.append(F.col(c).alias(to_snake(name)))
    df = df.select(*cols)
    # força string (seguro p/ tipos heterogêneos)
    for c in df.columns:
        df = df.withColumn(c, F.col(c).cast(T.StringType()))
    return df

def write_parquet_in_raw(df, base_no_ext: str):
    """
    Se FLATTEN=true: escreve em RAW/__tmp/<base>/ e copia part-*.parquet para RAW/<base>.parquet
    Se FLATTEN=false: escreve em RAW/<base>/ (sem copiar) — padrão Spark.
    """
    if DO_FLAT:
        tmp_dir = f"{raw_uri}/__tmp/{base_no_ext}/"
        df.coalesce(1).write.mode("overwrite").option("compression", "snappy").parquet(tmp_dir)
        print(f"[WRITE] tmp: {tmp_dir}")

        # localizar part-*.parquet e copiar para RAW/<base>.parquet
        part_key = None
        prefix_tmp = f"{RAW_PREF}/__tmp/{base_no_ext}/"
        resp = s3c.list_objects_v2(Bucket=BUCKET, Prefix=prefix_tmp)
        for o in resp.get("Contents", []):
            k = o["Key"]
            if re.search(r"/part-.*\.parquet$", k):
                part_key = k; break
        if not part_key:
            raise RuntimeError(f"part-*.parquet não encontrado em s3://{BUCKET}/{prefix_tmp}")

        dest_key = f"{RAW_PREF}/{base_no_ext}.parquet"
        s3c.copy_object(Bucket=BUCKET, CopySource={"Bucket": BUCKET, "Key": part_key}, Key=dest_key)

        # limpar tmp
        del_resp = s3c.list_objects_v2(Bucket=BUCKET, Prefix=prefix_tmp)
        for o in del_resp.get("Contents", []):
            s3c.delete_object(Bucket=BUCKET, Key=o["Key"])

        print(f"[FLATTEN] OK → s3://{BUCKET}/{dest_key}")
        return f"s3://{BUCKET}/{dest_key}"
    else:
        out_dir = f"{raw_uri}/{base_no_ext}/"
        df.write.mode("overwrite").option("compression", "snappy").parquet(out_dir)
        print(f"[WRITE] parquet (pasta): {out_dir}")
        return out_dir

# ---------- Processamento ----------
src_prefix = RAW_PREF + "/"
keys = list(list_raw_files(src_prefix))
print(f"[INFO] Arquivos (.csv/.txt/.tsv) encontrados em RAW: {len(keys)}")
if not keys:
    raise RuntimeError(f"Não há arquivos em s3://{BUCKET}/{RAW_PREF}/")

ok = 0; fail = 0
for key in keys:
    try:
        filename = key.rsplit("/", 1)[-1]
        base_no_ext = re.sub(r"\.[^.]+$", "", filename)

        # Se não for overwrite, pula quando já existir o parquet final
        dest_probe = f"{RAW_PREF}/{base_no_ext}.parquet"
        if not OVERWRITE:
            try:
                s3c.head_object(Bucket=BUCKET, Key=dest_probe)
                print(f"[SKIP] já existe (use OVERWRITE_EXISTING=true p/ recriar): s3://{BUCKET}/{dest_probe}")
                continue
            except Exception:
                pass

        # Detecta encoding (com confiança) e depois o separador usando o mesmo sample
        enc_detected, conf = detect_encoding_s3(key)
        sep = infer_delimiter_s3(key, encoding_hint=enc_detected)
        src_uri = f"{bucket_uri}/{key}"
        print(f"[FILE] {src_uri} | enc={enc_detected} (conf={conf:.2f}) | sep='{sep}' → RAW/{base_no_ext}.parquet")

        # Leitura com cascata de fallbacks
        enc_candidates = [enc_detected, "UTF-8", "ISO-8859-1", "windows-1252"]
        read_ok = False
        last_err = None
        for enc in enc_candidates:
            try:
                df = (spark.read
                        .option("header", True)
                        .option("sep", sep)
                        .option("encoding", enc)
                        .option("mode", "PERMISSIVE")
                        .csv(src_uri))
                # Se vier sem colunas (às vezes por BOM/encoding), força erro p/ tentar próximo encoding
                if not df.columns:
                    raise Exception(f"Sem colunas com encoding={enc}.")
                read_ok = True
                break
            except Exception as e:
                last_err = e
                continue
        if not read_ok:
            raise last_err or Exception("Falha de leitura em todos os encodings testados.")

        df = normalize_columns_df(df)
        write_parquet_in_raw(df, base_no_ext)

        ok += 1
    except Exception as e:
        fail += 1
        print(f"[ERROR] Falha no arquivo {key}: {e}")

print(f"[SUMMARY] Sucesso: {ok} | Falhas: {fail}")

# ---------- Commit Glue (se aplicável) ----------
try:
    job.commit()
except Exception:
    pass


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: dfb36435-2779-455a-b289-e487e1c1d6fe
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session dfb36435-2779-455a-b289-e487e1c1d6fe to get into ready status...
Session dfb36435-2779-455a-b289-e487e1c1d6fe has been created.
[INFO] BUCKET=bucket-etl-edb011 | RAW=s3://bucket-etl-edb011/raw/ | NORMALIZE_COLUMNS=True | FLATTEN=True | OVERWRITE_EXISTING=True
[INFO] Arquivos (.csv/.txt/.tsv) encontrados em RAW: 10
[FILE] s3://bucket-etl-edb011/raw/2021_tri_01.csv | enc=windows-1252 (conf=0.73) | sep=';' → 

# Base Trusted Bancos

In [2]:
# =========================
# Enquadramento -> TRUSTED/base_bancos_TRUSTED.parquet
# === Versão à prova de BOM em nomes de colunas + renomeação robusta ===
# =========================
import re as _re
import unicodedata
from pyspark.sql import functions as F, types as T

# ---- fallbacks se variáveis não existirem (rode apenas se necessário) ----
try:
    BUCKET, TRUSTED_PREFIX, RAW_PREF, spark, s3c
except NameError:
    from pyspark.sql import SparkSession
    import boto3
    spark = SparkSession.builder.appName("bancos_trusted").getOrCreate()
    BUCKET = "bucket-etl-edb011"      # ajuste se preciso
    RAW_PREF = "raw"
    TRUSTED_PREFIX = "trusted"
    s3c = boto3.client("s3")

bucket_uri    = f"s3://{BUCKET}"
raw_uri       = f"{bucket_uri}/{RAW_PREF}"
trusted_uri   = f"{bucket_uri}/{TRUSTED_PREFIX}"

# ---------- helpers de nomes (remove BOM, acentos, espaços estranhos) ----------
def _clean_name(name: str) -> str:
    if name is None:
        return ""
    # remove BOM, trims e normaliza espaços
    name = str(name).lstrip("\ufeff").strip()
    # normaliza acentos -> base ASCII (evita colunas com "ç, á, ã" etc.)
    name = unicodedata.normalize("NFKD", name)
    name = "".join(ch for ch in name if not unicodedata.combining(ch))
    # colapsa espaços internos
    name = _re.sub(r"\s+", " ", name)
    return name

def _lower_key(name: str) -> str:
    # chave para comparações: lower + sem BOM + trims
    return _clean_name(name).lower()

# 1) Lê o parquet do RAW (nome-base do .tsv convertido anteriormente)
src_key  = f"{RAW_PREF}/EnquadramentoInicia_v2.parquet"   # se o nome do arquivo diferir, ajuste aqui
src_path = f"{bucket_uri}/{src_key}"

df = spark.read.parquet(src_path)

# 1.1) normaliza *internamente* os nomes de colunas (remove BOM e espaços/acentos), mas mantendo forma original para matching
orig_cols = df.columns
clean_map = {}
for c in orig_cols:
    cc = _clean_name(c)
    if cc != c:
        clean_map[c] = cc

# aplica renomeações "cosméticas" só para remover BOM/espaços/acentos do nome físico
for old, new in clean_map.items():
    if new and new != old:
        # cuidado com colisões; se já existir, pula
        if new not in df.columns:
            df = df.withColumnRenamed(old, new)

# 2) Renomeia colunas como no seu pandas:
#    segmento -> Segmento_Bancos
#    cnpj     -> CNPJ_Bancos
#    nome     -> Nome_Bancos
# Faremos um matching case-insensitive e tolerante a espaços/BOM/acentos
rename_targets = {
    "segmento": "Segmento_Bancos",
    "cnpj": "CNPJ_Bancos",
    "nome": "Nome_Bancos",
}

# constrói índice (chave baixa e limpa -> nome atual)
idx = {_lower_key(c): c for c in df.columns}

for k_low, final_name in rename_targets.items():
    if k_low in idx:
        current = idx[k_low]
        if current != final_name:
            # evita sobrescrever coluna existente
            if final_name not in df.columns:
                df = df.withColumnRenamed(current, final_name)

# 3) Trata documento (equivalente ao seu trata_documento):
#    - remove não-dígitos
#    - se tiver <8 dígitos, faz zero-fill até 8; senão mantém como está (sem truncar)
if "CNPJ_Bancos" not in df.columns:
    raise RuntimeError("Coluna 'CNPJ_Bancos' não encontrada após o rename. Verifique o schema do RAW.")

digits = F.regexp_replace(F.col("CNPJ_Bancos").cast(T.StringType()), r"[^0-9]", "")
df = df.withColumn(
    "CNPJ_Bancos",
    F.when(F.length(digits) < 8, F.lpad(digits, 8, "0")).otherwise(digits)
)

# 4) (opcional) garante tipos string nas demais colunas
#    aqui convertemos todas as colunas para StringType para evitar surpresas de encoding ao exportar/ler depois
for c in df.columns:
    df = df.withColumn(c, F.col(c).cast(T.StringType()))

# 5) Escreve no TRUSTED com flatten: trusted/base_bancos_TRUSTED.parquet
final_name = "base_bancos_TRUSTED"
tmp_dir = f"{trusted_uri}/{final_name}/"
df.coalesce(1).write.mode("overwrite").option("compression", "snappy").parquet(tmp_dir)
print(f"[TRUSTED] tmp gravado: {tmp_dir}")

# Copia part-*.parquet para arquivo solto e apaga a pasta temporária
prefix_tmp = f"{TRUSTED_PREFIX}/{final_name}/"
part_key = None
resp = s3c.list_objects_v2(Bucket=BUCKET, Prefix=prefix_tmp)
for o in resp.get("Contents", []):
    k = o["Key"]
    if _re.search(r"/part-.*\.parquet$", k):
        part_key = k; break
if not part_key:
    raise RuntimeError(f"part-*.parquet não encontrado em s3://{BUCKET}/{prefix_tmp}")

dest_key = f"{TRUSTED_PREFIX}/{final_name}.parquet"
s3c.copy_object(Bucket=BUCKET, CopySource={"Bucket": BUCKET, "Key": part_key}, Key=dest_key)

# limpa tmp
for o in s3c.list_objects_v2(Bucket=BUCKET, Prefix=prefix_tmp).get("Contents", []):
    s3c.delete_object(Bucket=BUCKET, Key=o["Key"])

print(f"[OK] TRUSTED → s3://{BUCKET}/{dest_key}")


[TRUSTED] tmp gravado: s3://bucket-etl-edb011/trusted/base_bancos_TRUSTED/
[OK] TRUSTED → s3://bucket-etl-edb011/trusted/base_bancos_TRUSTED.parquet


# Base Trusted Empregados

In [3]:
import re, unicodedata, pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, broadcast
import boto3

# -------- Config --------
BUCKET = "bucket-etl-edb011"
RAW_PREF = "raw"
TRUSTED_PREFIX = "trusted"

bucket_uri  = f"s3://{BUCKET}"
raw_uri     = f"{bucket_uri}/{RAW_PREF}"
trusted_uri = f"{bucket_uri}/{TRUSTED_PREFIX}"

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
s3c = boto3.client("s3")

# -------- Helpers de texto/encoding para usar dentro das UDFs --------
_MOJIBAKE_BAD = ("Ã", "Â", "ï¿½")  # padrões comuns quando utf-8/latin1 cruza errado

def _fix_mojibake_series(s: pd.Series) -> pd.Series:
    """Tenta corrigir casos típicos de mojibake (latin1->utf8) e remove BOM/ZWSP."""
    s = s.astype("string")
    # remove BOM e zero-width
    s = s.str.replace("\ufeff", "", regex=False).str.replace("\u200b", "", regex=False).str.strip()

    # tentativa de recodificação latin1->utf8
    def _try_fix(x: pd._libs.missing.NAType | str) -> pd._libs.missing.NAType | str:
        if x is pd.NA or x is None:
            return pd.NA
        t = str(x)
        try:
            t1 = t.encode("latin1", errors="ignore").decode("utf-8", errors="ignore")
        except Exception:
            t1 = t
        # heurística: se a versão “t1” tem menos artefatos, fique com ela
        score_t  = sum(ch in _MOJIBAKE_BAD for ch in t)
        score_t1 = sum(ch in _MOJIBAKE_BAD for ch in t1)
        out = t1 if score_t1 < score_t else t
        # normaliza composição unicode (NFC)
        out = unicodedata.normalize("NFC", out)
        out = out.strip()
        return out if out != "" else pd.NA

    return s.apply(_try_fix)

# -------- Pandas UDFs --------
@pandas_udf("string")
def limpar_nome_pudf(s: pd.Series) -> pd.Series:
    """Normaliza nomes: corrige mojibake, remove acento, caixa alta e remove termos ruidosos."""
    s = _fix_mojibake_series(s)

    def limp(x):
        if x is pd.NA:
            return None
        t = unicodedata.normalize("NFKD", str(x))
        t = "".join(ch for ch in t if not unicodedata.combining(ch))
        t = t.upper()

        # remoção de termos ruidosos comuns em razão social de bancos
        remove_lista = [
            'S/A','S.A.','S.A','BANCO','FINANCEIRA','(BRASIL)','BRASIL','MULTIPLO','CREDITO',
            'CREDITO, FINANCIAMENTO E INVESTIMENTOS','INSTITUICAO DE PAGAMENTO','(CONGLOMERADO)',
            'MEDIUM','FINANCEIRO','CRED','SCFI','FINANCIAMENTO','INDUSTRIAL','GRUPO','SEGURO',
            'BANK','INVESTIMENTOS','CFI','BS2',' - PRUDENCIAL','PRUDENCIAL','INVESTIMENTO',
            'CAPITAL','SOCIEDADE','DE'
        ]
        for palavra in remove_lista:
            t = t.replace(palavra, '')

        # limpa pontuação/espacos múltiplos
        t = re.sub(r"[.,\-]", " ", t)
        t = re.sub(r"\s+", " ", t).strip()
        return t if t else None

    return s.apply(limp)

@pandas_udf("string")
def clean_doc_pudf(s: pd.Series) -> pd.Series:
    """Mantém apenas dígitos e faz zero-fill até 8; vazio vira None."""
    s = _fix_mojibake_series(s)
    s = s.astype("string").fillna("")
    s = s.str.replace(r"[^0-9]", "", regex=True)
    return s.apply(lambda x: x.zfill(8) if x else None)

@pandas_udf("string")
def fix_text_pudf(s: pd.Series) -> pd.Series:
    """Correção geral de texto (BOM/ZWSP/mojibake + NFC)."""
    out = _fix_mojibake_series(s)
    # vazio -> None
    return out.apply(lambda x: None if (x is pd.NA or x is None or str(x).strip() == "") else x)

def fix_all_string_columns(df):
    """Aplica fix_text_pudf em todas as colunas string (útil após joins)."""
    for c, dtype in df.dtypes:
        if dtype == "string":
            df = df.withColumn(c, fix_text_pudf(F.col(c)))
    return df

# -------- Leitura (Parquet não tem encoding de arquivo) --------
g1_path = f"{raw_uri}/glassdoor_consolidado_join_match_v2.parquet"
g2_path = f"{raw_uri}/glassdoor_consolidado_join_match_less_v2.parquet"
bancos_path = f"{trusted_uri}/base_bancos_TRUSTED.parquet"

df1 = spark.read.parquet(g1_path)
df2 = spark.read.parquet(g2_path)
df_bancos = spark.read.parquet(bancos_path)

# -------- Normalização de texto antes do match --------
# Corrige mojibake/BOM nas colunas relevantes
if "nome" in df1.columns:
    df1 = df1.withColumn("nome", fix_text_pudf(F.col("nome")))
if "nome" in df2.columns:
    df2 = df2.withColumn("nome", fix_text_pudf(F.col("nome")))
if "Nome_Bancos" in df_bancos.columns:
    df_bancos = df_bancos.withColumn("Nome_Bancos", fix_text_pudf(F.col("Nome_Bancos")))

# Nome normalizado para o join
df1 = df1.withColumn("nome_normalizado", limpar_nome_pudf(F.col("nome")))
df2 = df2.withColumn("nome_normalizado", limpar_nome_pudf(F.col("nome")))
df_bancos = df_bancos.withColumn("nome_normalizado", limpar_nome_pudf(F.col("Nome_Bancos")))

sel_bancos = (
    df_bancos
    .select("Nome_Bancos", "Segmento_Bancos", "CNPJ_Bancos", "nome_normalizado")
    .dropDuplicates(["nome_normalizado"])
)

df_main  = df1.join(broadcast(sel_bancos), on="nome_normalizado", how="left").drop("nome_normalizado")
df_main2 = df2.join(broadcast(sel_bancos), on="nome_normalizado", how="left").drop("nome_normalizado")

# -------- Ajustes pós-join --------
if "CNPJ_Bancos" in df_main.columns:
    df_main  = df_main.withColumnRenamed("CNPJ_Bancos", "cnpj").drop("Segmento_Bancos")
if "Segmento_Bancos" in df_main2.columns:
    df_main2 = df_main2.withColumnRenamed("Segmento_Bancos", "segmento").drop("CNPJ_Bancos")

df_emp = df_main.unionByName(df_main2, allowMissingColumns=True)

# Limpeza geral de strings (corrige eventuais sobras de mojibake/BOM)
df_emp = fix_all_string_columns(df_emp)

# -------- Trata CNPJ --------
if "cnpj" in df_emp.columns:
    df_emp = df_emp.withColumn("cnpj", clean_doc_pudf(F.col("cnpj")))

# -------- Grava temporário com 1 partição --------
tmp_prefix = f"{TRUSTED_PREFIX}/tmp_base_empregados_TRUSTED"
tmp_path = f"{bucket_uri}/{tmp_prefix}"

df_emp.coalesce(1).write.mode("overwrite").option("compression", "snappy").parquet(tmp_path)

# -------- Copia part-*.parquet para nome final --------
resp = s3c.list_objects_v2(Bucket=BUCKET, Prefix=tmp_prefix)
part_key = None
for o in resp.get("Contents", []):
    if re.search(r"part-.*\.parquet$", o["Key"]):
        part_key = o["Key"]
        break

if part_key:
    final_key = f"{TRUSTED_PREFIX}/base_empregados_TRUSTED.parquet"
    s3c.copy_object(Bucket=BUCKET, CopySource={"Bucket": BUCKET, "Key": part_key}, Key=final_key)
    # Apaga temporário
    for o in resp.get("Contents", []):
        s3c.delete_object(Bucket=BUCKET, Key=o["Key"])
    print(f"[OK] Arquivo final: s3://{BUCKET}/{final_key}")
else:
    print("[ERRO] Arquivo part-*.parquet não encontrado no tmp.")


{'ResponseMetadata': {'RequestId': 'YZCS5Y3TSGPMRA90', 'HostId': 'O7M3riE+xGIdxILIHclflEvRDYky5ZHOB+n9rbYKysEQ9IC4PrMWxIFdD9h5Ttd6u5+opK7MKDI=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'O7M3riE+xGIdxILIHclflEvRDYky5ZHOB+n9rbYKysEQ9IC4PrMWxIFdD9h5Ttd6u5+opK7MKDI=', 'x-amz-request-id': 'YZCS5Y3TSGPMRA90', 'date': 'Mon, 11 Aug 2025 01:33:38 GMT', 'x-amz-server-side-encryption': 'AES256', 'content-type': 'application/xml', 'content-length': '275', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'ServerSideEncryption': 'AES256', 'CopyObjectResult': {'ETag': '"64b87160bbdc1a501ce3f1056bd5314b"', 'LastModified': datetime.datetime(2025, 8, 11, 1, 33, 38, tzinfo=tzlocal())}}
{'ResponseMetadata': {'RequestId': 'YZCYP7QKAVVFJJA4', 'HostId': '2EAh0Iy1ZjbXS9w5yqKq0PRdoUYJfLgvUqECEyvI/HY7tXe3PEjPWKZpGOwx4n/EIExdAWwWYaw=', 'HTTPStatusCode': 204, 'HTTPHeaders': {'x-amz-id-2': '2EAh0Iy1ZjbXS9w5yqKq0PRdoUYJfLgvUqECEyvI/HY7tXe3PEjPWKZpGOwx4n/EIExdAWwWYaw=', 'x-amz-request-id': 'YZCYP7QKAVVFJJA4'

# Base Trusted Reclamações

In [5]:
# =========================
# Reclamações -> TRUSTED/base_reclamacoes_TRUSTED.parquet (arquivo único)
# === Versão à prova de BOM/mojibake e com detecção robusta de colunas ===
# =========================
import re, unicodedata, pandas as pd, boto3
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, broadcast

# ======= CONFIGS =======
BUCKET          = "bucket-etl-edb011"   # ajuste se necessário
RAW_PREFIX      = "raw"
TRUSTED_PREFIX  = "trusted"
TARGET_LEN_DOC  = 8          # 8 = ISPB; use 14 p/ CNPJ completo

# ======= Bootstrap Spark / Arrow =======
try:
    spark
except NameError:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("base_reclamacoes_trusted_flat").getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

bucket_uri   = f"s3://{BUCKET}"
raw_uri      = f"{bucket_uri}/{RAW_PREFIX}"
trusted_uri  = f"{bucket_uri}/{TRUSTED_PREFIX}"
s3c = boto3.client("s3")

# ======= Helpers de encoding/texto =======
_MOJIBAKE_BAD = ("Ã", "Â", "ï¿½")   # artefatos comuns de latin1/utf8

def remove_acentos(s: str) -> str:
    s = unicodedata.normalize("NFKD", s)
    return "".join(ch for ch in s if not unicodedata.combining(ch))

def strip_bom_zwsp(s: str) -> str:
    return s.replace("\ufeff", "").replace("\u200b", "").strip()

def fix_mojibake_series(s: pd.Series) -> pd.Series:
    """Remove BOM/ZWSP e tenta corrigir mojibake latin1->utf8; normaliza para NFC."""
    s = s.astype("string")
    s = s.str.replace("\ufeff", "", regex=False).str.replace("\u200b", "", regex=False).str.strip()
    def _fix(x):
        if x is pd.NA or x is None:
            return pd.NA
        t = str(x)
        try:
            t1 = t.encode("latin1", errors="ignore").decode("utf-8", errors="ignore")
        except Exception:
            t1 = t
        score_t  = sum(ch in _MOJIBAKE_BAD for ch in t)
        score_t1 = sum(ch in _MOJIBAKE_BAD for ch in t1)
        out = t1 if score_t1 < score_t else t
        out = unicodedata.normalize("NFC", out).strip()
        return out if out != "" else pd.NA
    return s.apply(_fix)

def pick_smart(df, exact_candidates, keywords_all=None):
    """
    Escolhe coluna por candidatos exatos (case-insensitive; tolera acento/BOM) ou por keywords.
    """
    def _clean_key(x: str) -> str:
        return remove_acentos(strip_bom_zwsp(x)).lower()
    idx = {_clean_key(c): c for c in df.columns}

    for cand in exact_candidates:
        k = remove_acentos(strip_bom_zwsp(cand)).lower()
        if k in idx:
            return idx[k]

    if keywords_all:
        keys = []
        for c in df.columns:
            cc = remove_acentos(strip_bom_zwsp(c)).lower()
            if all(k in cc for k in keywords_all):
                keys.append(c)
        if keys:
            return sorted(keys, key=len)[0]
    return None

@pandas_udf("string")
def limpar_nome_pudf(s: pd.Series) -> pd.Series:
    """Normaliza nomes: corrige mojibake, remove acento, upper, tira termos ruidosos e pontuação."""
    s = fix_mojibake_series(s)
    def limp(x):
        if x is pd.NA:
            return None
        t = remove_acentos(str(x)).upper()
        remove_lista = [
            'S/A','S.A.','S.A','BANCO','FINANCEIRA','(BRASIL)','BRASIL','MULTIPLO','CREDITO',
            'CREDITO, FINANCIAMENTO E INVESTIMENTOS','INSTITUICAO DE PAGAMENTO','(CONGLOMERADO)',
            'MEDIUM','FINANCEIRO','CRED','SCFI','FINANCIAMENTO','INDUSTRIAL','GRUPO','SEGURO',
            'BANK','INVESTIMENTOS','CFI','BS2',' - PRUDENCIAL','PRUDENCIAL','INVESTIMENTO',
            'CAPITAL','SOCIEDADE','DE'
        ]
        for palavra in remove_lista:
            t = t.replace(palavra, '')
        t = re.sub(r"[.,\-]", " ", t)
        t = re.sub(r"\s+", " ", t).strip()
        return t if t else None
    return s.apply(limp)

@pandas_udf("string")
def clean_doc_pudf(s: pd.Series) -> pd.Series:
    """Mantém dígitos e aplica zero-fill; vazio -> None."""
    s = fix_mojibake_series(s).astype("string").fillna("")
    s = s.str.replace(r"[^0-9]", "", regex=True)
    return s.apply(lambda x: x.zfill(TARGET_LEN_DOC) if x else None)

# ======= util S3 =======
def s3_key_exists(bucket: str, key: str) -> bool:
    try:
        s3c.head_object(Bucket=bucket, Key=key)
        return True
    except Exception:
        return False

# ======= 1) Leitura dos trimestres (RAW) =======
tri_basenames = [
    "2021_tri_01","2021_tri_02","2021_tri_03","2021_tri_04",
    "2022_tri_01","2022_tri_03","2022_tri_04",
]

tri_paths = []
for base in tri_basenames:
    for cand in (f"{RAW_PREFIX}/{base}.parquet", f"{RAW_PREFIX}/{base}_raw.parquet"):
        if s3_key_exists(BUCKET, cand):
            tri_paths.append(f"{bucket_uri}/{cand}")
            break
    else:
        print(f"[WARN] não encontrei RAW/{base}.parquet nem RAW/{base}_raw.parquet")

if not tri_paths:
    raise RuntimeError("Nenhum trimestre encontrado no RAW.")

dfs = [spark.read.parquet(p) for p in tri_paths]
df2 = dfs[0]
for d in dfs[1:]:
    df2 = df2.unionByName(d, allowMissingColumns=True)

# remove colunas 'unnamed' (inclui casos com BOM/acentos)
for c in list(df2.columns):
    cname = remove_acentos(strip_bom_zwsp(c)).lower()
    if cname.startswith("unnamed"):
        df2 = df2.drop(c)

# ======= 2) Base de bancos (TRUSTED) =======
bancos_path = f"{trusted_uri}/base_bancos_TRUSTED.parquet"
df_bancos   = spark.read.parquet(bancos_path)
if "Nome_Bancos" not in df_bancos.columns:
    raise RuntimeError("Esperava 'Nome_Bancos' em base_bancos_TRUSTED.parquet.")

# sanitiza texto de match antes de normalizar
df_bancos = df_bancos.withColumn(
    "Nome_Bancos",
    F.udf(lambda x: strip_bom_zwsp(x) if x is not None else None, "string")(F.col("Nome_Bancos"))
)

# ======= 3) Detecta colunas-alvo =======
col_inst = pick_smart(
    df2,
    exact_candidates=[
        "instituição_financeira","instituicao_financeira","nome_instituicao",
        "instituição financeira","instituicao financeira","institui_o_financeira"
    ],
    keywords_all=["institu","finan"]
)
col_cnpjif = pick_smart(
    df2,
    exact_candidates=["cnpj_if","cnpjif","cnpj_if_","cnpj if"],
    keywords_all=["cnpj","if"]
)

if not col_inst:
    raise RuntimeError(f"Coluna de instituição financeira não encontrada. Colunas: {df2.columns}")
if not col_cnpjif:
    raise RuntimeError(f"Coluna 'cnpj_if' não encontrada. Colunas: {df2.columns}")

print(f"[PICK] instituicao_financeira = '{col_inst}'")
print(f"[PICK] cnpj_if               = '{col_cnpjif}'")

# ======= 4) Split ready / nready =======
is_blank = (F.col(col_cnpjif).isNull()) | (F.trim(F.col(col_cnpjif)) == "")
df_ready  = df2.filter(~is_blank)
df_nready = df2.filter(is_blank)

# elimina qualquer 'cnpj_if' residual para evitar ambiguidade no join
for c in list(df_nready.columns):
    if remove_acentos(strip_bom_zwsp(c)).lower() == "cnpj_if":
        df_nready = df_nready.drop(c)

# ======= 5) Preparação do match (=100) =======
# Corrige mojibake/BOM nas colunas de texto ANTES do normalizador
df_aux = (
    df2.select(F.col(col_inst).alias("instituicao_financeira"),
               F.col(col_cnpjif).alias("cnpj_if"))
      .filter(is_blank)
      .dropDuplicates(["instituicao_financeira"])
)
df_aux = df_aux.withColumn(
    "instituicao_financeira",
    F.udf(lambda x: strip_bom_zwsp(x) if x is not None else None, "string")(F.col("instituicao_financeira"))
)

aux_norm    = df_aux.select("*", limpar_nome_pudf(F.col("instituicao_financeira")).alias("nome_norm"))
bancos_norm = (df_bancos
               .select("Nome_Bancos","CNPJ_Bancos")
               .select("*", limpar_nome_pudf(F.col("Nome_Bancos")).alias("nome_norm"))
               .dropDuplicates(["nome_norm"]))

df_merge = (aux_norm
            .join(broadcast(bancos_norm.select("nome_norm","CNPJ_Bancos")), on="nome_norm", how="left")
            .drop("nome_norm")
            .withColumnRenamed("CNPJ_Bancos","cnpj_if_sug"))

# ======= 6) Une sugestão no nready (sem ambiguidade) =======
left_col = col_inst  # ex.: "instituicao_financeira"
df_nready_left = df_nready.withColumnRenamed(left_col, "inst_left")

df_merge_right = (
    df_merge
    .select(F.col("instituicao_financeira").alias("inst_right"), "cnpj_if_sug")
)

df_nready = (
    df_nready_left
    .join(df_merge_right, df_nready_left["inst_left"] == df_merge_right["inst_right"], "left")
    .drop("inst_right")
)

# aplica sugestão e limpa temporários
if "cnpj_if" in df_nready.columns:
    df_nready = df_nready.withColumn("cnpj_if", F.coalesce(F.col("cnpj_if"), F.col("cnpj_if_sug")))
else:
    df_nready = df_nready.withColumnRenamed("cnpj_if_sug", "cnpj_if")
df_nready = df_nready.drop("cnpj_if_sug").withColumnRenamed("inst_left", left_col)

# ======= 7) Concat final + tratamento do documento =======
df_main_2 = df_ready.unionByName(df_nready, allowMissingColumns=True)
df_main_2 = df_main_2.withColumn("cnpj_if", clean_doc_pudf(F.col("cnpj_if").cast("string")))

# ======= 8) Escreve como ARQUIVO ÚNICO (flatten) =======
tmp_prefix = f"{TRUSTED_PREFIX}/__tmp_base_reclamacoes_TRUSTED"
tmp_path   = f"{bucket_uri}/{tmp_prefix}"

(df_main_2
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("compression","snappy")
    .parquet(tmp_path))

# copia o part-*.parquet para o nome final e limpa tmp
resp = s3c.list_objects_v2(Bucket=BUCKET, Prefix=tmp_prefix)
part_key = None
for o in resp.get("Contents", []):
    if re.search(r"part-.*\.parquet$", o["Key"]):
        part_key = o["Key"]; break

if not part_key:
    raise RuntimeError("part-*.parquet não encontrado no tmp.")

final_key = f"{TRUSTED_PREFIX}/base_reclamacoes_TRUSTED.parquet"
s3c.copy_object(Bucket=BUCKET, CopySource={"Bucket": BUCKET, "Key": part_key}, Key=final_key)

# limpa temporários
for o in resp.get("Contents", []):
    s3c.delete_object(Bucket=BUCKET, Key=o["Key"])

print(f"[OK] TRUSTED (arquivo único): s3://{BUCKET}/{final_key}")


[PICK] instituicao_financeira = 'instituicao_financeira'
[PICK] cnpj_if               = 'cnpj_if'
[OK] TRUSTED (arquivo único): s3://bucket-etl-edb011/trusted/base_reclamacoes_TRUSTED.parquet


# Base Delivery final

In [6]:
import re, unicodedata, pandas as pd, boto3
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf

# ========================
# CONFIGURAÇÕES
# ========================
BUCKET          = "bucket-etl-edb011"
TRUSTED_PREFIX  = "trusted"
DELIVERY_PREFIX = "delivery"
FLATTEN         = True  # True = gera 1 arquivo único .parquet
TARGET_LEN_DOC  = 8     # 8 = ISPB; use 14 para CNPJ completo, se preferir

# Bootstrap Spark (caso esteja fora do Glue)
try:
    spark
except NameError:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("base_final_delivery").getOrCreate()

bucket_uri   = f"s3://{BUCKET}"
trusted_uri  = f"{bucket_uri}/{TRUSTED_PREFIX}"
delivery_uri = f"{bucket_uri}/{DELIVERY_PREFIX}"
s3c = boto3.client("s3")

# ========================
# Funções de limpeza texto (à prova de BOM/mojibake)
# ========================
_MOJIBAKE_BAD = ("Ã", "Â", "ï¿½")  # artefatos comuns de latin1/utf8

@pandas_udf("string")
def fix_text_pudf(s: pd.Series) -> pd.Series:
    """Remove BOM/ZWSP, tenta corrigir mojibake latin1->utf8 e normaliza para NFC."""
    s = s.astype("string")
    # remove BOM e zero-width + trim
    s = s.str.replace("\ufeff", "", regex=False).str.replace("\u200b", "", regex=False).str.strip()

    def _fix(x):
        if x is pd.NA or x is None:
            return None
        t = str(x)
        try:
            t1 = t.encode("latin1", errors="ignore").decode("utf-8", errors="ignore")
        except Exception:
            t1 = t
        # escolhe a versão com menos artefatos típicos
        score_t  = sum(ch in _MOJIBAKE_BAD for ch in t)
        score_t1 = sum(ch in _MOJIBAKE_BAD for ch in t1)
        out = t1 if score_t1 < score_t else t
        out = unicodedata.normalize("NFC", out).strip()
        return out if out != "" else None

    return s.apply(_fix)

def fix_all_string_columns(df):
    """Aplica fix_text_pudf em todas as colunas string (bom contra BOM/ZWSP/mojibake)."""
    for c, dtype in df.dtypes:
        if dtype == "string":
            df = df.withColumn(c, fix_text_pudf(F.col(c)))
    return df

@pandas_udf("string")
def clean_doc_pudf(s: pd.Series) -> pd.Series:
    """Mantém apenas dígitos e faz zero-fill; vazio -> None."""
    s = s.astype("string").fillna("")
    s = s.str.replace(r"[^0-9]", "", regex=True)
    return s.apply(lambda x: x.zfill(TARGET_LEN_DOC) if x else None)

# ========================
# Ler bases TRUSTED
# ========================
df_main   = spark.read.parquet(f"{trusted_uri}/base_empregados_TRUSTED.parquet")
df_main_2 = spark.read.parquet(f"{trusted_uri}/base_reclamacoes_TRUSTED.parquet")

# ========================
# Preparar DataFrames
# ========================
df_final_1 = df_main
df_final_2 = df_main_2.withColumnRenamed("cnpj_if", "cnpj")

# Sanear CNPJ/ISPB antes do join (evita diferenças por encoding/whitespace)
if "cnpj" in df_final_1.columns:
    df_final_1 = df_final_1.withColumn("cnpj", clean_doc_pudf(F.col("cnpj").cast("string")))
if "cnpj" in df_final_2.columns:
    df_final_2 = df_final_2.withColumn("cnpj", clean_doc_pudf(F.col("cnpj").cast("string")))

# ========================
# Join outer no campo CNPJ
# ========================
df_final = df_final_1.join(df_final_2, on=["cnpj"], how="outer")

# ========================
# Corrigir encoding (todas as strings)
# ========================
df_final = fix_all_string_columns(df_final)

# ========================
# Salvar na pasta DELIVERY
# ========================
if not FLATTEN:
    (df_final
        .write
        .mode("overwrite")
        .option("compression", "snappy")
        .parquet(f"{delivery_uri}/base_final_delivery"))
    print(f"[OK] Pasta Parquet: {delivery_uri}/base_final_delivery")
else:
    tmp_prefix = f"{DELIVERY_PREFIX}/__tmp_base_final"
    tmp_path   = f"{bucket_uri}/{tmp_prefix}"

    (df_final
        .coalesce(1)
        .write
        .mode("overwrite")
        .option("compression", "snappy")
        .parquet(tmp_path))

    # Localiza o part-*.parquet e copia para o nome final (arquivo único)
    resp = s3c.list_objects_v2(Bucket=BUCKET, Prefix=tmp_prefix)
    part_key = None
    for o in resp.get("Contents", []):
        if re.search(r"part-.*\.parquet$", o["Key"]):
            part_key = o["Key"]
            break

    if not part_key:
        raise RuntimeError("part-*.parquet não encontrado no tmp.")

    final_key = f"{DELIVERY_PREFIX}/base_final_delivery.parquet"
    s3c.copy_object(Bucket=BUCKET, CopySource={"Bucket": BUCKET, "Key": part_key}, Key=final_key)

    # Limpa temporários
    for o in resp.get("Contents", []):
        s3c.delete_object(Bucket=BUCKET, Key=o["Key"])

    print(f"[OK] Arquivo único: s3://{BUCKET}/{final_key}")


{'ResponseMetadata': {'RequestId': 'M94PM6N5R0VK4364', 'HostId': '0q+WRSfV+HhDdCCMV5Odca4V7gyiyrdHSox9EQt9acS+RlYCzWVvaCO3MdZdhISpnfQSTZ7PYIQ=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': '0q+WRSfV+HhDdCCMV5Odca4V7gyiyrdHSox9EQt9acS+RlYCzWVvaCO3MdZdhISpnfQSTZ7PYIQ=', 'x-amz-request-id': 'M94PM6N5R0VK4364', 'date': 'Mon, 11 Aug 2025 01:39:00 GMT', 'x-amz-server-side-encryption': 'AES256', 'content-type': 'application/xml', 'content-length': '275', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'ServerSideEncryption': 'AES256', 'CopyObjectResult': {'ETag': '"2af978aa1d6a72eb0e4a8be2438ed873"', 'LastModified': datetime.datetime(2025, 8, 11, 1, 39, tzinfo=tzlocal())}}
{'ResponseMetadata': {'RequestId': 'AJ45HGJCDSDC4B1V', 'HostId': '1N1FxXSqnyoB8BcFtxCVPO6IFiHgErDVh8O552/UZKY4+yodrVa4HhJ5J9I6LQScKnu9wJrERvY=', 'HTTPStatusCode': 204, 'HTTPHeaders': {'x-amz-id-2': '1N1FxXSqnyoB8BcFtxCVPO6IFiHgErDVh8O552/UZKY4+yodrVa4HhJ5J9I6LQScKnu9wJrERvY=', 'x-amz-request-id': 'AJ45HGJCDSDC4B1V', 'd

In [7]:
# lê o parquet
df = spark.read.parquet("s3://bucket-etl-edb011/delivery/base_final_delivery.parquet")

# gera um CSV de amostra para inspeção
MAX_ROWS = 200_000
(df.limit(MAX_ROWS)
   .coalesce(1)
   .write.mode("overwrite")
   .option("header", True)
   .csv("s3://bucket-etl-edb011/tmp/base_final_teste_csv/"))

print("Baixe em: s3://bucket-etl-edb011/tmp/base_final_teste_csv/")


Baixe em: s3://bucket-etl-edb011/tmp/base_final_teste_csv/


# Salvando em arquivo MySQL

In [12]:
import os
import sqlite3

# Caminho do arquivo SQLite na pasta delivery
db_path = "s3://bucket-etl-edb011/delivery/base_final_delivery.db"

# Garante que a pasta existe
os.makedirs(os.path.dirname(db_path), exist_ok=True)

# Conecta ao banco (cria se não existir)
conn = sqlite3.connect(db_path)

# Salva o DataFrame no banco
# Aqui assumo que df_final é um DataFrame Pandas
df_final.toPandas().to_sql("base_final", conn, if_exists="replace", index=False)

# Fecha conexão
conn.close()

print(f"[OK] Banco SQLite criado em: {db_path}")


[OK] Banco SQLite criado em: s3://bucket-etl-edb011/delivery/base_final_delivery.db
