In [None]:
import sys
import os
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.metrics import accuracy_score

# Add project root to sys.path
sys.path.append(str(Path("..").resolve()))

from src import visualization, data_loader, filters, features, config

visualization.set_plot_style()

# --- Configuration ---
# Window sizes to test (10ms to 200ms)
WINDOW_SIZES_MS = range(10, 201, 10)

# Paths
TRAINING_DIR = Path("../data/labeled_training_data")
TEST_DIR = Path("../data/test_data")
RESULTS_DIR = Path("../results/thresholds")
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

print(f"Training Data: {TRAINING_DIR.resolve()}")
print(f"Test Data:     {TEST_DIR.resolve()}")

In [None]:
def find_best_threshold(feature_values, true_labels):
    """Grid search to find the threshold with maximum accuracy."""
    if len(feature_values) == 0: return 0.0
    
    # Scan range between min and max feature values
    min_v, max_v = np.min(feature_values), np.max(feature_values)
    candidates = np.linspace(min_v, max_v, 100)
    
    best_acc = 0
    best_thresh = candidates[0]
    
    for thresh in candidates:
        preds = (feature_values >= thresh).astype(int)
        acc = accuracy_score(true_labels, preds)
        if acc > best_acc:
            best_acc = acc
            best_thresh = thresh
    return best_thresh

def calculate_detection_delay(y_true, y_pred, fs):
    """Calculates average delay (ms) from onset (0->1) to first detection."""
    delays = []
    in_event = False
    start_idx = None
    
    for i in range(1, len(y_true)):
        # Label Onset
        if y_true[i-1] == 0 and y_true[i] == 1:
            in_event = True
            start_idx = i
        
        # First Detection
        if in_event and y_pred[i] == 1:
            delays.append((i - start_idx) / fs * 1000.0)
            in_event = False
            
    return np.nanmean(delays) if delays else np.nan

In [None]:
training_results = []
feature_funcs = {'RMS': features.calculate_rms, 'VAR': features.calculate_var, 'WL': features.calculate_wl}

# 1. Iterate over all training files
train_files = list(TRAINING_DIR.glob("*.csv"))
if not train_files:
    print("ERROR: No training files found! Please populate data/labeled_training_data/")
else:
    print(f"Training on {len(train_files)} datasets...")

    for file_path in train_files:
        # Load & Preprocess
        df = data_loader.load_labeled_csv(file_path)
        if df is None: continue
            
        # Convert to Volts & Center
        raw = (df['RawValue'] / 1023.0 * config.V_REF).values
        sig = raw - np.mean(raw)
        labels = df['LabelNumeric'].values
        
        # Filter
        clean_sig = filters.apply_butterworth_sos(sig, order=4, fs=config.FS)
        
        # 2. Test all Window Sizes & Features
        for win_ms in WINDOW_SIZES_MS:
            samples = int(win_ms * config.FS / 1000)
            if len(clean_sig) < samples: continue
            
            target_labels = labels[samples-1:]
            
            for fname, ffunc in feature_funcs.items():
                feats = ffunc(clean_sig, samples)
                
                # Find optimal threshold for this specific file/window/feature
                optimal_thresh = find_best_threshold(feats, target_labels)
                
                training_results.append({
                    "Dataset": file_path.name,
                    "Feature": fname,
                    "WindowSize(ms)": win_ms,
                    "Threshold": optimal_thresh
                })

    # 3. Aggregate Results (Median across datasets)
    df_train = pd.DataFrame(training_results)
    
    # Group by Feature + WindowSize and take the Median threshold
    trained_thresholds = df_train.groupby(["Feature", "WindowSize(ms)"])["Threshold"].median().reset_index()
    
    # Save to CSV
    out_path = RESULTS_DIR / "trained_thresholds.csv"
    trained_thresholds.to_csv(out_path, index=False)
    print(f"Training Complete. Optimized thresholds saved to:\n{out_path}")
    
    # Show preview
    print(trained_thresholds.head())

In [None]:
evaluation_results = []

# 1. Load the Trained Thresholds
if 'trained_thresholds' not in locals():
    trained_thresholds = pd.read_csv(RESULTS_DIR / "trained_thresholds.csv")

# Helper to look up threshold
def get_trained_threshold(feat, win):
    row = trained_thresholds[
        (trained_thresholds["Feature"] == feat) & 
        (trained_thresholds["WindowSize(ms)"] == win)
    ]
    return row["Threshold"].values[0] if not row.empty else None

# 2. Iterate over Test Files
test_files = list(TEST_DIR.glob("*.csv"))
if not test_files:
    print("ERROR: No test files found! Please populate data/test_data/")
else:
    print(f"Evaluating on {len(test_files)} datasets...")

    for file_path in test_files:
        # Load & Preprocess
        df = data_loader.load_labeled_csv(file_path)
        if df is None: continue
            
        raw = (df['RawValue'] / 1023.0 * config.V_REF).values
        sig = raw - np.mean(raw)
        labels = df['LabelNumeric'].values
        clean_sig = filters.apply_butterworth_sos(sig, order=4, fs=config.FS)
        
        # 3. Apply Learned Thresholds
        for win_ms in WINDOW_SIZES_MS:
            samples = int(win_ms * config.FS / 1000)
            if len(clean_sig) < samples: continue
            
            target_labels = labels[samples-1:]
            
            for fname, ffunc in feature_funcs.items():
                # Retrieve the generic threshold learned in Phase 1
                thresh = get_trained_threshold(fname, win_ms)
                if thresh is None: continue
                
                # Calculate Feature
                feats = ffunc(clean_sig, samples)
                
                # Make Prediction
                preds = (feats >= thresh).astype(int)
                
                # Score Performance
                acc = accuracy_score(target_labels, preds)
                delay = calculate_detection_delay(target_labels, preds, config.FS)
                
                evaluation_results.append({
                    "Dataset": file_path.name,
                    "Feature": fname,
                    "WindowSize(ms)": win_ms,
                    "Threshold": thresh,
                    "Accuracy": acc * 100, # Convert to %
                    "DetectionDelay(ms)": delay
                })

    # 4. Save Final Results
    df_eval = pd.DataFrame(evaluation_results)
    out_path = RESULTS_DIR / "evaluation_from_training.csv"
    df_eval.to_csv(out_path, index=False)
    
    print(f"Evaluation Complete. Full results saved to:\n{out_path}")
    print(f"You can now run Notebook 06 to visualize these statistics.")