In [0]:
import sys

from databricks.sdk import WorkspaceClient
from loguru import logger
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType, StringType, StructField, StructType, TimestampType

# from databricks.connect import DatabricksSession
from utils import load_config, setup_logging

# Set up logging
setup_logging(log_file="")


try:
    # Initialize Spark session
    spark = SparkSession.builder.getOrCreate()
    logger.info("Spark session initialized.")

    # Initialize Databricks workspace client
    workspace = WorkspaceClient()
    logger.info("Databricks workspace client initialized.")

    # Extract configuration details
    config = load_config("../project_config.yml")
    catalog_name = config.catalog_name
    schema_name = config.schema_name
    target = config.target[0].new_name
    logger.debug(f"Catalog: {catalog_name}, Schema: {schema_name}")

    # Load inference table
    logger.info("Loading inference table...")
    inf_table = spark.sql(f"SELECT * FROM {catalog_name}.{schema_name}.`credit-default-model-serving-feature_payload`")
    logger.success("Inference table loaded successfully.")

    ## Dataframe records on payload table under response column
    # {"dataframe_records": [{"Id": "43565", "Limit_bal": 198341.0, "Sex": 2.0,
    # "Education": 2.0, "Marriage": 2.0, "Age": 26.0, "Pay_0": 2.0, "Pay_2": 1.0,
    # "Pay_3": 6.0, "Pay_4": 4.0, "Pay_5": 8.0, "Pay_6": 6.0, "Bill_amt1": -44077.0,
    # "Bill_amt2": 15797.0, "Bill_amt3": 66567.0, "Bill_amt4": 54582.0, "Bill_amt5": 79211.0,
    # "Bill_amt6": 129060.0, "Pay_amt1": 13545.0, "Pay_amt2": 20476.0, "Pay_amt3": 8616.0,
    # "Pay_amt4": 3590.0, "Pay_amt5": 22999.0, "Pay_amt6": 3605.0}]}
    logger.info("Defining schemas...")
    request_schema = StructType(
        [
            StructField(
                "dataframe_records",
                ArrayType(
                    StructType(
                        [
                            StructField("Id", StringType(), True),
                            StructField("Limit_bal", DoubleType(), True),
                            StructField("Sex", DoubleType(), True),
                            StructField("Education", DoubleType(), True),
                            StructField("Marriage", DoubleType(), True),
                            StructField("Age", DoubleType(), True),
                            StructField("Pay_0", DoubleType(), True),
                            StructField("Pay_2", DoubleType(), True),
                            StructField("Pay_3", DoubleType(), True),
                            StructField("Pay_4", DoubleType(), True),
                            StructField("Pay_5", DoubleType(), True),
                            StructField("Pay_6", DoubleType(), True),
                            StructField("Bill_amt1", DoubleType(), True),
                            StructField("Bill_amt2", DoubleType(), True),
                            StructField("Bill_amt3", DoubleType(), True),
                            StructField("Bill_amt4", DoubleType(), True),
                            StructField("Bill_amt5", DoubleType(), True),
                            StructField("Bill_amt6", DoubleType(), True),
                            StructField("Pay_amt1", DoubleType(), True),
                            StructField("Pay_amt2", DoubleType(), True),
                            StructField("Pay_amt3", DoubleType(), True),
                            StructField("Pay_amt4", DoubleType(), True),
                            StructField("Pay_amt5", DoubleType(), True),
                            StructField("Pay_amt6", DoubleType(), True),
                        ]
                    )
                ),
                True,
            )
        ]
    )

    # Standard Databricks schema for the response
    response_schema = StructType(
        [
            StructField("predictions", ArrayType(DoubleType()), True),
            StructField(
                "databricks_output",
                StructType(
                    [StructField("trace", StringType(), True), StructField("databricks_request_id", StringType(), True)]
                ),
                True,
            ),
        ]
    )  
    logger.success("Schemas defined successfully.")

    # Parse request and response
    logger.info("Parsing request and response columns...")
    inf_table_parsed = inf_table.withColumn("parsed_request", F.from_json(F.col("request"), request_schema))

    inf_table_parsed = inf_table_parsed.withColumn("parsed_response", F.from_json(F.col("response"), response_schema))

    df_exploded = inf_table_parsed.withColumn("record", F.explode(F.col("parsed_request.dataframe_records")))

    df_final = df_exploded.select(
        F.from_unixtime(F.col("timestamp_ms") / 1000).cast("timestamp").alias("timestamp"),
        "timestamp_ms",
        "databricks_request_id",
        "execution_time_ms",
        F.col("record.Id").alias("Id"),
        F.col("parsed_response.predictions")[0].alias("prediction"),
        F.lit("credit_model_feature").alias("model_name"),
    )
    logger.success("Request and response parsed successfully.")

    # Join data with train/test/inference sets
    logger.info("Joining data with train/test/inference sets...")
    test_set = spark.table(f"{catalog_name}.{schema_name}.train_set")
    inference_set_normal = spark.table(f"{catalog_name}.{schema_name}.inference_set_normal")
    inference_set_skewed = spark.table(f"{catalog_name}.{schema_name}.inference_set_skewed")

    inference_set = inference_set_normal.union(inference_set_skewed)

    df_final_with_status = (
        df_final.join(test_set.select("Id", target), on="Id", how="left")
        .withColumnRenamed(target, "default_test")
        .join(inference_set.select("Id", target), on="Id", how="left")
        .withColumnRenamed(target, "default_inference")
        .select("*", F.coalesce(F.col("default_test"), F.col("default_inference")).alias("default"))
        .drop("default_test", "default_inference")
        .withColumn("default", F.col("default").cast("double"))
        .withColumn("prediction", F.col("prediction").cast("double"))
        # .dropna(subset=["default", "prediction"])
    )
    logger.success("Data joined successfully.")

    # Join with features and write to model monitoring table
    logger.info("Joining with features and writing to model monitoring table...")
    features_balanced = spark.table(f"{catalog_name}.{schema_name}.features_balanced")
    df_final_with_features = df_final_with_status.join(features_balanced, on="Id", how="left")
    df_final_with_features.write.mode("append").saveAsTable(f"{catalog_name}.{schema_name}.model_monitoring")
    # df_final_with_features.write.option("mergeSchema", "true").mode("append").saveAsTable(f"{catalog_name}.{schema_name}.model_monitoring")

    logger.success("Data written to model monitoring table successfully.")

    # Run quality monitors
    logger.info("Running quality monitors...")
    workspace.quality_monitors.run_refresh(table_name=f"{catalog_name}.{schema_name}.model_monitoring")
    logger.success("Quality monitors refreshed successfully.")

