In [0]:
from scipy.stats import chi2_contingency
import pandas as pd
import numpy as np
from datetime import date
from pyspark.sql import functions as F
import os
import json
from datetime import datetime,date
from pyspark.sql.types import *

In [0]:
curr_df = (
    spark.read.table("ispl_databricks.model_logs.ff_bd_payload")
    .filter("request_time >= current_date() - INTERVAL 20 DAYS")
    .filter(F.col("execution_duration_ms").isNotNull()).toPandas()
    
)

In [0]:
result_list = []
for _, row in curr_df.iterrows():
    data = row['request']
    # If you want to access 'dataframe_records', you need to load the JSON string first
    data_dict = json.loads(data)
    result_list.append(data_dict['dataframe_records'][0])
curr_df = pd.DataFrame(result_list)

In [0]:
curr_df.head()

In [0]:
features_df = spark.table("ispl_databricks.model_logs.bd_final_feature_stores").toPandas()

In [0]:
curr_df_schema = curr_df.dtypes.to_dict()

In [0]:
ref_df_schema  = features_df.dtypes.to_dict()

In [0]:
curr_df_schema

In [0]:
def schema_check(schema1,schema2):
    check = []
    for j in schema1:
        if schema1[j] == schema2[j]:
            check.append(True)
        else:
            check.append(False)
    return check


In [0]:
check = schema_check(curr_df_schema,ref_df_schema)

In [0]:
check

In [0]:
if False in check:
    schema_drift = 'True'
else:
    schema_drift = 'False'

In [0]:
def dq_checks(df):
    for _,rows in df.iterrows():
        if rows['phone_phoneNameDigitalAge'] > 20000:
            return True
        if rows['email_emailNameDigitalAge'] > 20000:
            return True
        if rows['bue_age'] > 100:
            return True
        if rows['email_nameEmailMatch'] > 100:
            return True
        if rows['upi_name_similarity'] > 100:
            return True
    return False


In [0]:
dq_drift = dq_checks(curr_df)
if dq_drift == True:
    dq_drift = 'True'
else:
    dq_drift = 'False'

In [0]:
%python
def null_value_checks(curr_df, ref_df):
    for x in ref_df.columns:
        miss_count_ref_percentage = (
            (ref_df[x] == -1).sum() / len(ref_df) * 100
        )
        miss_count_curr_percentage = (
            (curr_df[x] == -1).sum() / len(curr_df) * 100
        )
        if (
            miss_count_curr_percentage > miss_count_ref_percentage + 10
            or miss_count_curr_percentage < miss_count_ref_percentage - 10
        ):
            return True
    return False
    

In [0]:
null_value_checks  = null_value_checks(curr_df,features_df)
if null_value_checks == False:
    null_value_drift = 'False'
else:
    null_value_drift = 'True'

In [0]:
%sql
delete  from ispl_databricks.model_logs.ff_bd_payload
where   execution_duration_ms is  null

In [0]:
%sql
select * from ispl_databricks.model_logs.ff_bd_payload

In [0]:
# Databricks widgets (Job Parameters)
dbutils.widgets.text("model_name", "ff_bd", "Model Name")
dbutils.widgets.text("mean_diff_threshold", "0.2", "Mean Diff Threshold (Fraction)")
dbutils.widgets.text("std_diff_threshold", "0.3", "Std Diff Threshold (Fraction)")
dbutils.widgets.text("pvalue_threshold", "0.05", "Chi-Square p-value Threshold")

# Read job parameter values
model_name = dbutils.widgets.get("model_name")
mean_diff_threshold = float(dbutils.widgets.get("mean_diff_threshold"))
std_diff_threshold = float(dbutils.widgets.get("std_diff_threshold"))
pvalue_threshold = float(dbutils.widgets.get("pvalue_threshold"))

print(f"üèÉ Running data drift detection for model: {model_name}")
print(f"üîπ Mean threshold: {mean_diff_threshold}, Std threshold: {std_diff_threshold}, p-value: {pvalue_threshold}")

# Import dependencies
from scipy.stats import chi2_contingency
import pandas as pd
import numpy as np
from datetime import date
from pyspark.sql import functions as F
import os
import json
from datetime import datetime,date
# --- Load reference and current (inference) datasets ---

curr_df = (
    spark.read.table("ispl_databricks.model_logs.ff_bd_payload")
    .filter("request_time >= current_date() - INTERVAL 7 DAYS")
    .filter(F.col("execution_duration_ms").isNotNull())
    .toPandas()
)
result_list = []


for _, row in curr_df.iterrows():
    data = row['request']
    # If you want to access 'dataframe_records', you need to load the JSON string first
    data_dict = json.loads(data)
    result_list.append(data_dict['dataframe_records'][0])
curr_df = pd.DataFrame(result_list)
print(curr_df.columns)
features_df = spark.table("ispl_databricks.model_logs.bd_final_feature_stores").toPandas()

ref_df = features_df.drop(["loan_id"], axis=1)
# curr_df = curr_df.drop(["loan_id"],axis = 1)
features = curr_df.columns.tolist()
# feature_imp_df = pd.DataFrame({'feature_name':model.feature_name_,'importance':model.feature_importances_})

