In [None]:
import numpy as np
import pandas as pd
import torch
import scipy
import scipy.signal as signal
from scipy.signal import butter, lfilter
from scipy.fft import fft, ifft, fftfreq
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import time

Relevant importations: Preprocessing functions have their basis in scipy, and model architechture is done in Pytorch

In [None]:
def download_uci_lower_limb_dataset(save_dir='./data'):
    """
    Download the UCI Lower Limb Movement Dataset
    """
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
        
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00526/LowerLimbMovements.zip"
    
    if not os.path.exists(os.path.join(save_dir, 'LowerLimbMovements')):
        print("Downloading UCI Lower Limb EMG dataset...")
        r = requests.get(url)
        z = zipfile.ZipFile(io.BytesIO(r.content))
        z.extractall(save_dir)
        print("Dataset downloaded and extracted successfully!")
    else:
        print("UCI Lower Limb EMG dataset already exists.")
    
    return os.path.join(save_dir, 'LowerLimbMovements')


def process_uci_lower_limb_dataset(data_dir, output_file='lower_limb_emg_gait_data.csv'):
    """
    Process the UCI Lower Limb Movement Dataset and extract gait-related data
    """
    all_data = []
    
    print(f"Processing data from directory: {data_dir}")
    
    # The dataset contains multiple subjects with files like 'subject1.csv', 'subject2.csv', etc.
    subject_files = [f for f in os.listdir(data_dir) if f.startswith('subject') and f.endswith('.csv')]
    
    for subject_file in subject_files:
        try:
            file_path = os.path.join(data_dir, subject_file)
            print(f"Processing file: {subject_file}")
            
            # Read the subject's data
            df = pd.read_csv(file_path)
            
            # Extract EMG columns
            emg_cols = [col for col in df.columns if col.startswith('emg')]
            
            # Extract joint angle columns
            angle_cols = ['ankle', 'knee', 'hip']
            
            # Check if required columns exist
            if not all(col in df.columns for col in emg_cols + angle_cols):
                print(f"Skipping {subject_file}: missing required columns")
                continue
                
            # Add subject identifier and extract needed columns
            df_subset = df[emg_cols + angle_cols].copy()
            df_subset['subject_id'] = int(subject_file.replace('subject', '').replace('.csv', ''))
            
            # Append to our collection
            all_data.append(df_subset)
            print(f"Added data from {subject_file}: {len(df_subset)} samples")
            
        except Exception as e:
            print(f"Error processing file {subject_file}: {e}")
    
    if len(all_data) > 0:
        # Combine all subject data
        df_combined = pd.concat(all_data, ignore_index=True)
        
        # Save the processed data
        df_combined.to_csv(output_file, index=False)
        print(f"Combined dataset saved to {output_file} with {len(df_combined)} samples")
        
        return output_file
    else:
        return None


def get_data(file):
    """
    Load the processed Lower Limb EMG dataset
    """
    data = pd.read_csv(file)
    
    # Extract EMG signals
    emg_columns = [col for col in data.columns if col.startswith('emg')]
    emg_signals = data[emg_columns].values.T
    
    # Extract joint angles (ankle, knee, hip)
    angle_columns = ['ankle', 'knee']  # Using ankle and knee as targets
    labels = data[angle_columns].values
    
    return emg_signals, labels

Dataset  to be used: UCI EMG Dataset Lower Limb. This dataset samples subjects in a controlled or predictable motion setting (walking, standing). This element of periodicity (or null case with standing/sitting) is necessary for a good prediction accuracy and established of the boundary conditions around motion.

