# Silver - Filtros e Transformações CNPJ (Incremental)

In [0]:
import sys
sys.path.append("/Workspace/Users/kgenuins@emeal.nttdata.com/project-insight-lab-databricks")

from Config.spark_config import apply_storage_config
from Config.storage_config import *
from Utils.utils import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col, trim, when, lit, upper, lower, regexp_replace, translate,
    lpad, to_date, current_timestamp, decode, encode, concat_ws,
    collect_list, explode, split, substring, xxhash64, percentile_approx, max, length,coalesce
)
from pyspark.sql.types import IntegerType, DoubleType, StringType
import uuid
import builtins
from typing import Any

apply_storage_config(spark)

In [0]:
# Localizações dos dados
path_storage_bronze = f"{bronze_path}"
path_storage_silver = f"{silver_path}"
dbutils.fs.ls(bronze_path)

## Tabela de controle incremental

In [0]:
def criar_tabela_controle_silver():
    # Criar tabela de controle se não existir
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS silver.transformacoes_processadas_cnpj (
            tabela_silver STRING,
            data_ultima_transformacao TIMESTAMP,
            total_registros BIGINT,
            data_execucao TIMESTAMP
        )
        USING DELTA
    """)
    print("Tabela de controle de transformações Silver criada!")

criar_tabela_controle_silver()

## Funções auxiliares para controle incremental

In [0]:
def obter_data_ultima_transformacao(tabela_silver):
    """
    Obtém a data da última transformação realizada para uma tabela
    """
    try:
        resultado = spark.sql(f"""
            SELECT MAX(data_ultima_transformacao) as ultima_data
            FROM silver.transformacoes_processadas_cnpj
            WHERE tabela_silver = '{tabela_silver}'
        """).collect()
        
        if resultado and resultado[0].ultima_data:
            return resultado[0].ultima_data
        else:
            return None
    except:
        return None

In [0]:
def registrar_transformacao_silver(tabela_silver, total_registros):
    """
    Registra transformação realizada na Silver
    """
    spark.sql(f"""
        INSERT INTO silver.transformacoes_processadas_cnpj
        VALUES (
            '{tabela_silver}',
            current_timestamp(),
            {total_registros},
            current_timestamp()
        )
    """)

## Funções utilitárias do Projeto

In [0]:
def latin1_to_utf8(df: DataFrame, columns: list[str]) -> DataFrame:
    """ Tenta converter colunas do tipo StringType de iso-8859-1(latin1) para UTF-8 """
    for column_name in columns:
        df = df.withColumn(
            column_name, 
            decode(encode(col(column_name), "ISO-8859-1"), "UTF-8")
        )
    return df


def strip_df(df: DataFrame, columns: list[str]) -> DataFrame:
    """Remove espaços em branco do início e do fim das strings"""
    for column_name in columns:
        df = df.withColumn(column_name, trim(column_name))
    return df


def remove_accents(df: DataFrame, columns: list[str]) -> DataFrame:
    """Mapeia e remove acentos"""
    accented = "áàâãäéèêëíìîïóòôõöúùûüçÁÀÂÃÄÉÈÊËÍÌÎÏÓÒÔÕÖÚÙÛÜÇ"
    unaccented = "aaaaaeeeeiiiiooooouuuucAAAAAEEEEIIIIOOOOOUUUUC"

    for column_name in columns:
        df = df.withColumn(
            column_name,
            regexp_replace(
                translate(col(column_name), accented, unaccented),
                r"[^a-zA-Z0-9\s]",
                ""
            )
        )
    return df


def cast_dates(df: DataFrame, columns: list[str]) -> DataFrame:
    """Converte colunas para tipo DATE"""
    for column_name in columns:
        df = df.withColumn(column_name, to_date(col(column_name), "yyyyMMdd"))
    return df


def normalize_blanks_to_null(df: DataFrame, columns: list[str]) -> DataFrame:
    """
    Normaliza para NULL:
      - valores já nulos (None/null)
      - strings vazias ("")
      - strings contendo apenas espaços ("   ", "\t", etc. após trim)
    """
    if columns is None:
        target_cols = [c for c, t in df.dtypes if t == "string"]
    else:
        string_cols = {c for c, t in df.dtypes if t == "string"}
        target_cols = [c for c in columns if c in string_cols]

    for c in target_cols:
        df = df.withColumn(
            c,
            when(col(c).isNull() | (trim(col(c)) == ""), None).otherwise(col(c))
        )
    return df


# def non_values_to_not_informed(df: DataFrame, columns: list[str], target_values: list[str], not_informed_str="NAO INFORMADO") -> DataFrame:
#     """
#     Converte para 'NAO INFORMADO':
#       - valores contidos em 'target_values'
#       - strings com valor "0"
#       - strings vazias ("")
#       - strings contendo apenas espaços
#     """
#     if columns is None:
#         target_cols = [c for c, t in df.dtypes if t == "string"]
#     else:
#         string_cols = {c for c, t in df.dtypes if t == "string"}
#         target_cols = [c for c in columns if c in string_cols]

