# EMG Encoder - Temporal CNN for Phoneme Prediction
Mapping MyoWare muscle sensor sequences to phonemes using a Temporal Convolutional Neural Network

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
import json
from pathlib import Path

# Check device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

## 1. Phoneme Mapping & Configuration

In [None]:
# Phoneme mapping and muscle sensor configuration
PHONEMES = [
    'a', 'e', 'i', 'o', 'u',           # Vowels
    'p', 'b', 'm', 'f', 'v',           # Labial consonants (lips)
    't', 'd', 'n', 'l',                # Alveolar consonants (tongue tip)
    'k', 'g', 'ng',                    # Velar consonants (tongue back)
    's', 'z', 'sh', 'zh',              # Sibilants (tongue/teeth)
    'ch', 'j', 'th', 'dh',             # Other consonants
    'r', 'y', 'w'                      # Approximants
]

# MyoWare sensor locations mapping to phoneme groups
MUSCLE_SENSORS = {
    'biceps': 0,          # Arm movement (general movements, 'r', 'w')
    'forearm': 1,         # Forearm (precise movements)
    'masseter': 2,        # Jaw/chewing muscle (labial consonants: p, b, m, f, v)
    'orbicularis_oris': 3, # Lips (vowels, labial consonants)
    'temporalis': 4,      # Temple/jaw (velar consonants: k, g)
    'tongue': 5,          # Tongue (sibilants, alveolars)
    'larynx': 6,          # Throat (voicing, fricatives)
}

NUM_SENSORS = len(MUSCLE_SENSORS)
NUM_PHONEMES = len(PHONEMES)

print(f"Number of EMG Sensors: {NUM_SENSORS}")
print(f"Number of Phonemes: {NUM_PHONEMES}")
print(f"Phoneme Mapping: {PHONEMES}")
print(f"Muscle Sensors: {list(MUSCLE_SENSORS.keys())}")

## 2. EMG Dataset Class

In [None]:
###TRUE DATASET CLASS

# COMMENTED OUT - Uncomment this cell to load real MyoWare sensor data
"""
# Load real EMG data from disk
def load_real_emg_data(data_dir='./data/processed'):
    '''
    Load real EMG sequences and labels from preprocessed data files
    Expected files:
    - data/processed/train_sequences.npy: shape (num_samples, num_sensors, seq_length)
    - data/processed/train_labels.npy: shape (num_samples,)
    - data/processed/test_sequences.npy
    - data/processed/test_labels.npy
    '''
    data_path = Path(data_dir)
    
    if not data_path.exists():
        raise FileNotFoundError(f"Data directory not found: {data_dir}")
    
    # Load training data
    train_sequences = np.load(data_path / 'train_sequences.npy')
    train_labels = np.load(data_path / 'train_labels.npy')
    
    # Load test data
    test_sequences = np.load(data_path / 'test_sequences.npy')
    test_labels = np.load(data_path / 'test_labels.npy')
    
    print(f"Loaded {len(train_sequences)} training sequences")
    print(f"Loaded {len(test_sequences)} test sequences")
    
    return train_sequences, train_labels, test_sequences, test_labels

# Uncomment to use real data instead of synthetic:
# train_sequences, train_labels, test_sequences, test_labels = load_real_emg_data()
# sequences = np.concatenate([train_sequences, test_sequences])
# labels = np.concatenate([train_labels, test_labels])
"""

In [None]:
class EMGDataset(Dataset):
    """
    EMG Dataset for phoneme prediction
    Each sample is a sequence of EMG readings mapped to a phoneme
    """
    def __init__(self, sequences, labels, seq_length=100, transform=None):
        """
        Args:
            sequences: List of EMG signal sequences (num_samples, num_sensors, time_steps)
            labels: List of phoneme indices
            seq_length: Fixed sequence length
            transform: Optional preprocessing function
        """
        self.sequences = sequences
        self.labels = labels
        self.seq_length = seq_length
        self.transform = transform
        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        # Get sequence and pad/truncate to fixed length
        seq = self.sequences[idx]
        
        # Ensure correct shape (num_sensors, time_steps)
        if seq.shape[0] != NUM_SENSORS:
            seq = seq.T if seq.shape[1] == NUM_SENSORS else seq
        
        # Pad or truncate to seq_length
        if seq.shape[1] < self.seq_length:
            # Pad with zeros
            padding = np.zeros((NUM_SENSORS, self.seq_length - seq.shape[1]))
            seq = np.concatenate([seq, padding], axis=1)
        else:
            # Truncate
            seq = seq[:, :self.seq_length]
        
        if self.transform:
            seq = self.transform(seq)
        
        return torch.FloatTensor(seq), torch.LongTensor([self.labels[idx]])[0]


