### LSTM model architecture


##### Input x 
Tensor of shape: [batch, seq_len, num_features=3] 
- Given 3D input feature vector

##### Output x: 
Tensor of shape: [batch, 5] 
- Sigmoid probabilities for each finger (0-1)

Hyperparameters to be tuned, but current hyperparameters should be solid.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class LSTM_model(nn.Module):
    def __init__(self):
        super(LSTM_model, self).__init__()
        self.lstm1 = nn.LSTM(input_size=3, hidden_size=64, batch_first=True)
        self.lstm2 = nn.LSTM(input_size=64, hidden_size=128, batch_first=True)
        self.fc1 = nn.Linear(128, 64)
        self.dropout_fc = nn.Dropout(0.5)
        self.fc2 = nn.Linear(64, 5)  # 5 output for the fingers

    def forward(self, x):
        # x: (batch, seq_len, 3)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]  # Keep only last timestep (batch, 128)
        x = F.relu(self.fc1(x))
        x = self.dropout_fc(x)
        x = self.fc2(x)
        x = torch.sigmoid(x)  # Sigmoid activation for each finger
        return x  # (batch, 5)


In [None]:
# Imports for training cells
import torch
import numpy as np
from pathlib import Path


In [None]:
class ProcessedDataset(torch.utils.data.Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X).float()
        self.y = torch.from_numpy(y).float()
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


In [None]:
class LSTMModel(torch.nn.Module):
    def __init__(self, input_size, hidden_size=128, num_layers=2, output_size=5, dropout=0.2):
        super().__init__()
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.fc = torch.nn.Linear(hidden_size, output_size)
    def forward(self, x):
        # x: (B, T, C)
        out, (h_n, c_n) = self.lstm(x)
        last = out[:, -1, :]
        logits = self.fc(last)
        return logits


In [None]:
def train_model(X, y, epochs=5, batch_size=64, lr=1e-3, val_split=0.1, device='cpu'):
    """Train LSTM on processed data arrays X,y. Prints loss and per-finger accuracy per epoch and returns trained model."""
    import torch
    from torch.utils.data import DataLoader
    import numpy as _np
    import torch.nn as _nn

    N = len(X)
    idx = _np.random.permutation(N)
    nval = int(N * val_split)
    val_idx = idx[:nval]
    train_idx = idx[nval:]

    train_ds = ProcessedDataset(X[train_idx], y[train_idx])
    val_ds = ProcessedDataset(X[val_idx], y[val_idx])
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

    device = torch.device(device)

    # Prefer user's existing `LSTM_model` if present in the notebook
    if 'LSTM_model' in globals():
        base_model = LSTM_model()
    else:
        base_model = LSTMModel(input_size=X.shape[2])

    class WrappedModel(torch.nn.Module):
        def __init__(self, base, in_dim):
            super().__init__()
            self.base = base
            self.in_dim = in_dim
            expected = None
            if hasattr(base, 'lstm1') and hasattr(base.lstm1, 'input_size'):
                expected = base.lstm1.input_size
            elif hasattr(base, 'lstm') and hasattr(base.lstm, 'input_size'):
                expected = base.lstm.input_size
            self.expected = expected
            if expected is not None and expected != in_dim:
                self.project = _nn.Linear(in_dim, expected)
            else:
                self.project = None
        def forward(self, x):
            if self.project is not None:
                x = self.project(x)
            return self.base(x)

    model = WrappedModel(base_model, X.shape[2]).to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)

    # Use BCEWithLogitsLoss if model output is unbounded, else BCELoss
    with torch.no_grad():
        sample = torch.zeros((1, X.shape[1], X.shape[2]), device=device)
        out = model(sample)
        out_min, out_max = float(out.min()), float(out.max())
    if 0.0 <= out_min and out_max <= 1.0:
        loss_fn = torch.nn.BCELoss()
        print('Using BCELoss (model returns probabilities).')
    else:
        loss_fn = torch.nn.BCEWithLogitsLoss()
        print('Using BCEWithLogitsLoss (model returns logits).')

    for epoch in range(1, epochs+1):
        model.train()
        total_loss = 0.0
        train_correct = 0.0
        train_total = 0
        for xb, yb in train_loader:
            xb = xb.to(device)
            yb = yb.to(device)
            opt.zero_grad()
            logits = model(xb)
            loss = loss_fn(logits, yb)
            loss.backward()
            opt.step()
            total_loss += loss.item() * xb.size(0)

            # Compute accuracy (threshold 0.5)
            with torch.no_grad():
                probs = torch.sigmoid(logits) if not isinstance(loss_fn, torch.nn.BCELoss) else logits
                preds = (probs > 0.5).float()
                train_correct += (preds == yb).float().sum().item()
                train_total += preds.numel()

        total_loss /= len(train_ds)
        train_acc = train_correct / train_total if train_total > 0 else 0.0

        model.eval()
        val_loss = 0.0
        val_correct = 0.0
        val_total = 0
        with torch.no_grad():
            for xb, yb in val_loader:
                xb = xb.to(device)
                yb = yb.to(device)
                logits = model(xb)
                val_loss += loss_fn(logits, yb).item() * xb.size(0)
                probs = torch.sigmoid(logits) if not isinstance(loss_fn, torch.nn.BCELoss) else logits
                preds = (probs > 0.5).float()
                val_correct += (preds == yb).float().sum().item()
                val_total += preds.numel()
        val_loss = val_loss / len(val_ds) if len(val_ds) > 0 else 0.0
        val_acc = val_correct / val_total if val_total > 0 else 0.0

        print(f"Epoch {epoch}: train_loss={total_loss:.4f}, val_loss={val_loss:.4f}, train_acc={train_acc:.4f}, val_acc={val_acc:.4f}")

    return model


