In [3]:
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
import tensorflow as tf

np.random.seed(42)
tf.random.set_seed(42)

# Importing libraries
from keras import backend as K
from keras.models import Sequential
from keras.layers import LSTM, TimeDistributed, Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.layers import Dense, Dropout, Activation

In [5]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')


Mounted at /content/drive


In [6]:
import pandas as pd
import numpy as np
import os
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential, clone_model
from tensorflow.keras.layers import TimeDistributed, Conv1D, MaxPooling1D, Dropout, Flatten, LSTM, Dense, InputLayer
from sklearn.metrics import accuracy_score

# ---------------------------
# Config
# ---------------------------
dataset_path = "/content/drive/MyDrive/UCI_HAR_Dataset"
test_subjects = [14]  # hold-out subjects for testing
batch_size = 32
epochs_general = 30   # epochs for general model
epochs_finetune = 5   # epochs for per-subject fine-tuning
n_steps, n_length = 4, 32
n_hidden = 16

SIGNALS = [
    "body_acc_x", "body_acc_y", "body_acc_z",
    "body_gyro_x", "body_gyro_y", "body_gyro_z",
    "total_acc_x", "total_acc_y", "total_acc_z"
]

from sklearn.model_selection import train_test_split

def load_data(dataset_path, test_size=0.2, seed=42):
    def _read_csv(filename):
        return pd.read_csv(filename, delim_whitespace=True, header=None)

    def load_signals(subset):
        signals_data = []
        for signal in SIGNALS:
            filename = f"{dataset_path}/{subset}/Inertial Signals/{signal}_{subset}.txt"
            signals_data.append(_read_csv(filename).to_numpy())
        return np.transpose(signals_data, (1, 2, 0))  # (samples, timesteps=128, 9 signals)

    def load_y(subset):
        filename = f"{dataset_path}/{subset}/y_{subset}.txt"
        return _read_csv(filename)[0].to_numpy()

    def load_subjects(subset):
        filename = f"{dataset_path}/{subset}/subject_{subset}.txt"
        return _read_csv(filename)[0].to_numpy()

    # Load full dataset (train + test)
    X_train, y_train, subj_train = load_signals("train"), load_y("train"), load_subjects("train")
    X_test,  y_test,  subj_test  = load_signals("test"),  load_y("test"),  load_subjects("test")

    # Merge all
    X_all = np.vstack([X_train, X_test])
    y_all = np.concatenate([y_train, y_test])
    subjects = np.concatenate([subj_train, subj_test])  # not used anymore

    print(f"✅ Loaded all subjects together: {X_all.shape[0]} samples")

    # Shuffle + split (ignore subject IDs)
    train_X, test_X, train_y, test_y = train_test_split(
        X_all, y_all, test_size=test_size, random_state=seed, shuffle=True, stratify=y_all
    )

    print(f"📊 Final split -> Train: {train_X.shape}, Test: {test_X.shape}")

    return train_X, train_y, test_X, test_y


CNN-1D

In [8]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
from sklearn.preprocessing import LabelEncoder

# ---------------------------
# Load dataset (merged subjects, 80/20 split)
# ---------------------------
X_train, y_train, X_test, y_test = load_data(dataset_path, test_size=0.2, seed=42)

# ---------------------------
# Encode labels
# ---------------------------
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)
y_test_encoded  = le.transform(y_test)

n_classes = len(le.classes_)
print("Classes found:", le.classes_)

# reshape into subsequences for CNN-GRU
X_train = X_train.reshape((X_train.shape[0], n_steps, n_length, X_train.shape[2]))
X_test  = X_test.reshape((X_test.shape[0],  n_steps, n_length, X_test.shape[2]))

# convert to torch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor  = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
y_test_tensor  = torch.tensor(y_test_encoded, dtype=torch.long)

# datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset  = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False)



# ---------------------------
# Define pure CNN model
# ---------------------------
class CNN1D(nn.Module):
    def __init__(self, n_steps, n_length, input_dim, n_classes):
        super(CNN1D, self).__init__()

        # Conv layers
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=64, kernel_size=5, padding=2)
        self.bn1   = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=5, padding=2)
        self.bn2   = nn.BatchNorm1d(128)
        self.conv3 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.bn3   = nn.BatchNorm1d(256)

        self.dropout = nn.Dropout(0.5)
        self.pool    = nn.AdaptiveMaxPool1d(1)  # global pooling

        # FC layer
        self.fc = nn.Linear(256, n_classes)

    def forward(self, x):
        # x: (batch, n_steps, n_length, input_dim)
        # Merge steps and time
        x = x.reshape(x.size(0), x.size(1) * x.size(2), x.size(3))  # (B, steps*len, channels)
        x = x.permute(0, 2, 1)  # (B, channels=input_dim, seq_len)

        # Conv stack
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.dropout(x)

        # Global pooling
        x = self.pool(x).squeeze(-1)  # (B, 256)

        # Classifier
        logits = self.fc(x)
        return logits




