In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count as cnt, lit, udf
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructType, StructField
import math

def get_bin_edges(df, column: str, bins: int = 10):
    """
    Derive up to `bins` quantile-based boundaries for `column`.
    - Excludes null rows from boundary derivation.
    - Returns a sorted list of unique boundaries (including min and max).
    - If all values are identical or no non-null data, returns a single boundary list.
    """
    non_null = df.filter(col(column).isNotNull())
    total_non_null = non_null.count()
    if total_non_null == 0:
        # All values are null
        return []

    stats = non_null.agg(F.min(col(column)).alias("min_val"), F.max(col(column)).alias("max_val")).collect()[0]
    min_val, max_val = stats.min_val, stats.max_val
    if min_val == max_val:
        # Single unique value
        return [float(min_val)]

    fractiles = [i / bins for i in range(1, bins)]
    quantiles = non_null.approxQuantile(column, fractiles, 0.001)
    raw_edges = [min_val] + quantiles + [max_val]
    unique_edges = sorted(list(set(raw_edges)))
    return unique_edges

def build_bin_assigner(bin_edges: list):
    """
    Return a UDF that assigns each numeric value to a labeled bin:
    - `"null_bin"` if value is None
    - `"oor_low"` if value < bin_edges[0]
    - `"oor_high"` if value > bin_edges[-1]
    - Otherwise, `"bin_i"` for each interval [left, right]
    The final bin is inclusive of right edge to accommodate max value.
    """
    if not bin_edges or len(bin_edges) <= 1:
        # Single-value or no edges scenario
        def single_bin(value):
            if value is None:
                return "null_bin"
            return "bin_0"

        return udf(single_bin, StringType())

    def assign_bin(value):
        if value is None:
            return "null_bin"
        if value < bin_edges[0] or value > bin_edges[-1]:
            return "oor"

        for i in range(len(bin_edges) - 1):
            left = bin_edges[i]
            right = bin_edges[i + 1]
            if i == len(bin_edges) - 2:
                # Last bin includes the right edge
                if value >= left and value <= right:
                    return f"bin_{i}"
            else:
                # Half-open interval
                if value >= left and value < right:
                    return f"bin_{i}"

        return f"bin_{len(bin_edges) - 2}"  # Fallback

    return udf(assign_bin, StringType())

def build_histogram(df, column: str, bins: int = 10):
    """
    Create a histogram for a numeric column with up to `bins` decile boundaries.
    Output columns:
    - column_name
    - bin_label
    - bin_range
    - count_in_bin
    - proportion
    """
    edges = get_bin_edges(df, column, bins)
    assigner = build_bin_assigner(edges)

    # Assign each row to a bin
    binned = df.withColumn("bin_label", assigner(col(column)))
    total_rows = binned.count()
    if total_rows == 0:
        # No data
        empty_schema = StructType([
            StructField("column_name", StringType(), True),
            StructField("bin_label", StringType(), True),
            StructField("bin_range", StringType(), True),
            StructField("count_in_bin", IntegerType(), True),
            StructField("proportion", DoubleType(), True),
        ])
        return spark.createDataFrame([], empty_schema)

    # Count how many rows per bin
    bin_counts = binned.groupBy("bin_label").agg(cnt("*").alias("count_in_bin"))

    # Construct human-readable bin ranges for reference
    bin_meta = []
    if len(edges) <= 1:
        # Single bin or no edges
        bin_meta.append(("bin_0", "[SingleValue]"))
    else:
        for i in range(len(edges) - 1):
            left = edges[i]
            right = edges[i + 1]
            if i == len(edges) - 2:
                # Last bin is inclusive
                label = f"[{left}, {right}]"
            else:
                label = f"[{left}, {right})"
            bin_meta.append((f"bin_{i}", label))

    # out-of-range & null
    bin_meta.append(("oor", "Out of Range"))
    bin_meta.append(("null_bin", "NULL"))

    # Convert bin_meta -> Spark DataFrame for an easy join
    meta_df = spark.createDataFrame(
        [(column, x[0], x[1]) for x in bin_meta],
        ["column_name", "bin_label", "bin_range"]
    )

    # Join counts
    hist_df = meta_df \
        .join(bin_counts, on="bin_label", how="left") \
        .fillna({"count_in_bin": 0}) \
        .withColumn("proportion", col("count_in_bin") / lit(total_rows)) \
        .select("column_name", "bin_label", "bin_range", "count_in_bin", "proportion")

    return hist_df

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, when, lit, count as _count, sum as _sum
from pyspark.sql.types import IntegerType
import math

