### Baixando dependencias

In [0]:
%pip install py7zr

Collecting py7zr
  Downloading py7zr-1.1.0-py3-none-any.whl.metadata (17 kB)
Collecting texttable (from py7zr)
  Downloading texttable-1.7.0-py2.py3-none-any.whl.metadata (9.8 kB)
Collecting pycryptodomex>=3.20.0 (from py7zr)
  Downloading pycryptodomex-3.23.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting brotli>=1.2.0 (from py7zr)
  Downloading brotli-1.2.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (6.1 kB)
Collecting backports.zstd>=1.0.0 (from py7zr)
  Downloading backports_zstd-1.3.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (6.9 kB)
Collecting pyppmd>=1.3.1 (from py7zr)
  Downloading pyppmd-1.3.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (5.4 kB)
Collecting pybcj>=1.0.6 (from py7zr)
  Downloading pybcj-1.0.7-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (3.9 kB)
Collecting multi

### Conectando com o Blob Storage

In [0]:
STORAGE_ACCOUNT = ""

ACCESS_KEY = ""

spark.conf.set(
    f"fs.azure.account.key.{STORAGE_ACCOUNT}.blob.core.windows.net",
    ACCESS_KEY
)

BRONZE_CONTAINER = "bronze"
SILVER_CONTAINER = "silver"

BASE_PATH_BRONZE = f"wasbs://{BRONZE_CONTAINER}@{STORAGE_ACCOUNT}.blob.core.windows.net/"
BASE_PATH_SILVER = f"wasbs://{SILVER_CONTAINER}@{STORAGE_ACCOUNT}.blob.core.windows.net/movimentacoes_detalhadas"

print(f"Conectado ao Storage: {STORAGE_ACCOUNT}")

Conectado ao Storage: cagedstorage


In [0]:
import os
import shutil
import py7zr
from pyspark.sql.functions import col, when, regexp_replace
from pyspark.sql.types import DoubleType, IntegerType

# --- CONFIGURA√á√ïES ---
STORAGE_ACCOUNT = ""
CONTAINER_BRONZE = "bronze"
CONTAINER_SILVER = "silver"

# Insira sua chave de acesso (KEY) abaixo
ACCESS_KEY = ""

# Configura√ß√£o de credenciais Spark
spark.conf.set(
    f"fs.azure.account.key.{STORAGE_ACCOUNT}.blob.core.windows.net",
    ACCESS_KEY
)

# Caminhos (Protocolo WASBS)
PATH_BRONZE = f"wasbs://{CONTAINER_BRONZE}@{STORAGE_ACCOUNT}.blob.core.windows.net/"
PATH_SILVER = f"wasbs://{CONTAINER_SILVER}@{STORAGE_ACCOUNT}.blob.core.windows.net/movimentacoes_detalhadas"
DIR_TEMP_LOCAL = "/tmp/tmp_caged_processamento/"

def registrar_log(msg):
    print(f"[INFO] {msg}")

def listar_arquivos_recursivo(caminho_base):
    lista_arquivos = []
    try:
        items = dbutils.fs.ls(caminho_base)
        for item in items:
            if item.isDir():
                lista_arquivos.extend(listar_arquivos_recursivo(item.path))
            elif item.path.endswith(".7z"):
                lista_arquivos.append(item.path)
    except Exception as e:
        print(f"[AVISO] Erro ao listar {caminho_base}: {e}")
    return lista_arquivos

# --- ETAPA 1: C√ìPIA E EXTRA√á√ÉO ---
registrar_log("Iniciando varredura no Azure...")

if os.path.exists(DIR_TEMP_LOCAL):
    shutil.rmtree(DIR_TEMP_LOCAL)
os.makedirs(DIR_TEMP_LOCAL, exist_ok=True)

arquivos_azure = listar_arquivos_recursivo(PATH_BRONZE)
registrar_log(f"Arquivos .7z encontrados: {len(arquivos_azure)}")

arquivos_processar = []