#     for c in target_cols:
#         for target_value in target_values:
#             df = df.withColumn(
#                 c,
#                 when((col(c) == target_value | col(c).isNull()), lit(not_informed_str)).otherwise(col(c))
#             )
#     return df



def non_values_to_not_informed(
    df: DataFrame,
    columns: list[str] | None,
    target_values: list[str],
    not_informed_str: str = "NAO INFORMADO"
) -> DataFrame:
    """
    Converte para 'NAO INFORMADO':
      - valores contidos em 'target_values'
      - strings com valor "0"
      - strings vazias ("")
      - strings contendo apenas espaços
      - valores nulos (NULL)
    Observações:
      - Só aplica em colunas de tipo string.
      - Se 'columns' for None, aplica em todas as colunas string do DataFrame.
    """
    # Seleciona colunas string (evita tentar comparar tipos não-string)
    string_cols = [c for c, t in df.dtypes if t == "string"]

    if columns is None:
        target_cols = string_cols
    else:
        # Filtra apenas colunas string dentre as solicitadas
        target_cols = [c for c in columns if c in string_cols]

    # Conjunto de valores alvo inclui também "0" e "" (o trim resolverá espaços)
    # Observação: "" já é coberto pela condição de trim(col) == ""
    base_targets = set(target_values or [])
    # "0" explicitamente pedido pela regra
    base_targets.add("0")

    for c in target_cols:
        # Condições:
        # - nulo
        # - valor em target_values OU "0"
        # - vazio após trim ("" ou só espaços)
        condition = (
            col(c).isNull()
            | trim(col(c)).isin(list(base_targets))
            | (trim(col(c)) == "")
        )

        df = df.withColumn(
            c,
            when(condition, lit(not_informed_str)).otherwise(col(c))
        )

    return df



def eliminate_row_if_null_or_blank(df: DataFrame, columns_to_check: list[str]) -> DataFrame:
    """Elimina linhas onde as colunas listadas forem nulas ou vazias"""
    for column_name in columns_to_check:
        df = df.filter(~(col(column_name).isNull()) & ~(col(column_name) == "") & ~(col(column_name) == " "))
    return df


def recommend_salt_simple(df, col):
    """
    Recomenda um valor de salt (número de partições extras) para balancear dados em joins ou writes
    """
    counts = df.groupBy(col).count()

    stats = counts.agg(
        max("count").alias("max_count"),
        percentile_approx("count", 0.5).alias("median_count")
    ).collect()[0]

    max_c = stats["max_count"]
    med_c = stats["median_count"]

    if med_c == 0:
        return 1

    ratio = max_c / med_c

    if ratio < 1.5:
        salt = 1
    elif ratio < 2.5:
        salt = 4
    elif ratio < 4:
        salt = 8
    else:
        salt = 16

    return salt


def _estimate_row_size_parquet(df, sample_rows=200_000, tmp_dir="dbfs:/tmp/row_size_estimate"):
    """Estima o tamanho médio (em bytes) de uma linha de um DataFrame Spark ao ser gravada em formato Parquet"""
    tmp_path = f"{tmp_dir}/est_{uuid.uuid4().hex}"
    
    try:
        sample = df.limit(sample_rows)
        sample.write.format("parquet").mode("overwrite").option("compression", "snappy").save(tmp_path)
        
        files = dbutils.fs.ls(tmp_path)
        total_bytes = builtins.sum(f.size for f in files if f.name.endswith(".parquet"))
        
        row_count = sample.count()
        avg_bytes = total_bytes / row_count if row_count > 0 else 0.0
        
        return avg_bytes
    finally:
        try:
            dbutils.fs.rm(tmp_path, recurse=True)
        except:
            pass


def recommend_partitions(df, desired_file_mb=128, sample_rows=200_000):
    """
    Recomenda o número de partições para atingir um tamanho de arquivo desejado
    """
    avg_row_bytes = _estimate_row_size_parquet(df, sample_rows=sample_rows)
    total_rows = df.count()
    desired_bytes = desired_file_mb * 1024 * 1024
    total_bytes = total_rows * avg_row_bytes

    num_partitions = builtins.max(1, int(total_bytes / desired_bytes)) if desired_bytes > 0 else 1

    return {
        "avg_row_bytes": float(avg_row_bytes),
        "total_rows": int(total_rows),
        "total_mb_estimated": float(total_bytes / (1024*1024)),
        "desired_file_mb": int(desired_file_mb),
        "recommended_partitions": int(num_partitions),
    }

