## Name: Anomaly Detection using mean estimator
### Date: 7/11/2025
### Status: Done.
### Idea: 
- Simply use the mean of the data as the representative of the normal class.
- Calculate the distance from this mean, this is the anomaly score.
- Added a variant where we have a global distance and a local-distance scoring scheme


### Results:
- Performs worse than ISO forest in 2/3 datasets.
- Very fast though (the global variants).
- Local is better in two datasets (the ones we are worse from IF) and global is better in the other.


In [1]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split


data = np.load('./2_annthyroid.npz', allow_pickle=True)
#data = np.load('./5_campaign(1).npz', allow_pickle=True)
X, y = data['X'], data['y']
X_train, X_test, y_train, y_test = train_test_split(X,y,stratify=y, test_size=0.2, random_state=42)
X.shape, pd.Series(y).value_counts()

((7200, 6),
 0    6666
 1     534
 Name: count, dtype: int64)

In [27]:
from sklearn.datasets import load_digits


# NORMAL CLASSES BOTH 0 and 8
# ANOMALY CLASS IS 4


X, y = load_digits(return_X_y=True)


normal_inds = (y ==0) | (y==8)
anomaly_fraction = 0.01
anomaly_inds = np.random.choice(np.where(y==4)[0], size=int(anomaly_fraction*len(normal_inds)))
X = np.vstack((X[normal_inds], X[anomaly_inds]))
y = np.array([0]*sum(normal_inds) + [1]*len(anomaly_inds))

X_train, X_test, y_train, y_test = train_test_split(X,y,stratify=y, test_size=0.2, random_state=42)
X.shape, pd.Series(y).value_counts()

((369, 64),
 0    352
 1     17
 Name: count, dtype: int64)

In [25]:
from sklearn.ensemble import IsolationForest
from sklearn.metrics import classification_report,f1_score,average_precision_score
clf = IsolationForest(random_state=42)
clf.fit(X_train)
y_pred = clf.predict(X_test)
y_pred_proba = -clf.decision_function(X_test)
y_pred[y_pred == 1] = 0
y_pred[y_pred == -1] = 1
print(classification_report(y_test, y_pred))
print(f"F1 pos: {f1_score(y_test, y_pred, average='binary'):.4f}")
print(f"AP: {average_precision_score(y_test, y_pred_proba):.4f}")


              precision    recall  f1-score   support

           0       1.00      0.82      0.90        71
           1       0.19      1.00      0.32         3

    accuracy                           0.82        74
   macro avg       0.59      0.91      0.61        74
weighted avg       0.97      0.82      0.88        74

F1 pos: 0.3158
AP: 0.8333


In [28]:
from typing import Any
from numpy import floating
from sklearn.base import ClassifierMixin
from sklearn.preprocessing import StandardScaler

class DiffFromEstimator(ClassifierMixin):
    
    def __init__(self, strategy='mean', percentile_dist=0.5):
        self.strategy = strategy
        self.percentile_dist = percentile_dist
        if strategy == 'mean':
            self.aggr_ = np.mean
        elif strategy == 'max':
            self.aggr_ = np.max
        elif strategy == 'min':
            self.aggr_ = np.min
        else:
            raise NotImplementedError(f"{strategy} not understood")
        
        
    def fit(self, X, y=None):
        self.sc = StandardScaler()
        X = self.sc.fit_transform(X)
        center = self.aggr_(X, axis=0)
        self.center = center
        diffs = ((X-self.center)**2).sum(axis=1)
        self.max_dist = np.max(diffs)
        diffs = self.normalize_dists(diffs)
        self.threshold = self.percentile_dist * np.max(diffs)
        return self
    
    def decision_function(self, X):
        X = self.sc.transform(X)
        # Hihger scores more abnormal
        dists: floating[Any] = ((X-self.center)**2).sum(axis=1)
        dists = self.normalize_dists(dists)
        return dists
    
    def predict(self, X):
        dists = self.decision_function(X)
        pred = (dists > self.threshold).astype(int)
        return pred
    
    def normalize_dists(self, dists):
        return dists / self.max_dist

clf = DiffFromEstimator(strategy='mean')
clf.fit(X_train)
y_pred = clf.predict(X_test)
y_pred_proba = clf.decision_function(X_test)
print(classification_report(y_test, y_pred))
print(f"F1 pos: {f1_score(y_test, y_pred, average='binary'):.4f}")
print(f"AP: {average_precision_score(y_test, y_pred_proba):.4f}")


              precision    recall  f1-score   support

           0       0.97      1.00      0.99        71
           1       1.00      0.33      0.50         3

    accuracy                           0.97        74
   macro avg       0.99      0.67      0.74        74
weighted avg       0.97      0.97      0.97        74

F1 pos: 0.5000
AP: 0.5119


In [29]:
from sklearn.neighbors import NearestNeighbors

