In [1]:
import numpy as np
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
import hdbscan
from itertools import product
from sklearn.cluster import Birch
from sklearn.cluster import OPTICS
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import (
    silhouette_score,
    calinski_harabasz_score,
    davies_bouldin_score,
    adjusted_rand_score,
    adjusted_mutual_info_score,
    homogeneity_score,
    completeness_score,
    v_measure_score
)
from sklearn.neighbors import NearestNeighbors
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score
)

In [None]:
def simulate_mouse_path(state_params, width, height):
    if "initial_bias" in state_params:
        bias = state_params["initial_bias"]
        bias_std = state_params.get("bias_std", 50)
        x = int(np.clip(np.random.normal(bias[0], bias_std), 0, width-1))
        y = int(np.clip(np.random.normal(bias[1], bias_std), 0, height-1))
    else:
        x = np.random.randint(0, width)
        y = np.random.randint(0, height)
    
    angle = np.random.uniform(0, 2 * np.pi)
    events = []
    
    for _ in range(state_params["path_length"]):
        events.append((x, y))
        angle += np.random.normal(0, state_params.get("angle_variance", 0.1))
        if np.random.rand() < state_params.get("drift_probability", 0.05):
            if "drift_target" in state_params:
                target_x, target_y = state_params["drift_target"]
                desired_angle = np.arctan2(target_y - y, target_x - x)
                angle = (angle + desired_angle) / 2
        step = np.random.uniform(state_params.get("step_min", 1), state_params.get("step_max", 10))
        x += int(step * np.cos(angle))
        y += int(step * np.sin(angle))
        
        if "confined_region" in state_params:
            region = state_params["confined_region"]
            x = max(region[0], min(x, region[2]))
            y = max(region[1], min(y, region[3]))
        else:
            x = max(0, min(x, width-1))
            y = max(0, min(y, height-1))
    return events
 
