In [18]:
import os

import numpy as np

from sklearn.preprocessing import LabelBinarizer, StandardScaler
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from script.dataset import get_data, cols_emg, cols_emg_cal

In [2]:
X, y = get_data()

In [3]:
# extract center 100 data points from each set & one-hot encode labels

window_size = 100
data = list()
labels = list()

for i, count in X.value_counts('id').items():
    start = int(count / 2 - window_size / 2)
    idx_range = X[X['id'] == i].index[start:start + window_size]
    data.append(np.array(X.iloc[idx_range][cols_emg + cols_emg_cal]))
    labels.append(y[i])
    
data = np.array(data)
data_dim = data.shape
print(f'data shape: {data_dim}')

scaler = StandardScaler()
data = data.reshape(-1, 8)  # reshape to 2D for scaling
data = scaler.fit_transform(data)
data = data.reshape(*data_dim)  # reshape back to original shape

encoder = LabelBinarizer()
labels = encoder.fit_transform(labels)
print(f'labels shape: {labels.shape}, classes: {encoder.classes_}')

data shape: (312, 100, 16)
labels shape: (312, 4), classes: ['crimp_20' 'crimp_45' 'jug' 'sloper_30']


In [4]:
# train/test split

data_train, data_test, labels_train, labels_test = train_test_split(data, labels, test_size=0.4, random_state=42)
print(f'training data shape: {data_train.shape}')

training data shape: (187, 100, 16)


In [5]:
# pytorch device & tensors

# use gpu > apple silicon > cpu
device = torch.device("cuda:0" if torch.cuda.is_available() else (torch.device("mps") if torch.backends.mps.is_available() else "cpu"))

def dataset_to_tensors(data, labels):   
    data = torch.tensor(data, dtype=torch.float32, device=device)
    # change dimensions to (batch_size, num_features, seq_length)
    data = data.transpose(1, 2)
    labels = torch.tensor(labels, dtype=torch.float32, device=device)
    return (data, labels)

data_train, labels_train = dataset_to_tensors(data_train, labels_train)
data_test, labels_test = dataset_to_tensors(data_test, labels_test)

In [6]:
# network

class CNN(nn.Module):
    def __init__(self, seq_length, kernel_size = 5, num_features = 32, in_channels = 16):
        super(CNN, self).__init__()

        conv_diff = kernel_size - 1
        lin_in = int(((seq_length - conv_diff) / 2 - conv_diff) / 2 * num_features)
        conv1_feats = int(in_channels + (num_features - in_channels) / 2)
        
        self.stack = nn.Sequential(
            nn.Conv1d(in_channels=in_channels, out_channels=conv1_feats, kernel_size=kernel_size),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            
            nn.Conv1d(in_channels=conv1_feats, out_channels=num_features, kernel_size=kernel_size),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Flatten(),
            nn.Linear(lin_in, 4),
            nn.Dropout(0.5),
            nn.Softmax(dim=1)
        )
    
    def forward(self, x):
        i = torch.nn.Identity()
        c = i(x)
        c = self.stack(c)
        return c

In [7]:
# instantiate the model, define loss function and optimizer

model = CNN(window_size).to(device)
print(f'number of parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

number of parameters: 8636


In [8]:
# training

train = True

if not train:
    print('Skipping training')
else:
    # data loading
    dataset = TensorDataset(data_train, labels_train)
    dataloader = DataLoader(dataset, batch_size=8, shuffle=True)
    
    # model save
    model_dir = 'models'
    os.makedirs('models', exist_ok=True)
    model_path = os.path.join('models', 'best_model.pth')
    
    # training loop
    num_epochs = 100
    best_loss = float('inf')
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, targets in dataloader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
    
            running_loss += loss.item() * inputs.size(0)
        
        epoch_loss = running_loss / len(dataloader.dataset)
        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}')
        
        # save best model
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            torch.save(model.state_dict(), model_path)
    
    print('Training complete')

Epoch 1/100, Loss: 1.3591
Epoch 2/100, Loss: 1.2637
Epoch 3/100, Loss: 1.2683
Epoch 4/100, Loss: 1.2560
Epoch 5/100, Loss: 1.2215
Epoch 6/100, Loss: 1.2146
Epoch 7/100, Loss: 1.1843
Epoch 8/100, Loss: 1.2002
Epoch 9/100, Loss: 1.1595
Epoch 10/100, Loss: 1.1433
Epoch 11/100, Loss: 1.1234
Epoch 12/100, Loss: 1.1294
Epoch 13/100, Loss: 1.1502
Epoch 14/100, Loss: 1.1543
Epoch 15/100, Loss: 1.1589
Epoch 16/100, Loss: 1.1995
Epoch 17/100, Loss: 1.1281
Epoch 18/100, Loss: 1.1232
Epoch 19/100, Loss: 1.1483
Epoch 20/100, Loss: 1.1095
Epoch 21/100, Loss: 1.0747
Epoch 22/100, Loss: 1.1032
Epoch 23/100, Loss: 1.0694
Epoch 24/100, Loss: 1.0870
Epoch 25/100, Loss: 1.0802
Epoch 26/100, Loss: 1.0823
Epoch 27/100, Loss: 1.0870
Epoch 28/100, Loss: 1.0446
Epoch 29/100, Loss: 1.0821
Epoch 30/100, Loss: 1.0816
Epoch 31/100, Loss: 1.0771
Epoch 32/100, Loss: 1.0625
Epoch 33/100, Loss: 1.0835
Epoch 34/100, Loss: 1.0702
Epoch 35/100, Loss: 1.0568
Epoch 36/100, Loss: 1.0405
Epoch 37/100, Loss: 1.1110
Epoch 38/1

In [17]:
# load best model
model.load_state_dict(torch.load(model_path))
model.eval()

with torch.no_grad():
    outputs = model(data_test)
    _, predicted = torch.max(outputs, 1)
    _, true_labels = torch.max(labels_test, 1)
    accuracy = (predicted == true_labels).sum().item() / true_labels.size(0)
    print(f'Test Accuracy: {accuracy:.4f}')

Test Accuracy: 0.8080
