In [None]:
import numpy as np
import pandas as pd
import ast
from typing import List, Dict, Any

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import GroupKFold
from sklearn.metrics import accuracy_score, precision_recall_fscore_support


# -----------------------------
# Dataset & DataLoader helpers
# -----------------------------

class FoGWindowDataset(Dataset):
    """
    Dataset for window-level FoG data.

    Expects a DataFrame with columns:
        - 'sequence'     : (T, 3) accel window (list/array/string)
        - 'window_label' : 0/1
        - 'subject'      : patient ID (not used in __getitem__, but kept in df)
    """
    def __init__(self, df: pd.DataFrame):
        self.df = df.reset_index(drop=True)

    def _parse_sequence(self, seq_obj):
        # Handles numpy array, python list, or string from CSV
        if isinstance(seq_obj, np.ndarray):
            arr = seq_obj.astype(np.float32)
        elif isinstance(seq_obj, list):
            arr = np.asarray(seq_obj, dtype=np.float32)
        else:
            # assume string like "[[...], [...], ...]"
            arr = np.array(ast.literal_eval(seq_obj), dtype=np.float32)

        # Ensure shape (T, 3)
        if arr.ndim == 1:
            arr = arr.reshape(-1, 1)
        return arr

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        seq = self._parse_sequence(row['sequence'])  # (T, 3)
        label = float(row['window_label'])

        x = torch.from_numpy(seq)               # (T, 3)
        y = torch.tensor(label, dtype=torch.float32)  # scalar 0/1
        return x, y


def make_dataloader(df: pd.DataFrame,
                    batch_size: int = 64,
                    shuffle: bool = True,
                    num_workers: int = 0) -> DataLoader:
    dataset = FoGWindowDataset(df)
    loader = DataLoader(dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        num_workers=num_workers)
    return loader


# -----------------------------
# Patient-independent folds
# -----------------------------

def create_group_kfold_splits(df: pd.DataFrame,
                              n_splits: int = 5,
                              random_state: int = 42):
    """
    Patient-independent K-fold splits using GroupKFold on 'subject'.

    Returns a list of (train_idx, val_idx) index arrays.
    """
    groups = df['subject'].values
    y = df['window_label'].values
    X = np.arange(len(df))

    gkf = GroupKFold(n_splits=n_splits)

    # GroupKFold is deterministic; we can shuffle subjects before if needed:
    # but simplest is to just use GroupKFold directly.
    splits = list(gkf.split(X, y, groups))
    return splits




# -----------------------------
# Metrics helper
# -----------------------------

def compute_metrics(y_true: np.ndarray,
                    y_pred_probs: np.ndarray,
                    threshold: float = 0.5) -> Dict[str, float]:
    """
    Compute accuracy, precision, recall, F1 for binary classification.
    """
    y_pred = (y_pred_probs >= threshold).astype(int)

    acc = accuracy_score(y_true, y_pred)
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average='binary', zero_division=0
    )

    return {
        'accuracy': float(acc),
        'precision': float(precision),
        'recall': float(recall),
        'f1': float(f1),
    }


# -----------------------------
# One epoch train / eval
# -----------------------------

def run_epoch(model: nn.Module,
              loader: DataLoader,
              criterion,
              device: torch.device,
              optimizer=None) -> Dict[str, float]:
    """
    If optimizer is provided: training mode.
    Otherwise: evaluation mode.

    Returns dict: loss, accuracy, precision, recall, f1
    """
    if optimizer is None:
        model.eval()
        torch.set_grad_enabled(False)
    else:
        model.train()
        torch.set_grad_enabled(True)

    all_losses = []
    all_labels = []
    all_probs = []

    for x, y in loader:
        x = x.to(device)              # (batch, T, 3)
        y = y.to(device)              # (batch,)

        logits = model(x)             # (batch,)
        loss = criterion(logits, y)

        if optimizer is not None:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        all_losses.append(loss.item())

        probs = torch.sigmoid(logits)
        all_probs.append(probs.detach().cpu().numpy())
        all_labels.append(y.detach().cpu().numpy())

    all_probs = np.concatenate(all_probs, axis=0)
    all_labels = np.concatenate(all_labels, axis=0)

    metrics = compute_metrics(all_labels, all_probs)
    metrics['loss'] = float(np.mean(all_losses))

    return metrics


# -----------------------------
# Training one fold (with early stopping & scheduler)
# -----------------------------