def generate_synthetic_emg_data(num_samples=200, seq_length=100, num_sensors=NUM_SENSORS):
    """
    Generate synthetic EMG data for testing
    In practice, this would come from MyoWare sensors
    """
    sequences = []
    labels = []
    
    for _ in range(num_samples):
        # Random phoneme
        phoneme_idx = np.random.randint(0, NUM_PHONEMES)
        label = phoneme_idx
        
        # Generate EMG-like signal
        # Different phonemes have different muscle activation patterns
        seq = np.random.normal(0, 0.1, (num_sensors, seq_length))
        
        # Add muscle-specific patterns for phoneme
        if phoneme_idx in [0, 1, 2, 3, 4]:  # Vowels
            seq[3] += np.random.normal(0.5, 0.1, seq_length)  # orbicularis_oris
        
        if phoneme_idx in [5, 6, 7, 8, 9]:  # Labial consonants (p, b, m, f, v)
            seq[2] += np.random.normal(0.7, 0.1, seq_length)  # masseter
            seq[3] += np.random.normal(0.6, 0.1, seq_length)  # orbicularis_oris
        
        if phoneme_idx in [10, 11, 12, 13]:  # Alveolar consonants
            seq[5] += np.random.normal(0.6, 0.1, seq_length)  # tongue
        
        if phoneme_idx in [14, 15, 16]:  # Velar consonants
            seq[4] += np.random.normal(0.7, 0.1, seq_length)  # temporalis
            seq[5] += np.random.normal(0.5, 0.1, seq_length)  # tongue
        
        # Ensure positive (EMG is rectified)
        seq = np.abs(seq)
        
        sequences.append(seq)
        labels.append(label)
    
    return sequences, labels


# Generate synthetic data
print("Generating synthetic EMG data...")
sequences, labels = generate_synthetic_emg_data(num_samples=400)
print(f"Generated {len(sequences)} sequences")

## 3. Temporal CNN Model Architecture

In [None]:
class TemporalCNN(nn.Module):
    """
    Temporal Convolutional Neural Network for EMG sequence classification
    Uses dilated convolutions to capture long-range temporal dependencies
    """
    def __init__(self, num_sensors, num_phonemes, num_channels=64, kernel_size=3, dropout=0.3):
        super(TemporalCNN, self).__init__()
        
        self.num_sensors = num_sensors
        self.num_phonemes = num_phonemes
        
        # Temporal convolution blocks with increasing dilation
        self.conv1 = nn.Conv1d(num_sensors, num_channels, kernel_size, 
                              padding=kernel_size//2, dilation=1)
        self.bn1 = nn.BatchNorm1d(num_channels)
        self.dropout1 = nn.Dropout(dropout)
        
        self.conv2 = nn.Conv1d(num_channels, num_channels * 2, kernel_size,
                              padding=kernel_size//2, dilation=2)
        self.bn2 = nn.BatchNorm1d(num_channels * 2)
        self.dropout2 = nn.Dropout(dropout)
        
        self.conv3 = nn.Conv1d(num_channels * 2, num_channels * 4, kernel_size,
                              padding=kernel_size//2, dilation=4)
        self.bn3 = nn.BatchNorm1d(num_channels * 4)
        self.dropout3 = nn.Dropout(dropout)
        
        # Global average pooling will be applied
        # Fully connected layers
        self.fc1 = nn.Linear(num_channels * 4, num_channels * 2)
        self.dropout_fc = nn.Dropout(dropout)
        self.fc2 = nn.Linear(num_channels * 2, num_phonemes)
        
        self.relu = nn.ReLU()
        
    def forward(self, x):
        # x shape: (batch_size, num_sensors, seq_length)
        
        # Conv block 1
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout1(x)
        
        # Conv block 2
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.dropout2(x)
        
        # Conv block 3
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)
        x = self.dropout3(x)
        
        # Global average pooling
        x = torch.mean(x, dim=2)  # (batch_size, num_channels*4)
        
        # Fully connected layers
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout_fc(x)
        x = self.fc2(x)
        
        return x


# Initialize model
model = TemporalCNN(
    num_sensors=NUM_SENSORS,
    num_phonemes=NUM_PHONEMES,
    num_channels=64,
    kernel_size=3,
    dropout=0.3
).to(device)

print(f"Model architecture:\n{model}")
print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters()):,}")