## Tratamentos CNAE

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES CNAE")
print("=" * 60)

ultima_transformacao_cnae = obter_data_ultima_transformacao("cnaes_tratado")

cnaes_df = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/cnaes")

if ultima_transformacao_cnae:
    cnaes_df = cnaes_df.filter(col("ingestion_dt") > ultima_transformacao_cnae)
    print(f"Filtrando CNAE desde: {ultima_transformacao_cnae}")
else:
    print("Primeira execução: processando todos os CNAE")

print(f"Total de registros CNAE para processar: {cnaes_df.count()}")

In [0]:
# Sequência de tratamentos
cnaes_df = strip_df(cnaes_df, ["descricao"])
cnaes_df = normalize_blanks_to_null(cnaes_df, ["descricao"])
cnaes_df = remove_accents(cnaes_df, ["descricao"])

cnaes_df = (
    cnaes_df
    .withColumn("descricao", upper(col("descricao")))
    .withColumn("codigo", lpad(col("codigo"), 7, "0"))
    .drop("origin_file_name", "file_name_only")
    .dropDuplicates()
)
print(f"CNAE transformados: {cnaes_df.count()} registros")

In [0]:
# Escrita incremental (append) na Silver
(
    cnaes_df.write
    .format("delta")
    .mode("append")
    .save(f"{path_storage_silver}/cnpj/cnaes_tratado")
)

# Criar tabela se não existir
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.cnaes_tratado
    USING DELTA
    LOCATION '{path_storage_silver}/cnpj/cnaes_tratado'
""")

registrar_transformacao_silver("cnaes_tratado", cnaes_df.count())

print("CNAE salvos em Silver")

## Tratamentos Empresas

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES EMPRESAS")
print("=" * 60)

ultima_transformacao_empresas = obter_data_ultima_transformacao("empresas_tratado")

empresas_df = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/empresas")

if ultima_transformacao_empresas:
    empresas_df = empresas_df.filter(col("ingestion_dt") > ultima_transformacao_empresas)
    print(f"Filtrando Empresas desde: {ultima_transformacao_empresas}")
else:
    print("Primeira execução: processando todas as Empresas")

# display(empresas_df)

print(f"Total de registros Empresas para processar: {empresas_df.count()}")

In [0]:
# Sequência de tratamentos
empresas_df = eliminate_row_if_null_or_blank(empresas_df, ["CNPJ_BASICO"])
empresas_df = empresas_df.filter(~((empresas_df.CNPJ_BASICO == "00000000") | (empresas_df.CNPJ_BASICO == "") | (empresas_df.CNPJ_BASICO.isNull())))

empresas_df = empresas_df.drop("anomes","origin_file_name","file_name_only")

colunas_tratamento = [
    "CNPJ_BASICO", 
    "RAZAO_SOCIAL_NOME_EMPRESARIAL",  
    "NATUREZA_JURIDICA", 
    "QUALIFICACAO_DO_RESPONSAVEL", 
    "CAPITAL_SOCIAL_DA_EMPRESA", 
    "PORTE_DA_EMPRESA", 
    "ENTE_FEDERATIVO_RESPONSAVEL"
]
empresas_df = strip_df(empresas_df, colunas_tratamento)
empresas_df = remove_accents(empresas_df, colunas_tratamento)
empresas_df = normalize_blanks_to_null(empresas_df, colunas_tratamento)

empresas_df = empresas_df.withColumn("CNPJ_BASICO", lpad(col('CNPJ_BASICO'), 8, "0"))
empresas_df = empresas_df.withColumn("RAZAO_SOCIAL_NOME_EMPRESARIAL", upper(col("RAZAO_SOCIAL_NOME_EMPRESARIAL")))
empresas_df = empresas_df.withColumn("ENTE_FEDERATIVO_RESPONSAVEL", when(col("ENTE_FEDERATIVO_RESPONSAVEL").isNull(), "NAO INFORMADO").otherwise(col("ENTE_FEDERATIVO_RESPONSAVEL")))
empresas_df = empresas_df.withColumn("PORTE_DA_EMPRESA", when(col("PORTE_DA_EMPRESA").isNull(), "NAO INFORMADO").otherwise(col("PORTE_DA_EMPRESA")))
empresas_df = empresas_df.dropDuplicates()

print(f"Empresas transformadas: {empresas_df.count()} registros")

In [0]:
# Escrita incremental (append) na Silver
(
    empresas_df.write
    .format("delta")
    .mode("append")
    .save(f"{path_storage_silver}/cnpj/empresas_tratado")
)

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.empresas_tratado
    USING DELTA
    LOCATION '{path_storage_silver}/cnpj/empresas_tratado'
""")

