# Stage 1: Unseen Dataset Evaluation
## CNN + FP (1D-CNN + FP_AMPL Late Fusion) — LOS/NLOS Classification

Evaluates trained `stage1_cnn_fp_best.pt` on **completely unseen** 4 scenarios (2400 samples).

In [None]:
CONFIG = {
    "pre_crop": 10,
    "post_crop": 50,
    "total_len": 60,
    "search_start": 740,
    "search_end": 890,
    "embedding_size": 128,
    "input_channels": 1,
    "fp_size": 3,
    "fp_embed_size": 16,
    "dropout": 0.4,
    "batch_size": 64,
    "seed": 42,
}

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import (
    confusion_matrix, ConfusionMatrixDisplay,
    classification_report, roc_curve, auc
)
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")
torch.manual_seed(CONFIG["seed"])
np.random.seed(CONFIG["seed"])

## 1 · Model Architecture

In [None]:
class CNN_FP_Classifier(nn.Module):
    """
    1D-CNN with FP_AMPL late fusion.

    CIR passes through 3 conv blocks with BatchNorm + GAP to produce a
    128-dim embedding. FP_AMPL1/2/3 is projected to 16-dim via a linear
    layer. Both are concatenated (144-dim) and fed to the MLP classifier.
    """
    def __init__(self, input_channels=1, embedding_size=128, dropout=0.4,
                 fp_size=3, fp_embed_size=16):
        super().__init__()
        self.embedding_size = embedding_size
        self.fp_embed_size = fp_embed_size

        # 1D-CNN encoder (identical to CIR-only CNN)
        self.encoder = nn.Sequential(
            nn.Conv1d(input_channels, 16, kernel_size=5, padding=2),
            nn.BatchNorm1d(16), nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=5, padding=2, stride=2),
            nn.BatchNorm1d(32), nn.ReLU(),
            nn.Conv1d(32, embedding_size, kernel_size=3, padding=1, stride=2),
            nn.BatchNorm1d(embedding_size), nn.ReLU(),
        )
        self.gap = nn.AdaptiveAvgPool1d(1)

        # FP projection: (3,) -> (fp_embed_size,)
        self.fp_proj = nn.Linear(fp_size, fp_embed_size)

        # Classifier: (embedding_size + fp_embed_size) -> 1
        self.classifier = nn.Sequential(
            nn.Linear(embedding_size + fp_embed_size, 32),
            nn.SiLU(),
            nn.Dropout(dropout),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

    def _encode_cir(self, x):
        """CNN encoder: (B, 1, 60) -> (B, embedding_size)."""
        features = self.encoder(x)
        return self.gap(features).squeeze(-1)

    def forward(self, x, fp_features=None, return_dynamics=False):
        cnn_embed = self._encode_cir(x)  # (B, 128)

        if fp_features is not None:
            fp_embed = self.fp_proj(fp_features)  # (B, 16)
        else:
            fp_embed = torch.zeros(x.size(0), self.fp_embed_size, device=x.device)

        fused = torch.cat([cnn_embed, fp_embed], dim=-1)  # (B, 144)
        pred = self.classifier(fused)

        if return_dynamics:
            features = self.encoder(x)  # (B, 128, 15)
            return pred, features
        return pred

    def embed(self, x, fp_features=None):
        """Return fused embedding for Stage 2/3 compatibility."""
        cnn_embed = self._encode_cir(x)
        if fp_features is not None:
            fp_embed = self.fp_proj(fp_features)
            return torch.cat([cnn_embed, fp_embed], dim=-1)  # (B, 144)
        return cnn_embed  # (B, 128)

## 2 · Load Unseen Dataset

Same preprocessing as training. **Includes FP_AMPL1/2/3 extraction.**

**Note**: CNN uses channels-first format `(B, 1, 60)` for Conv1d.

In [None]:
def get_roi_alignment(sig, search_start=CONFIG["search_start"],
                      search_end=CONFIG["search_end"]):
    region = sig[search_start:search_end]
    if len(region) == 0:
        return np.argmax(sig)
    peak_local = np.argmax(region)
    peak_idx = search_start + peak_local
    peak_val = sig[peak_idx]
    noise_section = sig[:search_start]
    if len(noise_section) > 10:
        noise_mean = np.mean(noise_section)
        noise_std = np.std(noise_section)
        threshold = max(noise_mean + 3 * noise_std, 0.05 * peak_val)
    else:
        threshold = 0.05 * peak_val
    leading_edge = peak_idx
    for i in range(peak_idx, max(search_start - 20, 0), -1):
        if sig[i] < threshold:
            leading_edge = i + 1
            break
    return leading_edge


def load_unseen_fp_dataset(filepath):
    """Load and preprocess the unseen dataset — CIR (channels-first) + FP_AMPL features."""
    PRE = CONFIG["pre_crop"]
    TOTAL = CONFIG["total_len"]

    df = pd.read_csv(filepath)
    print(f"Loaded {len(df)} rows from {filepath}")

    cir_cols = sorted(
        [c for c in df.columns if c.startswith("CIR")],
        key=lambda c: int(c.replace("CIR", ""))
    )

    X_list, y_list, fp_list = [], [], []
    source_files = []
    skipped = 0

    for _, row in df.iterrows():
        sig = pd.to_numeric(row[cir_cols], errors='coerce').fillna(0).astype(float).values
        rxpacc_col = 'RXPACC' if 'RXPACC' in row.index else 'RX_PACC'
        rxpacc = float(row.get(rxpacc_col, 128.0))
        if rxpacc <= 0:
            skipped += 1
            continue
        sig = sig / rxpacc

        f1 = float(row.get('FP_AMPL1', 0)) / max(rxpacc, 1) / 64.0
        f2 = float(row.get('FP_AMPL2', 0)) / max(rxpacc, 1) / 64.0
        f3 = float(row.get('FP_AMPL3', 0)) / max(rxpacc, 1) / 64.0
        fp_list.append([f1, f2, f3])

        leading_edge = get_roi_alignment(sig)
        start = max(0, leading_edge - PRE)
        end = start + TOTAL
        if end > len(sig):
            end = len(sig)
            start = max(0, end - TOTAL)
        crop = sig[start:end]
        if len(crop) < TOTAL:
            crop = np.pad(crop, (0, TOTAL - len(crop)), mode='constant')
        local_min = np.min(crop)
        local_max = np.max(crop)
        rng = local_max - local_min
        if rng > 0:
            crop = (crop - local_min) / rng
        else:
            crop = np.zeros(TOTAL)

        X_list.append(crop)
        y_list.append(float(row["Label"]))
        source_files.append(row["Source_File"])

    if skipped:
        print(f"Skipped {skipped} rows (bad RXPACC)")

    # CNN channels-first: (N, 1, 60)
    X = np.array(X_list, dtype=np.float32).reshape(-1, 1, TOTAL)
    y = np.array(y_list, dtype=np.float32)
    F = np.array(fp_list, dtype=np.float32)

    print(f"Preprocessed: {len(y)} samples  |  LOS: {int((y==0).sum())}  |  NLOS: {int((y==1).sum())}")
    print(f"X shape: {X.shape} (channels-first)  |  F shape: {F.shape}")
    return X, y, F, source_files

In [None]:
X_unseen, y_unseen, F_unseen, sources = load_unseen_fp_dataset(
    "../dataset/channels/unseen_dataset.csv"
)

## 3 · Load Trained Model

In [None]:
model = CNN_FP_Classifier(
    input_channels=CONFIG["input_channels"],
    embedding_size=CONFIG["embedding_size"],
    dropout=CONFIG["dropout"],
    fp_size=CONFIG["fp_size"],
    fp_embed_size=CONFIG["fp_embed_size"],
).to(device)

state_dict = torch.load("stage1_cnn_fp_best.pt", map_location=device, weights_only=True)
model.load_state_dict(state_dict)
model.eval()
print(f"Model loaded: {sum(p.numel() for p in model.parameters()):,} parameters")

## 4 · Inference on Unseen Data

In [None]:
X_te = torch.tensor(X_unseen).to(device)
y_te = torch.tensor(y_unseen).unsqueeze(1).to(device)
F_te = torch.tensor(F_unseen).to(device)

with torch.no_grad():
    pred = model(X_te, fp_features=F_te)
    pred_binary = (pred > 0.5).float()
    accuracy = (pred_binary == y_te).float().mean().item()

pred_np = pred.cpu().numpy().flatten()
pred_label_np = (pred_np > 0.5).astype(float)
true_np = y_unseen.flatten()

print(f"Unseen Dataset Accuracy: {100 * accuracy:.2f}%")
print(f"{'='*50}")
print(classification_report(true_np, pred_label_np, target_names=["LOS", "NLOS"]))

## 5 · Per-Scenario Breakdown

In [None]:
sources_arr = np.array(sources)
scenario_names = np.array(["_".join(s.split("_")[:2]) for s in sources_arr])
unique_scenarios = sorted(set(scenario_names))

print(f"{'Scenario':<16} {'Type':<6} {'Samples':>7} {'Correct':>8} {'Accuracy':>9}")
print("-" * 52)

for scenario in unique_scenarios:
    mask = scenario_names == scenario
    s_true = true_np[mask]
    s_pred = pred_label_np[mask]
    s_correct = (s_true == s_pred).sum()
    s_total = len(s_true)
    s_acc = s_correct / s_total * 100
    s_type = "NLOS" if "nlos" in scenario else "LOS"
    print(f"{scenario:<16} {s_type:<6} {s_total:>7} {s_correct:>8} {s_acc:>8.2f}%")

print("-" * 52)
print(f"{'TOTAL':<16} {'':6} {len(true_np):>7} {int((true_np == pred_label_np).sum()):>8} {100*accuracy:>8.2f}%")

## 6 · Confusion Matrix

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

cm = confusion_matrix(true_np, pred_label_np)
disp = ConfusionMatrixDisplay(cm, display_labels=["LOS", "NLOS"])
disp.plot(ax=axes[0], cmap="Blues", values_format="d")
axes[0].set_title(f"Overall — Unseen Dataset\nAccuracy: {100*accuracy:.2f}%")

scenario_labels, scenario_accs, scenario_colors = [], [], []
for scenario in unique_scenarios:
    mask = scenario_names == scenario
    s_acc = (true_np[mask] == pred_label_np[mask]).mean() * 100
    scenario_labels.append(scenario)
    scenario_accs.append(s_acc)
    scenario_colors.append("#2ecc71" if "los" in scenario and "nlos" not in scenario else "#e74c3c")

bars = axes[1].barh(scenario_labels, scenario_accs, color=scenario_colors, edgecolor="white", height=0.6)
axes[1].set_xlim(0, 105)
axes[1].set_xlabel("Accuracy (%)")
axes[1].set_title("Per-Scenario Accuracy")
for bar, acc in zip(bars, scenario_accs):
    axes[1].text(bar.get_width() + 1, bar.get_y() + bar.get_height()/2,
                 f"{acc:.1f}%", va="center", fontweight="bold")

plt.tight_layout()
plt.show()

## 7 · ROC Curve

In [None]:
fpr, tpr, _ = roc_curve(true_np, pred_np)
roc_auc = auc(fpr, tpr)

fig, ax = plt.subplots(figsize=(6, 6))
ax.plot(fpr, tpr, color="#3498db", lw=2, label=f"Unseen AUC = {roc_auc:.4f}")
ax.plot([0, 1], [0, 1], "k--", lw=1, alpha=0.5)
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.set_title("ROC Curve — Unseen Dataset (CNN+FP)")
ax.legend(loc="lower right")
ax.set_aspect("equal")
plt.tight_layout()
plt.show()

## 8 · Embedding Visualization

In [None]:
los_mask = true_np == 0
nlos_mask = true_np == 1

with torch.no_grad():
    embeddings = model.embed(X_te, fp_features=F_te).cpu().numpy()

scaler = StandardScaler()
emb_scaled = scaler.fit_transform(embeddings)
pca = PCA(n_components=2)
emb_2d = pca.fit_transform(emb_scaled)

misclassified = true_np != pred_label_np
los_correct = los_mask & ~misclassified
los_wrong = los_mask & misclassified
nlos_correct = nlos_mask & ~misclassified
nlos_wrong = nlos_mask & misclassified

tn = int(los_correct.sum())
fp = int(los_wrong.sum())
tp = int(nlos_correct.sum())
fn = int(nlos_wrong.sum())

fig, axes = plt.subplots(1, 2, figsize=(16, 6))

cm = confusion_matrix(true_np, pred_label_np)
disp = ConfusionMatrixDisplay(cm, display_labels=["LOS", "NLOS"])
disp.plot(ax=axes[0], cmap="Blues", values_format="d")
axes[0].set_title(f"Confusion Matrix\nAccuracy: {100*accuracy:.2f}%", fontsize=13)

ax = axes[1]
ax.scatter(emb_2d[los_correct, 0], emb_2d[los_correct, 1],
           c="#2ecc71", alpha=0.4, s=15, label=f"TN — LOS correct ({tn})", zorder=3)
ax.scatter(emb_2d[nlos_correct, 0], emb_2d[nlos_correct, 1],
           c="#e74c3c", alpha=0.4, s=15, label=f"TP — NLOS correct ({tp})", zorder=3)
if fp > 0:
    ax.scatter(emb_2d[los_wrong, 0], emb_2d[los_wrong, 1],
               c="gold", marker='X', s=60, edgecolors='black', linewidths=0.8,
               alpha=0.9, label=f"FP — LOS\u2192NLOS ({fp})", zorder=7)
if fn > 0:
    ax.scatter(emb_2d[nlos_wrong, 0], emb_2d[nlos_wrong, 1],
               c="darkorange", marker='X', s=60, edgecolors='black', linewidths=0.8,
               alpha=0.9, label=f"FN — NLOS\u2192LOS ({fn})", zorder=7)

ax.set_xlabel(f"PC1 ({pca.explained_variance_ratio_[0]*100:.1f}%)")
ax.set_ylabel(f"PC2 ({pca.explained_variance_ratio_[1]*100:.1f}%)")
ax.set_title("CNN+FP Embedding (PCA)", fontsize=13)
ax.legend(loc="upper left", fontsize=9, title="Confusion Matrix", title_fontsize=10)

plt.tight_layout()
plt.show()

print(f"TN: {tn} | FP: {fp} | TP: {tp} | FN: {fn} | Total: {tn+fp+tp+fn}")

## 9 · Prediction Confidence Distribution

In [None]:
fig, ax = plt.subplots(figsize=(8, 4))

ax.hist(pred_np[true_np == 0], bins=40, alpha=0.6, color="#27ae60",
        label="LOS samples", density=True)
ax.hist(pred_np[true_np == 1], bins=40, alpha=0.6, color="#e74c3c",
        label="NLOS samples", density=True)
ax.axvline(0.5, color="black", ls="--", lw=1.5, label="Threshold")
ax.set_xlabel("P(NLOS)")
ax.set_ylabel("Density")
ax.set_title("Prediction Confidence Distribution — Unseen Dataset (CNN+FP)")
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 10 · Misclassified Samples Analysis

In [None]:
misclassified = true_np != pred_label_np
n_wrong = misclassified.sum()

if n_wrong == 0:
    print("No misclassified samples — perfect accuracy on unseen data!")
else:
    print(f"Misclassified: {n_wrong} / {len(true_np)} ({100*n_wrong/len(true_np):.2f}%)")
    print(f"\n{'Source File':<28} {'True':>6} {'Pred':>6} {'P(NLOS)':>9}")
    print("-" * 55)

    mis_idx = np.where(misclassified)[0]
    for i in mis_idx[:30]:
        lbl = "NLOS" if true_np[i] == 1 else "LOS"
        plb = "NLOS" if pred_label_np[i] == 1 else "LOS"
        print(f"{sources[i]:<28} {lbl:>6} {plb:>6} {pred_np[i]:>9.4f}")

    if len(mis_idx) > 30:
        print(f"... and {len(mis_idx) - 30} more")

    print(f"\n{'Scenario':<16} {'Errors':>7} {'Total':>7} {'Error Rate':>11}")
    print("-" * 45)
    for scenario in unique_scenarios:
        mask = scenario_names == scenario
        errs = (true_np[mask] != pred_label_np[mask]).sum()
        total = mask.sum()
        if errs > 0:
            print(f"{scenario:<16} {errs:>7} {total:>7} {100*errs/total:>10.2f}%")

In [None]:
print("Unseen dataset evaluation complete.")