In [None]:
# Databricks notebook source
# MAGIC %md 
# # üöÄ Ingest√£o de M√∫ltiplos Arquivos com Valida√ß√£o de Vazios
# - Autor: Luciana Sampaio
# - Data: 2025-07-02
# - Finalidade: Validar m√∫ltiplos arquivos e registrar ingest√µes em log Delta

# COMMAND ----------

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, current_timestamp, lit

# 1Ô∏è‚É£ Cria sess√£o Spark
spark = SparkSession.builder.appName("Ingestao_Arquivos_Multiplos").getOrCreate()

# 2Ô∏è‚É£ Configura par√¢metros principais
caminhos_arquivos = [
    "dbfs:/mnt/meulake/bronze/dados/arq1.csv",
    "dbfs:/mnt/meulake/bronze/dados/arq2.csv",
    "dbfs:/mnt/meulake/bronze/dados/arq3.csv"
]

tabela_destino = "meu_catalogo.bronze.minha_tabela"
tabela_log = "meu_catalogo.audit.log_ingestao"

arquivos_processados = []
arquivos_vazios = []

# 3Ô∏è‚É£ Loop de ingest√£o com valida√ß√£o
for caminho in caminhos_arquivos:
    print(f"üì• Verificando arquivo: {caminho}")
    try:
        df = spark.read.format("csv").option("header", True).load(caminho)

        if df.rdd.isEmpty():
            print(f"‚ö†Ô∏è Arquivo vazio! Ignorado: {caminho}")
            arquivos_vazios.append(caminho)

            # Registra no log Delta
            log_df = spark.createDataFrame(
                [(caminho, "VAZIO", current_timestamp())],
                ["arquivo", "status", "data_registro"]
            )
            log_df.write.mode("append").format("delta").saveAsTable(tabela_log)

            continue

        print(f"‚úÖ Arquivo OK: {caminho} | Linhas: {df.count()}")

        # Adiciona colunas de auditoria
        df_final = (
            df.withColumn("ddata_movto", current_date())
            .withColumn("adata_movto_ctrl", current_timestamp())
            .withColumn("rnome_arq", lit(caminho.split("/")[-1]))
        )

        df_final.write.mode("append").format("delta").saveAsTable(tabela_destino)

        arquivos_processados.append(caminho)

        # Registra no log Delta
        log_df = spark.createDataFrame(
            [(caminho, "PROCESSADO", current_timestamp())],
            ["arquivo", "status", "data_registro"]
        )
        log_df.write.mode("append").format("delta").saveAsTable(tabela_log)

    except Exception as e:
        print(f"‚ùå Erro ao processar {caminho}: {e}")
        # Tamb√©m pode logar falhas se quiser!

# 4Ô∏è‚É£ Resumo final
print(f"‚úÖ Processados: {arquivos_processados}")
print(f"‚ö†Ô∏è Vazios: {arquivos_vazios}")

# COMMAND ----------

# MAGIC %md
# ## ‚úÖ Checkpoint Final
# - Total de arquivos processados: `{len(arquivos_processados)}`
# - Total ignorados (vazios): `{len(arquivos_vazios)}`
# - Log de auditoria salvo em: `{tabela_log}`