def simulate_minute_heatmap(state_params, width, height, scale):
    all_events = []
    for _ in range(state_params["num_paths"]):
        events = simulate_mouse_path(state_params, width, height)
        all_events.extend(events)
    
    out_width = width // scale
    out_height = height // scale
    heatmap = np.zeros((out_height, out_width), dtype=float)
    
    for (x, y) in all_events:
        bin_x = int(x // scale)
        bin_y = int(y // scale)
        if 0 <= bin_x < out_width and 0 <= bin_y < out_height:
            heatmap[bin_y, bin_x] += 1
    
    max_val = heatmap.max()
    if max_val > 0:
        heatmap = heatmap / max_val
    return heatmap

def simulate_heatmap_series(total_minutes, timeline, width, height, scale):
    series = []
    for minute in range(total_minutes):
        state_label = None
        state_params = None
        for (start, end, label, params) in timeline:
            if start <= minute < end:
                state_label = label
                state_params = params
                break
        if state_params is None:
            state_params = {"num_paths": 10, "path_length": 500, "angle_variance": 0.1,
                            "step_min": 1, "step_max": 8, "drift_probability": 0.05}
            state_label = "default"
        heatmap = simulate_minute_heatmap(state_params, width, height, scale)
        series.append((minute, state_label, heatmap))
    return series

In [None]:
width, height = 2560, 1440
scale = 40


state_A = {
        "num_paths": 20,
        "path_length": 800,
        "angle_variance": 0.2,
        "step_min": 2,
        "step_max": 8,
        "drift_probability": 0.1,
        "drift_target": (width // 2, height // 2),
        "initial_bias": (width // 2, height // 2),
        "bias_std": 100
}

state_B = {
        "num_paths": 15,
        "path_length": 1000,
        "angle_variance": 0.1,
        "step_min": 1,
        "step_max": 5,
        "drift_probability": 0.05,
        "confined_region": (width // 4, height // 4, width // 2, height // 2),
        "initial_bias": (width // 3, height // 3),
        "bias_std": 50
}

state_C = {
        "num_paths": 25,
        "path_length": 600,
        "angle_variance": 0.5,
        "step_min": 3,
        "step_max": 12,
        "drift_probability": 0.2,
        "drift_target": (int(width * 0.75), int(height * 0.75)),
        "initial_bias": (width // 2, height // 2),
        "bias_std": 150
}

state_D = {
        "num_paths": 10,
        "path_length": 400,
        "angle_variance": 0.3,
        "step_min": 2,
        "step_max": 6,
        "drift_probability": 0.1,
        "drift_target": (width // 4, height // 4),
        "initial_bias": (width // 4, height // 4),
        "bias_std": 80
}

state_E = {
        "num_paths": 50,
        "path_length": 500,
        "angle_variance": 0.8,
        "step_min": 3,
        "step_max": 4,
        "drift_probability": 0.3,
        "drift_target": (int(width * 0.35), int(height * 0.25)),
        "initial_bias": (width // 2, height // 2),
        "bias_std": 50
}

state_F = {
        "num_paths": 30,
        "path_length": 700,
        "angle_variance": 0.4,
        "step_min": 1,
        "step_max": 2,
        "drift_probability": 0.15,
        "drift_target": (int(width * 0.8), int(height * 0.8)),
        "initial_bias": (width // 2, height // 2),
        "bias_std": 100
}

timeline = [
        (0, 30, "State A", state_A),
        (30, 90, "State B", state_B),
        (90, 110, "State C", state_C),
        (110, 125, "State B", state_B),
        (125, 160, "State E", state_A),
        (160, 190, "State C", state_C),
        (190, 210, "State B", state_B),
        (210, 240, "State E", state_B),
        # Anomaly detection states
        (240, 250, "State B", state_B),
        (250, 275, "State C", state_C),
        (275, 290, "State F", state_F), # anomaly
        (290, 310, "State A", state_A),
        (310, 330, "State E", state_E),
        (330, 360, "State D", state_D), # anomaly
]

total_minutes = 360
series = simulate_heatmap_series(total_minutes, timeline, width, height, scale)

for minute, state_label, _ in series:
        print(f"Minute {minute}: {state_label}")

Minute 0: State A
Minute 1: State A
Minute 2: State A
Minute 3: State A
Minute 4: State A
Minute 5: State A
Minute 6: State A
Minute 7: State A
Minute 8: State A
Minute 9: State A
Minute 10: State A
Minute 11: State A
Minute 12: State A
Minute 13: State A
Minute 14: State A
Minute 15: State A
Minute 16: State A
Minute 17: State A
Minute 18: State A
Minute 19: State A
Minute 20: State A
Minute 21: State A
Minute 22: State A
Minute 23: State A
Minute 24: State A
Minute 25: State A
Minute 26: State A
Minute 27: State A
Minute 28: State A
Minute 29: State A
Minute 30: State B
Minute 31: State B
Minute 32: State B
Minute 33: State B
Minute 34: State B
Minute 35: State B
Minute 36: State B
Minute 37: State B
Minute 38: State B
Minute 39: State B
Minute 40: State B
Minute 41: State B
Minute 42: State B
Minute 43: State B
Minute 44: State B
Minute 45: State B
Minute 46: State B
Minute 47: State B
Minute 48: State B
Minute 49: State B
Minute 50: State B
Minute 51: State B
Minute 52: State B
Min

In [None]:
heatmaps = [heatmap.flatten() for _, _, heatmap in series]
minutes = np.array([minute for minute, _, _ in series])
true_state_labels = [state_label for _, state_label, _ in series]
label_mapping = {label: idx for idx, label in enumerate(sorted(set(true_state_labels)))}
true_labels = np.array([label_mapping[s] for s in true_state_labels])
X = np.array(heatmaps)
pca = PCA(n_components=10)
X_reduced = pca.fit_transform(X)
train_mask = minutes < 240
test_mask = minutes >= 240

X_train = X_reduced[train_mask]
X_test = X_reduced[test_mask]
true_states_test = np.array(true_state_labels)[test_mask]
anomaly_states = {"State F", "State D"}
y_true = np.array([1 if s in anomaly_states else 0 for s in true_states_test])

from sklearn.cluster import DBSCAN
db = DBSCAN(eps=0.7, min_samples=5).fit(X_train)
core_samples = X_train[db.core_sample_indices_]

In [None]:
def detect_nn_core(core_samples, X_test, eps):
    nn = NearestNeighbors(radius=eps).fit(core_samples)
    neighbors = nn.radius_neighbors(X_test, return_distance=False)
    # 0 = normal, 1 = anomaly
    return np.array([0 if len(nbrs) > 0 else 1 for nbrs in neighbors])

def detect_ocsvm_core(core_samples, X_test, nu, kernel, gamma):
    ocsvm = OneClassSVM(nu=nu, kernel=kernel, gamma=gamma).fit(core_samples)
    preds = ocsvm.predict(X_test)  # +1 normal, -1 anomaly
    return np.array([0 if p == 1 else 1 for p in preds])

def detect_iforest_core(core_samples, X_test, n_estimators, contamination):
    iso = IsolationForest(n_estimators=n_estimators, contamination=contamination).fit(core_samples)
    preds = iso.predict(X_test)  # +1 normal, -1 anomaly
    return np.array([0 if p == 1 else 1 for p in preds])

def optimize_comparison_core(core_samples, X_test, y_true, methods, primary_metric='f1'):
    metric_fns = {
        'accuracy': accuracy_score,
        'precision': lambda yt, yp: precision_score(yt, yp, pos_label=1),
        'recall': lambda yt, yp: recall_score(yt, yp, pos_label=1),
        'f1': lambda yt, yp: f1_score(yt, yp, pos_label=1)
    }
    if primary_metric not in metric_fns:
        raise ValueError(f"Unsupported metric {primary_metric}")
    primary_fn = metric_fns[primary_metric]
    
    results = {}
    for name, cfg in methods.items():
        fn = cfg['func']
        hyper = cfg.get('hyperparams', {})
        param_names, param_values = zip(*hyper.items()) if hyper else ([], [])
        
        best = {'params': None, 'score': -np.inf, 'metrics': {}}
        for vals in product(*param_values):
            params = dict(zip(param_names, vals))
            try:
                y_pred = fn(core_samples, X_test, **params)
            except Exception:
                continue
            if len(y_pred) != len(y_true):
                continue
            score = primary_fn(y_true, y_pred)
            if score > best['score']:
                best['score'] = score
                best['params'] = params
                best['metrics'] = {
                    'accuracy': accuracy_score(y_true, y_pred),
                    'precision': precision_score(y_true, y_pred, pos_label=1),
                    'recall': recall_score(y_true, y_pred, pos_label=1),
                    'f1': f1_score(y_true, y_pred, pos_label=1)
                }
        results[name] = best
    return results


methods = {
    'NearestNeighbour': {
        'func': detect_nn_core,
        'hyperparams': {
            'eps': [0.2, 0.3, 0.4, 0.5, 0.7, 0.9],
        }
    },
    'OneClassSVM': {
        'func': detect_ocsvm_core,
        'hyperparams': {
            'nu': [0.001, 0.01, 0.05, 0.1, 0.5],
            'kernel': ['rbf', 'linear', 'poly', 'sigmoid'],
            'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1],
        }
    },
    'IsolationForest': {
        'func': detect_iforest_core,
        'hyperparams': {
            'n_estimators': [25, 50, 100, 150, 200],
            'contamination': [0.001, 0.01, 0.05, 0.1]
        }
    }
}

results = optimize_comparison_core(core_samples, X_test, y_true, methods, primary_metric='f1')

for name, res in results.items():
    print(f"{name}: Best F1={res['metrics']['f1']:.3f} with params {res['params']}")


NearestNeighbour: Best F1=0.818 with params {'eps': 0.4}
OneClassSVM: Best F1=0.818 with params {'nu': 0.001, 'kernel': 'linear', 'gamma': 'scale'}
IsolationForest: Best F1=0.531 with params {'n_estimators': 25, 'contamination': 0.05}


In [16]:
import json

print(json.dumps(results, indent=4, default=str))

{
    "NearestNeighbour": {
        "params": {
            "eps": 0.4
        },
        "score": 0.8181818181818182,
        "metrics": {
            "accuracy": 0.8333333333333334,
            "precision": 0.6923076923076923,
            "recall": 1.0,
            "f1": 0.8181818181818182
        }
    },
    "OneClassSVM": {
        "params": {
            "nu": 0.001,
            "kernel": "linear",
            "gamma": "scale"
        },
        "score": 0.8181818181818182,
        "metrics": {
            "accuracy": 0.8333333333333334,
            "precision": 0.6923076923076923,
            "recall": 1.0,
            "f1": 0.8181818181818182
        }
    },
    "IsolationForest": {
        "params": {
            "n_estimators": 25,
            "contamination": 0.05
        },
        "score": 0.53125,
        "metrics": {
            "accuracy": 0.75,
            "precision": 0.8947368421052632,
            "recall": 0.37777777777777777,
            "f1": 0.53125
        }
 