# üß™ Digital Gait Laboratory: Hybrid Architecture (Raw + Advanced Features)

**Course:** Wearable Computing & Gait Analysis  
**Dataset:** UCI HAPT (Activity Recognition + Postural Transitions)

### üéØ Objective
We are upgrading the pipeline to improve accuracy. Instead of relying solely on Deep Learning to find patterns in raw data, we will explicitely calculate **Statistical Features** (Mean, Variance, Energy) and feed them into the network alongside the raw signal.

**The Hybrid Pipeline:**
1.  **Input:** Raw Sensor Window (128 time steps).
2.  **Branch A (CNN):** Reads the raw signal to find temporal patterns (e.g., heel strike shape).
3.  **Branch B (MLP):** Reads calculated statistics. **Updated:** Now includes Frequency Domain (FFT), Correlation, **Jerk**, **Entropy**, and **IQR** features.
4.  **Fusion:** Combines both to predict the activity.
5.  **Phyphox Inference:** Applies the exact same feature math to your smartphone data.

---

### üõ†Ô∏è 1. Environment Setup

In [2]:
import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter
from scipy.fft import fft
from scipy.stats import iqr, entropy
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Configuration
FS = 50              # Sampling Frequency (Hz)
WINDOW_SIZE = 128    # 2.56 seconds per window
OVERLAP = 64         # 50% overlap
BATCH_SIZE = 32
EPOCHS = 40
LR = 0.0005

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


### üì• 2. Download and Prepare Dataset

In [3]:
dataset_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00341/HAPT%20Data%20Set.zip"
zip_path = "HAPT_Data_Set.zip"
extract_folder = "HAPT_Dataset"

if not os.path.exists(extract_folder):
    print("Downloading dataset...")
    !wget -q $dataset_url -O $zip_path
    print("Extracting...")
    !unzip -q $zip_path -d $extract_folder
    print("Done!")
else:
    print("Dataset already exists.")

Dataset already exists.


### üìê 3. Feature Engineering (The "Math" Department)
This is the most critical upgrade. We define a function `extract_features` that runs on **every window** of data (both training and Phyphox).

**New Features Added (based on features_info.txt):**
1.  **Jerk Signals:** The derivative of acceleration ($dA/dt$). Critical for measuring "smoothness" of movement.
2.  **Spectral Entropy:** Measures the "randomness" of the frequency distribution. (Walking = Low Entropy, Random Shaking = High Entropy).
3.  **IQR:** Interquartile Range, a robust measure of spread.
4.  **Magnitudes:** Euclidean norm of Jerk signals.

In [4]:
def separate_gravity(data, fs=50):
    # Low-pass filter to isolate gravity
    nyq = 0.5 * fs
    normal_cutoff = 0.3 / nyq
    b, a = butter(3, normal_cutoff, btype='low', analog=False)
    gravity = lfilter(b, a, data, axis=0)
    body_acc = data - gravity
    return body_acc, gravity