for arquivo_remoto in arquivos_azure:
    nome_arquivo = arquivo_remoto.split("/")[-1]
    caminho_local_7z = os.path.join(DIR_TEMP_LOCAL, nome_arquivo)
    
    # Otimiza√ß√£o: S√≥ copia se n√£o existir localmente (caso o cluster n√£o tenha reiniciado)
    if not os.path.exists(caminho_local_7z):
        try:
            dbutils.fs.cp(arquivo_remoto, f"file:{caminho_local_7z}")
        except Exception as e:
            print(f"[ERRO] Falha ao copiar {nome_arquivo}: {e}")
            continue

    try:
        with py7zr.SevenZipFile(caminho_local_7z, mode='r') as z:
            z.extractall(path=DIR_TEMP_LOCAL)
            for nome in z.getnames():
                # Filtra apenas os arquivos de dados (txt ou csv)
                if nome.lower().endswith(".txt") or nome.lower().endswith(".csv"):
                    arquivos_processar.append(os.path.join(DIR_TEMP_LOCAL, nome))
        
        # Remove o compactado para liberar espa√ßo
        os.remove(caminho_local_7z)
    except Exception as e:
        print(f"[ERRO] Falha ao descompactar {nome_arquivo}: {e}")

# --- ETAPA 2: PROCESSAMENTO SPARK ---
if arquivos_processar:
    registrar_log(f"Processando {len(arquivos_processar)} arquivos extra√≠dos...")

    try:
        # CORRE√á√ÉO: Encoding UTF-8 para suportar acentos corretamente
        df_bruto = spark.read \
            .option("header", "true") \
            .option("delimiter", ";") \
            .option("encoding", "UTF-8") \
            .option("inferSchema", "false") \
            .csv(f"file:{DIR_TEMP_LOCAL}")

        # Mapeamento exato conforme colunas informadas
        df_final = df_bruto.select(
            col("compet√™nciamov").cast(IntegerType()).alias("competencia"),
            col("munic√≠pio").cast(IntegerType()).alias("id_municipio"),
            col("subclasse").alias("id_cnae_subclasse"),
            col("cbo2002ocupa√ß√£o").alias("id_cbo"),
            col("saldomovimenta√ß√£o").cast(IntegerType()).alias("saldo_movimentacao"),
            col("tipomovimenta√ß√£o").cast(IntegerType()).alias("tipo_movimentacao"),
            col("ra√ßacor").cast(IntegerType()).alias("id_raca"),
            col("sexo").cast(IntegerType()).alias("id_sexo"),
            col("graudeinstru√ß√£o").cast(IntegerType()).alias("id_instrucao"),
            col("idade").cast(IntegerType()).alias("idade"),
            
            # Tratamento num√©rico (V√≠rgula para Ponto)
            regexp_replace(col("sal√°rio"), ",", ".").cast(DoubleType()).alias("salario_bruto"),
            col("unidadesal√°rioc√≥digo").cast(IntegerType()).alias("tipo_unidade_salarial"),
            regexp_replace(col("horascontratuais"), ",", ".").cast(DoubleType()).alias("horas_contratuais")
        ).withColumn(
            # Regra de c√°lculo salarial mensalizado
            "salario_mensal_final",
            when(col("tipo_unidade_salarial") == 5, col("salario_bruto")) # Mensal
            .when(col("tipo_unidade_salarial") == 1, col("salario_bruto") * col("horas_contratuais") * 4.33) # Horista
            .when(col("tipo_unidade_salarial") == 3, col("salario_bruto") * 4.33) # Semanal
            .when(col("tipo_unidade_salarial") == 4, col("salario_bruto") * 2) # Quinzenal
            .otherwise(None)
        ).filter(
            # Filtros de consist√™ncia de dados
            (col("salario_mensal_final") >= 1000) & 
            (col("salario_mensal_final") <= 50000) &
            (col("idade") >= 14) & 
            (col("idade") <= 80)
        ).withColumn("ano", (col("competencia") / 100).cast(IntegerType())) \
         .withColumn("mes", (col("competencia") % 100).cast(IntegerType()))

        # --- ETAPA 3: GRAVA√á√ÉO ---
        registrar_log("Gravando tabela Delta na camada Silver...")
        
        df_final.write \
            .format("delta") \
            .mode("overwrite") \
            .partitionBy("ano", "mes") \
            .save(PATH_SILVER)

        registrar_log(f"Processamento conclu√≠do com sucesso em: {PATH_SILVER}")

    except Exception as e:
        print(f"[ERRO CR√çTICO NO SPARK] {e}")
        # Importante: N√£o relan√ßa o erro imediatamente para permitir a limpeza do finally
    
    finally:
        # Limpeza obrigat√≥ria do disco tempor√°rio
        if os.path.exists(DIR_TEMP_LOCAL):
            shutil.rmtree(DIR_TEMP_LOCAL)
            registrar_log("Arquivos tempor√°rios removidos.")
else:
    registrar_log("Nenhum arquivo encontrado para processar.")