# ---------------------------
# Train general model
# ---------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
general_model = CNN1D(n_steps, n_length, X_train.shape[3], n_classes).to(device)


# Label smoothing loss
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

optimizer = torch.optim.Adam(general_model.parameters(), lr=0.0005, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

best_val_loss = float("inf")
patience_counter = 0
early_stop_patience = 10

for epoch in range(epochs_general):
    # --- Train ---
    general_model.train()
    total_loss, correct, total = 0, 0, 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = general_model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(general_model.parameters(), max_norm=5.0)
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
    train_loss = total_loss / len(train_loader)
    train_acc = correct / total

    # --- Validation ---
    general_model.eval()
    val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = general_model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    val_loss = val_loss / len(test_loader)
    val_acc = correct / total

    scheduler.step(val_loss)

    print(f"Epoch {epoch+1}/{epochs_general} "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        best_weights = general_model.state_dict()
    else:
        patience_counter += 1
        if patience_counter >= early_stop_patience:
            print("Early stopping triggered!")
            break


# restore best weights
general_model.load_state_dict(best_weights)

# Save weights of general model
general_weights = general_model.state_dict()


✅ Loaded all subjects together: 10299 samples
📊 Final split -> Train: (8239, 128, 9), Test: (2060, 128, 9)
Classes found: [1 2 3 4 5 6]
Epoch 1/30 Train Loss: 0.6233, Train Acc: 0.9198 | Val Loss: 0.6900, Val Acc: 0.9621
Epoch 2/30 Train Loss: 0.5270, Train Acc: 0.9567 | Val Loss: 0.6795, Val Acc: 0.9665
Epoch 3/30 Train Loss: 0.5109, Train Acc: 0.9608 | Val Loss: 0.6744, Val Acc: 0.9733
Epoch 4/30 Train Loss: 0.5049, Train Acc: 0.9646 | Val Loss: 0.6762, Val Acc: 0.9757
Epoch 5/30 Train Loss: 0.4976, Train Acc: 0.9681 | Val Loss: 0.6672, Val Acc: 0.9684
Epoch 6/30 Train Loss: 0.4929, Train Acc: 0.9703 | Val Loss: 0.6731, Val Acc: 0.9816
Epoch 7/30 Train Loss: 0.4914, Train Acc: 0.9707 | Val Loss: 0.6683, Val Acc: 0.9767
Epoch 8/30 Train Loss: 0.4845, Train Acc: 0.9746 | Val Loss: 0.6490, Val Acc: 0.9801
Epoch 9/30 Train Loss: 0.4861, Train Acc: 0.9709 | Val Loss: 0.6669, Val Acc: 0.9786
Epoch 10/30 Train Loss: 0.4789, Train Acc: 0.9765 | Val Loss: 0.6521, Val Acc: 0.9791
Epoch 11/30 T

Transformer

In [None]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
from sklearn.preprocessing import LabelEncoder

# ---------------------------
# Configuration
# ---------------------------
batch_size = 32
epochs = 30
learning_rate = 0.0005
d_model = 128          # transformer embedding size
nhead = 8             # number of attention heads
num_layers = 4        # number of transformer encoder layers
dropout = 0.5

# ---------------------------
# Load dataset (merged subjects, 80/20 split)
# ---------------------------
# X_train, y_train, X_test, y_test = load_data(dataset_path, test_size=0.2, seed=42)
# Ensure X_train.shape = (num_samples, seq_len, n_features)

# ---------------------------
# Encode labels
# ---------------------------
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)
y_test_encoded  = le.transform(y_test)
n_classes = len(le.classes_)
print("Classes found:", le.classes_)

# Convert to torch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor  = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
y_test_tensor  = torch.tensor(y_test_encoded, dtype=torch.long)

# Datasets & loaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset  = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# ---------------------------
# Positional Encoding
# ---------------------------
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x

# ---------------------------
# Transformer-based Model
# ---------------------------
class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_layers, n_classes, dropout=0.3):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
                                                   dim_feedforward=d_model*2, dropout=dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(d_model, n_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x: (batch, seq_len, input_dim)
        x = self.input_proj(x)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = x[:, -1, :]           # take the last time step
        x = self.dropout(x)
        logits = self.fc(x)
        return logits

# ---------------------------
# Initialize model
# ---------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerClassifier(input_dim=X_train_tensor.shape[2],
                              d_model=d_model, nhead=nhead, num_layers=num_layers,
                              n_classes=n_classes, dropout=dropout).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# ---------------------------