## 4. Data Preparation & Preprocessing

In [None]:
# Preprocessing: Normalize EMG signals
scaler = StandardScaler()

# Flatten sequences for scaling
sequences_flat = np.concatenate([s.flatten() for s in sequences]).reshape(-1, 1)
scaler.fit(sequences_flat)

# Apply scaling to sequences
sequences_normalized = []
for seq in sequences:
    seq_flat = seq.reshape(-1, 1)
    seq_scaled = scaler.transform(seq_flat).reshape(seq.shape)
    sequences_normalized.append(seq_scaled)

# Train-test split
from sklearn.model_selection import train_test_split

train_sequences, test_sequences, train_labels, test_labels = train_test_split(
    sequences_normalized, labels, test_size=0.2, random_state=42, stratify=labels
)

print(f"Training samples: {len(train_sequences)}")
print(f"Test samples: {len(test_sequences)}")

# Create datasets
train_dataset = EMGDataset(train_sequences, train_labels, seq_length=100)
test_dataset = EMGDataset(test_sequences, test_labels, seq_length=100)

# Create dataloaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"DataLoader created with batch size: {batch_size}")

## 5. Training Loop

In [None]:
# Training configuration
num_epochs = 50
learning_rate = 0.001
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)

# Training history
train_losses = []
test_losses = []
test_accuracies = []
best_accuracy = 0
best_model_state = None

print("Starting training...")
print("-" * 60)

for epoch in range(num_epochs):
    # Training phase
    model.train()
    train_loss = 0.0
    
    for batch_emg, batch_labels in train_loader:
        batch_emg = batch_emg.to(device)
        batch_labels = batch_labels.to(device)
        
        # Forward pass
        outputs = model(batch_emg)
        loss = criterion(outputs, batch_labels)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    train_loss /= len(train_loader)
    train_losses.append(train_loss)
    
    # Validation phase
    model.eval()
    test_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch_emg, batch_labels in test_loader:
            batch_emg = batch_emg.to(device)
            batch_labels = batch_labels.to(device)
            
            outputs = model(batch_emg)
            loss = criterion(outputs, batch_labels)
            test_loss += loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            total += batch_labels.size(0)
            correct += (predicted == batch_labels).sum().item()
    
    test_loss /= len(test_loader)
    test_losses.append(test_loss)
    accuracy = correct / total
    test_accuracies.append(accuracy)
    
    # Learning rate scheduling
    scheduler.step(test_loss)
    
    # Save best model
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_model_state = model.state_dict().copy()
    
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}] | Train Loss: {train_loss:.4f} | "
              f"Test Loss: {test_loss:.4f} | Accuracy: {accuracy:.4f}")

print("-" * 60)
print(f"Training completed! Best Test Accuracy: {best_accuracy:.4f}")

# Load best model
model.load_state_dict(best_model_state)
print("Loaded best model")

## 6. Visualization & Results

In [None]:
# Plot training results
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Loss curve
ax1.plot(train_losses, label='Train Loss', linewidth=2)
ax1.plot(test_losses, label='Test Loss', linewidth=2)
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training and Test Loss')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Accuracy curve
ax2.plot(test_accuracies, label='Test Accuracy', linewidth=2, color='green')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.set_title('Test Accuracy Over Epochs')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('training_results.png', dpi=100, bbox_inches='tight')
plt.show()

print(f"Best test accuracy achieved: {best_accuracy:.4f}")

## 7. Inference & Real-time Prediction

