In [None]:
import orjson
import pandas as pd
import glob
import os
import numpy as np
import json
from data_processor import DataProcessor
from prob1 import config as cfg1
from prob2 import config as cfg2

# Determine drift data

In [None]:
def get_unique_not_nan_values_list_from_series(current_data: pd.Series, reference_data: pd.Series) -> list:
    """Get unique values from current and reference series, drop NaNs"""
    return list(set(reference_data.dropna().unique()) | set(current_data.dropna().unique()))

def get_binned_data(
    reference_data: pd.Series, current_data: pd.Series, feature_type: str, n: int, feel_zeroes: bool = True
):
    """Split variable into n buckets based on reference quantiles
    Args:
        reference_data: reference data
        current_data: current data
        feature_type: feature type
        n: number of quantiles
    Returns:
        reference_percents: % of records in each bucket for reference
        current_percents: % of records in each bucket for current
    """
    n_vals = reference_data.nunique()

    if feature_type == "num" and n_vals > 20:
        bins = np.histogram_bin_edges(pd.concat([reference_data, current_data], axis=0).values, bins="sturges")
        reference_percents = np.histogram(reference_data, bins)[0] / len(reference_data)
        current_percents = np.histogram(current_data, bins)[0] / len(current_data)

    else:
        keys = get_unique_not_nan_values_list_from_series(current_data=current_data, reference_data=reference_data)
        ref_feature_dict = {**dict.fromkeys(keys, 0), **dict(reference_data.value_counts())}
        current_feature_dict = {**dict.fromkeys(keys, 0), **dict(current_data.value_counts())}
        reference_percents = np.array([ref_feature_dict[key] / len(reference_data) for key in keys])
        current_percents = np.array([current_feature_dict[key] / len(current_data) for key in keys])

    if feel_zeroes:
        np.place(
            reference_percents,
            reference_percents == 0,
            min(reference_percents[reference_percents != 0]) / 10**6
            if min(reference_percents[reference_percents != 0]) <= 0.0001
            else 0.0001,
        )
        np.place(
            current_percents,
            current_percents == 0,
            min(current_percents[current_percents != 0]) / 10**6
            if min(current_percents[current_percents != 0]) <= 0.0001
            else 0.0001,
        )

    return reference_percents, current_percents

def _psi(
    reference_data: pd.Series, current_data: pd.Series, feature_type: str, threshold: float, n_bins: int = 30
):
    """Calculate the PSI
    Args:
        reference_data: reference data
        current_data: current data
        feature_type: feature type
        threshold: all values above this threshold means data drift
        n_bins: number of bins
    Returns:
        psi_value: calculated PSI
        test_result: whether the drift is detected
    """
    reference_percents, current_percents = get_binned_data(reference_data, current_data, feature_type, n_bins)

    psi_values = (reference_percents - current_percents) * np.log(reference_percents / current_percents)
    psi_value = np.sum(psi_values)

    return psi_value, psi_value >= threshold

In [None]:
from pydantic import BaseModel
class Data(BaseModel):
    id: str
    rows: list
    columns: list

def process_json_data(json_path):
    json_data = orjson.loads(open(json_path, "rb").read())
    data = Data(**json_data)
    df = pd.DataFrame(data.rows, columns=data.columns)
    df = DataProcessor.apply_process_data(df, cfg, DataProcessor.load_category_encoder(cfg.category_index_path))
    return df

# Problem 1

In [None]:
ref_df = pd.read_parquet(cfg1.original_data_path)
ref_df = DataProcessor.apply_process_data(ref_df, cfg1, DataProcessor.load_category_encoder(cfg1.category_index_path))
ref_df.head()

In [None]:
numeric_features = cfg1.feature_config["numeric_columns"]
category_features = cfg1.feature_config["category_columns"]

In [None]:
for feature in numeric_features:
    a = []
    psi_dict = {}
    for json_path in sorted(glob.glob("./save_request_data/prob1/*.json")):
        test_df = process_json_data(json_path)
        psi_value, is_drift = _psi(ref_df[feature], test_df[feature], "num", 0.0)
        # if is_drift:
        #     print(json_path, psi_value)
        a.append(psi_value)
        psi_dict[os.path.basename(json_path).split('.')[0]] = psi_value
    print(feature, sorted(list(set(a)), reverse=True))
    sorted_by_psi = sorted(psi_dict.items(), key=lambda x:x[1], reverse=True)
    print(list(dict(sorted_by_psi).keys())[:10])

In [None]:
cnt = 0
for feature in numeric_features:
    if feature != "feature19": continue
    a = []
    for json_path in sorted(glob.glob("./save_request_data/prob1/*.json")):
        test_df = process_json_data(json_path)
        psi_value, is_drift = _psi(ref_df[feature], test_df[feature], "num", 0.02)
        if is_drift:
            cnt += 1
            print(json_path, psi_value)
        a.append(psi_value)
    print(cnt)
    print(feature, sorted(list(set(a)), reverse=True))


# Problem 2

In [None]:
ref_df = pd.read_parquet(cfg2.original_data_path)
ref_df = DataProcessor.apply_process_data(ref_df, cfg2, DataProcessor.load_category_encoder(cfg2.category_index_path))
ref_df.head()

In [None]:
numeric_features = cfg2.feature_config["numeric_columns"]
category_features = cfg2.feature_config["category_columns"]

In [None]:
for feature in numeric_features:
    a = []
    psi_dict = {}
    for json_path in sorted(glob.glob("./save_request_data/prob2/*.json")):
        test_df = process_json_data(json_path)
        psi_value, is_drift = _psi(ref_df[feature], test_df[feature], "num", 0.0)
        # if is_drift:
        #     print(json_path, psi_value)
        a.append(psi_value)
        psi_dict[os.path.basename(json_path).split('.')[0]] = psi_value
    print(feature, sorted(list(set(a)), reverse=True))
    sorted_by_psi = sorted(psi_dict.items(), key=lambda x:x[1], reverse=True)
    print(list(dict(sorted_by_psi).keys())[:10])

In [None]:
cnt = 0
for feature in numeric_features:
    if feature != "feature20": continue
    a = []
    for json_path in sorted(glob.glob("./save_request_data/prob2/*.json")):
        test_df = process_json_data(json_path)
        psi_value, is_drift = _psi(ref_df[feature], test_df[feature], "num", 0.026)
        if is_drift:
            print(json_path, psi_value)
            cnt += 1
        a.append(psi_value)
    print(cnt)
    print(feature, sorted(list(set(a)), reverse=True))