class DiffFromEstimator_LocalGlobal(ClassifierMixin):
    
    def __init__(self, strategy='mean', percentile_dist=0.5, n_neighbors=5, lamda_global=0.5):
        self.strategy = strategy
        self.percentile_dist = percentile_dist
        self.n_neighbors = n_neighbors
        self.lamda_global = lamda_global
        if strategy == 'mean':
            self.aggr_ = np.mean
        elif strategy == 'max':
            self.aggr_ = np.max
        elif strategy == 'min':
            self.aggr_ = np.min
        else:
            raise NotImplementedError(f"{strategy} not understood")
    
    def fit(self, X, y=None):
        self.global_clf = DiffFromEstimator(strategy=self.strategy, percentile_dist=self.percentile_dist)
        self.global_clf.fit(X)
        X = self.global_clf.sc.transform(X)
        self.nn = NearestNeighbors(n_neighbors=self.n_neighbors, metric='euclidean')
        self.nn.fit(X)
        self.X_train = X
        
    def decision_function(self, X):
        dists_global = self.global_clf.decision_function(X)
        dists_global = self.global_clf.normalize_dists(dists_global)
        
        X = self.global_clf.sc.transform(X)
        neighbors_dist, neighbors_ind = self.nn.kneighbors(X, self.n_neighbors)
        
        # THis is N_SAMPLES X N_NEIGHBORS X N_FEATS
        X_neigh = self.X_train[neighbors_ind]
        # THis is N_SAMPLES X N_FEATS
        centers = self.aggr_(X_neigh, axis=1)
        dists_local = ((X - centers)**2).sum(axis=1)
        dists_local /= np.max(dists_local)
        dists = self.lamda_global*dists_global + (1-self.lamda_global)*dists_local
        return dists
    

clf = DiffFromEstimator_LocalGlobal(strategy='mean', lamda_global=0.25, n_neighbors=31)
clf.fit(X_train)

y_pred_proba = clf.decision_function(X_test)
# y_pred = clf.predict(X_test)
# print(classification_report(y_test, y_pred))
# print(f"F1 pos: {f1_score(y_test, y_pred, average='binary'):.4f}")
print(f"AP: {average_precision_score(y_test, y_pred_proba):.4f}")


AP: 0.5167


In [31]:
import time
num_runs = 10
datasets = [
    './2_annthyroid.npz', 
    './5_campaign(1).npz',
    'digits'
    ]
clf_names = [
    'IF', 
    'DB_Global',
    'DB_Mix_0', 
    'DB_Mix_0.5'
    ]
res = []
for dataset_name in datasets:
    
    print(dataset_name)
    if dataset_name == 'digits':
        X, y = load_digits(return_X_y=True)
        normal_inds = (y ==0) | (y==8)
        anomaly_fraction = 0.01
        anomaly_inds = np.random.choice(np.where(y==4)[0], size=int(anomaly_fraction*len(normal_inds)))
        X = np.vstack((X[normal_inds], X[anomaly_inds]))
        y = np.array([0]*sum(normal_inds) + [1]*len(anomaly_inds))
    else:
            
        data = np.load(dataset_name, allow_pickle=True)
        X, y = data['X'], data['y']
    

    for run_id in range(num_runs):
        X_train, X_test, y_train, y_test = train_test_split(X,y,stratify=y, test_size=0.2, random_state=run_id)
        for clf_name in clf_names:
            if clf_name == 'IF':
                clf = IsolationForest(random_state=run_id)
            elif clf_name == 'DB_Global':
                clf = DiffFromEstimator(strategy='mean')
            elif clf_name.startswith('DB_Mix'):
                lamda_global = float(clf_name.split("_")[-1])
                clf = DiffFromEstimator_LocalGlobal(strategy='mean', lamda_global=lamda_global, n_neighbors=31)
            else:
                raise NotImplementedError(f"{clf_name} not known...")
            time_s = time.time()
            clf.fit(X_train)
            probas = clf.decision_function(X_test)
            if clf_name == 'IF':
                probas = -probas
            time_took = time.time() - time_s
            ap = average_precision_score(y_test, probas)
            num_anomalies = y_test.sum()
            num_found = y_test[np.argsort(probas)[::-1]][:num_anomalies].sum()
            res.append((dataset_name, run_id, clf_name, ap, num_found/num_anomalies, time_took))
                
    #break
res = pd.DataFrame(res, columns=["dataset", 'run', 'clf', 'ap', 'rec_prec', 'time'])
res.groupby(['dataset', 'clf'])[['ap', 'rec_prec', 'time']].agg("mean").sort_values(["dataset", "rec_prec"], ascending=False)

./2_annthyroid.npz
./5_campaign(1).npz
digits


Unnamed: 0_level_0,Unnamed: 1_level_0,ap,rec_prec,time
dataset,clf,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
digits,IF,0.58881,0.5,0.093366
digits,DB_Mix_0,0.43297,0.3,0.002916
digits,DB_Mix_0.5,0.43797,0.3,0.002858
digits,DB_Global,0.404842,0.233333,0.000614
./5_campaign(1).npz,DB_Global,0.285035,0.366918,0.022727
./5_campaign(1).npz,DB_Mix_0.5,0.298875,0.353125,0.286706
./5_campaign(1).npz,DB_Mix_0,0.29887,0.353017,0.288104
./5_campaign(1).npz,IF,0.29569,0.32597,0.18483
./2_annthyroid.npz,IF,0.294395,0.316822,0.103265
./2_annthyroid.npz,DB_Mix_0.5,0.227459,0.276636,0.038352