# Split DataFrame: Baseline vs. Comparison
baseline_df, comparison_df = df_res.randomSplit([0.5, 0.5], seed=42)
print("Baseline Count:", baseline_df.count())
print("Comparison Count:", comparison_df.count())

#######################################################
# 4. Build bins + baseline proportions
#######################################################

def get_bins_proportions_for_baseline(df_baseline, col_name, num_bins=10):
    """Auto-derive bin edges and baseline proportions from baseline data."""
    stats = df_baseline.agg(
        F.min(col_name).alias("minVal"),
        F.max(col_name).alias("maxVal")
    ).collect()[0]
    min_val = stats["minVal"]
    max_val = stats["maxVal"]

    # Handle edge cases
    if min_val is None or max_val is None:
        return None, None
    if min_val == max_val:
        # everything is the same -> single bin
        return [min_val, min_val + 1e-9], [1.0]

    # Evenly spaced bins
    step = (max_val - min_val) / num_bins
    edges = [min_val + i * step for i in range(num_bins)] + [max_val + 1e-9]

    # Bin assignment UDF
    def binning_func(x):
        if x is None:
            return None
        for i in range(num_bins):
            if i < num_bins - 1:
                if x >= edges[i] and x < edges[i+1]:
                    return i
            else:
                # last bin includes top
                if x >= edges[i] and x <= edges[i+1]:
                    return i
        return None

    bin_udf = F.udf(binning_func, IntegerType())
    binned = df_baseline.withColumn("bin_index", bin_udf(col(col_name)))

    total_count = binned.count()
    if total_count == 0:
        # no records
        return edges, [0] * num_bins

    bin_counts = (binned.groupBy("bin_index")
                  .agg(_count("*").alias("cnt"))
                  .collect())
    count_dict = {r["bin_index"]: r["cnt"] for r in bin_counts}

    proportions = []
    for i in range(num_bins):
        cnt = count_dict.get(i, 0)
        proportions.append(cnt / total_count)

    return edges, proportions

# Decide which columns to measure for VSI, plus model_score for PSI
input_cols = [
    "tran_amt",
    "sec_since_dvc_use_last_ts",
    "sec_since_add_payee_rqst_in_1",
    "sec_since_prty_birth_inc_dt"
]

all_cols_for_bins = input_cols + ["model_score"]

baseline_bins_info = {}
for c in all_cols_for_bins:
    bins, base_props = get_bins_proportions_for_baseline(baseline_df, c, num_bins=5)
    baseline_bins_info[c] = {
        "bins": bins,
        "base_props": base_props
    }



# Compute VSI for inputs, PSI for model_score
def psi(df_comp, column, bins, baseline_props):
    """Compute Stability Index for df_comp vs. baseline distribution."""
    if not bins or not baseline_props or len(bins) == 1:
        return 0.0  # degenerate case or empty

    num_bins = len(baseline_props)

    # Binning UDF
    def binning_func(x):
        if x is None:
            return None
        for i in range(num_bins):
            if i < num_bins - 1:
                if x >= bins[i] and x < bins[i+1]:
                    return i
            else:
                if x >= bins[i] and x <= bins[i+1]:
                    return i
        return None

    bin_udf = F.udf(binning_func, IntegerType())
    binned_df = df_comp.withColumn("bin_index", bin_udf(col(column)))

    total_count = binned_df.count()
    if total_count == 0:
        return 0.0

    bin_counts = (binned_df.groupBy("bin_index")
                  .agg(_count("*").alias("cnt"))
                  .collect())
    count_dict = {r["bin_index"]: r["cnt"] for r in bin_counts}

    # observed proportions
    obs_props = []
    for i in range(num_bins):
        cnt = count_dict.get(i, 0)
        obs_props.append(cnt / total_count)

    # SI sum
    si_val = 0.0
    for obs, base in zip(obs_props, baseline_props):
        if obs > 0 and base > 0:
            si_val += (obs - base) * math.log(obs / base)

    return si_val