def extract_features(window):
    """
    Input: window shape (128, 9) 
           [BodyAccXYZ (0-3), GyroXYZ (3-6), TotalAccXYZ (6-9)]
    Output: Feature vector (numpy array)
    """
    features = []
    
    # --- PREPARE SIGNALS ---
    body_acc = window[:, 0:3]
    gyro = window[:, 3:6]
    total_acc = window[:, 6:9]
    
    # 1. Jerk Signals (Derivative over time)
    # We pad with 0 at the start to keep length 128
    acc_jerk = np.diff(body_acc, axis=0, prepend=body_acc[0:1,:])
    gyro_jerk = np.diff(gyro, axis=0, prepend=gyro[0:1,:])
    
    # 2. Magnitudes (Euclidean Norm)
    acc_mag = np.linalg.norm(body_acc, axis=1)
    gyro_mag = np.linalg.norm(gyro, axis=1)
    acc_jerk_mag = np.linalg.norm(acc_jerk, axis=1)
    gyro_jerk_mag = np.linalg.norm(gyro_jerk, axis=1)
    
    # Collect all temporal signals to loop over
    # 3 Axes Acc + 3 Axes Gyro + 3 Axes AccJerk + 3 Axes GyroJerk + 4 Mags = 16 signals
    signals = [body_acc, gyro, acc_jerk, gyro_jerk]
    mags = [acc_mag, gyro_mag, acc_jerk_mag, gyro_jerk_mag]
    
    # --- TIME DOMAIN FEATURES ---
    for signal_group in signals:
        # signal_group shape: (128, 3)
        features.append(np.mean(signal_group, axis=0))      # Mean
        features.append(np.std(signal_group, axis=0))       # Std
        features.append(np.max(signal_group, axis=0))       # Max
        features.append(np.min(signal_group, axis=0))       # Min
        features.append(iqr(signal_group, axis=0))          # IQR (Robust Spread)
        features.append(np.sum(signal_group**2, axis=0) / len(window)) # Energy
        
    for mag_signal in mags:
        # mag_signal shape: (128,)
        features.append([np.mean(mag_signal)])
        features.append([np.std(mag_signal)])
        features.append([np.max(mag_signal)])
        features.append([iqr(mag_signal)])
        
    # --- FREQUENCY DOMAIN FEATURES ---
    # FFT on BodyAcc and Gyro (common practice)
    for signal_group in [body_acc, gyro]:
        for i in range(3): # X, Y, Z
            col = signal_group[:, i]
            fft_vals = np.abs(fft(col))[:len(window)//2]
            
            # Spectral Energy
            features.append([np.mean(fft_vals)])
            
            # Spectral Entropy (Treat power spectrum as probability distribution)
            psd = fft_vals / (np.sum(fft_vals) + 1e-9) # Normalize
            features.append([entropy(psd)])
            
            # Dominant Frequency
            features.append([np.argmax(fft_vals)])

    # --- CORRELATION (Axis Synchronization) ---
    # Acc X-Y, X-Z, Y-Z
    features.append([np.corrcoef(body_acc[:,0], body_acc[:,1])[0,1]])
    features.append([np.corrcoef(body_acc[:,0], body_acc[:,2])[0,1]])
    features.append([np.corrcoef(body_acc[:,1], body_acc[:,2])[0,1]])
    
    # --- ANGLE (Posture detection) ---
    # Angle between Mean Body Acc vector and Mean Gravity vector
    # Gravity is approx equal to the mean of the Total Acc signal (since body acc avg is 0)
    mean_total_acc = np.mean(total_acc, axis=0) 
    mean_body_acc = np.mean(body_acc, axis=0)
    
    # Cosine similarity
    num = np.dot(mean_total_acc, mean_body_acc)
    denom = np.linalg.norm(mean_total_acc) * np.linalg.norm(mean_body_acc) + 1e-9
    angle = np.arccos(np.clip(num/denom, -1.0, 1.0))
    features.append([angle])

    # Flatten
    return np.nan_to_num(np.concatenate([np.atleast_1d(f) for f in features]))

### üèóÔ∏è 4. Data Pipeline
We load the raw data, segment it, AND run our feature extractor.

In [5]:
def load_hapt_hybrid(base_path):
    raw_path = os.path.join(base_path, "RawData")
    labels_df = pd.read_csv(os.path.join(raw_path, "labels.txt"), sep='\\s+', header=None)
    labels_df.columns = ["exp", "user", "act", "start", "end"]
    
    raw_windows = []
    handcrafted_features = []
    labels = []

    for exp_id in labels_df['exp'].unique():
        user_id = labels_df[labels_df['exp'] == exp_id]['user'].iloc[0]
        acc_file = f"acc_exp{exp_id:02d}_user{user_id:02d}.txt"
        gyro_file = f"gyro_exp{exp_id:02d}_user{user_id:02d}.txt"
        
        # Load
        acc_raw = pd.read_csv(os.path.join(raw_path, acc_file), sep='\\s+', header=None).values
        gyro_raw = pd.read_csv(os.path.join(raw_path, gyro_file), sep='\\s+', header=None).values
        
        # Preprocess
        body_acc, gravity = separate_gravity(acc_raw, FS)
        full_data = np.hstack([body_acc, gyro_raw, acc_raw])
        
        # Segment
        exp_labels = labels_df[labels_df['exp'] == exp_id]
        for _, row in exp_labels.iterrows():
            start, end, act_id = row['start'], row['end'], row['act']
            segment = full_data[start:end]
            
            for i in range(0, len(segment) - WINDOW_SIZE, OVERLAP):
                window = segment[i : i + WINDOW_SIZE]
                
                # 1. Store Raw Window
                raw_windows.append(window)
                
                # 2. Calculate Features
                feats = extract_features(window)
                handcrafted_features.append(feats)
                
                # 3. Store Label
                labels.append(act_id - 1)

    return np.array(raw_windows), np.array(handcrafted_features), np.array(labels)

print("Processing Hybrid Dataset (Raw + Features)...")
X_raw, X_feat, y_all = load_hapt_hybrid("HAPT_Dataset")

# Normalize Features (Critical for MLP)
scaler = StandardScaler()
X_feat = scaler.fit_transform(X_feat)

# Transpose Raw for PyTorch (N, Channels, Time)
X_raw = np.transpose(X_raw, (0, 2, 1))

print(f"Raw Data Shape: {X_raw.shape}")
print(f"Feature Data Shape: {X_feat.shape}")
print(f"Labels Shape: {y_all.shape}")

Processing Hybrid Dataset (Raw + Features)...
Raw Data Shape: (10890, 9, 128)
Feature Data Shape: (10890, 110)
Labels Shape: (10890,)


### üß† 5. Hybrid Neural Network
This network has two "heads":
1.  **ConvNet Head:** Processes the raw 128-step time series.
2.  **Dense Head:** Processes the feature vector.
3.  **Fusion Layer:** Concatenates both and makes the final decision.

In [6]:
class HybridGaitNet(nn.Module):
    def __init__(self, n_raw_channels=9, n_features=63, n_classes=12):
        super(HybridGaitNet, self).__init__()
        
        # --- Branch A: Raw Signal (CNN) ---
        self.cnn = nn.Sequential(
            nn.Conv1d(n_raw_channels, 64, kernel_size=5, padding=2),
            nn.BatchNorm1d(64), 
            nn.ReLU(),
            nn.MaxPool1d(2),
            
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1), # Squeeze time -> (Batch, 128, 1)
            nn.Flatten()
        )
        
        # --- Branch B: Handcrafted Features (MLP) ---
        self.mlp = nn.Sequential(
            nn.Linear(n_features, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128, 64),
            nn.ReLU()
        )
        
        # --- Fusion ---
        # CNN out (128) + MLP out (64) = 192
        self.fusion = nn.Sequential(
            nn.Linear(128 + 64, 64),
            nn.ReLU(),
            nn.Linear(64, n_classes)
        )

    def forward(self, x_raw, x_feat):
        # Pass through separate branches
        out_cnn = self.cnn(x_raw)
        out_mlp = self.mlp(x_feat)
        
        # Concatenate
        combined = torch.cat((out_cnn, out_mlp), dim=1)
        
        # Final prediction
        return self.fusion(combined)