# Training loop
# ---------------------------
for epoch in range(epochs):
    model.train()
    total_loss, correct, total = 0, 0, 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
    train_loss = total_loss / len(train_loader)
    train_acc = correct / total

    # Validation
    model.eval()
    val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    val_loss = val_loss / len(test_loader)
    val_acc = correct / total

    print(f"Epoch {epoch+1}/{epochs} "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


Classes found: [1 2 3 4 5 6]
Epoch 1/30 Train Loss: 1.4435, Train Acc: 0.3957 | Val Loss: 1.1733, Val Acc: 0.5184
Epoch 2/30 Train Loss: 0.9770, Train Acc: 0.6002 | Val Loss: 0.6660, Val Acc: 0.7471
Epoch 3/30 Train Loss: 0.6116, Train Acc: 0.7670 | Val Loss: 0.5198, Val Acc: 0.8204
Epoch 4/30 Train Loss: 0.4696, Train Acc: 0.8270 | Val Loss: 0.3798, Val Acc: 0.8553
Epoch 5/30 Train Loss: 0.4283, Train Acc: 0.8422 | Val Loss: 0.4326, Val Acc: 0.8476
Epoch 6/30 Train Loss: 0.4012, Train Acc: 0.8544 | Val Loss: 0.3464, Val Acc: 0.8767
Epoch 7/30 Train Loss: 0.3785, Train Acc: 0.8635 | Val Loss: 0.3621, Val Acc: 0.8723
Epoch 8/30 Train Loss: 0.3533, Train Acc: 0.8730 | Val Loss: 0.3271, Val Acc: 0.8825
Epoch 9/30 Train Loss: 0.3513, Train Acc: 0.8752 | Val Loss: 0.3544, Val Acc: 0.8777
Epoch 10/30 Train Loss: 0.3284, Train Acc: 0.8792 | Val Loss: 0.3653, Val Acc: 0.8762
Epoch 11/30 Train Loss: 0.3234, Train Acc: 0.8838 | Val Loss: 0.3648, Val Acc: 0.8796
Epoch 12/30 Train Loss: 0.3302, Tr

CNN-GRU

In [9]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
from sklearn.preprocessing import LabelEncoder

# ---------------------------
# Load dataset (merged subjects, 80/20 split)
# ---------------------------
X_train, y_train, X_test, y_test = load_data(dataset_path, test_size=0.2, seed=42)

# ---------------------------
# Encode labels
# ---------------------------
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)
y_test_encoded  = le.transform(y_test)

n_classes = len(le.classes_)
print("Classes found:", le.classes_)

# reshape into subsequences for CNN-GRU
X_train = X_train.reshape((X_train.shape[0], n_steps, n_length, X_train.shape[2]))
X_test  = X_test.reshape((X_test.shape[0],  n_steps, n_length, X_test.shape[2]))

# convert to torch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor  = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
y_test_tensor  = torch.tensor(y_test_encoded, dtype=torch.long)

# datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset  = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False)



# ---------------------------
# Define CNN-GRU with residuals + norm
# ---------------------------
class CNNGRU(nn.Module):
    def __init__(self, n_steps, n_length, input_dim, n_hidden, n_classes):
        super(CNNGRU, self).__init__()

        # --- CNN layers (from the pure CNN above) ---
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=64, kernel_size=5, padding=2)
        self.bn1   = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=5, padding=2)
        self.bn2   = nn.BatchNorm1d(128)
        self.conv3 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.bn3   = nn.BatchNorm1d(256)

        self.dropout_cnn = nn.Dropout(0.5)
        self.pool        = nn.AdaptiveMaxPool1d(1)  # global pooling

        # --- GRU ---
        self.gru = nn.GRU(input_size=256, hidden_size=n_hidden, batch_first=True)
        self.layer_norm = nn.LayerNorm(n_hidden)

        # --- FC layer (single, like pure CNN) ---
        self.dropout_fc = nn.Dropout(0.5)
        self.fc = nn.Linear(n_hidden, n_classes)

    def forward(self, x):
        # x: (batch, n_steps, n_length, input_dim)
        x = x.permute(0, 1, 3, 2)   # (B, steps, in_ch, len)
        batch, n_steps, in_ch, seq_len = x.size()
        x = x.reshape(batch * n_steps, in_ch, seq_len)

        # CNN stack
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.dropout_cnn(x)

        # Global pooling → (batch*steps, 256)
        x = self.pool(x).squeeze(-1)

        # Reshape for GRU → (batch, steps, features)
        x = x.reshape(batch, n_steps, -1)

        # GRU
        out, _ = self.gru(x)       # (batch, steps, hidden)
        out = out[:, -1, :]        # take last hidden state
        out = self.layer_norm(out)

        # FC classifier
        out = self.dropout_fc(out)
        logits = self.fc(out)
        return logits




# ---------------------------
# Train general model
# ---------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
general_model = CNNGRU(n_steps, n_length, X_train.shape[3], n_hidden, n_classes).to(device)

# Label smoothing loss
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