# Calculate VSI/PSI
vsi_psi_results = {}
for c in input_cols:
    bins = baseline_bins_info[c]["bins"]
    base_props = baseline_bins_info[c]["base_props"]
    psi_val = psi(comparison_df, c, bins, base_props)
    vsi_psi_results[f"VSI_{c}"] = psi_val

# PSI for model_score
score_bins = baseline_bins_info["model_score"]["bins"]
score_props = baseline_bins_info["model_score"]["base_props"]
psi_val = psi(comparison_df, "model_score", score_bins, score_props)
vsi_psi_results["PSI_model_score"] = psi_val

In [None]:
# Compute VSI for inputs, PSI for model_score
def psi(df_comp, column, bins, baseline_props):
    """Compute Stability Index for df_comp vs. baseline distribution."""
    if not bins or not baseline_props or len(bins) == 1:
        return 0.0  # degenerate case or empty

    num_bins = len(baseline_props)

    # Binning UDF
    def binning_func(x):
        if x is None:
            return None
        for i in range(num_bins):
            if i < num_bins - 1:
                if x >= bins[i] and x < bins[i+1]:
                    return i
            else:
                if x >= bins[i] and x <= bins[i+1]:
                    return i
        return None

    bin_udf = F.udf(binning_func, IntegerType())
    binned_df = df_comp.withColumn("bin_index", bin_udf(col(column)))

    total_count = binned_df.count()
    if total_count == 0:
        return 0.0

    bin_counts = (binned_df.groupBy("bin_index")
                  .agg(_count("*").alias("cnt"))
                  .collect())
    count_dict = {r["bin_index"]: r["cnt"] for r in bin_counts}

    # observed proportions
    obs_props = []
    for i in range(num_bins):
        cnt = count_dict.get(i, 0)
        obs_props.append(cnt / total_count)

    # SI sum
    si_val = 0.0
    for obs, base in zip(obs_props, baseline_props):
        if obs > 0 and base > 0:
            si_val += (obs - base) * math.log(obs / base)

    return si_val

# Calculate VSI/PSI
vsi_psi_results = {}
for c in input_cols:
    bins = baseline_bins_info[c]["bins"]
    base_props = baseline_bins_info[c]["base_props"]
    psi_val = psi(comparison_df, c, bins, base_props)
    vsi_psi_results[f"VSI_{c}"] = psi_val

# PSI for model_score
score_bins = baseline_bins_info["model_score"]["bins"]
score_props = baseline_bins_info["model_score"]["base_props"]
psi_val = psi(comparison_df, "model_score", score_bins, score_props)
vsi_psi_results["PSI_model_score"] = psi_val