registrar_transformacao_silver("empresas_tratado", empresas_df.count())

print("Empresas salvas em Silver")

## Tratamentos Estabelecimentos

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES ESTABELECIMENTOS")
print("=" * 60)

ultima_transformacao_estabelecimentos = obter_data_ultima_transformacao("estabelecimentos_tratado")

estabelecimentos_df = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/estabelecimentos")

if ultima_transformacao_estabelecimentos:
    estabelecimentos_df = estabelecimentos_df.filter(col("ingestion_dt") > ultima_transformacao_estabelecimentos)
    print(f"Filtrando Estabelecimentos desde: {ultima_transformacao_estabelecimentos}")
else:
    print("Primeira execução: processando todos os Estabelecimentos")

print(f"Total de registros Estabelecimentos para processar: {estabelecimentos_df.count()}")

In [0]:
# Sequência de tratamentos
estabelecimentos_df = eliminate_row_if_null_or_blank(estabelecimentos_df, ["CNPJ_BASICO"])
estabelecimentos_df = estabelecimentos_df.filter(~((estabelecimentos_df.CNPJ_BASICO == "00000000") | (estabelecimentos_df.CNPJ_BASICO == "") | (estabelecimentos_df.CNPJ_BASICO.isNull())))

estabelecimentos_df = estabelecimentos_df.drop("ingestion_dt","anomes","origin_file_name","file_name_only")

columns_to_remove = [
    "DDD1", "TELEFONE1", "DDD2", "TELEFONE2", "DDD_FAX", "FAX", "CORREIO_ELETRONICO", "TIPO_LOGRADOURO", "LOGRADOURO", "NUMERO", "COMPLEMENTO", "CEP",
    "SITUACAO_ESPECIAL", "DATA_SITUACAO_ESPECIAL", 'ingestion_dt'
]
estabelecimentos_df = estabelecimentos_df.drop(*columns_to_remove)

colunas_tratamento = ["CNPJ_BASICO", "CNPJ_ORDEM", "CNPJ_DV", "NOME_FANTASIA", "SITUACAO_CADASTRAL", "DATA_SITUACAO_CADASTRAL", "MOTIVO_SITUACAO_CADASTRAL", "NOME_CIDADE_EXTERIOR", "PAIS", "DT_INICIO_ATIVIDADE", "CNAE_FISCAL_PRINCIPAL", "CNAE_FISCAL_SECUNDARIA", "BAIRRO", "UF", "MUNICIPIO", "IDENTIFICADOR_MATRIZ_FILIAL"]
estabelecimentos_df = strip_df(estabelecimentos_df, colunas_tratamento)
estabelecimentos_df = remove_accents(estabelecimentos_df, colunas_tratamento)
estabelecimentos_df = normalize_blanks_to_null(estabelecimentos_df, colunas_tratamento)

colunas_tratamento_datas = ["DATA_SITUACAO_CADASTRAL", "DT_INICIO_ATIVIDADE"]
estabelecimentos_df = cast_dates(estabelecimentos_df, colunas_tratamento_datas)

estabelecimentos_df = estabelecimentos_df.withColumn("CNPJ_BASICO", lpad(col('CNPJ_BASICO'), 8, "0"))
estabelecimentos_df = estabelecimentos_df.withColumn("CNPJ_ORDEM", lpad(col('CNPJ_ORDEM'), 4, "0"))
estabelecimentos_df = estabelecimentos_df.withColumn("CNPJ_DV", lpad(col('CNPJ_DV'), 2, "0"))
estabelecimentos_df = estabelecimentos_df.withColumn("CNAE_FISCAL_PRINCIPAL", lpad(col('CNAE_FISCAL_PRINCIPAL'), 7, "0"))

estabelecimentos_df = estabelecimentos_df.withColumn("NOME_FANTASIA", upper(col("NOME_FANTASIA")))
estabelecimentos_df = estabelecimentos_df.withColumn("BAIRRO", upper(col("BAIRRO")))

target_values = ["0", "00"]
colunas_tratamento_not_informed = ["NOME_FANTASIA", "NOME_CIDADE_EXTERIOR", "PAIS", "BAIRRO"]
estabelecimentos_df = non_values_to_not_informed(estabelecimentos_df, colunas_tratamento_not_informed, target_values)