In [None]:
#Badnapss filter (restricted signal between 20-450 mV, most of legitimate EMG signal)
def bandpass_filter(signal_data, crit_freq=[20, 450], sampling_freq=125, plot=False, channel=0):
    order = 4
    b, a = scipy.signal.butter(order, crit_freq, btype='bandpass', fs=sampling_freq)
    processed_signal = scipy.signal.filtfilt(b, a, signal_data)

    if plot:
        plt.figure(figsize=(12, 6))
        plt.xlabel('Time')
        plt.ylabel(f'Normalized amplitude of channel {channel}')
        plt.title(f'{crit_freq[0]}-{crit_freq[1]}Hz bandpass filter')

        signal_min = np.min(signal_data, axis=1, keepdims=True)
        signal_max = np.max(signal_data, axis=1, keepdims=True)
        normed_signal = (signal_data - signal_min) / (signal_max - signal_min)

        filtered_min = np.min(processed_signal, axis=1, keepdims=True)
        filtered_max = np.max(processed_signal, axis=1, keepdims=True)
        normed_filt = (processed_signal - filtered_min) / (filtered_max - filtered_min)

        plt.plot(np.arange(normed_signal[channel].size), normed_signal[channel], label='Input')
        plt.plot(np.arange(normed_filt[channel].size), normed_filt[channel], label='Transformed')
        plt.legend()
        plt.show()

    return processed_signal

#Notch filter to eliminate common sources of artifacts from electronics (50-60 mV)
def notch_filter(signal_data, notch_freqs=[50, 60], Q=30, sampling_freq=125):
    filtered_signal = signal_data.copy()

    for f0 in notch_freqs:
        b, a = signal.iirnotch(f0, Q, fs=sampling_freq)
        filtered_signal = signal.filtfilt(b, a, filtered_signal)

    return filtered_signal

#Getting the absolute value of the data -> interest lies mainly in signal magnitude
def rectify(signal_data):
    return np.abs(signal_data)

#FFT Filter: Given that any sharp signal deviation does not also occur periodically, eliminate it from the spectrum
#Justification: We assume that the provided gait data is periodic, 
def fft_analysis_and_filter(signal_data, sampling_freq=125, gait_freq_range=[0.5, 2.5], plot=False, channel=0):
    n_samples = signal_data.shape[1]
    n_channels = signal_data.shape[0]

    fft_result = fft(signal_data)
    freqs = fftfreq(n_samples, 1 / sampling_freq)

    mask = np.zeros((n_channels, n_samples), dtype=bool)
    for i in range(n_channels):
        mask[i] = (np.abs(freqs) < 0.1) | (
                    (np.abs(freqs) >= gait_freq_range[0]) & (np.abs(freqs) <= gait_freq_range[1]))

        for harmonic in range(2, 6):
            mask[i] |= ((np.abs(freqs) >= harmonic * gait_freq_range[0]) &
                        (np.abs(freqs) <= harmonic * gait_freq_range[1]))

    filtered_fft = fft_result.copy()
    for i in range(n_channels):
        filtered_fft[i, ~mask[i]] = 0

    filtered_signal = np.real(ifft(filtered_fft))

    if plot and channel < n_channels:
        plt.figure(figsize=(15, 10))

        plt.subplot(3, 1, 1)
        plt.title(f'Original EMG Signal - Channel {channel}')
        plt.plot(signal_data[channel]) 
        plt.xlabel('Sample')
        plt.ylabel('Amplitude')

        plt.subplot(3, 1, 2)
        plt.title(f'Frequency Components - Channel {channel}')
        pos_mask = freqs > 0
        pos_freqs = freqs[pos_mask][:int(n_samples / 2)]
        plt.plot(pos_freqs, 2.0 / n_samples * np.abs(fft_result[channel, pos_mask][:int(n_samples / 2)]))
        plt.axvspan(gait_freq_range[0], gait_freq_range[1], alpha=0.3, color='green', label='Gait Freq Range')
        plt.xlabel('Frequency (Hz)')
        plt.ylabel('Magnitude')
        plt.legend()

        plt.subplot(3, 1, 3)
        plt.title(f'Filtered EMG Signal - Channel {channel}')
        plt.plot(filtered_signal[channel])
        plt.xlabel('Sample')
        plt.ylabel('Amplitude')

        plt.tight_layout()
        plt.show()

    return filtered_signal

