In [0]:
# ================================================================
#  Databricks notebook: Pipeline Runner
#  ---------------------------------------------------------------
#  Purpose: Execute the full ETL pipeline (Steps 01‚Äì05)
#  Environment: Databricks Runtime 16.4 LTS / markscope secrets
#  Author: M. Holahan
# ================================================================

# COMMAND ----------
# ‚úÖ Environment bootstrap
!pip install -q adlfs fsspec

from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
import inspect
import time
import json
import time
import pandas as pd

spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

# Validate secrets and storage
storage_account = dbutils.secrets.get("markscope", "azure-storage-account-name").strip()
storage_key = dbutils.secrets.get("markscope", "azure-storage-account-key").strip()
base_uri = f"abfss://raw@{storage_account}.dfs.core.windows.net"
print(f"Connected to ADLS container: {base_uri}")

# COMMAND ----------
# üß© Parameter block
active_steps = [1, 2, 3, 4]  # Choose which steps to run
limit = None                  # Optional row cap for test runs

# COMMAND ----------
# üì¶ Import pipeline steps
from scripts_pandas.prepare_tmdb_discogs_candidates import Step03PrepareTMDBDiscogsCandidates
from scripts_pandas.validate_schema_alignment import Step04ValidateSchemaAlignment
from scripts_spark.extract_spark_tmdb import Step01ExtractSparkTMDB
from scripts_spark.extract_spark_discogs import Step02ExtractSparkDiscogs

# Step registry
pipeline_steps = {
    1: Step01ExtractSparkTMDB,
    2: Step02ExtractSparkDiscogs,
    3: Step03PrepareTMDBDiscogsCandidates,
    4: Step04ValidateSchemaAlignment
}

# COMMAND ----------
# üöÄ Run pipeline (robust & always logs metrics)

results = []

for step_no in active_steps:
    StepClass = pipeline_steps[step_no]
    init_params = inspect.signature(StepClass.__init__).parameters

    # ‚úÖ Only pass Spark if class supports it
    if "spark" in init_params:
        step = StepClass(spark=spark)
    else:
        step = StepClass()

    print(f"üöÄ Running Step {step_no}: {StepClass.__name__}")
    t0 = time.time()
    status = "success"
    try:
        # Run step (with or without limit parameter)
        if "limit" in step.run.__code__.co_varnames:
            df_out = step.run(limit=limit)
        else:
            df_out = step.run()

    except Exception as e:
        status = f"failed: {type(e).__name__}"
        print(f"‚ö†Ô∏è Step {step_no} ({StepClass.__name__}) failed: {e}")

    duration = round(time.time() - t0, 2)

    # ‚úÖ Always record result, even on failure or None return
    results.append({
        "step": step_no,
        "name": StepClass.__name__,
        "duration_sec": duration,
        "status": status
    })

    print(f"‚úÖ Step {step_no} ({StepClass.__name__}) finished with status '{status}' in {duration}s")

# COMMAND ----------
# üßæ Summary & export
summary_df = pd.DataFrame(results)
display(summary_df)

summary_path = "/dbfs/tmp/pipeline_summary.json"
summary_df.to_json(summary_path, orient="records", indent=2)
print(f"üìä Summary written to: {summary_path}")