# estabelecimentos_df = estabelecimentos_df.withColumn("CNAE_FISCAL_SECUNDARIA_EXP", explode(split(col("CNAE_FISCAL_SECUNDARIA"), ",")))
# estabelecimentos_df = estabelecimentos_df.withColumn("CNAE_FISCAL_SECUNDARIA_EXP", lpad(col('CNAE_FISCAL_SECUNDARIA_EXP'), 7, "0"))

# group_columns = ['CNPJ_BASICO', 'CNPJ_ORDEM', 'CNPJ_DV', 'IDENTIFICADOR_MATRIZ_FILIAL', 'NOME_FANTASIA', 'SITUACAO_CADASTRAL', 'DATA_SITUACAO_CADASTRAL', 'MOTIVO_SITUACAO_CADASTRAL', 'NOME_CIDADE_EXTERIOR', 'PAIS', 'DT_INICIO_ATIVIDADE', 'CNAE_FISCAL_PRINCIPAL', 'BAIRRO', 'UF', 'MUNICIPIO']
# estabelecimentos_df = estabelecimentos_df.groupBy(*group_columns).agg(collect_list("CNAE_FISCAL_SECUNDARIA_EXP").alias("CNAE_FISCAL_SECUNDARIA_BACK"))
# estabelecimentos_df = estabelecimentos_df.withColumn("CNAE_FISCAL_SECUNDARIA", concat_ws(",", col("CNAE_FISCAL_SECUNDARIA_BACK"))).drop("CNAE_FISCAL_SECUNDARIA_BACK")

estabelecimentos_df = estabelecimentos_df.withColumn("CNPJ_BASICO_2D", substring(col("CNPJ_BASICO"), 1, 2))
estabelecimentos_df = estabelecimentos_df.withColumn("IS_MATRIZ", when(col("IDENTIFICADOR_MATRIZ_FILIAL") == "1", True).otherwise(False))
estabelecimentos_df = estabelecimentos_df.withColumn("IS_ATIVA", when(col("SITUACAO_CADASTRAL") == "02", True).otherwise(False))

# filiais_df = estabelecimentos_df.filter(col("IS_MATRIZ") == False).groupBy("CNPJ_BASICO").count()
# filiais_df = filiais_df.withColumnRenamed("count", "QTDE_FILIAIS")
# estabelecimentos_df = estabelecimentos_df.join(filiais_df, on="CNPJ_BASICO", how="left")

recommendations = recommend_partitions(estabelecimentos_df)
print(f"RECOMMENDATIONS: {str(recommendations)}")
partitions_number = builtins.max(1, int(recommendations['recommended_partitions']))

salt_buckets = builtins.int(recommend_salt_simple(estabelecimentos_df, "CNPJ_BASICO_2D"))
salt_buckets = builtins.max(1, salt_buckets)

if salt_buckets == 1:
    estabelecimentos_df = estabelecimentos_df.withColumn("salt", lit(0).cast("int"))
else:
    estabelecimentos_df = estabelecimentos_df.withColumn(
        "salt",
        (xxhash64(col("CNPJ_BASICO_2D")) % lit(salt_buckets)).cast("int")
    )

estabelecimentos_df = estabelecimentos_df.dropDuplicates()

estabelecimentos_df = estabelecimentos_df.repartition(partitions_number, col("salt"))

print(f"Estabelecimentos transformados: {estabelecimentos_df.count()} registros")


display(estabelecimentos_df)

In [0]:
# Escrita incremental (append) na Silver
(
    estabelecimentos_df.write
    .format("delta")
    .mode("append")
    .partitionBy("salt")
    .save(f"{path_storage_silver}/cnpj/estabelecimentos_tratado")
)


# spark.sql(f"""
#     CREATE TABLE IF NOT EXISTS silver.estabelecimentos_tratado
#     USING DELTA
#     LOCATION '{path_storage_silver}/cnpj/estabelecimentos_tratado'
# """)

estabelecimentos_df.write.format("delta").mode("append").saveAsTable("silver.estabelecimentos_tratado")

registrar_transformacao_silver("estabelecimentos_tratado", estabelecimentos_df.count())

print("Estabelecimentos salvos em Silver")

## Tratamentos Municípios

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES MUNICÍPIOS")
print("=" * 60)

ultima_transformacao_municipios = obter_data_ultima_transformacao("municipios_tratado")

df_municipios = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/municipios")

if ultima_transformacao_municipios:
    df_municipios = df_municipios.filter(col("ingestion_dt") > ultima_transformacao_municipios)
    print(f"Filtrando Municípios desde: {ultima_transformacao_municipios}")
