Feature Stability Monitoring System

In [8]:
import numpy as np
import pandas as pd   

In [14]:
def psi_categorical(base, new):
    """
    Compute PSI for categorical variable
    """
    base_dist = base.value_counts(normalize=True)
    new_dist = new.value_counts(normalize=True)

    all_categories = set(base_dist.index).union(set(new_dist.index))

    psi = 0
    for cat in all_categories:
        p = base_dist.get(cat, 1e-6)
        q = new_dist.get(cat, 1e-6)

        psi += (p - q) * np.log(p / q)

    return psi

In [15]:
def psi_numeric(base, new, bins=10):
    """
    Compute PSI for numeric variable
    """
    base_perc, edges = np.histogram(base, bins=bins)
    new_perc, _ = np.histogram(new, bins=edges)

    base_perc = base_perc / len(base)
    new_perc = new_perc / len(new)

    base_perc = np.where(base_perc == 0, 1e-6, base_perc)
    new_perc = np.where(new_perc == 0, 1e-6, new_perc)

    psi_values = (base_perc - new_perc) * np.log(base_perc / new_perc)

    return np.sum(psi_values)


In [16]:
def compute_feature_psi(base_df, new_df, categorical_cols=None, bins=10):

    if categorical_cols is None:
        categorical_cols = []

    psi_results = []

    for col in base_df.columns:

        base_col = base_df[col].dropna()
        new_col = new_df[col].dropna()

        if col in categorical_cols:
            psi = psi_categorical(base_col, new_col)
            feature_type = "categorical"
        else:
            psi = psi_numeric(base_col, new_col, bins=bins)
            feature_type = "numeric"

        if psi < 0.1:
            stability = "Stable"
        elif psi < 0.25:
            stability = "Moderate Shift"
        else:
            stability = "Unstable"

        psi_results.append({
            "feature": col,
            "psi": round(float(psi), 4),
            "status": stability,
            "type": feature_type
        })

    return pd.DataFrame(psi_results).sort_values(by="psi", ascending=False)


In [17]:
if __name__ == "__main__":

    np.random.seed(42)

    # historical data
    base = pd.DataFrame({
        "age": np.random.normal(40, 5, 1000),
        "income": np.random.normal(50000, 5000, 1000),
        "gender": np.random.choice(["M", "F"], 1000)
    })

    # new data with drift
    new = pd.DataFrame({
        "age": np.random.normal(45, 6, 1000),
        "income": np.random.normal(52000, 5500, 1000),
        "gender": np.random.choice(["M", "F"], 1000, p=[0.7, 0.3])
    })

    report = compute_feature_psi(
        base,
        new,
        categorical_cols=["gender"],
        bins=10
    )

In [18]:
 print("\n====== Feature Stability Report ======\n")
 print(report)




  feature     psi          status         type
0     age  0.8081        Unstable      numeric
2  gender  0.1450  Moderate Shift  categorical
1  income  0.1029  Moderate Shift      numeric