def train_one_fold(
    train_df: pd.DataFrame,
    val_df: pd.DataFrame,
    input_dim: int = 3,
    hidden_dim: int = 128,
    num_layers: int = 2,
    bidirectional: bool = False,
    dropout: float = 0.0,
    batch_size: int = 16,
    num_epochs: int = 50,
    lr: float = 1e-4,
    weight_decay: float = 1e-4,
    early_stopping_patience: int = 7,
    device: str = None
) -> Dict[str, Any]:
    """
    Train LSTM on one fold with early stopping on val F1 and
    ReduceLROnPlateau scheduler on val loss.

    Returns:
        dict with:
            - 'best_state_dict'
            - 'history' : list of per-epoch metrics
    """
    if device is None:
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
    device = torch.device(device)

    # Dataloaders
    train_loader = make_dataloader(train_df, batch_size=batch_size, shuffle=True)
    val_loader = make_dataloader(val_df, batch_size=batch_size, shuffle=False)

    # Model, loss, optimizer, scheduler
    #model =  ParallelCNNLSTMTransformer().to(device)
    # ---------------------------
    # Quick test
    # ---------------------------

    B = 4
    T = 256
    C = 3
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    model = CNN_model().to(device)


    criterion = nn.BCEWithLogitsLoss()
    #criterion = WeightedBCEWithLogitsLoss()
    #criterion = FocalTverskyLoss(gamma=0.75, alpha=0.7)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=5
    )

    best_val_f1 = -np.inf
    best_state_dict = None
    epochs_without_improvement = 0

    history = []  # list of dicts per epoch

    for epoch in range(1, num_epochs + 1):
        # --- Train ---
        train_metrics = run_epoch(model, train_loader, criterion, device, optimizer)

        # --- Validation ---
        val_metrics = run_epoch(model, val_loader, criterion, device, optimizer=None)

        # Step scheduler on validation loss
        scheduler.step(val_metrics['loss'])

        # Early stopping on val F1
        val_f1 = val_metrics['f1']
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            best_state_dict = {k: v.cpu().clone() for k, v in model.state_dict().items()}
            epochs_without_improvement = 0
        else:
            epochs_without_improvement += 1

        # Record metrics
        epoch_record = {
            'epoch': epoch,
            'train_loss': train_metrics['loss'],
            'train_accuracy': train_metrics['accuracy'],
            'train_precision': train_metrics['precision'],
            'train_recall': train_metrics['recall'],
            'train_f1': train_metrics['f1'],
            'val_loss': val_metrics['loss'],
            'val_accuracy': val_metrics['accuracy'],
            'val_precision': val_metrics['precision'],
            'val_recall': val_metrics['recall'],
            'val_f1': val_metrics['f1'],
            'lr': optimizer.param_groups[0]['lr'],
        }
        history.append(epoch_record)

        print(
            f"[Epoch {epoch:02d}] "
            f"Train Loss={train_metrics['loss']:.4f}, Acc: {train_metrics['accuracy']:.4f}, F1={train_metrics['f1']:.4f} | "

            f"Val Loss={val_metrics['loss']:.4f}, Acc: {val_metrics['accuracy']:.4f} F1={val_metrics['f1']:.4f}"
        )

        if epochs_without_improvement >= early_stopping_patience:
            print(f"Early stopping at epoch {epoch} (no val F1 improvement for "
                  f"{early_stopping_patience} epochs).")
            break

    return {
        'best_state_dict': best_state_dict,
        'history': history,
    }