else:
    print("Primeira execução: processando todos os Municípios")

print(f"Total de registros Municípios para processar: {df_municipios.count()}")

In [0]:
colunas_tratamento = ["codigo", "descricao"]
df_municipios = strip_df(df_municipios, colunas_tratamento)
df_municipios = remove_accents(df_municipios, colunas_tratamento)
df_municipios = normalize_blanks_to_null(df_municipios, colunas_tratamento)
df_municipios = df_municipios.withColumn("descricao", upper(col("descricao")))
df_municipios = df_municipios.dropDuplicates()
df_municipios = df_municipios.drop("origin_file_name")
df_municipios = df_municipios.drop("file_name_only")

print(f"Municípios transformados: {df_municipios.count()} registros")

In [0]:
(
    df_municipios.write
    .format("delta")
    .mode("append")
    .save(f"{path_storage_silver}/cnpj/municipios_Tratado")
)

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.municipios_tratado
    USING DELTA
    LOCATION '{path_storage_silver}/cnpj/municipios_Tratado'
""")

registrar_transformacao_silver("municipios_tratado", df_municipios.count())

print("Municípios salvos em Silver")

## Tratamentos Natureza Jurídica

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES NATUREZA JURÍDICA")
print("=" * 60)

ultima_transformacao_natureza = obter_data_ultima_transformacao("Naturezas_Tratado")

df_natureza_juridica = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/naturezas")

if ultima_transformacao_natureza:
    df_natureza_juridica = df_natureza_juridica.filter(col("ingestion_dt") > ultima_transformacao_natureza)
    print(f"Filtrando Natureza Jurídica desde: {ultima_transformacao_natureza}")
else:
    print("Primeira execução: processando todas as Naturezas Jurídicas")

print(f"Total de registros Natureza Jurídica para processar: {df_natureza_juridica.count()}")

In [0]:
colunas_tratamento = ["codigo", "descricao"]
df_natureza_juridica = strip_df(df_natureza_juridica, colunas_tratamento)
df_natureza_juridica = remove_accents(df_natureza_juridica, colunas_tratamento)
df_natureza_juridica = normalize_blanks_to_null(df_natureza_juridica, colunas_tratamento)
df_natureza_juridica = df_natureza_juridica.withColumn("descricao", upper(col("descricao")))
df_natureza_juridica = df_natureza_juridica.drop("origin_file_name")
df_natureza_juridica = df_natureza_juridica.drop("file_name_only")
df_natureza_juridica = df_natureza_juridica.dropDuplicates()

print(f"Natureza Jurídica transformada: {df_natureza_juridica.count()} registros")

In [0]:
(
    df_natureza_juridica.write
    .format("delta")
    .mode("append")
    .save(f"{path_storage_silver}/cnpj/natureza_tratado")
)

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.natureza_juridica_tratado
    USING DELTA
    LOCATION '{path_storage_silver}/cnpj/natureza_tratado'
""")

registrar_transformacao_silver("natureza_tratado", df_natureza_juridica.count())

print("Natureza Jurídica salva em Silver")

## Tratamentos Países

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES PAÍSES")
print("=" * 60)

ultima_transformacao_paises = obter_data_ultima_transformacao("Paises_Tratado")

df_paises = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/paises")

if ultima_transformacao_paises:
    df_paises = df_paises.filter(col("ingestion_dt") > ultima_transformacao_paises)
    print(f"Filtrando Países desde: {ultima_transformacao_paises}")
else:
    print("Primeira execução: processando todos os Países")

print(f"Total de registros Países para processar: {df_paises.count()}")

In [0]:
colunas_tratamento = ["codigo", "descricao"]
df_paises = strip_df(df_paises, colunas_tratamento)
df_paises = remove_accents(df_paises, colunas_tratamento)
df_paises = normalize_blanks_to_null(df_paises, colunas_tratamento)
df_paises = df_paises.withColumn("descricao", upper(col("descricao")))
df_paises = df_paises.drop("origin_file_name")
df_paises = df_paises.drop("file_name_only")
df_paises = df_paises.dropDuplicates()

print(f"Países transformados: {df_paises.count()} registros")

In [0]:
(
    df_paises.write
    .format("delta")
    .mode("append")
    .save(f"{path_storage_silver}/cnpj/paises_tratado")
)

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.paises_tratado
    USING DELTA
    LOCATION '{path_storage_silver}/cnpj/paises_tratado'