# feature_imp_df.sort_values(by=["importance"],ascending=False,inplace=True)

# top_n_features = feature_imp_df.loc[:,'feature_name'].tolist()
# feature_imp_df.to_csv(os.path.join(output_dir_version,"all_gain_feautures.csv"))

# top_n_features = feature_imp_df.iloc[0:20]
# curr_df = curr_df[top_n_features]
# ref_df = ref_df.drop(["loan_id"], axis=1)






# --- Define features to monitor ---
cat_cols = ref_df.select_dtypes(include=["object", "category"]).columns.tolist()
num_cols = ref_df.select_dtypes(include=["int", "float", "number"]).columns.tolist()


results = []
evaluation_date = date.today()

# --- 1. Categorical Columns: Chi-Square Test ---
for col in cat_cols:
    try:
        ref_counts = ref_df[col].value_counts()
        cur_counts = curr_df[col].value_counts()

        all_categories = set(ref_counts.index).union(set(cur_counts.index))
        ref_aligned = [ref_counts.get(c, 0) for c in all_categories]
        cur_aligned = [cur_counts.get(c, 0) for c in all_categories]

        chi2, p, _, _ = chi2_contingency([ref_aligned, cur_aligned])
        drift_status_cat = "Drift" if p < pvalue_threshold else "Stable"

        results.append((
            evaluation_date,
            "ff_bd",
            col,
            "categorical",
            "chi_square",
            float(p),
            drift_status_cat,
            int(len(ref_df)),
            int(len(curr_df)),
            None,
            datetime.now()
        ))

    except Exception as e:
        print(f"‚ö†Ô∏è Skipped {col} due to error: {e}")

# --- 2. Numeric Columns: Mean & Std Deviation Comparison ---
for col in num_cols:
    try:
        mean_ref, mean_cur = ref_df[col].mean(), curr_df[col].mean()
        std_ref, std_cur = ref_df[col].std(), curr_df[col].std()

        mean_diff = abs(mean_cur - mean_ref) / (abs(mean_ref) + 1e-6)
        std_diff = abs(std_cur - std_ref) / (abs(std_ref) + 1e-6)

        drift_status_num = "Drift" if (mean_diff > mean_diff_threshold or std_diff > std_diff_threshold) else "Stable"

        results.append((
            evaluation_date,
            "ff_bd",
            col,
            "numeric",
            "mean_std",
            float(max(mean_diff, std_diff)),
            drift_status_num,
            int(len(ref_df)),
            int(len(curr_df)),
            None,
            datetime.now()
        ))

    except Exception as e:
        print(f"‚ö†Ô∏è Skipped {col} due to error: {e}")

# -------------------------------------------------------------
# ‚≠ê NEW: GLOBAL DRIFT STATUS (Do not modify per-column results)
# -------------------------------------------------------------
drift_count = sum(1 for r in results if r[6] == "Drift")  # index 6 = drift_status
drift_status = "Drift" if drift_count >= 20 else "Stable"

print(f"üîî Per-feature drift count = {drift_count}")
print(f"üåê Global drift status = {drift_status}")
# -------------------------------------------------------------

# --- Create DataFrame & Write to Delta Table ---
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    FloatType,
    DateType,
    DoubleType,
    TimestampType

)



schema = StructType([
    StructField("evaluation_date", DateType(), True),
    StructField("model_name", StringType(), True),
    StructField("feature_name", StringType(), True),
    StructField("feature_type", StringType(), True),
    StructField("metric_used", StringType(), True),
    StructField("metric_value", DoubleType(), True),
    StructField("drift_status", StringType(), True),
    StructField("ref_sample_size", IntegerType(), True),
    StructField("cur_sample_size", IntegerType(), True),
    StructField("comment", StringType(), True),
    StructField("created_at", TimestampType(), True),
])

if results:
    drift_df = spark.createDataFrame(results, schema)
    drift_df = drift_df.withColumn("created_at", F.current_timestamp())
    drift_df = drift_df.drop("global_drift_status")
    drift_df = drift_df.withColumn("metric_value", drift_df["metric_value"].cast("double"))
    drift_df.write.format("delta").mode("append").saveAsTable("ispl_databricks.model_logs.data_drift_log")
    display(drift_df.printSchema())
else:
    print("‚ö†Ô∏è No results to log ‚Äî check your reference and inference tables.")

In [0]:
%sql
select * from ispl_databricks.model_logs.ff_bd_payload where execution_duration_ms is not null

In [0]:
%sql
select * from ispl_databricks.model_logs.ff_bd_payload

In [0]:
%sql
select * from ispl_databricks.model_logs.data_drift_log

In [0]:
%sql
select * from ispl_databricks.model_logs.ff_bd_payload

In [0]:
dbutils.jobs.taskValues.set("drift_flag", drift_status)

In [0]:
dbutils.jobs.taskValues.set("schema_drift", schema_drift)

In [0]:
dbutils.jobs.taskValues.set("dq_drift", dq_drift)

In [0]:
dbutils.jobs.taskValues.set("null_value_drift", null_value_drift )