In [0]:
# 1. Obter os parâmetros de entrada do job
catalog_name = dbutils.widgets.get("catalog")
schema_name = dbutils.widgets.get("schema")
folder_volume = dbutils.widgets.get("folder_volume") # Ex: 20250617_1930

# 2. Configurar os caminhos e o Spark
volume_name = "raw"
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {schema_name}")

# Caminho base onde todos os arquivos da execução foram salvos
source_path_base = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/{folder_volume}"
print(f"Iniciando processamento para a pasta: {source_path_base}")

# 3. Listar todos os diretórios de tabelas que foram enviados nesta execução
# O Meltano cria um subdiretório para cada tabela (ex: ./output/Production-Product/)
try:
    table_directories = dbutils.fs.ls(source_path_base)
except Exception as e:
    print(f"Erro ao listar diretórios em '{source_path_base}'. Verifique se a pasta e os arquivos foram enviados corretamente. Erro: {e}")
    dbutils.notebook.exit("Falha ao encontrar o diretório de origem.")

# 4. Loop para processar cada tabela
for table_dir_info in table_directories:
    # O nome do diretório é o nome da stream do Meltano (ex: 'Production-Product')
    table_stream_name = table_dir_info.name.strip('/')
    parquet_source_path = table_dir_info.path
    
    # 5. Construir o nome da tabela Delta final
    # Converte 'Production-Product' para 'raw_meltano_production_product'
    table_name_final = f"raw_meltano_{table_stream_name.lower().replace('-', '_')}"
    
    print(f"--- Processando Tabela ---")
    print(f"Origem Parquet: {parquet_source_path}")
    print(f"Destino Delta: {table_name_final}")

    # 6. Ingestão dos dados usando COPY INTO ou Read/Write
    try:
        # COPY INTO é mais eficiente e idempotente para ingestão incremental
        spark.sql(f"""
            COPY INTO {catalog_name}.{schema_name}.{table_name_final}
            FROM '{parquet_source_path}'
            FILEFORMAT = PARQUET
            COPY_OPTIONS ('mergeSchema' = 'true')
        """)
        print(f"Sucesso ao usar COPY INTO para a tabela {table_name_final}.")

    except Exception as e:
        # Fallback para criar a tabela se ela não existir
        print(f"COPY INTO falhou (talvez a tabela não exista ainda), tentando criar com Read/Write. Erro: {e}")
        try:
            df = spark.read.format("parquet").load(parquet_source_path)
            df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog_name}.{schema_name}.{table_name_final}")
            print(f"Sucesso ao criar a tabela {table_name_final} com Read/Write.")
        except Exception as e_write:
            print(f"ERRO: Falha ao processar os dados para a tabela {table_name_final}. Erro: {e_write}")
            # Continue para a próxima tabela em caso de falha em uma específica
            continue

print("--- Processamento de todas as tabelas concluído. ---")

bronze_productionproduct
