In [None]:
# !pip install torch pandas scikit-learn numpy 
# !pip install kagglehub

In [None]:
# import kagglehub

# # Download latest version
# path = kagglehub.dataset_download("mansibmursalin/ninapro-db1-full-dataset")

# print("Path to dataset files:", path)

## Training time

In [9]:
import pandas as pd
path = "/Users/guanyulu/Documents/GitHub/BioMechDesignTeam_EMG/data_collection/data/Ninapro_DB1.csv"
data = pd.read_csv(path)
data = data.iloc[:, 1:]

In [10]:
print(data["exercise"].unique())
df = data[data["exercise"] == 3]
df = df[df['subject'] == 1]
df = df.drop(columns=["exercise", "restimulus", "repetition", "rerepetition"])
df.shape, df.columns

[1 2 3]


((227493, 34),
 Index(['emg_0', 'emg_1', 'emg_2', 'emg_3', 'emg_4', 'emg_5', 'emg_6', 'emg_7',
        'emg_8', 'emg_9', 'glove_0', 'glove_1', 'glove_2', 'glove_3', 'glove_4',
        'glove_5', 'glove_6', 'glove_7', 'glove_8', 'glove_9', 'glove_10',
        'glove_11', 'glove_12', 'glove_13', 'glove_14', 'glove_15', 'glove_16',
        'glove_17', 'glove_18', 'glove_19', 'glove_20', 'glove_21', 'stimulus',
        'subject'],
       dtype='object'))

In [11]:
n_classes = len(df['stimulus'].unique())
n_classes

24

In [12]:
import numpy as np

def make_windows(X, y, window_size=200, overlap=50, label_mode="center"):
    stride = window_size - overlap
    Xw, yw = [], []
    T = len(X)

    for start in range(0, T - window_size + 1, stride):
        end = start + window_size
        x_win = X[start:end]              
        y_win = y[start:end]              

        if label_mode == "center":
            label = y_win[window_size // 2]
        elif label_mode == "mode":
            label = np.bincount(y_win).argmax()

        Xw.append(x_win.T)      
        yw.append(label)

    return np.stack(Xw), np.array(yw)


y = df["stimulus"].to_numpy()
X = df.drop(columns=["stimulus"]).to_numpy()

Xw, yw = make_windows(X, y, window_size=200, overlap=50, label_mode="center")
print(Xw.shape, yw.shape, np.unique(yw).shape)

(1516, 33, 200) (1516,) (24,)


In [13]:
from sklearn.model_selection import train_test_split

X_, X_test, y_, y_test = train_test_split(Xw, yw, test_size=0.2, random_state=42, stratify=yw)
X_train, X_val, y_train, y_val = train_test_split(X_, y_, test_size=0.25, random_state=42, stratify=y_)
print(X_train.shape, X_val.shape, X_test.shape)
print(y_train.shape, y_val.shape, y_test.shape)

(909, 33, 200) (303, 33, 200) (304, 33, 200)
(909,) (303,) (304,)


In [14]:
import numpy as np

mean = X_train.mean(axis=(0, 2), keepdims=True)
std = X_train.std(axis=(0, 2), keepdims=True)

std[std < 1e-8] = 1.0  # Prevent division by zero

def standardize(X, mean, std):
    return (X - mean[None, :, None]) / std[None, :, None]

X_train = standardize(X_train, mean, std)
X_val   = standardize(X_val,   mean, std)
X_test  = standardize(X_test,  mean, std)

X_train = np.squeeze(X_train)   
X_val   = np.squeeze(X_val)     
X_test  = np.squeeze(X_test)

print(X_train.shape, X_val.shape, X_test.shape)

(909, 33, 200) (303, 33, 200) (304, 33, 200)


In [15]:
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)

print("X_val shape:", X_val.shape)
print("y_val shape:", y_val.shape)

X_train shape: (909, 33, 200)
y_train shape: (909,)
X_val shape: (303, 33, 200)
y_val shape: (303,)


In [16]:
import torch 
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch import nn
from torch import optim
import torch.nn.functional as F

class EMGDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.y)  
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
train_set = EMGDataset(X_train, y_train)
val_set = EMGDataset(X_val, y_val)
test_set = EMGDataset(X_test, y_test)