In [None]:
class EMGEncoder:
    """
    Real-time EMG to Phoneme encoder
    Use this class for inference with live MyoWare sensor data
    """
    def __init__(self, model, scaler, device='cpu'):
        self.model = model
        self.scaler = scaler
        self.device = device
        self.model.eval()
        
    def predict_phoneme(self, emg_sequence):
        """
        Predict phoneme from EMG sequence
        
        Args:
            emg_sequence: numpy array of shape (num_sensors, seq_length) or (num_sensors,)
        
        Returns:
            phoneme: predicted phoneme
            confidence: confidence score (0-1)
            probabilities: softmax probabilities for all phonemes
        """
        # Ensure correct shape
        if emg_sequence.ndim == 1:
            # Single timestep, repeat it
            emg_sequence = np.repeat(emg_sequence[:, np.newaxis], 100, axis=1)
        
        if emg_sequence.shape[0] != NUM_SENSORS:
            emg_sequence = emg_sequence.T
        
        # Pad or truncate
        if emg_sequence.shape[1] < 100:
            padding = np.zeros((NUM_SENSORS, 100 - emg_sequence.shape[1]))
            emg_sequence = np.concatenate([emg_sequence, padding], axis=1)
        else:
            emg_sequence = emg_sequence[:, :100]
        
        # Normalize
        emg_flat = emg_sequence.reshape(-1, 1)
        emg_scaled = self.scaler.transform(emg_flat).reshape(emg_sequence.shape)
        
        # Convert to tensor
        emg_tensor = torch.FloatTensor(emg_scaled).unsqueeze(0).to(self.device)
        
        # Inference
        with torch.no_grad():
            outputs = self.model(emg_tensor)
            probabilities = torch.softmax(outputs, dim=1)[0].cpu().numpy()
            predicted_idx = np.argmax(probabilities)
        
        phoneme = PHONEMES[predicted_idx]
        confidence = float(probabilities[predicted_idx])
        
        return phoneme, confidence, probabilities
    
    def predict_sequence(self, emg_sequences):
        """
        Predict sequence of phonemes from multiple EMG sequences
        
        Args:
            emg_sequences: list of EMG arrays
        
        Returns:
            phoneme_sequence: list of predicted phonemes
            confidence_scores: list of confidence scores
        """
        phoneme_sequence = []
        confidence_scores = []
        
        for seq in emg_sequences:
            phoneme, confidence, _ = self.predict_phoneme(seq)
            phoneme_sequence.append(phoneme)
            confidence_scores.append(confidence)
        
        return phoneme_sequence, confidence_scores


# Initialize encoder
encoder = EMGEncoder(model, scaler, device=device)

# Test with random samples
print("\n" + "="*60)
print("Testing Phoneme Prediction")
print("="*60)

for i in range(5):
    test_emg = test_sequences[i]
    true_label = PHONEMES[test_labels[i]]
    
    predicted_phoneme, confidence, probs = encoder.predict_phoneme(test_emg)
    
    print(f"\nSample {i+1}:")
    print(f"  True phoneme: {true_label}")
    print(f"  Predicted phoneme: {predicted_phoneme}")
    print(f"  Confidence: {confidence:.4f}")
    print(f"  Match: {'✓' if true_label == predicted_phoneme else '✗'}")

## 8. Model Persistence & Deployment

In [None]:
# Save model
model_save_path = 'emg_encoder_model.pth'
torch.save({
    'model_state_dict': model.state_dict(),
    'model_config': {
        'num_sensors': NUM_SENSORS,
        'num_phonemes': NUM_PHONEMES,
        'num_channels': 64,
        'kernel_size': 3,
        'dropout': 0.3
    },
    'phonemes': PHONEMES,
    'muscle_sensors': MUSCLE_SENSORS,
    'scaler_mean': scaler.mean_,
    'scaler_scale': scaler.scale_
}, model_save_path)

print(f"Model saved to {model_save_path}")

# Save metadata
metadata = {
    'model_name': 'TemporalCNN_EMG_Phoneme',
    'version': '1.0',
    'training_accuracy': float(best_accuracy),
    'num_sensors': NUM_SENSORS,
    'num_phonemes': NUM_PHONEMES,
    'phonemes': PHONEMES,
    'muscle_sensors': MUSCLE_SENSORS,
    'created_date': '2026-02-10'
}