""")

registrar_transformacao_silver("paises_tratado", df_paises.count())

print("Países salvos em Silver")

## Tratamentos Qualificações

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES QUALIFICAÇÕES")
print("=" * 60)

ultima_transformacao_qualificacoes = obter_data_ultima_transformacao("Qualificacoes_Tratado")

df_qualificacoes = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/qualificacoes")

if ultima_transformacao_qualificacoes:
    df_qualificacoes = df_qualificacoes.filter(col("ingestion_dt") > ultima_transformacao_qualificacoes)
    print(f"Filtrando Qualificações desde: {ultima_transformacao_qualificacoes}")
else:
    print("Primeira execução: processando todas as Qualificações")

print(f"Total de registros Qualificações para processar: {df_qualificacoes.count()}")

In [0]:
colunas_tratamento = ["codigo", "descricao"]
df_qualificacoes = strip_df(df_qualificacoes, colunas_tratamento)
df_qualificacoes = remove_accents(df_qualificacoes, colunas_tratamento)
df_qualificacoes = normalize_blanks_to_null(df_qualificacoes, colunas_tratamento)
df_qualificacoes = df_qualificacoes.withColumn("descricao", upper(col("descricao")))
df_qualificacoes = df_qualificacoes.drop("origin_file_name")
df_qualificacoes = df_qualificacoes.drop("file_name_only")
df_qualificacoes = df_qualificacoes.dropDuplicates()

print(f"Qualificações transformadas: {df_qualificacoes.count()} registros")

In [0]:
(
    df_qualificacoes.write
    .format("delta")
    .mode("append")
    .save(f"{path_storage_silver}/cnpj/qualificacoes_tratado")
)

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.qualificacoes_tratado
    USING DELTA
    LOCATION '{path_storage_silver}/cnpj/qualificacoes_tratado'
""")

registrar_transformacao_silver("qualificacoes_tratado", df_qualificacoes.count())

print("Qualificações salvas em Silver")

## Tratamentos Simples

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES SIMPLES")
print("=" * 60)

ultima_transformacao_simples = obter_data_ultima_transformacao("simples_tratado")

df_simples = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/simples")

if ultima_transformacao_simples:
    df_simples = df_simples.filter(col("ingestion_dt") > ultima_transformacao_simples)
    print(f"Filtrando Simples desde: {ultima_transformacao_simples}")
else:
    print("Primeira execução: processando todos os Simples")

print(f"Total de registros Simples para processar: {df_simples.count()}")

In [0]:
# 0. Reparticionamento estratégico (logo após leitura da Bronze)
df_simples = df_simples.repartition(200, "CNPJ_BASICO")

# 1. Tratamentos de string (só colunas necessárias)
colunas_string = ["CNPJ_BASICO", "OPCAO_PELO_SIMPLES", "OPCAO_PELO_MEI"]
df_simples = strip_df(df_simples, colunas_string)
df_simples = remove_accents(df_simples, colunas_string)
df_simples = normalize_blanks_to_null(df_simples, colunas_string)

# 2. Conversão de datas
colunas_datas = [
    "DATA_DE_OPCAO_PELO_SIMPLES",
    "DATA_DE_EXCLUSAO_DO_SIMPLES",
    "DATA_DE_OPCAO_PELO_MEI",
    "DATA_DE_EXCLUSAO_DO_MEI"
]
df_simples = cast_dates(df_simples, colunas_datas)

# 3. Transformações encadeadas + filtro antecipado (reduz volume antes do shuffle)
df_simples = (
    df_simples
    .withColumn("CNPJ_BASICO", lpad(col("CNPJ_BASICO"), 8, "0"))
    .filter(
        col("CNPJ_BASICO").isNotNull() & 
        (length(col("CNPJ_BASICO")) == 8)
    )
    .withColumn("OPCAO_PELO_SIMPLES", upper(col("OPCAO_PELO_SIMPLES")))
    .withColumn("OPCAO_PELO_MEI", upper(col("OPCAO_PELO_MEI")))
    .drop("origin_file_name", "file_name_only")
)

# 4. Deduplicação eficiente com window function 
w = Window.partitionBy("CNPJ_BASICO").orderBy(
    desc(coalesce("ingestion_dt", "DATA_DE_OPCAO_PELO_SIMPLES"))
)

# 4. Count uma vez só
total = df_simples.count()
print(f"Simples transformados: {total} registros")

In [0]:
(
    df_simples.write
    .format("delta")
    .mode("append")
    .save(f"{path_storage_silver}/cnpj/simples_tratado")
)

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.simples_tratado
    USING DELTA
    LOCATION '{path_storage_silver}/cnpj/simples_tratado'
