## Setup and imports

In [1]:
import whisper
import torch
import numpy as np
import sounddevice as sd
from sentence_transformers import SentenceTransformer
import serial
import time
from IPython.display import clear_output
import warnings
import ast
import math
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from tqdm import tqdm
from typing import List, Optional, Tuple
from torch.utils.data import Dataset, DataLoader
from torch.optim.swa_utils import AveragedModel, get_ema_multi_avg_fn
from sklearn.model_selection import train_test_split

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')




### InmoovPoseNet

In [2]:
class InmoovPoseNet(nn.Module):
    def __init__(
            self,
            text_embeddings_dim: int,
            control_dim: int,
            model_config
    ):
        super(InmoovPoseNet, self).__init__()
        
        self._use_controls = model_config.use_controls
        self._use_lstm = model_config.use_lstm
        self._use_layer_norm = model_config.layer_norm
    
        # Building embeddings for the current control inputs.
        if self._use_controls:
            self.control_embeddings = nn.Linear(in_features=control_dim, out_features=model_config.control_embeddings_dim)
            fc_input_dim = text_embeddings_dim + model_config.control_embeddings_dim
        else:
            self.control_embeddings = None
            fc_input_dim = text_embeddings_dim

        # Build FC encoder for pre-processing.
        if len(model_config.fc_encoder_layers) > 0:
            self._use_fc_preprocessor = True
            fc_encoder_units = [fc_input_dim] + model_config.fc_encoder_layers
            encoder_layers = []
            for i in range(len(fc_encoder_units) - 1):
                encoder_layers.append(nn.Linear(fc_encoder_units[i], fc_encoder_units[i+1]))
                encoder_layers.append(nn.GELU())

                if model_config.dropout_rate > 0.0:
                    encoder_layers.append(nn.Dropout1d(p=model_config.dropout_rate))
            self.fc_encoder = nn.Sequential(*encoder_layers)
            fc_output_dim = fc_encoder_units[-1]
        else:
            self._use_fc_preprocessor = False
            self.fc_encoder = None
            fc_output_dim = fc_input_dim
            
        # Building the Memory Network Encoder (LSTM, Transformer, etc.).
        if self._use_lstm:
            self.memory_encoder = nn.LSTM(
                input_size=fc_output_dim,
                hidden_size=model_config.lstm_units,
                num_layers=model_config.num_lstm_layers,
                batch_first=True
            )
            encoder_output_dim = model_config.lstm_units
        else:
            self.memory_encoder = None
            encoder_output_dim = fc_output_dim
            
        # Build projection layer to apply a skip-connection.
        if encoder_output_dim == text_embeddings_dim:
            self._project_encoder_out = False
            self.projection_layer = None
        else:
            self._project_encoder_out = True
            self.projection_layer = None if encoder_output_dim == text_embeddings_dim else nn.Linear(
                in_features=encoder_output_dim,
                out_features=text_embeddings_dim
            )
        
        # Build Layer Normalization layer, which is applied after the skip connection.
        if self._use_layer_norm:
            self.layer_norm = nn.LayerNorm(normalized_shape=text_embeddings_dim)
        else:
            self.layer_norm = None

        # Build FC decoder for post-processing.
        fc_decoder_units = [text_embeddings_dim] + model_config.fc_decoder_layers + [control_dim]
        num_decoder_layers = len(fc_decoder_units)
        layers = []
        for i in range(len(fc_decoder_units) - 1):
            layers.append(torch.nn.Linear(in_features=fc_decoder_units[i], out_features=fc_decoder_units[i+1]))

            if i != num_decoder_layers - 2:
                layers.append(torch.nn.GELU())
        self.fc_decoder = torch.nn.Sequential(*layers)

    def forward(
            self, 
            inputs: Tuple[torch.Tensor, torch.Tensor], 
            state: Optional[torch.Tensor] = None
    ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
        """ Computes the target controls. 
            :param inputs: a tuple of text embeddings and current controls.
            :param state: the hidden state of the memory network.
            :return: the predicted controls and the hidden state of the memory network.
        """
        # Fetch & Validate inputs.
        text_embeddings, controls = inputs      # Batch x Features

        if not (text_embeddings.dim() == 2 and controls.dim() == 2):
            raise RuntimeError(f'Expected Text & Controls to be Batch x Features, got {text_embeddings.shape} and {controls.shape}')

        # Generate control embeddings.
        if self._use_controls:
            control_embeddings = self.control_embeddings(controls)
            encoder_inputs = torch.cat(tensors=[text_embeddings, control_embeddings], dim=1)
        else:
            encoder_inputs = text_embeddings

        # FC encoder
        if self._use_fc_preprocessor:
            encoder_inputs = self.fc_encoder(encoder_inputs)

        if self._use_lstm:
            x = torch.unsqueeze(encoder_inputs, dim=1)                      # Batch x 1 x Features
            memory_out, state = self.memory_encoder(x, state)               # (Batch x 1 x Features), (h0, c0)
            encoder_out = torch.squeeze(memory_out, dim=1)                  # Batch x Features
        else:
            state = None
            encoder_out = encoder_inputs

        # Projecting LSTM out to Text Embedding Dim
        if self._project_encoder_out:
            encoder_out = self.projection_layer(encoder_out)                # Batch x Text Dim

        # Adding Text Embeddings
        encoder_outputs = text_embeddings + encoder_out
        
        if self._use_layer_norm:
            encoder_outputs = self.layer_norm(encoder_outputs)

        # FC decoder
        outputs = self.fc_decoder(encoder_outputs)                          # Batch x Control Dim
        return outputs, state

    def get_initial_state(self, batch_size: int, device: Optional[torch.device]):
        """ Initializes and returns the initial memory state uniformly in range -0.001 to 0.001. 
            :param batch_size: The desired batch size of the hidden state.
            :param device: The device of the model.
        """
        
        if not self._use_lstm:
            return None
        
        return (
            torch.rand(self.memory_encoder.num_layers, batch_size, self.memory_encoder.hidden_size, device=device) * 0.002 - 0.001,
            torch.rand(self.memory_encoder.num_layers, batch_size, self.memory_encoder.hidden_size, device=device) * 0.002 - 0.001
        )


## Configuration

In [11]:
# 2. Configuration
class Config:
    # Audio settings
    SAMPLE_RATE = 16000  # Whisper uses 16kHz
    RECORD_DURATION = 3  # seconds
    
    # Model paths
    WHISPER_MODEL = "tiny.en"
    SENTENCE_TRANSFORMER = "sentence-transformers/all-MiniLM-L12-v2"
    CONTROL_MODEL_PATH = "checkpoints/final_model/ckp.pt"
    
    # Serial settings
    SERIAL_PORT = "COM12"
    BAUD_RATE = 115200
    SERIAL_TIMEOUT = 2  # seconds
    
    # Control settings
    INITIAL_POSITION = [[0.0, 0.0, 0.0, 0.0, 0.0]]  # All motors at 0
    MOTOR_LIMITS = [
        (0, 160),  # Motor 0
        (0, 160),  # Motor 1
        (0, 160),  # Motor 2
        (0, 160),  # Motor 3
        (100, 160)   # Motor 4
    ]
    
    # Display settings
    PRINT_PREDICTIONS = True

    COMMAND_PRIMITIVES = {
        0: [160, 0, 160, 0, 160],
        1: [0, 160, 0, 0, 100],
        2: [0, 160, 160, 0, 100],
        3: [160, 160, 160, 0, 100],
        4: [160, 0, 160, 0, 100],
        5: [160, 0, 160, 0, 160],
        6: [0, 160, 0, 160, 100],
        7: [0, 160, 160, 0, 100],
        8: [0, 0, 0, 160, 160],
        9: [0, 0, 0, 160, 100],
        10: [0, 160, 0, 0, 160],
        11: [0, 160, 0, 160, 160],
        12: [160, 0, 160, 160, 100],
        13: [0, 0, 0, 0, 0],
    }

    NATURAL_LANGUAGE_COMMANDS = {
        0: 'Nothing',
        1: 'One',
        2: "Two",
        3: "Three",
        4: 'Four',
        5: "Five",
        6: 'Fist',
        7: 'Victory sign',
        8: 'Call me',
        9: 'Pinky promise',
        10: 'Loser',
        11: 'Good',
        12: 'Okay',
        13: 'Zero Padding'
    }

    
config = Config()

## Initialize components

In [None]:
# 3. Initialize Components
def initialize_models():
    """Load all required models"""

    print("Loading Sentence Transformer...")
    embedding_model = SentenceTransformer(config.SENTENCE_TRANSFORMER)
    
    print("Loading Control Prediction Model...")
    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    class Config_Model(dict):
        """ Config class that utilizes dict keywords as object attributes for easy access. """
        
        def __init__(self, *args, **kwargs):
            super(Config_Model, self).__init__(*args, **kwargs)
            self.__dict__ = self

    config_model = Config_Model(
        model=Config_Model(
            fc_encoder_layers=[256],                                # Fully-Connected encoder layers (before memory network).
            fc_decoder_layers = [256],                              # Fully-Connected decoder layers (after memory network).
            use_controls = True,                                    # Whether to utilize current control inputs to predict the target controls.
            control_embeddings_dim = 384,                           # Control embeddings size (if use_controls is True).
            use_lstm = True,                                        # Whether to use LSTM as memory network.
            num_lstm_layers = 1,                                    # Number of lstm layers (if use_lstm is True).
            lstm_units = 512,                                       # Number of lstm units per layer.
            dropout_rate = 0.2,                                     # Dropout rate for the encoder (set 0.0 to deactivate).
            layer_norm = False,                                     # Whether to apply layer normalization.
            checkpoint_directory = "checkpoints/final_model/ckp.pt" # Model checkpoint directory.
        )
    )
    # Instantiate the model
    control_model = InmoovPoseNet(
        text_embeddings_dim=384,
        control_dim=5,
        model_config=config_model.model
    ).to(device)

    control_model.load_state_dict(torch.load(config.CONTROL_MODEL_PATH, map_location=torch.device('cpu')))
    control_model.eval()

    print("Loading Whisper model...")
    whisper_model = whisper.load_model(config.WHISPER_MODEL)
    
    return whisper_model, embedding_model, control_model

def initialize_serial():
    """Initialize serial connection to Arduino"""
    try:
        ser = serial.Serial(
            port=config.SERIAL_PORT,
            baudrate=config.BAUD_RATE,
            timeout=config.SERIAL_TIMEOUT
        )
        print(f"Serial connection established on {config.SERIAL_PORT}")
        return ser
    except Exception as e:
        print(f"Failed to initialize serial connection: {e}")
        print("Running in offline mode (predictions will be printed only)")
        return None

# Initialize all components
whisper_model, embedding_model, control_model = initialize_models()
ser = initialize_serial()

Loading Sentence Transformer...
Loading Control Prediction Model...
Loading Whisper model...
Failed to initialize serial connection: could not open port 'COM12': FileNotFoundError(2, 'The system cannot find the file specified.', None, 2)
Running in offline mode (predictions will be printed only)


In [None]:
import serial
import threading
import time
from IPython.display import display
import ipywidgets as widgets

# Create an output widget
output_widget = widgets.Output()
display(output_widget)

def serial_monitor():
    """ Continuously reads from Arduino and updates output widget in Jupyter. """
    while True:
        if ser.in_waiting > 0:
            line = ser.readline().decode('utf-8').strip()
            with output_widget:
                output_widget.clear_output(wait=True)  # Clears old messages dynamically
                print("Arduino says:", line)
        time.sleep(0.1)  # Prevent excessive CPU usage

# Start the serial monitor in a background thread
thread = threading.Thread(target=serial_monitor, daemon=True)
thread.start()

## Core functions

In [13]:
def record_audio(duration=config.RECORD_DURATION):
    """Record audio from microphone"""
    print(f"Recording for {duration} seconds... (Press Ctrl+C to stop early)")
    try:
        audio = sd.rec(
            int(duration * config.SAMPLE_RATE),
            samplerate=config.SAMPLE_RATE,
            channels=1,
            dtype='float32'
        )
        sd.wait()
        print("Recording complete")
        return audio.flatten()
    except KeyboardInterrupt:
        print("\nRecording stopped by user")
        return None
    except Exception as e:
        print(f"Error during recording: {e}")
        return None

def transcribe_audio(audio, model=whisper_model):
    """Transcribe audio to text using Whisper"""
    try:
        audio = whisper.pad_or_trim(audio)
        mel = whisper.log_mel_spectrogram(audio).to(model.device)
        options = whisper.DecodingOptions(language="en", fp16=False)
        result = whisper.decode(model, mel, options)
        return result.text
    except Exception as e:
        print(f"Transcription error: {e}")
        return None

def text_to_embedding(text, model=embedding_model):
    """Convert text to embedding vector"""
    try:
        embedding = model.encode(text, convert_to_tensor=True).unsqueeze(0)
        return embedding
    except Exception as e:
        print(f"Embedding generation error: {e}")
        return None

def validate_angles(angles_list):
    """Validate motor angles are within safe limits for a list of angle sets"""
    for angles in angles_list:
        if len(angles) != 5:
            print(f"Invalid number of angles: {len(angles)} (must be 5)")
            return False
            
        for i, (angle, (min_val, max_val)) in enumerate(zip(angles, config.MOTOR_LIMITS)):
            if not (min_val <= angle <= max_val):
                print(f"Invalid angle for motor {i}: {angle} (must be between {min_val}-{max_val})")
                return False
    return True

def send_to_arduino(angles_list, serial_conn=ser):
    """Send multiple sets of motor angles to Arduino via serial"""
    if serial_conn is None:
        print("No serial connection - angles would be:", angles_list)
        return True
    
    try:
        # Format all angle sets as a single string
        # Each set is semicolon-separated, angles within a set are comma-separated
        angle_str = ";".join([",".join([f"{a:.1f}" for a in angles]) for angles in angles_list])
        serial_conn.write(f"{angle_str}\n".encode())
        
        # Wait for acknowledgment
        response = serial_conn.readline().decode().strip()
        if "Servos updated" in response:
            print(f"Successfully sent {len(angles_list)} angle sets to Arduino")
            return True
        else:
            print(f"Unexpected response from Arduino: {response}")
            return False
    except Exception as e:
        print(f"Serial communication error: {e}")
        return False

def reset_hand_position(serial_conn=ser):
    """Send reset command to Arduino"""
    if serial_conn is None:
        print("No serial connection - would send reset command")
        return True
    
    try:
        serial_conn.write(b"reset fingers\n")
        response = serial_conn.readline().decode().strip()
        if "Servos reset" in response:
            print("Hand reset to default position")
            return True
        else:
            print(f"Unexpected reset response: {response}")
            return False
    except Exception as e:
        print(f"Reset command error: {e}")
        return False
    
def find_closest_command(prediction):
    """Find the closest command primitive to the given prediction"""
    min_distance = float('inf')
    closest_cmd = None
    closest_idx = None
    
    # Convert prediction to a numpy array for easy calculation
    pred_array = np.array(prediction)
    
    for idx, primitive in Config.COMMAND_PRIMITIVES.items():
        # Calculate Euclidean distance between prediction and primitive
        distance = np.linalg.norm(pred_array - np.array(primitive))
        
        if distance < min_distance:
            min_distance = distance
            closest_cmd = Config.NATURAL_LANGUAGE_COMMANDS[idx]
            closest_idx = idx
            
    return closest_cmd, closest_idx, min_distance

def display_predictions(predictions):
    """Display predictions with closest natural language command and distance"""
    print("Generated Control Sequence:")
    print("Step | Thumb | Index | Middle | Ring | Pinky | Command (Distance)")
    print("------------------------------------------------------------------")
    for i, step in enumerate(predictions):
        closest_cmd, idx, distance = find_closest_command(step)
        print(f"{i+1:4} | {step[0]:5} | {step[1]:5} | {step[2]:6} | {step[3]:4} | {step[4]:5} | {idx}: {closest_cmd} ({distance:.1f})")

## Control Prediction

In [17]:
class HandControlSystem:
    def __init__(self, control_model):
        self.control_model = control_model
        self.current_controls = torch.tensor(config.INITIAL_POSITION, dtype=torch.float32)
        self.state = None
        self.sequence_history = []
        self.num_steps = 16
    
    def predict_controls(self, embedding):
        """Predict control sequence from embedding"""
        try:
            # Convert inputs to tensors
            embedding_tensor = torch.tensor(embedding, dtype=torch.float32)
            
            # Get initial state if needed
            if self.state is None:
                batch_size = 1  # We're processing one sample at a time
                self.state = self.control_model.get_initial_state(batch_size, device='cpu')
            
            # Make prediction
            with torch.no_grad():
                predicted_controls, self.state = self.control_model(
                    (embedding_tensor.unsqueeze(0),  # Add batch dimension
                     self.current_controls.unsqueeze(0)),
                    self.state
                )
            
            # Update current controls
            predicted_controls = predicted_controls.squeeze(0)  # Remove batch dimension
            self.current_controls = predicted_controls
            
            # Convert to numpy array and denormalize
            angles = predicted_controls.numpy()

            # Multiply first 4 by 160
            angles[:4] = angles[:4] * 160.0

            # Denormalize last one from [0, 1] → [100, 160]
            angles[4] = angles[4] * (160 - 100) + 100
            # Store in history
            self.sequence_history.append(angles.tolist())
            
            return angles
        except Exception as e:
            print(f"Prediction error: {e}")
            return None

# Initialize control system
hand_control = HandControlSystem(control_model)

## Main Pipeline

In [18]:
def run_pipeline():
    """Main pipeline execution"""
    while True:
        print("\n" + "="*50)
        print("1. Record new audio")
        print("2. Reset hand position")
        print("3. Exit")
        choice = input("Select an option: ").strip()
        
        if choice == "1":
            # Record audio
            audio = record_audio()
            if audio is None:
                continue
                
            # Transcribe to text
            text = transcribe_audio(audio)
            if text is None:
                continue
            text = "Show five, then four loser"
            print(f"\nTranscribed Text: {text}")
            
            # Generate embeddings
            embedding = text_to_embedding(text)
            if embedding is None:
                continue
                
            # Initialize controls and state
            current_controls = hand_control.current_controls
            state = hand_control.control_model.get_initial_state(batch_size=1, device='cpu')
            
            predictions = []
            for _ in range(hand_control.num_steps):
                with torch.no_grad():
                    # Get next control prediction
                    outputs, state = hand_control.control_model((embedding, current_controls), state)
                    
                    predictions.append(outputs.cpu().numpy())
                    
                    # Update current controls with the prediction
                    current_controls = outputs
                    hand_control.current_controls = current_controls
                
            # Stack predictions and denormalize
            predictions = np.vstack(predictions)
            
            # Denormalize the predictions (reverse the normalization done during training)
            # First 4 controls (0-3) were normalized by dividing by 160
            predictions[:, :4] *= 160.0
            
            # 5th control (index 4) was normalized as (value - 100)/60
            predictions[:, 4] = predictions[:, 4] * 60.0 + 100.0
            
            # Round to nearest integer (since motor commands are integers)
            predictions = np.round(predictions).astype(int)
            
            # Clip to valid ranges (assuming 0-160 for first 4, 100-160 for last)
            predictions[:, :4] = np.clip(predictions[:, :4], 0, 160)
            predictions[:, 4] = np.clip(predictions[:, 4], 100, 160)

            display_predictions(predictions)

            # Validate and send to Arduino
            if validate_angles(predictions):
                if not send_to_arduino(predictions):
                    print("Failed to send angles to Arduino")
            else:
                print("Invalid angles predicted - not sending to Arduino")
            
        elif choice == "2":
            # Reset hand position
            if reset_hand_position():
                # Also reset our control system state
                hand_control.current_controls = torch.tensor(config.INITIAL_POSITION, dtype=torch.float32)
                hand_control.state = None
                print("Control system reset to initial state")
        
        elif choice == "3":
            print("Exiting...")
            break
            
        else:
            print("Invalid option, please try again")

## Run the Pipeline

In [19]:
print("Robotic Hand Control System Initialized")
print(f"Initial hand position: {config.INITIAL_POSITION}")
run_pipeline()

# Close serial connection if it exists
if ser is not None:
    ser.close()
    print("Serial connection closed")

Robotic Hand Control System Initialized
Initial hand position: [[0.0, 0.0, 0.0, 0.0, 0.0]]

1. Record new audio
2. Reset hand position
3. Exit
Recording for 3 seconds... (Press Ctrl+C to stop early)
Recording complete

Transcribed Text: Show five, then four loser
Generated Control Sequence:
Step | Thumb | Index | Middle | Ring | Pinky | Command (Distance)
------------------------------------------------------------------
   1 |    11 |   130 |     35 |    0 |   160 | 10: Loser (47.4)
   2 |    38 |    80 |     57 |    0 |   160 | 10: Loser (105.3)
   3 |   160 |     1 |    160 |   43 |   100 | 4: Four (43.0)
   4 |   160 |     1 |    160 |   12 |   100 | 4: Four (12.0)
   5 |   136 |    68 |    153 |    0 |   100 | 4: Four (72.4)
   6 |   124 |   143 |    147 |    0 |   100 | 3: Three (41.9)
   7 |   159 |     9 |    160 |    2 |   100 | 4: Four (9.3)
   8 |   160 |     0 |    160 |    0 |   100 | 4: Four (0.0)
   9 |   142 |   129 |    154 |    0 |   122 | 3: Three (42.5)
  10 |   151