with open('emg_encoder_metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)

print("Metadata saved to emg_encoder_metadata.json")


# Function to load model for later use
def load_emg_encoder(model_path='emg_encoder_model.pth', device='cpu'):
    """Load trained EMG encoder model"""
    checkpoint = torch.load(model_path, map_location=device)
    
    config = checkpoint['model_config']
    model = TemporalCNN(
        num_sensors=config['num_sensors'],
        num_phonemes=config['num_phonemes'],
        num_channels=config['num_channels'],
        kernel_size=config['kernel_size'],
        dropout=config['dropout']
    ).to(device)
    
    model.load_state_dict(checkpoint['model_state_dict'])
    
    # Recreate scaler
    scaler = StandardScaler()
    scaler.mean_ = checkpoint['scaler_mean']
    scaler.scale_ = checkpoint['scaler_scale']
    
    return model, scaler, checkpoint['phonemes']


print("\n✓ Model saved successfully!")

## 9. Integration with MyoWare Sensors (Arduino/Serial)

In [None]:
class MyoWareEMGReader:
    """
    Interface for reading EMG data from MyoWare sensors via Serial/Arduino
    """
    def __init__(self, port='/dev/ttyUSB0', baudrate=9600, num_sensors=NUM_SENSORS, buffer_size=100):
        """
        Args:
            port: Serial port (e.g., '/dev/ttyUSB0' on Linux/Mac, 'COM3' on Windows)
            baudrate: Serial communication speed
            num_sensors: Number of MyoWare sensors connected
            buffer_size: Window size for EMG sequences
        """
        self.port = port
        self.baudrate = baudrate
        self.num_sensors = num_sensors
        self.buffer_size = buffer_size
        self.buffer = np.zeros((num_sensors, buffer_size))
        self.read_index = 0
        
        try:
            import serial
            self.serial_connection = serial.Serial(port, baudrate, timeout=1)
            print(f"Connected to {port} at {baudrate} baud")
        except Exception as e:
            print(f"Warning: Could not connect to serial port: {e}")
            print("Running in simulation mode")
            self.serial_connection = None
    
    def read_sample(self):
        """Read one sample from all sensors"""
        if self.serial_connection:
            try:
                line = self.serial_connection.readline().decode('utf-8').strip()
                # Expected format: "sensor0,sensor1,sensor2,..."
                values = [float(x) for x in line.split(',')]
                if len(values) == self.num_sensors:
                    return np.array(values)
            except:
                pass
        
        # Simulation mode: return random values
        return np.random.rand(self.num_sensors) * 100
    
    def get_sequence(self):
        """
        Get current buffer as EMG sequence
        Returns shape: (num_sensors, buffer_size)
        """
        return self.buffer.copy()
    
    def update(self):
        """Read new sample and update buffer"""
        sample = self.read_sample()
        self.buffer[:, self.read_index] = sample
        self.read_index = (self.read_index + 1) % self.buffer_size
        return sample
    
    def close(self):
        """Close serial connection"""
        if self.serial_connection:
            self.serial_connection.close()


# Example: Real-time EMG to phoneme streaming
print("Example Real-time EMG Processing Code:")
print("-" * 60)
print("""
# Initialize reader
emg_reader = MyoWareEMGReader(
    port='/dev/ttyUSB0',  # Change to your Arduino port
    num_sensors=7,
    buffer_size=100
)

# Initialize encoder
encoder = EMGEncoder(model, scaler, device=device)

# Real-time loop
try:
    while True:
        # Read new EMG sample
        emg_reader.update()
        
        # Get current sequence
        emg_sequence = emg_reader.get_sequence()
        
        # Predict phoneme
        phoneme, confidence, _ = encoder.predict_phoneme(emg_sequence)
        
        # Only output if confidence is high
        if confidence > 0.7:
            print(f"Predicted: {phoneme} (confidence: {confidence:.2f})")
        
        time.sleep(0.01)  # 10ms - adjust based on your sampling rate
        
except KeyboardInterrupt:
    emg_reader.close()
    print("Stopped")
""")
print("-" * 60)

## 10. Performance Analysis & Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import seaborn as sns

# Get predictions on test set
model.eval()
all_predictions = []
all_labels = []

with torch.no_grad():
    for batch_emg, batch_labels in test_loader:
        batch_emg = batch_emg.to(device)
        outputs = model(batch_emg)
        _, predicted = torch.max(outputs, 1)
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(batch_labels.numpy())

all_predictions = np.array(all_predictions)
all_labels = np.array(all_labels)

# Classification report
print("\n" + "="*60)
print("Classification Report")
print("="*60)
print(classification_report(all_labels, all_predictions, target_names=PHONEMES))

# Compute confusion matrix
cm = confusion_matrix(all_labels, all_predictions)

# Plot confusion matrix (subset for clarity - first 10 phonemes)
fig, ax = plt.subplots(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=PHONEMES, yticklabels=PHONEMES,
            cbar_kws={'label': 'Count'}, ax=ax)
ax.set_xlabel('Predicted Phoneme')
ax.set_ylabel('True Phoneme')
ax.set_title('Confusion Matrix - EMG to Phoneme Mapping')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig('confusion_matrix.png', dpi=100, bbox_inches='tight')
plt.show()

# Per-phoneme accuracy
print("\nPer-Phoneme Accuracy:")
print("-" * 40)
for i, phoneme in enumerate(PHONEMES):
    mask = all_labels == i
    if mask.sum() > 0:
        acc = (all_predictions[mask] == i).sum() / mask.sum()
        print(f"{phoneme:6s}: {acc:.4f} ({mask.sum()} samples)")