##################################################
# Compute Performance KPIs (VDR, TDR, TFPR, KS)
##################################################
def compute_performance_kpis(df_comp, threshold=0.5):
    """Compute VDR, TDR, TFPR with a given score threshold."""
    pred_df = df_comp.withColumn(
        "pred_fraud",
        when(col("model_score") >= threshold, 1).otherwise(0)
    )

    # Value Detection Rate (VDR)
    sum_fraud_amt = pred_df.filter(col("frd_tag") == 1).agg(_sum("tran_amt")).collect()[0][0]
    sum_detected_fraud_amt = pred_df.filter((col("frd_tag") == 1) & (col("pred_fraud") == 1)) \
        .agg(_sum("tran_amt")).collect()[0][0]

    if not sum_fraud_amt or sum_fraud_amt == 0:
        vdr = 0.0
    else:
        vdr = float(sum_detected_fraud_amt or 0) / float(sum_fraud_amt)

    # TDR
    count_fraud = pred_df.filter(col("frd_tag") == 1).count()
    count_detected_fraud = pred_df.filter((col("frd_tag") == 1) & (col("pred_fraud") == 1)).count()
    tdr = float(count_detected_fraud) / float(count_fraud) if count_fraud > 0 else 0.0

    # TFPR
    count_pred_fraud = pred_df.filter(col("pred_fraud") == 1).count()
    count_false_pos = pred_df.filter((col("frd_tag") == 0) & (col("pred_fraud") == 1)).count()
    tfpr = float(count_false_pos) / float(count_pred_fraud) if count_pred_fraud > 0 else 0.0

    return {"VDR": vdr, "TDR": tdr, "TFPR": tfpr}

def compute_ks(df_comp):
    """Compute KS statistic between frd_tag=1 and frd_tag=0 distributions of model_score."""
    pdf = df_comp.select("model_score", "frd_tag").toPandas()
    fraud_scores = pdf[pdf["frd_tag"] == 1]["model_score"].sort_values()
    non_fraud_scores = pdf[pdf["frd_tag"] == 0]["model_score"].sort_values()

    n_fraud = len(fraud_scores)
    n_non_fraud = len(non_fraud_scores)
    if n_fraud == 0 or n_non_fraud == 0:
        return 0.0

    all_scores = sorted(pdf["model_score"].unique())
    fraud_idx = 0
    non_fraud_idx = 0
    cdf_diff = 0.0

    for s in all_scores:
        while fraud_idx < n_fraud and fraud_scores.iloc[fraud_idx] <= s:
            fraud_idx += 1
        while non_fraud_idx < n_non_fraud and non_fraud_scores.iloc[non_fraud_idx] <= s:
            non_fraud_idx += 1

        f_cdf = fraud_idx / n_fraud
        nf_cdf = non_fraud_idx / n_non_fraud
        diff = abs(f_cdf - nf_cdf)
        cdf_diff = max(cdf_diff, diff)

    return cdf_diff

perf_results = compute_performance_kpis(comparison_df, threshold=0.5)
ks_val = compute_ks(comparison_df)

##################################################
# Combine & Print Final KPIs
##################################################
final_metrics = {}
final_metrics.update(vsi_psi_results)  # VSI_..., PSI_model_score
final_metrics.update(perf_results)  # VDR, TDR, TFPR
final_metrics["KS"] = ks_val  # KS

print("Final KPI Metrics:")
for k, v in final_metrics.items():
    print(f"{k}: {v}")

# spark.stop()


# BASELINE HIST

In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, when, lit, count as cnt, udf
)
from pyspark.sql.types import (
    StructType, StructField, StringType,
    DoubleType, IntegerType
)
import math
import json

spark = SparkSession.builder.getOrCreate()

#############################
# 1) get_edges: Decile Breakpoints
#############################
def get_edges(df: DataFrame, col_name: str, nbins: int = 10) -> list:
    """
    Uses approxQuantile to derive decile-based edges for `col_name`.
    Returns a sorted list of unique numeric edges:
      [min_val, q1, q2, ..., max_val]
    """
    valid = df.filter(col(col_name).isNotNull())
    cnt_valid = valid.count()
    if cnt_valid == 0:
        return []

    stats = valid.agg(
        F.min(col(col_name)).alias("mn"),
        F.max(col(col_name)).alias("mx")
    ).collect()[0]
    mn, mx = stats.mn, stats.mx
    if mn == mx:
        # Single unique value
        return [float(mn)]

    fractiles = [i / nbins for i in range(1, nbins)]
    quants = valid.approxQuantile(col_name, fractiles, 0.001)
    raw_edges = [mn] + quants + [mx]
    return sorted(set(raw_edges))