Preprocessing functions:
- 1. Bandpass filter: EMG range restricted between 20-450 mV (with <20 commonly being a motion artifact, and >450 being too high of a voltage for any muscle condition)
- 2. Notch Filter: Range further restricted by eliminating 50-60 mV (common source of artifacts from electronics)
- 3. Signal Rectification: Taking the absolute value of the EMG spectrum, as magnitude is the more important factor in identifying the exact boundary conditions of the surrounding anatomical structures
- 4. FFT Filter: We assume an element of periodicity within gait, and given the rectification performed, the magnitude of the EMG spectrum will reflect as such. This function checks for any spikes in the spectrum that are non-periodic and eliminate, coverting back to the time domain after operating in the frequency domain to filter. 

In [None]:
def segmentation(signal_data, sampling_freq=125, window_size=1, window_shift=0.016):
    w_size = int(sampling_freq * window_size)
    w_shift = int(sampling_freq * window_shift)

    segments = []
    i = 0
    while i + w_size <= signal_data.shape[1]:
        segments.append(signal_data[:, i:i + w_size])
        i += w_shift

    return segments


def channel_rearrangement(signal_data, channel_order):
    channel_order = [channel - 1 for channel in channel_order]
    reindexed = np.zeros_like(signal_data)
    for i, ind in enumerate(channel_order):
        reindexed[i] = signal_data[ind]
    return reindexed

Next, we divide the processed spectrum into windows, filling an initialized list (segments) with them. This is done to prepare the data as an iterable for the ML portion of the program. Then the channels are rearranged according to how the UCI dataset presents them. 