[INFO] Iniciando varredura no Azure...
[INFO] Arquivos .7z encontrados: 71
[INFO] Processando 71 arquivos extra√≠dos...
[INFO] Gravando tabela Delta na camada Silver...
[INFO] Processamento conclu√≠do com sucesso em: wasbs://silver@cagedstorage.blob.core.windows.net/movimentacoes_detalhadas
[INFO] Arquivos tempor√°rios removidos.


#### criando catalogo

In [0]:
# Garante que as chaves est√£o na sess√£o
spark.conf.set(
    f"fs.azure.account.key.{STORAGE_ACCOUNT}.blob.core.windows.net",
    ACCESS_KEY
)

CAMINHO_SILVER = f"wasbs://{CONTAINER_SILVER}@{STORAGE_ACCOUNT}.blob.core.windows.net/movimentacoes_detalhadas"

print("1. Criando Banco de Dados no Cat√°logo Legado...")
# Usamos 'hive_metastore' para fugir da trava do Unity Catalog
spark.sql("CREATE DATABASE IF NOT EXISTS hive_metastore.banco_caged")

print(f"2. Registrando tabela apontando para: {CAMINHO_SILVER}")
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS hive_metastore.banco_caged.tabela_silver
    USING DELTA
    LOCATION '{CAMINHO_SILVER}'
""")

print("Tabela 'hive_metastore.banco_caged.tabela_silver' registrada com sucesso!")

1. Criando Banco de Dados no Cat√°logo Legado...
2. Registrando tabela apontando para: wasbs://silver@cagedstorage.blob.core.windows.net/movimentacoes_detalhadas
Tabela 'hive_metastore.banco_caged.tabela_silver' registrada com sucesso!


In [0]:
# --- DIAGN√ìSTICO DA TABELA ---
spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT}.blob.core.windows.net", ACCESS_KEY)

print("Verificando sa√∫de da tabela...")

# 1. Conta quantas linhas existem no total
total = spark.sql("SELECT count(*) FROM hive_metastore.banco_caged.tabela_silver").collect()[0][0]
print(f"Total de linhas na tabela: {total}")

if total > 0:
    # 2. Se tiver dados, mostra quais ANOS est√£o dispon√≠veis
    print("\nAnos dispon√≠veis:")
    display(spark.sql("SELECT ano, count(*) as qtd FROM hive_metastore.banco_caged.tabela_silver GROUP BY ano ORDER BY ano"))
    
    # 3. Mostra uma amostra de 5 linhas para checar se os c√≥digos est√£o certos
    print("\n Amostra dos dados :")
    display(spark.sql("SELECT * FROM hive_metastore.banco_caged.tabela_silver LIMIT 5"))
else:
    print("\nA TABELA EST√Å VAZIA!")
    print("Isso significa que o passo de 'Processamento' (Etapa 2) n√£o salvou nada.")
    print("SOLU√á√ÉO: Volte na c√©lula de processamento (aquela grande do Python) e rode ela novamente agora que o Cluster est√° corrigido.")

Verificando sa√∫de da tabela...
Total de linhas na tabela: 215808549

Anos dispon√≠veis:


ano,qtd
2020,22153258
2021,30652934
2022,37294617
2023,39359513
2024,43610336
2025,42737891



üëÄ Amostra dos dadosüëÄüëÄüëÄüëÄ:


competencia,id_municipio,id_cnae_subclasse,id_cbo,saldo_movimentacao,tipo_movimentacao,id_raca,id_sexo,id_instrucao,idade,salario_bruto,tipo_unidade_salarial,horas_contratuais,salario_mensal_final,ano,mes
202207,354100,8299799,517410,-1,31,6,1,7,37,1607.97,5,44.0,1607.97,2022,7
202207,354980,9329899,521140,-1,43,3,3,7,34,1617.32,5,44.0,1617.32,2022,7
202207,355030,8130300,783225,-1,31,1,1,7,34,1239.8,5,44.0,1239.8,2022,7
202207,110020,6021700,376320,-1,31,6,3,9,38,2348.66,5,30.0,2348.66,2022,7
202207,430930,2822401,731155,-1,31,2,1,7,48,2889.02,5,44.0,2889.02,2022,7


In [0]:
# --- DIAGN√ìSTICO DA TABELA ---
spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT}.blob.core.windows.net", ACCESS_KEY)

df = spark.sql("""
CREATE TABLE cageddatabricks.banco_caged.tabela_silver
USING DELTA
AS SELECT * FROM hive_metastore.banco_caged.tabela_silver;
                  """)

# display(df)