""")

registrar_transformacao_silver("simples_tratado", df_simples.count())

print("Simples salvos em Silver")

## Tratamentos Sócios

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES SÓCIOS")
print("=" * 60)

ultima_transformacao_socios = obter_data_ultima_transformacao("socios_tratado")

df_socios = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/socios")

if ultima_transformacao_socios:
    df_socios = df_socios.filter(col("ingestion_dt") > ultima_transformacao_socios)
    print(f"Filtrando Sócios desde: {ultima_transformacao_socios}")
else:
    print("Primeira execução: processando todos os Sócios")

print(f"Total de registros Sócios para processar: {df_socios.count()}")

In [0]:
colunas_tratamento = ["CNPJ_BASICO","NOME_DO_SOCIO","PAIS","QUALIFICACAO_DO_SOCIO","IDENTIFICADOR_DO_SOCIO","CPF_CNPJ_DO_SOCIO","FAIXA_ETARIA","DATA_DE_ENTRADA_NA_SOCIEDADE"]
df_socios = strip_df(df_socios, colunas_tratamento)
df_socios = remove_accents(df_socios, colunas_tratamento)
df_socios = normalize_blanks_to_null(df_socios, colunas_tratamento)

df_socios = df_socios.withColumn("CNPJ_BASICO", lpad(col('CNPJ_BASICO'), 8, "0"))

colunas_tratamento_data = ["DATA_DE_ENTRADA_NA_SOCIEDADE"]
df_socios = cast_dates(df_socios, colunas_tratamento_data)

df_socios = df_socios.drop("nome_do_representante_legal","qualificacao_do_representante_legal","representante_legal","anomes","ingestion_dt")

df_socios = df_socios.withColumn(
    "PAIS",
    when(
        (col("PAIS").isNull()) |
        (trim(col("PAIS")) == "") |
        (col("PAIS") == "null"),
        "105"
    ).otherwise(col("PAIS"))
)

df_socios = df_socios.withColumn("NOME_DO_SOCIO", upper(col("NOME_DO_SOCIO"))) \
                     .withColumn("QUALIFICACAO_DO_SOCIO", upper(col("QUALIFICACAO_DO_SOCIO"))) \
                     .withColumn("PAIS", upper(col("PAIS")))

df_socios = df_socios.filter(
    col("CNPJ_BASICO").isNotNull() &
    (length(col("CNPJ_BASICO")) == 8)
)

df_socios = df_socios.dropDuplicates(["CNPJ_BASICO","IDENTIFICADOR_DO_SOCIO"])
df_socios = df_socios.drop('anomes','origin_path_name')

print(f"Sócios transformados: {df_socios.count()} registros")

In [0]:
(
    df_socios.write
    .format("delta")
    .mode("append")
    .save(f"{path_storage_silver}/cnpj/socios_tratado")
)

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.socios_tratado
    USING DELTA
    LOCATION '{path_storage_silver}/cnpj/socios_tratado'
""")

registrar_transformacao_silver("socios_tratado", df_socios.count())

print("Sócios salvos em Silver")

# Tratamentos Motivos

In [0]:
print("=" * 60)
print("INICIANDO TRANSFORMAÇÕES MOTIVOS")
print("=" * 60)

ultima_transformacao_motivos = obter_data_ultima_transformacao("motivos_tratado")

df_motivos = spark.read.format("delta").load(f"{path_storage_bronze}/cnpj/motivos")

if ultima_transformacao_motivos:
    df_motivos = df_motivos.filter(col("ingestion_dt") > ultima_transformacao_motivos)
    print(f"Filtrando motivos desde: {ultima_transformacao_motivos}")
else:
    print("Primeira execução: processando todos os motivos")

print(f"Total de registros motivos para processar: {df_motivos.count()}")

In [0]:
colunas_tratamento = ["codigo", "descricao"]
df_motivos = strip_df(df_motivos, colunas_tratamento)
df_motivos = remove_accents(df_motivos, colunas_tratamento)
df_motivos = normalize_blanks_to_null(df_motivos, colunas_tratamento)
df_motivos = df_motivos.withColumn("descricao", upper(col("descricao")))
df_motivos = df_motivos.drop("origin_file_name")
df_motivos = df_motivos.drop("file_name_only")
df_motivos = df_motivos.dropDuplicates()

print(f"Motivos transformados: {df_motivos.count()} registros")

In [0]:
(
    df_motivos.write
    .format("delta")
    .mode("append")
    .save(f"{path_storage_silver}/cnpj/motivos_tratado")
)

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.motivos_tratado
    USING DELTA
    LOCATION '{path_storage_silver}/cnpj/motivos_tratado'
""")

registrar_transformacao_silver("qualificacoes_tratado", df_motivos.count())

print("Motivos salvos em Silver")