# Orquestrador de Cargas - Camada Silver - Vers√£o Paralelo


In [0]:

# Databricks notebook source
import json
import uuid
import time
import traceback
from datetime import datetime, date
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType
from pyspark.dbutils import DBUtils
from pyspark.sql import Row


# Inicializa contexto Spark e DBUtils
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

In [0]:
# Caminho do JSON de configura√ß√£o
json_path = "/Volumes/vitivinicultura/default/landing_zone/orquestrador_camada_silver_paralel.json"

# Nome completo da tabela no Unity Catalog
tabela_logs = "vitivinicultura.logs.pipeline_logs"

In [0]:
# ‚úÖ Define schema fixo (evita erro de infer√™ncia)
schema_log = StructType([
    StructField("log_id", StringType(), True),
    StructField("execution_id", StringType(), True),
    StructField("pipeline_name", StringType(), True),
    StructField("job_name", StringType(), True),
    StructField("status", StringType(), True),
    StructField("start_time", TimestampType(), True),
    StructField("end_time", TimestampType(), True),
    StructField("duration_sec", LongType(), True),
    StructField("data_execucao", StringType(), True),
    StructField("user", StringType(), True),
    StructField("environment", StringType(), True),
    StructField("error_message", StringType(), True),
    StructField("tipo_erro", StringType(), True),
    StructField("initial_row_count", LongType(), True),
    StructField("final_row_count", LongType(), True),
    StructField("rows_loaded", LongType(), True),
    StructField("target_table", StringType(), True)
])

In [0]:

def salvar_log(execution_id, pipeline_name, job_name, status, start_time, end_time, data_execucao, user, environment,
               error_message=None, tipo_erro=None, rows_before=None, rows_after=None, rows_inserted=None, target_table=None):
    duration = int((end_time - start_time).total_seconds())
    log_id = str(uuid.uuid4())

    log_data = [(log_id, execution_id, pipeline_name, job_name, status, start_time, end_time, duration,
                 data_execucao, user, environment, error_message, tipo_erro, rows_before, rows_after, rows_inserted, target_table)]

    df = spark.createDataFrame(log_data, schema=schema_log)
    df.write.format("delta").mode("append").saveAsTable(tabela_logs)


def get_user_safe():
    try:
        return dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
    except:
        return "unknown_user"

user = get_user_safe()
environment = "dev"
data_hoje = date.today().strftime("%Y-%m-%d")
execution_id = str(uuid.uuid4())


In [0]:

with open(json_path, "r") as f:
    config = json.load(f)

pipeline_name = config.get("pipeline_name", "pipeline_desconhecido")
fail_fast = config.get("fail_fast", True)
jobs = config["jobs"]


for job in jobs:
    params = job.get("params", {})
    for k, v in params.items():
        if isinstance(v, str) and "{data_atual}" in v:
            params[k] = v.replace("{data_atual}", data_hoje)
    job["params"] = params

print(f"üöÄ Pipeline iniciado: {pipeline_name} - {len(jobs)} jobs")

In [0]:
jobs_by_name = {job["name"]: job for job in jobs}
dependencies = {job["name"]: job.get("depends_on", []) for job in jobs}
completed, failed = set(), set()

In [0]:

def run_notebook(job):
    job_name = job["name"]
    path = job["path"]
    params = job.get("params", {})
    target_table = job.get("target_table", None)
    start_time = datetime.now()
    status = "OK"
    error_message = None
    rows_before = rows_after = rows_loaded = None

    print(f"\n‚ñ∂Ô∏è Iniciando job: {job_name}")
    print(f"üìÇ Notebook: {path}")
    print(f"‚öôÔ∏è Params: {params}")

    # Contagem inicial da tabela destino
    if target_table:
        try:
            rows_before = spark.table(target_table).count()
        except:
            rows_before = 0

    try:
        # Executa notebook
        result = dbutils.notebook.run(path, timeout_seconds=3600, arguments=params)

        # Contagem final
        if target_table:
            rows_after = spark.table(target_table).count()
            rows_loaded = max(rows_after - (rows_before or 0), 0)

        print(f"‚úÖ Job conclu√≠do: {job_name} | Linhas carregadas: {rows_loaded}")
        end_time = datetime.now()

    except Exception as e:
        end_time = datetime.now()
        status = "ERROR"
        error_message = str(e)[:1000]
        full_trace = traceback.format_exc()

        # Classifica tipo de erro
        if "AnalysisException" in full_trace:
            tipo_erro = "ERRO_SQL"
        elif "Permission" in full_trace:
            tipo_erro = "ERRO_PERMISSAO"
        elif "java.io" in full_trace:
            tipo_erro = "ERRO_IO"
        elif "Delta" in full_trace:
            tipo_erro = "ERRO_DELTA"
        else:
            tipo_erro = "ERRO_DESCONHECIDO"

         # Salva log imediato
        log = salvar_log(
            execution_id, pipeline_name, job_name, status,
            start_time, end_time, data_hoje, user, environment,
            error_message=error_message, tipo_erro=tipo_erro,
            rows_before=rows_before, rows_after=rows_after,
            rows_inserted=rows_loaded, target_table=target_table
        )

        if fail_fast:
            raise e

        # 
        return {
            "name": job_name,
            "status": status,
            "log": log
        }

    # 
    log = salvar_log(
        execution_id, pipeline_name, job_name, status,
        start_time, end_time, data_hoje, user, environment,
        error_message=None, tipo_erro=None,
        rows_before=rows_before, rows_after=rows_after,
        rows_inserted=rows_loaded, target_table=target_table
    )

    # √∫ltima linha da fun√ß√£o
    return {
        "name": job_name,
        "status": status,
        "log": log
    }

