In [1]:
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

X_train = np.load("../concated_embs_npy/X_train_aug_1v10.npy")
y_train = np.load("../concated_embs_npy/y_train_aug_1v10.npy")

X_test  = np.load("../concated_embs_npy/X_test.npy")
y_test  = np.load("../concated_embs_npy/y_test.npy")

X_val   = np.load("../concated_embs_npy/X_val.npy")
y_val   = np.load("../concated_embs_npy/y_val.npy")

# y_train: (N,) или (N,1)
y_train_ = y_train.ravel()

X_pos = X_train[y_train_ == 1]
X_neg = X_train[y_train_ == 0]  # если есть реальные / синтетические негативы

In [2]:
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import (
    average_precision_score, roc_auc_score, f1_score,
    balanced_accuracy_score, matthews_corrcoef, confusion_matrix
)

def fit_mahalanobis_pca(
    X_train: np.ndarray,
    y_train: np.ndarray,
    n_components: int = 64,
    use_pos_only: bool = True,
    eps: float = 1e-12,
    random_state: int = 42,
):
    """
    Fit scaler + PCA on (positive) train, then estimate mean and diagonal covariance in PCA space.
    Returns a dict with all params to score new samples.
    """
    y = y_train.ravel()
    if use_pos_only:
        X_fit = X_train[y == 1]
    else:
        X_fit = X_train

    scaler = StandardScaler()
    Xs = scaler.fit_transform(X_fit)

    pca = PCA(n_components=n_components, random_state=random_state)
    Z = pca.fit_transform(Xs)

    mu = Z.mean(axis=0)
    var = Z.var(axis=0) + eps  # diag covariance; eps for stability

    model = {
        "scaler": scaler,
        "pca": pca,
        "mu": mu,
        "var": var,
    }
    return model

def mahalanobis_distance(model, X: np.ndarray) -> np.ndarray:
    """
    Squared Mahalanobis distance with diagonal covariance in PCA space:
    d^2 = sum_k ((z_k - mu_k)^2 / var_k)
    """
    Xs = model["scaler"].transform(X)
    Z = model["pca"].transform(Xs)
    diff = Z - model["mu"]
    d2 = np.sum((diff * diff) / model["var"], axis=1)
    return d2

def d2_to_prob_neg(d2: np.ndarray) -> np.ndarray:
    """
    Convert distance^2 to a monotonic "probability of negative".
    Not calibrated; just a smooth mapping into (0,1).
    """
    # p_neg = 1 - exp(-d2/2)
    return 1.0 - np.exp(-0.5 * d2)

def choose_threshold(y_true: np.ndarray, p_neg: np.ndarray, mode: str = "mcc"):
    """
    Choose threshold on p_neg to classify negative/positive.
    mode: 'mcc' or 'f1_macro'
    Returns (best_t, best_score)
    """
    y = y_true.ravel().astype(int)

    # IMPORTANT:
    # Here y=1 means POSITIVE.
    # p_neg is probability of NEGATIVE.
    # So predict NEGATIVE when p_neg > t.
    # Hence y_pred_pos = (p_neg <= t)
    ts = np.linspace(0.0, 1.0, 1001)

    best_t = 0.5
    best_val = -1e9

    for t in ts:
        y_pred = (p_neg <= t).astype(int)  # 1=pos, 0=neg
        if mode == "mcc":
            val = matthews_corrcoef(y, y_pred)
        elif mode == "f1_macro":
            val = f1_score(y, y_pred, average="macro")
        else:
            raise ValueError("mode must be 'mcc' or 'f1_macro'")

        if val > best_val:
            best_val = val
            best_t = t

    return best_t, best_val

# -------------------- usage --------------------

# 1) fit model
model = fit_mahalanobis_pca(
    X_train=X_train,
    y_train=y_train,
    n_components=256,      # попробуй 32/64/128
    use_pos_only=True,    # если у тебя реально почти всё pos, всё равно лучше True
)

# 2) score val: distance -> p(neg)
d2_val = mahalanobis_distance(model, X_val)
p_neg  = d2_to_prob_neg(d2_val)

# 3) convert to "p(pos)" for AUC metrics that expect positive score:
# y_prob = P(y=1) = 1 - P(neg)
import numpy as np
from sklearn.metrics import (
    average_precision_score, roc_auc_score, f1_score,
    balanced_accuracy_score, matthews_corrcoef, confusion_matrix
)

# d2_val = mahalanobis_distance(model, X_val)

# 1) СКОР: выше = более позитивно
score = -d2_val

# 2) Для AUC можно напрямую (не "вероятность", а скор)
y_prob = score  # да, так можно

# 3) Подбор порога по d2 (или по score) на сетке квантилей
y_true = y_val.ravel().astype(int)

# будем предсказывать POS, если d2 <= thr  (то есть score >= -thr)
qs = np.linspace(0.0, 1.0, 2001)
thr_grid = np.quantile(d2_val, qs)

best_thr = thr_grid[0]
best_mcc = -1e9

for thr in thr_grid:
    y_pred = (d2_val <= thr).astype(int)  # 1=pos, 0=neg
    mcc = matthews_corrcoef(y_true, y_pred)
    if mcc > best_mcc:
        best_mcc = mcc
        best_thr = thr

y_pred = (d2_val <= best_thr).astype(int)

print("Chosen threshold on d2:", best_thr, "best_mcc:", best_mcc)
print("PR-AUC:", average_precision_score(y_true, y_prob))
print("ROC-AUC:", roc_auc_score(y_true, y_prob))
print("F1-macro:", f1_score(y_true, y_pred, average='macro'))
print("F1-pos:", f1_score(y_true, y_pred, pos_label=1))
print("BalancedAcc:", balanced_accuracy_score(y_true, y_pred))
print("MCC:", matthews_corrcoef(y_true, y_pred))
print("Confusion:\n", confusion_matrix(y_true, y_pred))


Chosen threshold on d2: 891221267557016.6 best_mcc: 0.0347947820550994
PR-AUC: 0.9999649069074893
ROC-AUC: 0.8495908346972176
F1-macro: 0.4606401619989276
F1-pos: 0.9184329435878325
BalancedAcc: 0.9245843741924369
MCC: 0.0347947820550994
Confusion:
 [[    5     0]
 [ 3502 19716]]


In [None]:
d2 = d2_val
y = y_val.ravel().astype(int)

print("d2 all: min/med/max", np.min(d2), np.median(d2), np.max(d2))
print("d2 pos: min/med/max", np.min(d2[y==1]), np.median(d2[y==1]), np.max(d2[y==1]))
print("d2 neg: min/med/max", np.min(d2[y==0]), np.median(d2[y==0]), np.max(d2[y==0]))
print("neg count:", (y==0).sum(), "pos count:", (y==1).sum())