#############################
# 2) get_assigner: UDF for Binning
#############################
def get_assigner(edges: list):
    """
    Returns a UDF that maps numeric values to:
      - 'bin_X' for each interval
      - 'null_bin' if None
      - 'oor' if out of [edges[0], edges[-1]]
    """
    if not edges or len(edges) <= 1:
        def single_bin(x):
            if x is None:
                return "null_bin"
            return "bin_0"
        return udf(single_bin, StringType())

    def assign_bin(x):
        if x is None:
            return "null_bin"
        if x < edges[0] or x > edges[-1]:
            return "oor"
        for i in range(len(edges) - 1):
            left, right = edges[i], edges[i+1]
            # last bin inclusive
            if i == len(edges) - 2:
                if left <= x <= right:
                    return f"bin_{i}"
            else:
                if left <= x < right:
                    return f"bin_{i}"
        return f"bin_{len(edges) - 2}"  # fallback
    return udf(assign_bin, StringType())

#############################
# 3) get_hist: Build a Decile-Based Histogram for One Column
#############################
def get_hist(
    df: DataFrame,
    col_name: str,
    nbins: int = 10,
    edges: list = None
) -> dict:
    """
    Creates a histogram for `df[col_name]`:
      - If edges is None, derive decile edges from df (baseline mode).
      - If edges is provided, reuse them (new data mode).

    Returns a dict:
    {
      "edges": [...],
      "bins": {
        "bin_0": {"range": ..., "cnt":..., "prop":..., "adj_prop":...},
        "bin_1": {...},
        "null_bin": {...},
        "oor": {...}
      }
    }
    """
    # 1) Determine edges
    if edges is None:
        edges = get_edges(df, col_name, nbins)

    # 2) Assign bins
    assigner = get_assigner(edges)
    binned_df = df.withColumn("bin", assigner(col(col_name)))
    total_rows = binned_df.count()
    if total_rows == 0:
        return {
            "edges": edges,
            "bins": {
                "null_bin": {
                    "range": "NULL",
                    "cnt": 0,
                    "prop": 0.0,
                    "adj_prop": 0.0
                }
            }
        }

    # 3) Count per bin
    bin_counts = binned_df.groupBy("bin").agg(cnt("*").alias("cnt")).collect()
    cnt_map = {r["bin"]: r["cnt"] for r in bin_counts}

    # 4) prop & adj_prop
    null_cnt = cnt_map.get("null_bin", 0)
    oor_cnt  = cnt_map.get("oor", 0)
    valid_cnt = sum(v for k, v in cnt_map.items() if k.startswith("bin_"))

    null_prop = null_cnt / total_rows
    oor_prop  = oor_cnt / total_rows
    valid_prop = 1.0 - null_prop - oor_prop

    # 5) Build "bins" dictionary
    bins_data = {}

    # If edges is empty or len=1 => only bin_0
    if len(edges) <= 1:
        # single bin
        label = "bin_0"
        cnt_val = cnt_map.get(label, 0)
        bins_data[label] = {
            "range": "[SingleValue]",
            "cnt": cnt_val,
            "prop": cnt_val / total_rows if total_rows else 0.0,
            "adj_prop": (cnt_val / valid_cnt * valid_prop) if valid_cnt else 0.0
        }
    else:
        # Multiple intervals
        for i in range(len(edges) - 1):
            label = f"bin_{i}"
            left, right = edges[i], edges[i + 1]
            if i == (len(edges) - 2):
                rng_label = f"[{left}, {right}]"
            else:
                rng_label = f"[{left}, {right})"
            cnt_val = cnt_map.get(label, 0)
            prp = cnt_val / total_rows
            adj_prp = (cnt_val / valid_cnt * valid_prop) if valid_cnt else 0.0
            bins_data[label] = {
                "range": rng_label,
                "cnt": cnt_val,
                "prop": prp,
                "adj_prop": adj_prp
            }

    # Add null_bin & oor
    bins_data["null_bin"] = {
        "range": "NULL",
        "cnt": null_cnt,
        "prop": null_prop,
        "adj_prop": null_prop
    }
    bins_data["oor"] = {
        "range": "Out of Range",
        "cnt": oor_cnt,
        "prop": oor_prop,
        "adj_prop": oor_prop
    }

    return {"edges": edges, "bins": bins_data}