optimizer = torch.optim.Adam(general_model.parameters(), lr=0.0005, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

best_val_loss = float("inf")
patience_counter = 0
early_stop_patience = 10

for epoch in range(epochs_general):
    # --- Train ---
    general_model.train()
    total_loss, correct, total = 0, 0, 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = general_model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(general_model.parameters(), max_norm=5.0)
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
    train_loss = total_loss / len(train_loader)
    train_acc = correct / total

    # --- Validation ---
    general_model.eval()
    val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = general_model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    val_loss = val_loss / len(test_loader)
    val_acc = correct / total

    scheduler.step(val_loss)

    print(f"Epoch {epoch+1}/{epochs_general} "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        best_weights = general_model.state_dict()
    else:
        patience_counter += 1
        if patience_counter >= early_stop_patience:
            print("Early stopping triggered!")
            break


# restore best weights
general_model.load_state_dict(best_weights)

# Save weights of general model
general_weights = general_model.state_dict()


✅ Loaded all subjects together: 10299 samples
📊 Final split -> Train: (8239, 128, 9), Test: (2060, 128, 9)
Classes found: [1 2 3 4 5 6]
Epoch 1/30 Train Loss: 0.9705, Train Acc: 0.7684 | Val Loss: 0.6503, Val Acc: 0.9398
Epoch 2/30 Train Loss: 0.7450, Train Acc: 0.8968 | Val Loss: 0.5692, Val Acc: 0.9485
Epoch 3/30 Train Loss: 0.6935, Train Acc: 0.9133 | Val Loss: 0.5312, Val Acc: 0.9481
Epoch 4/30 Train Loss: 0.6803, Train Acc: 0.9201 | Val Loss: 0.5229, Val Acc: 0.9583
Epoch 5/30 Train Loss: 0.6632, Train Acc: 0.9235 | Val Loss: 0.5308, Val Acc: 0.9485
Epoch 6/30 Train Loss: 0.6612, Train Acc: 0.9267 | Val Loss: 0.5183, Val Acc: 0.9573
Epoch 7/30 Train Loss: 0.6556, Train Acc: 0.9280 | Val Loss: 0.5134, Val Acc: 0.9583
Epoch 8/30 Train Loss: 0.6495, Train Acc: 0.9290 | Val Loss: 0.5123, Val Acc: 0.9631
Epoch 9/30 Train Loss: 0.6430, Train Acc: 0.9379 | Val Loss: 0.5089, Val Acc: 0.9617
Epoch 10/30 Train Loss: 0.6344, Train Acc: 0.9392 | Val Loss: 0.5100, Val Acc: 0.9631
Epoch 11/30 T

CNN-LSTM

In [10]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
from sklearn.preprocessing import LabelEncoder

# ---------------------------
# Load dataset (merged subjects, 80/20 split)
# ---------------------------
X_train, y_train, X_test, y_test = load_data(dataset_path, test_size=0.2, seed=42)

# ---------------------------
# Encode labels
# ---------------------------
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)
y_test_encoded  = le.transform(y_test)

n_classes = len(le.classes_)
print("Classes found:", le.classes_)

# reshape into subsequences for CNN-GRU
X_train = X_train.reshape((X_train.shape[0], n_steps, n_length, X_train.shape[2]))
X_test  = X_test.reshape((X_test.shape[0],  n_steps, n_length, X_test.shape[2]))

# convert to torch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor  = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
y_test_tensor  = torch.tensor(y_test_encoded, dtype=torch.long)

# datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset  = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False)



class CNNLSTM(nn.Module):
    def __init__(self, n_steps, n_length, input_dim, n_hidden, n_classes):
        super(CNNLSTM, self).__init__()

        # --- CNN layers (from the pure CNN above) ---
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=64, kernel_size=5, padding=2)
        self.bn1   = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=5, padding=2)
        self.bn2   = nn.BatchNorm1d(128)
        self.conv3 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.bn3   = nn.BatchNorm1d(256)

        self.dropout_cnn = nn.Dropout(0.5)
        self.pool        = nn.AdaptiveMaxPool1d(1)  # global pooling

        # --- LSTM instead of GRU ---
        self.lstm = nn.LSTM(input_size=256, hidden_size=n_hidden, batch_first=True)
        self.layer_norm = nn.LayerNorm(n_hidden)

        # --- Single FC layer ---
        self.dropout_fc = nn.Dropout(0.5)
        self.fc = nn.Linear(n_hidden, n_classes)

    def forward(self, x):
        # x: (batch, n_steps, n_length, input_dim)
        x = x.permute(0, 1, 3, 2)   # (B, steps, in_ch, len)
        batch, n_steps, in_ch, seq_len = x.size()
        x = x.reshape(batch * n_steps, in_ch, seq_len)

        # CNN stack
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.dropout_cnn(x)

        # Global pooling → (batch*steps, 256)
        x = self.pool(x).squeeze(-1)

        # Reshape for LSTM → (batch, steps, features)
        x = x.reshape(batch, n_steps, -1)

        # LSTM (ignore cell state)
        out, (h_n, c_n) = self.lstm(x)   # out: (batch, steps, hidden)
        out = out[:, -1, :]              # take last hidden state
        out = self.layer_norm(out)

        # FC classifier
        out = self.dropout_fc(out)
        logits = self.fc(out)
        return logits