In [17]:
class EMGCNN(nn.Module):
    def __init__(self, input_channels, num_classes):
        super(EMGCNN, self).__init__()
        self.conv1 = nn.Conv1d(input_channels, 64, kernel_size=3, padding=1)
        self.maxpool1 = nn.MaxPool1d(kernel_size=3, stride=2)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.maxpool2 = nn.MaxPool1d(kernel_size=3, stride=2)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, padding=1)
        self.maxpool3 = nn.MaxPool1d(kernel_size=3, stride=2)
        self.bn = nn.BatchNorm1d(256)
        self.relu = nn.ReLU()
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.maxpool1(x)
        x = self.relu(self.conv2(x))
        x = self.maxpool2(x)
        x = self.conv3(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.maxpool3(x)
        x = self.fc(x.mean(dim=2))
        return x

device = "mps" if torch.cuda.is_available() else "cpu"

model = EMGCNN(input_channels=33, num_classes=n_classes).to(device)
print(model)

EMGCNN(
  (conv1): Conv1d(33, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (maxpool1): MaxPool1d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(64, 128, kernel_size=(3,), stride=(1,), padding=(1,))
  (maxpool2): MaxPool1d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv1d(128, 256, kernel_size=(3,), stride=(1,), padding=(1,))
  (maxpool3): MaxPool1d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (bn): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
  (fc): Linear(in_features=256, out_features=24, bias=True)
)


In [None]:
# from tqdm import tqdm

# train_loader = DataLoader(train_set, batch_size=32, shuffle=True)
# val_loader = DataLoader(val_set, batch_size=32, shuffle=False)
# test_loader = DataLoader(test_set, batch_size=32, shuffle=False)

# epochs = 10
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.0005)
# for epoch in range(epochs):
#   model.train()
#   train_loss = 0

#   loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} [Train]", leave=True)
#   for x, y in loop:
#     x, y = x.to(device), y.to(device)

#     optimizer.zero_grad()
#     yhat = model(x)
#     loss = criterion(yhat, y)
#     loss.backward()
#     optimizer.step()

#     train_loss += loss.item()
#     loop.set_postfix(train_loss=loss.item())

#   train_loss /= len(train_loader)

#   model.eval()
#   val_loss = 0
#   correct = 0
#   total = 0
#   with torch.no_grad():
#     loop = tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} [Val]", leave=True)
#     for x, y in loop:
#       x, y = x.to(device), y.to(device)

#       yhat = model(x)
#       loss = criterion(yhat, y)
#       val_loss += loss.item()

#       predicted = yhat.argmax(dim=1)
#       total += y.size(0)
#       correct += (predicted == y).sum().item()

#       loop.set_postfix(val_loss=loss.item())

#   val_loss /= len(val_loader)
#   val_accuracy = correct / total
#   print(f"Epoch {epoch+1}/{epochs} => Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")

torch.save(model.state_dict(), "best_model.pt")

In [18]:
# to load: 
model.load_state_dict(torch.load("best_model.pt", map_location=device))
model.eval()        

EMGCNN(
  (conv1): Conv1d(33, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (maxpool1): MaxPool1d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(64, 128, kernel_size=(3,), stride=(1,), padding=(1,))
  (maxpool2): MaxPool1d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv1d(128, 256, kernel_size=(3,), stride=(1,), padding=(1,))
  (maxpool3): MaxPool1d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (bn): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
  (fc): Linear(in_features=256, out_features=24, bias=True)
)

In [22]:
from sklearn.metrics import accuracy_score, confusion_matrix

test_loader = DataLoader(test_set, batch_size=32, shuffle=False)
criterion = nn.CrossEntropyLoss()

model.eval()

test_loss = 0.0
correct = 0
total = 0

all_preds = []
all_labels = []

with torch.no_grad():
    for x, y in test_loader:
        x, y = x.to(device), y.to(device)

        yhat = model(x)
        loss = criterion(yhat, y)
        test_loss += loss.item()

        preds = yhat.argmax(dim=1)

        correct += (preds == y).sum().item()
        total += y.size(0)

        all_preds.append(preds.cpu())
        all_labels.append(y.cpu())

test_loss /= len(test_loader)
test_acc = correct / total

all_preds = torch.cat(all_preds)
all_labels = torch.cat(all_labels)

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")

Test Loss: 0.5547
Test Accuracy: 0.8816


## Try Using on subject 2:

In [26]:
print(data["exercise"].unique())
df2 = data[data["exercise"] == 3]
df2 = df2[df2['subject'] == 2]
df2 = df2.drop(columns=["exercise", "restimulus", "repetition", "rerepetition"])
df2.shape, df2.columns

[1 2 3]


((229084, 34),
 Index(['emg_0', 'emg_1', 'emg_2', 'emg_3', 'emg_4', 'emg_5', 'emg_6', 'emg_7',
        'emg_8', 'emg_9', 'glove_0', 'glove_1', 'glove_2', 'glove_3', 'glove_4',
        'glove_5', 'glove_6', 'glove_7', 'glove_8', 'glove_9', 'glove_10',
        'glove_11', 'glove_12', 'glove_13', 'glove_14', 'glove_15', 'glove_16',
        'glove_17', 'glove_18', 'glove_19', 'glove_20', 'glove_21', 'stimulus',
        'subject'],
       dtype='object'))

In [38]:
y_2 = df2["stimulus"].to_numpy()
X_2 = df2.drop(columns=["stimulus"]).to_numpy()

Xw_2, yw_2 = make_windows(X_2, y_2, window_size=200, overlap=50, label_mode="center")
print(Xw_2.shape, yw_2.shape, np.unique(yw_2).shape)

(1526, 33, 200) (1526,) (24,)


In [39]:
mean_2 = Xw_2.mean(axis=(0, 2), keepdims=True)
std_2 = Xw_2.std(axis=(0, 2), keepdims=True)

std_2[std_2 < 1e-8] = 1.0  # Prevent division by zero

Xw_2 = standardize(Xw_2, mean_2, std_2)
Xw_2 = np.squeeze(Xw_2)

sub_2_dataset = EMGDataset(Xw_2, yw_2)

In [40]:
from sklearn.metrics import accuracy_score, confusion_matrix

test_loader = DataLoader(sub_2_dataset, batch_size=32, shuffle=False)
criterion = nn.CrossEntropyLoss()

model.eval()

test_loss = 0.0
correct = 0
total = 0

all_preds = []
all_labels = []

with torch.no_grad():
    for x, y in test_loader:
        x, y = x.to(device), y.to(device)

        yhat = model(x)
        loss = criterion(yhat, y)
        test_loss += loss.item()

        preds = yhat.argmax(dim=1)

        correct += (preds == y).sum().item()
        total += y.size(0)

        all_preds.append(preds.cpu())
        all_labels.append(y.cpu())

test_loss /= len(test_loader)
test_acc = correct / total

all_preds = torch.cat(all_preds)
all_labels = torch.cat(all_labels)

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")

Test Loss: 3.9052
Test Accuracy: 0.4856


## see that test accuracy is significantly lower, meaning emg for gesture classification cannot be generalized across different individual. 