In [0]:
def execute_pipeline(max_parallel=5):
    global completed, failed
    all_jobs = set(jobs_by_name.keys())
    futures = {}
    results = []

    with ThreadPoolExecutor(max_workers=max_parallel) as executor:
        while completed.union(failed) != all_jobs:
            # Descobre jobs prontos para execu√ß√£o (todas as depend√™ncias completas)
            ready_jobs = [
                j for j, deps in dependencies.items()
                if j not in completed and j not in failed
                and all(d in completed for d in deps)
            ]

          # Submete jobs prontos
            for job_name in ready_jobs:
                if job_name not in futures.values():
                    futures[executor.submit(run_notebook, jobs_by_name[job_name])] = job_name

            # Processa resultados conforme finalizam
            for future in as_completed(list(futures.keys())):
                job_name = futures[future]
                result = future.result()
                results.append(result)
                del futures[future]

                if result["status"] == "OK":
                    completed.add(job_name)
                else:
                    failed.add(job_name)
                    if fail_fast:
                        print(f"üõë Interrompendo pipeline por erro (fail_fast).")
                        return results

            time.sleep(1)

    return results

In [0]:
# =========================================
# üèÅ Execu√ß√£o principal
# =========================================
start_time = time.time()
results = execute_pipeline(max_parallel=5)
total_time = round(time.time() - start_time, 2)


# üîπ Consolidar logs
try:
    # Filtra apenas execu√ß√µes com log v√°lido
    all_logs = []
    for r in results:
        if r and "log" in r and isinstance(r["log"], dict):
            all_logs.append(r["log"])

    if all_logs:
        # Cria DataFrame
        logs_df = spark.createDataFrame([Row(**log) for log in all_logs], schema=schema_log)

        # Grava no Delta (append)
        (
            logs_df.write
            .format("delta")
            .mode("append")
            .option("mergeSchema", "true")  # seguran√ßa para evolu√ß√µes de schema
            .saveAsTable(tabela_logs)
        )

        print(f"üßæ {len(all_logs)} logs gravados com sucesso em {tabela_logs}")

    else:
        print("‚ö†Ô∏è Nenhum log foi retornado para grava√ß√£o.")

except Exception as e:
    print(f"‚ùå Erro ao salvar logs: {str(e)}")


# =========================================
# üìä Resumo final
# =========================================
success_count = sum(1 for r in results if r["status"] == "OK")
fail_count = sum(1 for r in results if r["status"] == "ERROR")

print("\nüìã RESUMO FINAL")
print(f"Pipeline: {pipeline_name}")
print(f"Dura√ß√£o total: {total_time}s")
print(f"Jobs conclu√≠dos: {success_count}")
print(f"Jobs com erro: {fail_count}")
print(f"‚úÖ Conclu√≠dos: {completed}")
if failed:
    print(f"‚ùå Falharam: {failed}")
else:
    print("‚úÖ Todos os jobs executados com sucesso!")

In [0]:
%sql
SELECT * FROM vitivinicultura.logs.pipeline_logs

--DELETE FROM vitivinicultura.logs.pipeline_logs

--INSERT INTO vitivinicultura.logs.pipeline_logs FROM vitivinicultura.logs.pipeline_logs