except Exception as e:
    logger.error(f"An error occurred: {e}")
    sys.exit(1)  # Exit with a failure code

[32m2025-04-04 20:53:18[0m | [1mINFO[0m | [36mcommand-7746108655932807-576299281[0m:[36m<module>[0m - [1mSpark session initialized.[0m
[32m2025-04-04 20:53:18[0m | [1mINFO[0m | [36mcommand-7746108655932807-576299281[0m:[36m<module>[0m - [1mDatabricks workspace client initialized.[0m
[32m2025-04-04 20:53:18[0m | [1mINFO[0m | [36mutils[0m:[36mload_config[0m - [1mLoaded configuration from ../project_config.yml[0m
[32m2025-04-04 20:53:18[0m | [34m[1mDEBUG[0m | [36mcommand-7746108655932807-576299281[0m:[36m<module>[0m - [34m[1mCatalog: credit, Schema: default[0m
[32m2025-04-04 20:53:18[0m | [1mINFO[0m | [36mcommand-7746108655932807-576299281[0m:[36m<module>[0m - [1mLoading inference table...[0m
[32m2025-04-04 20:53:18[0m | [32m[1mSUCCESS[0m | [36mcommand-7746108655932807-576299281[0m:[36m<module>[0m - [32m[1mInference table loaded successfully.[0m
[32m2025-04-04 20:53:18[0m | [1mINFO[0m | [36mcommand-7746108655932807-57629

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


An exception has occurred, use %tb to see the full traceback.

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7746108655932807>, line 143[0m
[1;32m    141[0m features_balanced [38;5;241m=[39m spark[38;5;241m.[39mtable([38;5;124mf[39m[38;5;124m"[39m[38;5;132;01m{[39;00mcatalog_name[38;5;132;01m}[39;00m[38;5;124m.[39m[38;5;132;01m{[39;00mschema_name[38;5;132;01m}[39;00m[38;5;124m.features_balanced[39m[38;5;124m"[39m)
[1;32m    142[0m df_final_with_features [38;5;241m=[39m df_final_with_status[38;5;241m.[39mjoin(features_balanced, on[38;5;241m=[39m[38;5;124m"[39m[38;5;124mId[39m[38;5;124m"[39m, how[38;5;241m=[39m[38;5;124m"[39m[38;5;124mleft[39m[38;5;124m"[39m)
[0;32m--> 143[0m df_final_with_features[38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[