In [None]:
import os
import glob
import h5py
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

# =========================
# CONFIGURATION
# =========================
DATA_DIR = "SHM-Data"
FREQUENCY = "40kHz"              # <-- change here (e.g. "60kHz")
FILENAME = f"pc_f{FREQUENCY}.h5"

BASELINE_IDS = [10, 20, 30, 40, 50]
DAMAGE_IDS   = [1, 6, 11, 16, 21]

BATCH_SIZE = 32
EPOCHS = 10
LEARNING_RATE = 1e-3

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# =========================
# DATA LOADING
# =========================
def load_h5_file(filepath):
    """Load catch signals from an OGW h5 file."""
    with h5py.File(filepath, "r") as f:
        signals = f["pitchcatch/catch"][:]  # shape: (66, signal_length)
    return signals

def collect_data():
    X, y = [], []

    # Baseline (healthy = 0)
    for bid in BASELINE_IDS:
        pattern = os.path.join(DATA_DIR, f"*baseline_{bid}*", FILENAME)
        for filepath in glob.glob(pattern):
            signals = load_h5_file(filepath)
            X.append(signals)
            y.extend([0] * signals.shape[0])

    # Damage (damaged = 1)
    for did in DAMAGE_IDS:
        pattern = os.path.join(DATA_DIR, f"*D{did}*", FILENAME)
        for filepath in glob.glob(pattern):
            signals = load_h5_file(filepath)
            X.append(signals)
            y.extend([1] * signals.shape[0])

    X = np.vstack(X)
    y = np.array(y)

    return X, y

X, y = collect_data()
print(f"Loaded data shape: {X.shape}, Labels: {y.shape}")

# Normalize each signal (important!)
X = X / np.max(np.abs(X), axis=1, keepdims=True)

# =========================
# PYTORCH DATASET
# =========================
class LambWaveDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        # Add channel dimension for Conv1D
        return self.X[idx].unsqueeze(0), self.y[idx]

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

train_dataset = LambWaveDataset(X_train, y_train)
test_dataset  = LambWaveDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# =========================
# 1D CNN MODEL
# =========================
class SimpleCNN(nn.Module):
    def __init__(self, input_length):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=9, padding=4),
            nn.ReLU(),
            nn.MaxPool1d(2),

            nn.Conv1d(16, 32, kernel_size=9, padding=4),
            nn.ReLU(),
            nn.MaxPool1d(2),
        )

        reduced_length = input_length // 4
        self.classifier = nn.Sequential(
            nn.Linear(32 * reduced_length, 64),
            nn.ReLU(),
            nn.Linear(64, 2)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        return self.classifier(x)

model = SimpleCNN(input_length=X.shape[1]).to(DEVICE)

# =========================
# TRAINING SETUP
# =========================
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

# =========================
# TRAINING LOOP
# =========================
for epoch in range(EPOCHS):
    model.train()
    running_loss = 0.0

    for x_batch, y_batch in train_loader:
        x_batch, y_batch = x_batch.to(DEVICE), y_batch.to(DEVICE)

        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch [{epoch+1}/{EPOCHS}], Loss: {running_loss/len(train_loader):.4f}")

# =========================
# EVALUATION
# =========================
model.eval()
correct, total = 0, 0

with torch.no_grad():
    for x_batch, y_batch in test_loader:
        x_batch, y_batch = x_batch.to(DEVICE), y_batch.to(DEVICE)
        outputs = model(x_batch)
        preds = torch.argmax(outputs, dim=1)
        correct += (preds == y_batch).sum().item()
        total += y_batch.size(0)

accuracy = correct / total
print(f"Test Accuracy: {accuracy:.3f}")