### üèãÔ∏è 6. Training the Hybrid Model

In [7]:
# Custom Dataset that returns TWO inputs
class HybridDataset(Dataset):
    def __init__(self, x_raw, x_feat, y):
        self.x_raw = torch.FloatTensor(x_raw)
        self.x_feat = torch.FloatTensor(x_feat)
        self.y = torch.LongTensor(y)
    def __len__(self): return len(self.y)
    def __getitem__(self, idx): return self.x_raw[idx], self.x_feat[idx], self.y[idx]

# Split
Xr_train, Xr_test, Xf_train, Xf_test, y_train, y_test = train_test_split(
    X_raw, X_feat, y_all, test_size=0.2, random_state=42, stratify=y_all
)

train_loader = DataLoader(HybridDataset(Xr_train, Xf_train, y_train), batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(HybridDataset(Xr_test, Xf_test, y_test), batch_size=BATCH_SIZE, shuffle=False)

# Setup
n_feats = X_feat.shape[1]
print(f"Detected {n_feats} features per window.")
model = HybridGaitNet(n_features=n_feats).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)

# Train
train_losses, test_accs = [], []
print("Starting Hybrid Training...")

for epoch in range(EPOCHS):
    model.train()
    running_loss = 0.0
    for r_in, f_in, labels in train_loader:
        r_in, f_in, labels = r_in.to(device), f_in.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(r_in, f_in)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        
    # Test
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for r_in, f_in, labels in test_loader:
            r_in, f_in, labels = r_in.to(device), f_in.to(device), labels.to(device)
            outputs = model(r_in, f_in)
            _, preds = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (preds == labels).sum().item()
            
    acc = 100 * correct / total
    test_accs.append(acc)
    train_losses.append(running_loss/len(train_loader))
    print(f"Epoch [{epoch+1}/{EPOCHS}] Loss: {train_losses[-1]:.4f} | Test Acc: {acc:.2f}%")

torch.save(model.state_dict(), "hybrid_gait_model.pth")