#############################
# 4) get_baseline_hists (Multiple Features)
#############################
def get_baseline_hists(
    df: DataFrame,
    features: list,
    nbins: int = 10
) -> dict:
    """
    Builds a dictionary of baseline histograms for multiple features:
    {
      "feat_1": {
        "edges": [...],
        "bins": {...}
      },
      "feat_2": {...},
      ...
    }
    """
    result = {}
    for feat in features:
        hist_info = get_hist(df, feat, nbins=nbins, edges=None)
        result[feat] = hist_info
    return result

#############################
# 5) save_baseline_json / load_baseline_json
#############################
def save_baseline_json(hist_dict: dict, file_path: str):
    """
    Saves the multi-feature dictionary as JSON:
    {
      "feat_1": { "edges": [...], "bins": {...} },
      "feat_2": {...}
    }
    """
    with open(file_path, "w") as f:
        json.dump(hist_dict, f)

def load_baseline_json(file_path: str) -> dict:
    """
    Loads the multi-feature histogram dictionary from JSON.
    """
    with open(file_path, "r") as f:
        data = json.load(f)
    return data

#############################
# 6) calc_psi: Re-bin new data & compare with baseline
#############################
def calc_psi(
    new_df: DataFrame,
    col_name: str,
    baseline_hist: dict
) -> float:
    """
    Hard-coded epsilon. 
    1) Extract edges from baseline_hist.
    2) Build new histogram with those edges.
    3) Compare 'adj_prop' per bin => sum( (p_new - p_base)*ln(p_new/p_base) ).
    """
    EPSILON = 1e-10
    # 1) Get the baseline edges
    edges = baseline_hist["edges"]

    # 2) Build new distribution
    new_hist = get_hist(new_df, col_name, edges=edges)

    # 3) Compare
    base_bins = baseline_hist["bins"]
    new_bins  = new_hist["bins"]
    all_bins = set(base_bins.keys()) | set(new_bins.keys())

    psi_val = 0.0
    for b in all_bins:
        # baseline & new => adj_prop
        p_base = base_bins.get(b, {}).get("adj_prop", 0.0) + EPSILON
        p_new  = new_bins.get(b, {}).get("adj_prop", 0.0) + EPSILON
        psi_val += (p_new - p_base) * math.log(p_new / p_base)

    return psi_val

##############################################
# EXAMPLE USAGE (Uncomment to Run)
##############################################
"""
def example_usage():
    # 1) Sample DataFrame with multiple features
    data = [
        (1.0, 10.0, 5.0),
        (2.5, 20.0, None),
        (None, 30.0, 2.5),
        (100.0, 40.0, 10.0),
        (9999.0, 50.0, 60.0),
        (2.0, None, None),
    ]
    df = spark.createDataFrame(data, ["feat_1", "feat_2", "feat_3"])

    # 2) Split => baseline & new
    df_baseline, df_new = df.randomSplit([0.5, 0.5], seed=42)

    # 3) Build baseline histogram for multiple features
    features = ["feat_1", "feat_2", "feat_3"]
    baseline_dict = get_baseline_hists(df_baseline, features, nbins=5)

    # 4) Save baseline to JSON
    save_baseline_json(baseline_dict, "baseline_histograms.json")

    # 5) Load baseline from JSON
    loaded_baseline = load_baseline_json("baseline_histograms.json")

    # 6) Calculate PSI for each feature
    for feat in features:
        psi_val = calc_psi(df_new, feat, loaded_baseline[feat])
        print(f"PSI for {feat}: {psi_val}")
"""