# ---------------------------
# Train general model
# ---------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
general_model = CNNGRU(n_steps, n_length, X_train.shape[3], n_hidden, n_classes).to(device)

# Label smoothing loss
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

optimizer = torch.optim.Adam(general_model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

best_val_loss = float("inf")
patience_counter = 0
early_stop_patience = 10

for epoch in range(epochs_general):
    # --- Train ---
    general_model.train()
    total_loss, correct, total = 0, 0, 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = general_model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(general_model.parameters(), max_norm=5.0)
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
    train_loss = total_loss / len(train_loader)
    train_acc = correct / total

    # --- Validation ---
    general_model.eval()
    val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = general_model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    val_loss = val_loss / len(test_loader)
    val_acc = correct / total

    scheduler.step(val_loss)

    print(f"Epoch {epoch+1}/{epochs_general} "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        best_weights = general_model.state_dict()
    else:
        patience_counter += 1
        if patience_counter >= early_stop_patience:
            print("Early stopping triggered!")
            break


# restore best weights
general_model.load_state_dict(best_weights)

# Save weights of general model
general_weights = general_model.state_dict()


✅ Loaded all subjects together: 10299 samples
📊 Final split -> Train: (8239, 128, 9), Test: (2060, 128, 9)
Classes found: [1 2 3 4 5 6]
Epoch 1/30 Train Loss: 0.9329, Train Acc: 0.7893 | Val Loss: 0.6706, Val Acc: 0.8903
Epoch 2/30 Train Loss: 0.7353, Train Acc: 0.8864 | Val Loss: 0.5577, Val Acc: 0.9398
Epoch 3/30 Train Loss: 0.6975, Train Acc: 0.9050 | Val Loss: 0.5300, Val Acc: 0.9485
Epoch 4/30 Train Loss: 0.6819, Train Acc: 0.9124 | Val Loss: 0.5287, Val Acc: 0.9515
Epoch 5/30 Train Loss: 0.6640, Train Acc: 0.9149 | Val Loss: 0.5198, Val Acc: 0.9602
Epoch 6/30 Train Loss: 0.6524, Train Acc: 0.9273 | Val Loss: 0.5305, Val Acc: 0.9519
Epoch 7/30 Train Loss: 0.6525, Train Acc: 0.9254 | Val Loss: 0.5182, Val Acc: 0.9583
Epoch 8/30 Train Loss: 0.6414, Train Acc: 0.9309 | Val Loss: 0.5167, Val Acc: 0.9578
Epoch 9/30 Train Loss: 0.6362, Train Acc: 0.9345 | Val Loss: 0.5116, Val Acc: 0.9563
Epoch 10/30 Train Loss: 0.6436, Train Acc: 0.9294 | Val Loss: 0.5309, Val Acc: 0.9534
Epoch 11/30 T

Inception-Time

In [11]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
from sklearn.preprocessing import LabelEncoder

# ---------------------------
# Load dataset (merged subjects, 80/20 split)
# ---------------------------
X_train, y_train, X_test, y_test = load_data(dataset_path, test_size=0.2, seed=42)

# ---------------------------
# Encode labels
# ---------------------------
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)
y_test_encoded  = le.transform(y_test)

n_classes = len(le.classes_)
print("Classes found:", le.classes_)

# reshape into subsequences for CNN-GRU
X_train = X_train.reshape((X_train.shape[0], n_steps, n_length, X_train.shape[2]))
X_test  = X_test.reshape((X_test.shape[0],  n_steps, n_length, X_test.shape[2]))

# convert to torch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor  = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
y_test_tensor  = torch.tensor(y_test_encoded, dtype=torch.long)

# datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset  = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False)



# ---------------------------
# Inception Block
# ---------------------------
class InceptionBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_sizes=[9, 19, 39], bottleneck_channels=32):
        super(InceptionBlock, self).__init__()

        self.bottleneck = (
            nn.Conv1d(in_channels, bottleneck_channels, kernel_size=1, bias=False)
            if in_channels > 1 else nn.Identity()
        )

        self.convs = nn.ModuleList([
            nn.Conv1d(
                bottleneck_channels if in_channels > 1 else in_channels,
                out_channels,
                kernel_size=k,
                padding=k // 2,
                bias=False,
            )
            for k in kernel_sizes
        ])

        self.maxpool_conv = nn.Sequential(
            nn.MaxPool1d(kernel_size=3, stride=1, padding=1),
            nn.Conv1d(in_channels, out_channels, kernel_size=1, bias=False)
        )

        self.bn = nn.BatchNorm1d(out_channels * (len(kernel_sizes) + 1))
        self.relu = nn.ReLU()

    def forward(self, x):
        x_bottleneck = self.bottleneck(x)
        out = [conv(x_bottleneck) for conv in self.convs]
        out.append(self.maxpool_conv(x))
        out = torch.cat(out, dim=1)
        out = self.bn(out)
        return self.relu(out)