Detected 110 features per window.
Starting Hybrid Training...
Epoch [1/40] Loss: 0.6133 | Test Acc: 90.77%
Epoch [2/40] Loss: 0.2118 | Test Acc: 93.34%
Epoch [3/40] Loss: 0.1660 | Test Acc: 93.89%
Epoch [4/40] Loss: 0.1516 | Test Acc: 93.57%
Epoch [5/40] Loss: 0.1416 | Test Acc: 94.12%
Epoch [6/40] Loss: 0.1298 | Test Acc: 93.76%
Epoch [7/40] Loss: 0.1264 | Test Acc: 95.18%
Epoch [8/40] Loss: 0.1203 | Test Acc: 94.90%
Epoch [9/40] Loss: 0.1137 | Test Acc: 94.95%
Epoch [10/40] Loss: 0.1132 | Test Acc: 95.32%
Epoch [11/40] Loss: 0.1118 | Test Acc: 95.32%
Epoch [12/40] Loss: 0.1062 | Test Acc: 95.00%
Epoch [13/40] Loss: 0.1011 | Test Acc: 95.55%
Epoch [14/40] Loss: 0.1093 | Test Acc: 94.49%
Epoch [15/40] Loss: 0.1097 | Test Acc: 95.41%
Epoch [16/40] Loss: 0.1010 | Test Acc: 95.73%
Epoch [17/40] Loss: 0.1015 | Test Acc: 95.41%
Epoch [18/40] Loss: 0.0969 | Test Acc: 95.91%
Epoch [19/40] Loss: 0.0931 | Test Acc: 95.91%
Epoch [20/40] Loss: 0.0986 | Test Acc: 96.33%
Epoch [21/40] Loss: 0.0909 

### üì≤ 7. Phyphox: Hybrid Inference
Here we replicate the entire pipeline for your files.

In [8]:
HAPT_LABELS = {0: "WALKING", 1: "WALK_UP", 2: "WALK_DOWN", 3: "SITTING", 4: "STANDING", 5: "LAYING", 
               6: "STAND_TO_SIT", 7: "SIT_TO_STAND", 8: "SIT_TO_LIE", 9: "LIE_TO_SIT", 10: "STAND_TO_LIE", 11: "LIE_TO_STAND"}

def classify_phyphox_hybrid(acc_path, gyro_path):
    print(f"Processing {acc_path}...")
    try:
        df_acc = pd.read_csv(acc_path)
        df_gyro = pd.read_csv(gyro_path)
    except:
        print("‚ö†Ô∏è Files not found.")
        return
    
    # 1. Clean & Resample
    def clean_cols(df):
        col_map = {}
        for c in df.columns:
            c_lower = c.lower()
            if 'time' in c_lower: col_map[c] = 'time'
            elif 'x' in c_lower: col_map[c] = 'x'
            elif 'y' in c_lower: col_map[c] = 'y'
            elif 'z' in c_lower: col_map[c] = 'z'
        return df.rename(columns=col_map)[['time', 'x', 'y', 'z']]

    df_acc = clean_cols(df_acc)
    df_gyro = clean_cols(df_gyro)

    df_acc[['x', 'y', 'z']] = df_acc[['x', 'y', 'z']] / 9.80665
    
    # Interpolate to 50Hz
    t_start = max(df_acc['time'].min(), df_gyro['time'].min())
    t_end = min(df_acc['time'].max(), df_gyro['time'].max())
    new_time = np.arange(t_start, t_end, 1/50.0)

    def resample(df, t_target):
        df = df.set_index('time')
        return df.reindex(df.index.union(t_target)).interpolate(method='linear').reindex(t_target).values

    acc_data = resample(df_acc, new_time)
    gyro_data = resample(df_gyro, new_time)

    # 2. Gravity Sep
    body_acc, gravity = separate_gravity(acc_data, fs=50)
    full_signal = np.hstack([body_acc, gyro_data, acc_data])
    
    # 3. Windowing & Feature Extraction
    raw_wins = []
    feat_wins = []
    
    for i in range(0, len(full_signal) - WINDOW_SIZE, OVERLAP):
        win = full_signal[i : i + WINDOW_SIZE]
        raw_wins.append(win)
        feat_wins.append(extract_features(win))
        
    if not raw_wins: return
    
    # 4. Prepare Tensors
    # Raw: (N, 9, 128)
    X_raw_phy = np.array(raw_wins).transpose(0, 2, 1)
    # Feat: Normalize using the SAME scaler from training
    X_feat_phy = scaler.transform(np.array(feat_wins))
    
    # 5. Inference
    model.eval()
    with torch.no_grad():
        r_in = torch.FloatTensor(X_raw_phy).to(device)
        f_in = torch.FloatTensor(X_feat_phy).to(device)
        outputs = model(r_in, f_in)
        _, preds = torch.max(outputs, 1)
        
    # 6. Report
    print("\n--- üìä Hybrid Classification Results ---")
    unique, counts = np.unique(preds.cpu().numpy(), return_counts=True)
    total = sum(counts)
    for cls_idx, count in zip(unique, counts):
        print(f"{HAPT_LABELS[cls_idx]}: {(count/total)*100:.1f}%")

classify_phyphox_hybrid("Accelerometer.csv", "Gyroscope.csv")

Processing Accelerometer.csv...

--- üìä Hybrid Classification Results ---
WALKING: 66.7%
WALK_UP: 5.6%
WALK_DOWN: 2.8%
SIT_TO_LIE: 5.6%
STAND_TO_LIE: 19.4%