def summarize_and_save_cv_results(
        results: Dict[str, Any],
        output_prefix: str = "cv_results",
        save_best_model_path: str = "best_model_overall.pt",
        save_all_folds_dir: str = "all_best_models"):
    """
    - Extract best metrics from each fold
    - Compute mean and std
    - Save fold-wise results and summary to Excel
    - Save ALL best model weights for all folds
    - Save the global best model across folds
    """

    import os
    os.makedirs(save_all_folds_dir, exist_ok=True)

    fold_best_rows = []
    global_best_f1 = -np.inf
    global_best_state_dict = None
    global_best_fold = None

    # -----------------------------
    # Extract best epoch per fold
    # -----------------------------
    for fold_data in results['folds']:
        fold_idx = fold_data['fold_idx']
        history = fold_data['history']

        # Best epoch based on val_f1
        best_epoch_record = max(history, key=lambda x: x['val_f1'])

        row = {
            'fold': fold_idx,
            'val_accuracy': best_epoch_record['val_accuracy'],
            'val_precision': best_epoch_record['val_precision'],
            'val_recall': best_epoch_record['val_recall'],
            'val_f1': best_epoch_record['val_f1'],
            'val_loss': best_epoch_record['val_loss'],
        }

        fold_best_rows.append(row)

        # ---------------------------------------
        # Save THIS FOLD'S best model separately
        # ---------------------------------------
        fold_model_path = os.path.join(
            save_all_folds_dir, f"best_model_fold_{fold_idx}.pt"
        )
        torch.save(fold_data['best_state_dict'], fold_model_path)
        print(f" Saved best model for Fold {fold_idx} ‚Üí {fold_model_path}")

        # ---------------------------------------
        # Track the GLOBAL best model
        # ---------------------------------------
        if best_epoch_record['val_f1'] > global_best_f1:
            global_best_f1 = best_epoch_record['val_f1']
            global_best_state_dict = fold_data['best_state_dict']
            global_best_fold = fold_idx

    df_folds = pd.DataFrame(fold_best_rows)

    # -----------------------------
    # Mean & Std
    # -----------------------------
    summary = {}
    for col in ['val_accuracy', 'val_precision', 'val_recall', 'val_f1', 'val_loss']:
        summary[f'{col}_mean'] = df_folds[col].mean()
        summary[f'{col}_std'] = df_folds[col].std()

    df_summary = pd.DataFrame([summary])

    # -----------------------------
    # Save Excel files
    # -----------------------------
    folds_excel = f"{output_prefix}_folds.xlsx"
    summary_excel = f"{output_prefix}_summary.xlsx"

    df_folds.to_excel(folds_excel, index=False)
    df_summary.to_excel(summary_excel, index=False)

    print(f" Fold metrics saved to Excel: {folds_excel}")
    print(f" Summary metrics saved to Excel: {summary_excel}")

    # -----------------------------
    # Save global best model
    # -----------------------------
    if global_best_state_dict is not None:
        torch.save(global_best_state_dict, save_best_model_path)
        print(f"üèÜ Global best model (Fold {global_best_fold}) saved ‚Üí {save_best_model_path}")

    return {
        'fold_metrics_df': df_folds,
        'summary_df': df_summary,
        'best_fold': global_best_fold,
        'best_f1': global_best_f1
    }



# -----------------------------
# Full K-fold cross-validation
# -----------------------------

def cross_validate_patient_independent(
    df: pd.DataFrame,
    n_splits: int = 5,
    random_state: int = 42,
    **train_kwargs
) -> Dict[str, Any]:
    """
    Run patient-independent K-fold cross-validation.

    Returns:
        {
          'folds': [
            {
              'fold_idx': 0,
              'train_subjects': [...],
              'val_subjects': [...],
              'history': [...],           # list of per-epoch dicts
              'best_state_dict': {...},
            },
            ...
          ]
        }
    """
    splits = create_group_kfold_splits(df, n_splits=n_splits, random_state=random_state)

    results = {'folds': []}

    for fold_idx, (train_idx, val_idx) in enumerate(splits):
        print("=" * 60)
        print(f"Fold {fold_idx + 1}/{n_splits}")

        train_df = df.iloc[train_idx].reset_index(drop=True)
        val_df = df.iloc[val_idx].reset_index(drop=True)

        train_subjects = sorted(train_df['subject'].unique())
        val_subjects = sorted(val_df['subject'].unique())

        print(f"Train subjects (n={len(train_subjects)}): {train_subjects}")
        print(f"Val subjects   (n={len(val_subjects)}): {val_subjects}")
        print(f"Train windows: {len(train_df)}, Val windows: {len(val_df)}")

        fold_result = train_one_fold(
            train_df=train_df,
            val_df=val_df,
            **train_kwargs
        )

        results['folds'].append({
            'fold_idx': fold_idx,
            'train_subjects': train_subjects,
            'val_subjects': val_subjects,
            'history': fold_result['history'],
            'best_state_dict': fold_result['best_state_dict'],
        })
    #save the results:
    summary = summarize_and_save_cv_results(
        results,
        output_prefix="fog_cv",
        save_best_model_path="best_fog_model.pt"
    )

    results['summary'] = summary


    return results


In [3]:
import torch
import torch.nn as nn