In [None]:
# Checking load_processed availability
subjects = list(range(1, 11))
try:
    X, y = load_processed(subjects)
    print('Loaded X,y shapes for example:', X.shape, y.shape)
except Exception as e:
    print('load_processed not available or failed:', e)


In [None]:
# Load processed dataset saved by data/run_preprocess.py
import numpy as np
from pathlib import Path

def load_processed(subjects, data_dir=Path('..') / 'data' / 'processed'):
    """Load and concatenate processed X/y for a list of subject ids.

    subjects: iterable of ints or strings (e.g. [1,2,3] or ['1','2'])
    data_dir: Path to the `data/processed` directory (defaults to ../data/processed from this notebook)
    Returns: X (N,T,C), y (N,5)
    """
    data_dir = Path(data_dir)
    Xs = []
    ys = []
    for sid in subjects:
        s = str(sid)
        sdir = data_dir / f"S{s}"
        Xp = sdir / "X.npy"
        yp = sdir / "y.npy"
        if not Xp.exists() or not yp.exists():
            raise FileNotFoundError(f"Missing processed files for S{s} in {sdir}")
        Xs.append(np.load(Xp))
        ys.append(np.load(yp))
    X = np.concatenate(Xs, axis=0)
    y = np.concatenate(ys, axis=0)
    return X, y

# Example usage (adjust subject list as needed)
subjects = list(range(1, 11))
X, y = load_processed(subjects)
print('Loaded processed dataset:', X.shape, y.shape)

In [None]:
# Run training in-notebook 
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using device:', device)

subjects = list(range(1, 11))
X, y = load_processed(subjects)
print('Loaded processed data:', X.shape, y.shape)

# Run a short training session using the notebook `train_model` which will prefer `LSTM_model` if present
model = train_model(X, y, epochs=3, batch_size=128, device=device)

# Print a few sample predictions on validation split to inspect outputs
import numpy as _np
N = len(X)
idx = _np.random.permutation(N)
nval = int(N * 0.1)
val_idx = idx[:nval]

model.eval()
with torch.no_grad():
    sample_X = torch.from_numpy(X[val_idx[:8]]).float().to(device)
    preds = model(sample_X).cpu().numpy()
    print('Sample predictions (first 8):')
    print(preds)
    print('Sample targets (first 8):')
    print(y[val_idx[:8]])