# ---------------------------
# InceptionTime Model
# ---------------------------
class InceptionTime(nn.Module):
    def __init__(self, n_steps, n_length, input_dim, n_classes, num_blocks=3, out_channels=32):
        super(InceptionTime, self).__init__()

        in_channels = input_dim
        blocks = []
        for _ in range(num_blocks):
            blocks.append(InceptionBlock(in_channels, out_channels))
            in_channels = out_channels * 4  # since 3 convs + 1 maxpool branch
        self.inception_blocks = nn.Sequential(*blocks)

        self.gap = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(in_channels, n_classes)

    def forward(self, x):
        # x: (batch, n_steps, n_length, input_dim)
        x = x.permute(0, 3, 1, 2)   # (B, in_ch, steps, len)
        batch, in_ch, steps, seq_len = x.size()
        x = x.reshape(batch * steps, in_ch, seq_len)  # treat each step separately

        # Inception blocks
        x = self.inception_blocks(x)

        # Global average pooling
        x = self.gap(x).squeeze(-1)

        # Reshape back to (batch, steps, features)
        x = x.reshape(batch, steps, -1)

        # Take last step (or could use mean over steps)
        x = x[:, -1, :]

        # FC layer
        return self.fc(x)





# ---------------------------
# Train general model
# ---------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
general_model = InceptionTime(
    n_steps=n_steps,
    n_length=n_length,
    input_dim=X_train.shape[3],
    n_classes=n_classes,
    num_blocks=3,
    out_channels=32
).to(device)


# Label smoothing loss
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