# ===========================
#   CNN Branch (unchanged order)
# ===========================
class CNNBranch(nn.Module):
    def __init__(self, in_ch=3, channels=[64, 128, 128], kernel_size=5, skip=True):
        super().__init__()
        self.skip = skip
        layers = []
        ch = in_ch

        for out_ch in channels:
            layers.append(nn.Sequential(
                nn.Conv1d(ch, out_ch, kernel_size, padding=kernel_size // 2),
                nn.BatchNorm1d(out_ch),
                nn.ReLU()
            ))
            ch = out_ch

        self.blocks = nn.ModuleList(layers)
        self.out_channels = ch

        if skip and in_ch != ch:
            self.skip_proj = nn.Conv1d(in_ch, ch, kernel_size=1)
        else:
            self.skip_proj = None

    def forward(self, x):
        # x: (B, C, T)
        residual_input = x
        out = x

        for block in self.blocks:
            prev = out
            out = block(out)
            if self.skip and prev.shape == out.shape:
                out = out + prev

        if self.skip and self.skip_proj is not None:
            residual_input = self.skip_proj(residual_input)
            out = out + residual_input

        return out  # (B, C, T)


# ===========================
#   Simple Attention Layer
# ===========================
class AttentionLayer(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.score_fc = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        # x: (B, T, F)
        scores = self.score_fc(x)       # (B, T, 1)
        weights = torch.softmax(scores, dim=1)  # (B, T, 1)
        context = (weights * x).sum(dim=1)      # (B, F)
        return context



class CNN_model(nn.Module):
    def __init__(self,
                 seq_len=256,
                 in_channels=3,
                 cnn_channels=[32, 64, 128, 128],
                 tcn_kernel=3,
                 dropout=0.05):
        super().__init__()

        # CNN branch (1D CNN style)
        self.cnn = CNNBranch(
            in_ch=in_channels,
            channels=cnn_channels,
            kernel_size=tcn_kernel,
            skip=True
        )

        # Attention pooling needs feature dim from CNN output
        self.attention = AttentionLayer(hidden_dim=self.cnn.out_channels)

        # Classifier MLP
        self.pool_dropout = nn.Dropout(0.2)
        self.mlp = nn.Sequential(
            nn.Linear(self.cnn.out_channels, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        # Expecting x: (B, C, T)
        if x.dim() != 3:
            raise ValueError(f"Expected 3D input (B,C,T), got {x.shape}")

        # CNN needs channels-first ‚Üí already (B, C, T)
        x = x.permute(0, 2, 1)
        c = self.cnn(x)  # (B, C, T)

        # Attention expects (B, T, F)
        c = c.permute(0, 2, 1)  # (B, T, C_feat)

        context = self.attention(c)
        context = self.pool_dropout(context)

        out = self.mlp(context).squeeze(-1)
        return out


In [4]:
path = 'C:\\Users\\Student\\Desktop\\Abouhashem\\DeepLearningProject\\'
train_df = pd.read_pickle(path+ "FoG_windows_train.pkl")
test_df  = pd.read_pickle(path+"FoG_windows_test.pkl")

In [5]:

cv_results = cross_validate_patient_independent(
    train_df,
    n_splits=5,
    random_state=42,
    input_dim=3,
    hidden_dim=64,
    num_layers=1,
    bidirectional=False,
    dropout=0.2,
    batch_size=16,
    num_epochs=50,
    lr=1e-4,
    weight_decay=1e-4,
    early_stopping_patience=7,
    device=None,  # auto: cuda if available else cpu
)


Fold 1/5
Train subjects (n=32): ['07285e', '194d1d', '220a17', '231c3b', '24a59d', '251738', '2a39f8', '2c98f7', '31d269', '364459', '3b2403', '48fd62', '4b39ac', '4ca9b3', '4f13b4', '516a67', '54ee6e', '66341b', '7688c1', '79011a', '7eb666', '7fcee9', '8db7dd', '93f49f', 'a03db7', 'bc3908', 'c85fdf', 'c8e721', 'd8836b', 'd9312a', 'e8919c', 'f2c8aa']
Val subjects   (n=2): ['2d57c2', '87174c']
Train windows: 15896, Val windows: 3984
[Epoch 01] Train Loss=0.5113, Acc: 0.7408, F1=0.1806 | Val Loss=0.8286, Acc: 0.5063 F1=0.6224
[Epoch 02] Train Loss=0.4387, Acc: 0.7948, F1=0.5394 | Val Loss=1.2104, Acc: 0.3993 F1=0.5009
[Epoch 03] Train Loss=0.3964, Acc: 0.8246, F1=0.6359 | Val Loss=0.8167, Acc: 0.5670 F1=0.6847
[Epoch 04] Train Loss=0.3765, Acc: 0.8349, F1=0.6624 | Val Loss=1.0997, Acc: 0.3963 F1=0.4976
[Epoch 05] Train Loss=0.3585, Acc: 0.8441, F1=0.6854 | Val Loss=0.8748, Acc: 0.5304 F1=0.6492
[Epoch 06] Train Loss=0.3445, Acc: 0.8508, F1=0.7030 | Val Loss=0.6514, Acc: 0.6767 F1=0.7824


In [None]:
import torch
import os
import numpy as np
import torch.nn as nn
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score,
    f1_score, roc_auc_score, average_precision_score,
    confusion_matrix
)
import matplotlib.pyplot as plt


def evaluate_models_on_test_ensemble(
    test_df: pd.DataFrame,
    model_paths: List[str],
    batch_size: int = 32,
    device: str = None,
    fold_weights: List[float] = None   # OPTIONAL for weighted voting
) -> Dict[str, Any]:
    """
    Evaluate an ensemble of models on the test set using:
        - Soft voting (default)
        - Hard majority voting
        - Optional weighted voting

    Args:
        test_df        : Test dataframe
        model_paths    : List of paths to saved fold models
        batch_size     : Test batch size
        device         : 'cpu' or 'cuda'
        fold_weights   : Optional weights per fold (e.g. fold F1)

    Returns:
        Dictionary with all ensemble metrics and voting predictions
    """

    # -----------------------------
    # Device Setup
    # -----------------------------
    if device is None:
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
    device = torch.device(device)

    # -----------------------------
    # DataLoader
    # -----------------------------
    test_loader = make_dataloader(test_df, batch_size=batch_size, shuffle=False)

    criterion = nn.BCEWithLogitsLoss()

    # collect predictions from each model
    prob_list = []     # soft voting
    hard_list = []     # hard voting
    targets_list = []

    # -----------------------------
    # Load each model and predict
    # -----------------------------
    for idx, model_path in enumerate(model_paths):
        if not os.path.exists(model_path):
            raise FileNotFoundError(f" Model not found: {model_path}")

        print(f"üì• Loading model: {model_path}")

        model = CNN_model().to(device)
        model.load_state_dict(torch.load(model_path, map_location=device))
        model.eval()

        fold_probs = []
        fold_hard = []
        fold_targets = []

        with torch.no_grad():
            for X, y in test_loader:
                X = X.to(device)
                y = y.to(device)

                logits = model(X)
                probs = torch.sigmoid(logits).cpu().numpy().flatten()

                preds = (probs >= 0.5).astype(int)

                fold_probs.extend(probs)
                fold_hard.extend(preds)
                fold_targets.extend(y.cpu().numpy().astype(int))

        prob_list.append(np.array(fold_probs))
        hard_list.append(np.array(fold_hard))
        targets_list = fold_targets     # same for all folds

    prob_matrix = np.vstack(prob_list)     # shape: (num_models, N)
    hard_matrix = np.vstack(hard_list)     # shape: (num_models, N)
    ground_truth = np.array(targets_list)

    # -----------------------------
    # Voting Methods
    # -----------------------------

    # SOFT VOTING (best default)
    soft_probs = prob_matrix.mean(axis=0)
    soft_preds = (soft_probs >= 0.5).astype(int)

    # HARD VOTING
    hard_preds = np.round(hard_matrix.mean(axis=0)).astype(int)

    # WEIGHTED VOTING (if provided)
    if fold_weights is not None:
        w = np.array(fold_weights).reshape(-1, 1)
        weighted_probs = (prob_matrix * w).sum(axis=0) / w.sum()
        weighted_preds = (weighted_probs >= 0.5).astype(int)
    else:
        weighted_preds = None

    # -----------------------------
    # Metric Function
    # -----------------------------
    def compute_metrics(preds, probs=None):
        return {
            "loss": criterion(
                torch.tensor(preds, dtype=torch.float32),
                torch.tensor(ground_truth, dtype=torch.float32)
            ).item(),
            "accuracy": accuracy_score(ground_truth, preds),
            "precision": precision_score(ground_truth, preds, zero_division=0),
            "recall": recall_score(ground_truth, preds, zero_division=0),
            "f1": f1_score(ground_truth, preds, zero_division=0),
            "roc_auc": roc_auc_score(ground_truth, probs) if probs is not None else None,
            "pr_auc": average_precision_score(ground_truth, probs) if probs is not None else None,
            "confusion_matrix": confusion_matrix(ground_truth, preds)
        }

    # -----------------------------
    # Compute Metrics
    # -----------------------------
    metrics_soft = compute_metrics(soft_preds, soft_probs)
    metrics_hard = compute_metrics(hard_preds, soft_probs)
    metrics_weighted = compute_metrics(weighted_preds, weighted_probs) if weighted_preds is not None else None

    # -----------------------------
    # Print Results
    # -----------------------------
    print("\n\n **SOFT VOTING RESULTS**")
    for k, v in metrics_soft.items():
        if k != "confusion_matrix":
            print(f"{k}: {v}")

    print("\n **HARD VOTING RESULTS**")
    for k, v in metrics_hard.items():
        if k != "confusion_matrix":
            print(f"{k}: {v}")

    if metrics_weighted is not None:
        print("\n **WEIGHTED VOTING RESULTS**")
        for k, v in metrics_weighted.items():
            if k != "confusion_matrix":
                print(f"{k}: {v}")

    return {
        "metrics_soft": metrics_soft,
        "metrics_hard": metrics_hard,
        "metrics_weighted": metrics_weighted,
        "soft_preds": soft_preds,
        "soft_probs": soft_probs,
        "hard_preds": hard_preds,
        "weighted_preds": weighted_preds
    }


In [10]:
directory_path = '.\\all_best_models\\'
file_paths = []
for root, _, files in os.walk(directory_path):
    for file in files:
        file_paths.append(os.path.join(root, file))
file_paths

['.\\all_best_models\\best_model_fold_0.pt',
 '.\\all_best_models\\best_model_fold_1.pt',
 '.\\all_best_models\\best_model_fold_2.pt',
 '.\\all_best_models\\best_model_fold_3.pt',
 '.\\all_best_models\\best_model_fold_4.pt']

In [11]:
results = evaluate_models_on_test_ensemble(
    test_df=test_df,
    model_paths= file_paths
)

üì• Loading model: .\all_best_models\best_model_fold_0.pt
üì• Loading model: .\all_best_models\best_model_fold_1.pt
üì• Loading model: .\all_best_models\best_model_fold_2.pt
üì• Loading model: .\all_best_models\best_model_fold_3.pt
üì• Loading model: .\all_best_models\best_model_fold_4.pt


üéØ **SOFT VOTING RESULTS**
loss: 0.7099300622940063
accuracy: 0.8619817997977756
precision: 0.5157232704402516
recall: 0.5795053003533569
f1: 0.5457570715474209
roc_auc: 0.8675213942483088
pr_auc: 0.5526740578325077

üó≥Ô∏è **HARD VOTING RESULTS**
loss: 0.7107999920845032
accuracy: 0.8609706774519716
precision: 0.5123456790123457
recall: 0.5865724381625441
f1: 0.5469522240527183
roc_auc: 0.8675213942483088
pr_auc: 0.5526740578325077


In [None]:
# üó≥Ô∏è **HARD VOTING RESULTS**
# loss: 0.7022083401679993
# accuracy: 0.8842264914054601
# precision: 0.5721925133689839
# recall: 0.7561837455830389
# f1: 0.6514459665144596
# roc_auc: 0.9207417367647519
# pr_auc: 0.708372352847281

In [None]:
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, roc_curve, precision_recall_curve


def save_and_plot_ensemble_results(
    eval_results: Dict[str, Any],
    ground_truth: np.ndarray,
    output_folder: str = "ensemble_results/",
    model_output_path: str = "ensemble_final_model.pt"
):
    """
    Creates plots (confusion matrix, ROC, PR curve),
    saves predictions, and exports the ensemble model.

    Args:
        eval_results      : Output of evaluate_models_on_test_ensemble()
        ground_truth      : Numpy array of true labels
        output_folder     : Directory to save images & CSV
        model_output_path : File to save final ensemble soft-voting model weights
    """

    import os
    os.makedirs(output_folder, exist_ok=True)

    # -----------------------------
    # Extract predictions
    # -----------------------------
    soft_probs = eval_results["soft_probs"]
    soft_preds = eval_results["soft_preds"]
    hard_preds = eval_results["hard_preds"]
    weighted_preds = eval_results["weighted_preds"]

    # =============================
    #  1. Save predictions to CSV
    # =============================
    pred_df = pd.DataFrame({
        "y_true": ground_truth,
        "soft_prob": soft_probs,
        "soft_pred": soft_preds,
        "hard_pred": hard_preds,
        "weighted_pred": weighted_preds if weighted_preds is not None else np.nan
    })

    csv_path = os.path.join(output_folder, "ensemble_predictions.csv")
    pred_df.to_csv(csv_path, index=False)
    print(f" Predictions saved to: {csv_path}")

    # =============================
    #  2. Confusion Matrix Plot
    # =============================
    cm = confusion_matrix(ground_truth, soft_preds)
    plt.figure(figsize=(5, 4))
    plt.imshow(cm, cmap="Blues")
    plt.title("Confusion Matrix")
    plt.colorbar()
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.xticks([0, 1])
    plt.yticks([0, 1])

    for i in range(2):
        for j in range(2):
            plt.text(j, i, str(cm[i, j]), ha='center', va='center', color='red')

    cm_path = os.path.join(output_folder, "confusion_matrix.png")
    plt.savefig(cm_path, dpi=300)
    plt.close()
    print(f" Confusion Matrix saved to: {cm_path}")

    # =============================
    #  3. ROC Curve Plot
    # =============================
    fpr, tpr, _ = roc_curve(ground_truth, soft_probs)
    plt.figure(figsize=(6, 5))
    plt.plot(fpr, tpr, label="ROC")
    plt.plot([0, 1], [0, 1], linestyle="--")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("ROC Curve (Soft Voting)")
    plt.legend()

    roc_path = os.path.join(output_folder, "roc_curve.png")
    plt.savefig(roc_path, dpi=300)
    plt.close()
    print(f" ROC Curve saved to: {roc_path}")

    # =============================
    #  4. Precision‚ÄìRecall Curve
    # =============================
    precision, recall, _ = precision_recall_curve(ground_truth, soft_probs)
    plt.figure(figsize=(6, 5))
    plt.plot(recall, precision, label="Precision‚ÄìRecall Curve")
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title("Precision‚ÄìRecall Curve (Soft Voting)")
    plt.legend()

    pr_path = os.path.join(output_folder, "pr_curve.png")
    plt.savefig(pr_path, dpi=300)
    plt.close()
    print(f" Precision‚ÄìRecall Curve saved to: {pr_path}")

    # =============================
    #  5. Export Final Ensemble Model
    # =============================

    """
    Ensemble model: soft-voting means averaging probabilities.
    You cannot save a single PyTorch state dict unless we create
    a small wrapper module below.
    """

    class SoftVotingEnsemble(torch.nn.Module):
        def __init__(self):
            super().__init__()

        def forward(self, prob_list):
            """
            prob_list: tensor shape (num_models, batch_size)
            """
            return prob_list.mean(dim=0)

    ensemble_model = SoftVotingEnsemble()
    torch.save(ensemble_model.state_dict(), model_output_path)

    print(f"üß† Final Ensemble Model saved to: {model_output_path}")

    print("\nüéâ ALL RESULTS SAVED SUCCESSFULLY!\n")


In [13]:
test_df['window_label']

0       0
1       0
2       0
3       0
4       0
       ..
1973    0
1974    0
1975    0
1976    0
1977    0
Name: window_label, Length: 1978, dtype: int64

In [14]:
ground_truth = test_df['window_label'].values  # or however your labels stored

save_and_plot_ensemble_results(
    eval_results=results,
    ground_truth=ground_truth,
    output_folder="ensemble_results/",
    model_output_path="ensemble_final_soft_voting.pt"
)

üìÑ Predictions saved to: ensemble_results/ensemble_predictions.csv
üìä Confusion Matrix saved to: ensemble_results/confusion_matrix.png
üìà ROC Curve saved to: ensemble_results/roc_curve.png
üìâ Precision‚ÄìRecall Curve saved to: ensemble_results/pr_curve.png
üß† Final Ensemble Model saved to: ensemble_final_soft_voting.pt

üéâ ALL RESULTS SAVED SUCCESSFULLY!