In [None]:
def prepare_dataset(file_path, ordered_channels, test_size=0.25):
    emg_data, labels = get_data(file_path)

    if ordered_channels:
        emg_data = channel_rearrangement(emg_data, ordered_channels)

    filtered_emg = bandpass_filter(emg_data, [20, 450], 125)
    notched_emg = notch_filter(filtered_emg, [50, 60], 30, 125)
    rectified_emg = rectify(notched_emg)
    clean_emg = fft_analysis_and_filter(rectified_emg, 125, [0.5, 2.5])

    X_train, X_test, y_train, y_test = train_test_split(
        clean_emg.T,
        labels,
        test_size=test_size,
        random_state=42
    )

    X_val, X_test = X_test[:len(X_test) // 2], X_test[len(X_test) // 2:]
    y_val, y_test = y_test[:len(y_test) // 2], y_test[len(y_test) // 2:]

    X_train, X_val, X_test = X_train.T, X_val.T, X_test.T

    train_emg = []
    train_labels = []
    valid_emg = []
    valid_labels = []
    test_emg = []
    test_labels = []

    train_segments = segmentation(X_train, 125, window_size=1.5, window_shift=0.0175)
    train_emg.extend(train_segments)
    train_labels.extend([y_train[0]] * len(train_segments))

    val_segments = segmentation(X_val, 125, window_size=1.5, window_shift=0.0175)
    valid_emg.extend(val_segments)
    valid_labels.extend([y_val[0]] * len(val_segments))

    test_segments = segmentation(X_test, 125, window_size=1.5, window_shift=0.0175)
    test_emg.extend(test_segments)
    test_labels.extend([y_test[0]] * len(test_segments))

    return (
        np.array(train_emg), np.array(train_labels),
        np.array(valid_emg), np.array(valid_labels),
        np.array(test_emg), np.array(test_labels)
    )


class EMGDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.float32)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        x = self.features[idx]
        y = self.labels[idx]
        return x, y

Establish train-test splits and use Pytorch's CustomDataset construct (called later in the main function for program execution) for dataset preparation

In [None]:
class CNNRegression(nn.Module):
    def __init__(self, input_channels, seq_length):
        super(CNNRegression, self).__init__()

        self.conv1 = nn.Conv1d(input_channels, 64, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool1d(2)
        self.dropout1 = nn.Dropout(0.3)

        self.conv2 = nn.Conv1d(64, 32, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool1d(2)
        self.dropout2 = nn.Dropout(0.3)

        self.conv3 = nn.Conv1d(32, 16, kernel_size=3, padding=1)
        self.pool3 = nn.MaxPool1d(2)
        self.dropout3 = nn.Dropout(0.3)

        self.flat_features = 16 * (seq_length // 8)

        self.fc1 = nn.Linear(self.flat_features, 64)
        self.dropout4 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 2)

    def forward(self, x, t=None):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = self.dropout1(x)

        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = self.dropout2(x)

        x = F.relu(self.conv3(x))
        x = self.pool3(x)
        x = self.dropout3(x)

        x = x.view(-1, self.flat_features)

        x = F.relu(self.fc1(x))
        x = self.dropout4(x)
        x = F.relu(self.fc2(x))
        output = self.fc3(x)

        return output



CNN Structure:
    
Justification: Sliding a convolution across the EMG spectrum input allows for the network to recognize patterns in the data, already augmented by the periodic nature of the dataset. For the establishment of numerical boundary conditions, pairing periodicity with the numerial output of a convolution will be very helpful. 

Structure: Compose the main body of Convolutional Layers as well as MaxPool and Dropout to prevent placebo patterns from arising. Finally, pass the transformed input across a few Linear Layers to flatten dimensions. 

In [None]:
def physics_loss(pred, x, t, w=4.0, z=3.0):
    dtheta_dt = torch.autograd.grad(
        outputs=pred,
        inputs=t,
        grad_outputs=torch.ones_like(pred),
        create_graph=True
    )[0]

    d2theta_dt2 = torch.autograd.grad(
        outputs=dtheta_dt,
        inputs=t,
        grad_outputs=torch.ones_like(dtheta_dt),
        create_graph=True
    )[0]

    residual = d2theta_dt2 + w * dtheta_dt + z * pred
    phys_loss = torch.mean(residual ** 2)

    return phys_loss

Physics-based loss: This forms the basis for the anatomical constraint of the model. Several key anatomical structures, including joints during gait, can be modeled with damper-driven oscillation. As such, this was the ODE used (represented in residual) to append to the typical MSE loss accrued during training. 

In [None]:
def train_cnn_model(model, train_loader, valid_loader, optimizer, num_epochs=100, device='cpu'):
    print("Starting CNN model training...")
    model.to(device)

    train_losses = []
    valid_losses = []

    for epoch in range(num_epochs):
        start_time = time.time()
        model.train()
        train_loss = 0.0
        data_loss_total = 0.0
        phys_loss_total = 0.0

        for batch_idx, (x_batch, y_batch) in enumerate(train_loader):
            x_batch = x_batch.to(device)
            y_batch = y_batch.to(device)

            batch_size, channels, seq_len = x_batch.shape
            dt = 0.001
            t = torch.linspace(0, dt * (seq_len - 1), seq_len, device=device).view(1, -1, 1)
            t = t.repeat(batch_size, 1, 1).requires_grad_(True)

            x_batch.requires_grad_(True)

            optimizer.zero_grad()
            pred = model(x_batch, t)

            data_loss = F.mse_loss(pred, y_batch)
            phys_loss = physics_loss(pred, x_batch, t)
            total_loss = data_loss + 0.1 * phys_loss

            total_loss.backward()
            optimizer.step()

            train_loss += total_loss.item()
            data_loss_total += data_loss.item()
            phys_loss_total += phys_loss.item()

            if batch_idx % 10 == 0:
                print(f"  Batch {batch_idx}/{len(train_loader)}, "
                      f"Loss: {total_loss.item():.4f}, "
                      f"Data Loss: {data_loss.item():.4f}, "
                      f"Physics Loss: {phys_loss.item():.4f}")
            
        model.eval()
        valid_loss = 0.0
        with torch.no_grad():
            for x_batch, y_batch in valid_loader:
                x_batch = x_batch.to(device)
                y_batch = y_batch.to(device)

                pred = model(x_batch)
                loss = F.mse_loss(pred, y_batch)
                valid_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)
        avg_data_loss = data_loss_total / len(train_loader)
        avg_phys_loss = phys_loss_total / len(train_loader)
        avg_valid_loss = valid_loss / len(valid_loader)

        train_losses.append(avg_train_loss)
        valid_losses.append(avg_valid_loss)

        epoch_time = time.time() - start_time
        print(f"Epoch {epoch + 1}/{num_epochs} completed in {epoch_time:.2f} seconds.")
        print(f"  Train Loss: {avg_train_loss:.4f}")
        print(f"  Data Loss: {avg_data_loss:.4f}")
        print(f"  Physics Loss: {avg_phys_loss:.4f}")
        print(f"  Validation Loss: {avg_valid_loss:.4f}")

    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(valid_losses, label='Validation Loss')
    plt.title('Loss Over Time')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.show()

    return model

 

Model training - physics loss is combined with data loss in gradient descent to optimally align the returned boundary condition

In [None]:
  def evaluate_model(model, test_loader, device='cpu', boundary_model=None, is_lstm=False):
    print("Evaluating model on test data...")
    model.eval()
    if boundary_model:
        boundary_model.eval()

    test_loss = 0.0
    predictions = []
    labels = []

    with torch.no_grad():
        for x_batch, y_batch in test_loader:
            x_batch = x_batch.to(device)
            y_batch = y_batch.to(device)

            if is_lstm and boundary_model:
                boundary_cond = boundary_model(x_batch)
                pred = model(x_batch, boundary_cond)
            else:
                pred = model(x_batch)

            loss = F.mse_loss(pred, y_batch)
            test_loss += loss.item()

            predictions.extend(pred.cpu().numpy())
            labels.extend(y_batch.cpu().numpy())

    avg_test_loss = test_loss / len(test_loader)
    print(f"Test Loss: {avg_test_loss:.4f}")


    predictions = np.array(predictions)
    labels = np.array(labels)

    plt.figure(figsize=(12, 6))

    if predictions.shape[1] == 2:
        plt.subplot(1, 2, 1)
        plt.scatter(labels[:, 0], predictions[:, 0], alpha=0.5)
        plt.plot([min(labels[:, 0]), max(labels[:, 0])], [min(labels[:, 0]), max(labels[:, 0])], 'r--')
        plt.xlabel('Actual Angle')
        plt.ylabel('Predicted Angle')
        plt.title('Angle Predictions')

        plt.subplot(1, 2, 2)
        plt.scatter(labels[:, 1], predictions[:, 1], alpha=0.5)
        plt.plot([min(labels[:, 1]), max(labels[:, 1])], [min(labels[:, 1]), max(labels[:, 1])], 'r--')
        plt.xlabel('Actual Position')
        plt.ylabel('Predicted Position')
        plt.title('Position Predictions')
    else:
        plt.scatter(labels, predictions, alpha=0.5)
        plt.plot([min(labels), max(labels)], [min(labels), max(labels)], 'r--')
        plt.xlabel('Actual Values')
        plt.ylabel('Predicted Values')
        plt.title('Predictions vs labels')

    plt.tight_layout()
    plt.show()

    return avg_test_loss, predictions, labels

Model evaluation using test set data previously designated

In [None]:
def main(emg_data_file, ordered_channels=None):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
       
        train_emg, train_labels, valid_emg, valid_labels, test_emg, test_labels = prepare_dataset(
        emg_data_file, ordered_channels
    )

    input_channels = train_emg.shape[1]
    seq_length = train_emg.shape[2]

    train_dataset = EMGDataset(train_emg, train_labels)
    valid_dataset = EMGDataset(valid_emg, valid_labels)
    test_dataset = EMGDataset(test_emg, test_labels)

    batch_size = 32
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    cnn_model = CNNRegression(input_channels, seq_length).to(device)
    cnn_optimizer = optim.Adam(cnn_model.parameters(), lr=0.001)

    trained_cnn = train_cnn_model(
        cnn_model,
        train_loader,
        valid_loader,
        cnn_optimizer,
        num_epochs=50,
        device=device
    )

    cnn_test_loss, cnn_predictions, cnn_labels = evaluate_model(
        trained_cnn,
        test_loader,
        device=device
    )

    torch.save(trained_cnn.state_dict(), 'cnn_boundary_model.pth')

    return trained_cnn, cnn_test_loss, cnn_predictions, cnn_labels, 

Defining main function with calls of the dataset preparation and proper batching. Also calling CNN architechture, training, and optimization

In [None]:
if __name__ == "__main__":
    emg_data_file = "emg_gait_data.csv"
    ordered_channels = [1, 2, 3, 4, 5, 6, 7, 8]
    trained_cnn = main(emg_data_file, ordered_channels)