optimizer = torch.optim.Adam(general_model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

best_val_loss = float("inf")
patience_counter = 0
early_stop_patience = 10

for epoch in range(epochs_general):
    # --- Train ---
    general_model.train()
    total_loss, correct, total = 0, 0, 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = general_model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(general_model.parameters(), max_norm=5.0)
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
    train_loss = total_loss / len(train_loader)
    train_acc = correct / total

    # --- Validation ---
    general_model.eval()
    val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = general_model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    val_loss = val_loss / len(test_loader)
    val_acc = correct / total

    scheduler.step(val_loss)

    print(f"Epoch {epoch+1}/{epochs_general} "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        best_weights = general_model.state_dict()
    else:
        patience_counter += 1
        if patience_counter >= early_stop_patience:
            print("Early stopping triggered!")
            break


# restore best weights
general_model.load_state_dict(best_weights)

# Save weights of general model
general_weights = general_model.state_dict()


✅ Loaded all subjects together: 10299 samples
📊 Final split -> Train: (8239, 128, 9), Test: (2060, 128, 9)
Classes found: [1 2 3 4 5 6]
Epoch 1/30 Train Loss: 0.8657, Train Acc: 0.7725 | Val Loss: 0.7018, Val Acc: 0.8583
Epoch 2/30 Train Loss: 0.6409, Train Acc: 0.8972 | Val Loss: 0.6072, Val Acc: 0.9146
Epoch 3/30 Train Loss: 0.5946, Train Acc: 0.9175 | Val Loss: 0.5892, Val Acc: 0.9117
Epoch 4/30 Train Loss: 0.5793, Train Acc: 0.9245 | Val Loss: 0.5766, Val Acc: 0.9335
Epoch 5/30 Train Loss: 0.5692, Train Acc: 0.9302 | Val Loss: 0.5579, Val Acc: 0.9350
Epoch 6/30 Train Loss: 0.5541, Train Acc: 0.9363 | Val Loss: 0.5545, Val Acc: 0.9398
Epoch 7/30 Train Loss: 0.5468, Train Acc: 0.9391 | Val Loss: 0.5546, Val Acc: 0.9350
Epoch 8/30 Train Loss: 0.5396, Train Acc: 0.9415 | Val Loss: 0.5327, Val Acc: 0.9485
Epoch 9/30 Train Loss: 0.5317, Train Acc: 0.9437 | Val Loss: 0.5327, Val Acc: 0.9490
Epoch 10/30 Train Loss: 0.5353, Train Acc: 0.9448 | Val Loss: 0.5384, Val Acc: 0.9398
Epoch 11/30 T

MLSTM-FCN

In [20]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import LabelEncoder
import numpy as np

# ---------------------------
# MLSTM-FCN model
# ---------------------------
class MLSTM_FCN(nn.Module):
    def __init__(self, n_channels, n_classes, lstm_hidden=128):
        super(MLSTM_FCN, self).__init__()

        # LSTM branch
        self.lstm = nn.LSTM(
            input_size=n_channels,
            hidden_size=lstm_hidden,
            num_layers=1,
            batch_first=True
        )

        # FCN branch
        self.conv1 = nn.Conv1d(in_channels=n_channels, out_channels=128, kernel_size=8, padding=4)
        self.bn1   = nn.BatchNorm1d(128)
        self.conv2 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=5, padding=2)
        self.bn2   = nn.BatchNorm1d(256)
        self.conv3 = nn.Conv1d(in_channels=256, out_channels=128, kernel_size=3, padding=1)
        self.bn3   = nn.BatchNorm1d(128)
        self.global_pool = nn.AdaptiveAvgPool1d(1)

        # Fully connected classifier
        self.fc = nn.Linear(lstm_hidden + 128, n_classes)

    def forward(self, x):
        # x shape: (batch, timesteps, channels)
        batch_size = x.size(0)

        # LSTM branch
        lstm_out, _ = self.lstm(x)            # (batch, timesteps, lstm_hidden)
        lstm_out = lstm_out[:, -1, :]         # take last timestep

        # FCN branch
        fcn_x = x.permute(0, 2, 1)            # (batch, channels, timesteps)
        fcn_x = F.relu(self.bn1(self.conv1(fcn_x)))
        fcn_x = F.relu(self.bn2(self.conv2(fcn_x)))
        fcn_x = F.relu(self.bn3(self.conv3(fcn_x)))
        fcn_x = self.global_pool(fcn_x)       # (batch, channels, 1)
        fcn_x = fcn_x.squeeze(-1)             # (batch, channels)

        # Concatenate branches
        out = torch.cat([lstm_out, fcn_x], dim=1)

        # FC classifier
        logits = self.fc(out)
        return logits


# ---------------------------
# Example usage
# ---------------------------

# Reshape your dataset for MLSTM-FCN: (samples, timesteps, channels)
X_train_mlfcn = X_train.reshape(X_train.shape[0], X_train.shape[1]*X_train.shape[2], X_train.shape[3])
X_test_mlfcn  = X_test.reshape(X_test.shape[0],  X_test.shape[1]*X_test.shape[2],  X_test.shape[3])

# Convert to torch tensors
X_train_tensor = torch.tensor(X_train_mlfcn, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
X_test_tensor  = torch.tensor(X_test_mlfcn, dtype=torch.float32)
y_test_tensor  = torch.tensor(y_test_encoded, dtype=torch.long)

# DataLoaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset  = TensorDataset(X_test_tensor, y_test_tensor)
train_loader  = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader   = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# ---------------------------
# Train MLSTM-FCN
# ---------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MLSTM_FCN(n_channels=X_train_tensor.shape[2], n_classes=len(le.classes_)).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(30):
    model.train()
    total_loss, correct, total = 0, 0, 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
    train_acc = correct / total
    train_loss = total_loss / len(train_loader)

    # Validation
    model.eval()
    val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    val_acc = correct / total
    val_loss /= len(test_loader)

    print(f"Epoch {epoch+1}/{epochs_general} "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


Epoch 1/30 Train Loss: 0.2822, Train Acc: 0.9125 | Val Loss: 0.1710, Val Acc: 0.9311
Epoch 2/30 Train Loss: 0.1441, Train Acc: 0.9440 | Val Loss: 0.1345, Val Acc: 0.9432
Epoch 3/30 Train Loss: 0.1461, Train Acc: 0.9410 | Val Loss: 0.1149, Val Acc: 0.9544
Epoch 4/30 Train Loss: 0.1245, Train Acc: 0.9494 | Val Loss: 0.1267, Val Acc: 0.9417
Epoch 5/30 Train Loss: 0.1224, Train Acc: 0.9485 | Val Loss: 0.1205, Val Acc: 0.9490
Epoch 6/30 Train Loss: 0.1277, Train Acc: 0.9460 | Val Loss: 0.1061, Val Acc: 0.9578
Epoch 7/30 Train Loss: 0.1217, Train Acc: 0.9481 | Val Loss: 0.1014, Val Acc: 0.9583
Epoch 8/30 Train Loss: 0.1149, Train Acc: 0.9525 | Val Loss: 0.1017, Val Acc: 0.9568
Epoch 9/30 Train Loss: 0.1179, Train Acc: 0.9510 | Val Loss: 0.1033, Val Acc: 0.9636
Epoch 10/30 Train Loss: 0.1134, Train Acc: 0.9495 | Val Loss: 0.1130, Val Acc: 0.9578
Epoch 11/30 Train Loss: 0.1236, Train Acc: 0.9504 | Val Loss: 0.1113, Val Acc: 0.9510
Epoch 12/30 Train Loss: 0.1129, Train Acc: 0.9519 | Val Loss: 0

TCN

In [22]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
from sklearn.preprocessing import LabelEncoder

# ---------------------------
# Configuration
# ---------------------------
batch_size = 32
epochs = 30
learning_rate = 0.0005
dropout = 0.3
n_hidden = 64
kernel_size = 3
n_steps = 4
n_length = 32

# ---------------------------
# Load dataset (merged subjects, 80/20 split)
# ---------------------------
# X_train, y_train, X_test, y_test = load_data(dataset_path, test_size=0.2, seed=42)
# Example: make sure X_train has shape (num_samples, n_steps, n_length, n_channels)
# For demonstration, you must replace the above line with your actual data loading function

# ---------------------------
# Encode labels
# ---------------------------
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)
y_test_encoded  = le.transform(y_test)
n_classes = len(le.classes_)
print("Classes found:", le.classes_)

# Reshape into (batch, channels, sequence_length)
X_train = X_train.reshape((X_train.shape[0], n_steps * X_train.shape[2], X_train.shape[3]))
X_test  = X_test.reshape((X_test.shape[0], n_steps * X_test.shape[2], X_test.shape[3]))
X_train = X_train.transpose(0, 2, 1)  # (batch, channels, seq_len)
X_test  = X_test.transpose(0, 2, 1)

# Convert to torch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor  = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
y_test_tensor  = torch.tensor(y_test_encoded, dtype=torch.long)

# Datasets & loaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset  = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# ---------------------------
# Define TCN model
# ---------------------------
class TemporalBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, dilation, padding, dropout):
        super(TemporalBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size,
                               stride=stride, padding=padding, dilation=dilation)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.dropout1 = nn.Dropout(dropout)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size,
                               stride=stride, padding=padding, dilation=dilation)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.dropout2 = nn.Dropout(dropout)
        self.downsample = nn.Conv1d(in_channels, out_channels, 1) \
            if in_channels != out_channels else None
        self.relu = nn.ReLU()

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.dropout1(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        out = self.dropout2(out)

        # Crop output to match input length
        if out.size(2) > x.size(2):
            out = out[:, :, :x.size(2)]

        res = x if self.downsample is None else self.downsample(x)
        if res.size(2) != out.size(2):
            res = res[:, :, :out.size(2)]

        return self.relu(out + res)

class TCN(nn.Module):
    def __init__(self, num_inputs, num_channels, kernel_size, dropout, n_classes):
        super(TCN, self).__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            padding = (kernel_size - 1) * dilation_size
            layers += [TemporalBlock(in_channels, out_channels, kernel_size,
                                     stride=1, dilation=dilation_size, padding=padding, dropout=dropout)]
        self.network = nn.Sequential(*layers)
        self.fc = nn.Linear(num_channels[-1], n_classes)

    def forward(self, x):
        y1 = self.network(x)
        y1 = y1[:, :, -1]  # take last time step
        return self.fc(y1)

# ---------------------------
# Initialize model
# ---------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_channels = [64, 64, 64]  # three TCN layers
model = TCN(num_inputs=X_train_tensor.shape[1], num_channels=num_channels,
            kernel_size=kernel_size, dropout=dropout, n_classes=n_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# ---------------------------
# Training loop
# ---------------------------
for epoch in range(epochs):
    model.train()
    total_loss, correct, total = 0, 0, 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
    train_loss = total_loss / len(train_loader)
    train_acc = correct / total

    # Validation
    model.eval()
    val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    val_loss = val_loss / len(test_loader)
    val_acc = correct / total

    print(f"Epoch {epoch+1}/{epochs} "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


Classes found: [1 2 3 4 5 6]
Epoch 1/30 Train Loss: 0.8867, Train Acc: 0.6438 | Val Loss: 0.5043, Val Acc: 0.8053
Epoch 2/30 Train Loss: 0.5187, Train Acc: 0.7912 | Val Loss: 0.2954, Val Acc: 0.9049
Epoch 3/30 Train Loss: 0.3792, Train Acc: 0.8582 | Val Loss: 0.2319, Val Acc: 0.9155
Epoch 4/30 Train Loss: 0.3046, Train Acc: 0.8835 | Val Loss: 0.1822, Val Acc: 0.9311
Epoch 5/30 Train Loss: 0.2672, Train Acc: 0.8967 | Val Loss: 0.1568, Val Acc: 0.9422
Epoch 6/30 Train Loss: 0.2454, Train Acc: 0.9038 | Val Loss: 0.1577, Val Acc: 0.9374
Epoch 7/30 Train Loss: 0.2217, Train Acc: 0.9143 | Val Loss: 0.1458, Val Acc: 0.9442
Epoch 8/30 Train Loss: 0.2085, Train Acc: 0.9195 | Val Loss: 0.1421, Val Acc: 0.9408
Epoch 9/30 Train Loss: 0.1994, Train Acc: 0.9240 | Val Loss: 0.1387, Val Acc: 0.9437
Epoch 10/30 Train Loss: 0.1993, Train Acc: 0.9244 | Val Loss: 0.1368, Val Acc: 0.9481
Epoch 11/30 Train Loss: 0.1849, Train Acc: 0.9271 | Val Loss: 0.1329, Val Acc: 0.9471
Epoch 12/30 Train Loss: 0.1775, Tr