### Proof of concept using PyTorch before implementing the model on the microcontroller

In [63]:
from pathlib import Path
import os
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.nn.functional as F

import librosa
import librosa.display
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torchaudio


In [64]:
DATA_DIR = "processed_data/raw/"
SAMPLE_RATE = 16000
NUM_CLASSES = 2  # change this based on your keywords
BATCH_SIZE = 64
EPOCHS = 20
LEARNING_RATE = 0.001
USE_MFCC = True  # Set to False to use spectrogram instead
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [65]:
# Dataset and Preprocessing

class KWSDataset(Dataset):
    def __init__(self, file_paths, labels, use_mfcc=True):
        self.file_paths = file_paths
        self.labels = labels
        self.use_mfcc = use_mfcc

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        label = self.labels[idx]
        y, sr = librosa.load(file_path, sr=SAMPLE_RATE)

        # Optional: Padding/truncation
        if len(y) < SAMPLE_RATE:
            y = np.pad(y, (0, SAMPLE_RATE - len(y)))
        else:
            y = y[:SAMPLE_RATE]

        # Feature extraction
        if self.use_mfcc:
            features = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=32) 
            features = (features - np.mean(features)) / (np.std(features) + 1e-6) # Normalize
        else:
            features = librosa.stft(y)
            features = np.abs(features)

        features = features[np.newaxis, ...]  # Add channel dimension
        return torch.tensor(features, dtype=torch.float32), torch.tensor(label)


In [66]:
# Data loading

# Example assumes structure: DATA_DIR/class_name/*.wav
def load_data(data_dir):
    all_paths = []
    all_labels = []
    label_names = sorted([d for d in os.listdir(data_dir) if not d.startswith('.') and os.path.isdir(os.path.join(data_dir, d))])
    print(f"Label names: {label_names}")
    label_map = {name: i for i, name in enumerate(label_names)}
    print(f"Label map: {label_map}")

    for label in label_names:
        class_dir = os.path.join(data_dir, label)
        wavs = list(Path(class_dir).rglob("*.wav"))
        print(f"Found {len(wavs)} files for label '{label}'")
        for wav_path in wavs:
            all_paths.append(str(wav_path))
            all_labels.append(label_map[label]) # contain 0 or 1 depending on class

    return all_paths, all_labels, label_map

file_paths, labels, label_map = load_data(DATA_DIR)
X_train, X_test, y_train, y_test = train_test_split(file_paths, labels, test_size=0.1, stratify=labels)

train_dataset = KWSDataset(X_train, y_train, use_mfcc=USE_MFCC)
test_dataset = KWSDataset(X_test, y_test, use_mfcc=USE_MFCC)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)


Label names: ['cough', 'non_cough']
Label map: {'cough': 0, 'non_cough': 1}
Found 8588 files for label 'cough'
Found 25742 files for label 'non_cough'


In [67]:
print(f"Size of test data: {len(test_dataset)}")

Size of test data: 3433


In [68]:
# Simulating the KWSetwork implemented in the the MAXIM examples
class PyTorchAI85Net20(nn.Module): 
    def __init__(self, num_classes=21, num_channels=1, dimensions=(64, 64), 
                 fc_inputs=30, bias=False):
        super().__init__()
        self.dim = dimensions[0]

        # Layer 1: Conv + ReLU
        self.conv1 = nn.Conv2d(num_channels, 15, kernel_size=3, padding=1, bias=bias)
        
        # Layer 2: MaxPool + Conv + ReLU
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) # 16x16
        self.conv2 = nn.Conv2d(15, 30, kernel_size=3, padding=1, bias=bias)
        
        # Layer 3: MaxPool + Conv + ReLU
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2) # 8x8
        self.conv3 = nn.Conv2d(30, 60, kernel_size=3, padding=1, bias=bias)
        
        # Layer 4: MaxPool + Conv + ReLU
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2) # 4x4
        self.conv4 = nn.Conv2d(60, 30, kernel_size=3, padding=1, bias=bias)
        
        # Layer 5: MaxPool + Conv + ReLU
        self.pool5 = nn.MaxPool2d(kernel_size=2, stride=2) # 2x2
        self.conv5 = nn.Conv2d(30, 30, kernel_size=3, padding=1, bias=bias)
        
        # Layer 6: Conv + ReLU
        self.conv6 = nn.Conv2d(30, fc_inputs, kernel_size=3, padding=1, bias=bias)
        
        # Final classification layer
        self.fc = nn.Linear(fc_inputs * (self.dim//16)**2, num_classes, bias=True)

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

    def forward(self, x):
        # Forward pass through all layers
        x = F.relu(self.conv1(x))
        
        x = self.pool2(x)
        x = F.relu(self.conv2(x))
        
        x = self.pool3(x)
        x = F.relu(self.conv3(x))
        
        x = self.pool4(x)
        x = F.relu(self.conv4(x))
        
        x = self.pool5(x)
        x = F.relu(self.conv5(x))
        
        x = F.relu(self.conv6(x))
        
        # Flatten and classify
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        
        return x

# Usage example:
#model = PyTorchAI85Net20(num_classes=2, dimensions=(32, 32))
#input_tensor = torch.randn(1, 1, 64, 64)  # Batch size 1, 1 channel, 64x64
#output = model(input_tensor)
#print(f"Output shape: {output.shape}")  # Should be torch.Size([1, 21])


In [69]:

#model = SimpleCNN(num_classes=NUM_CLASSES).to(DEVICE)
model = PyTorchAI85Net20(num_classes=NUM_CLASSES, dimensions=(32, 32)).to(DEVICE)

# ============================================
# ⚙️ 6. Training Loop
# ============================================

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

def train():
    model.train()
    for epoch in range(EPOCHS):
        running_loss = 0.0
        correct = 0
        total = 0
        for inputs, targets in tqdm(train_loader):
            inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
        
        print(f"Epoch {epoch+1} - Loss: {running_loss:.3f}, Acc: {100.*correct/total:.2f}%")

train()

100%|██████████| 483/483 [04:55<00:00,  1.63it/s]


Epoch 1 - Loss: 165.427, Acc: 85.29%


100%|██████████| 483/483 [03:52<00:00,  2.08it/s]


Epoch 2 - Loss: 88.882, Acc: 92.99%


100%|██████████| 483/483 [03:57<00:00,  2.03it/s]


Epoch 3 - Loss: 63.266, Acc: 95.27%


100%|██████████| 483/483 [04:23<00:00,  1.83it/s]


Epoch 4 - Loss: 49.516, Acc: 96.20%


100%|██████████| 483/483 [04:01<00:00,  2.00it/s]


Epoch 5 - Loss: 43.188, Acc: 96.68%


100%|██████████| 483/483 [04:14<00:00,  1.90it/s]


Epoch 6 - Loss: 37.366, Acc: 97.27%


100%|██████████| 483/483 [03:41<00:00,  2.18it/s]


Epoch 7 - Loss: 29.037, Acc: 97.84%


100%|██████████| 483/483 [03:41<00:00,  2.18it/s]


Epoch 8 - Loss: 27.030, Acc: 97.92%


100%|██████████| 483/483 [03:36<00:00,  2.23it/s]


Epoch 9 - Loss: 22.809, Acc: 98.25%


100%|██████████| 483/483 [03:31<00:00,  2.28it/s]


Epoch 10 - Loss: 18.780, Acc: 98.57%


100%|██████████| 483/483 [03:41<00:00,  2.18it/s]


Epoch 11 - Loss: 16.966, Acc: 98.71%


100%|██████████| 483/483 [03:38<00:00,  2.21it/s]


Epoch 12 - Loss: 14.665, Acc: 98.94%


100%|██████████| 483/483 [03:47<00:00,  2.12it/s]


Epoch 13 - Loss: 14.394, Acc: 98.93%


100%|██████████| 483/483 [04:04<00:00,  1.98it/s]


Epoch 14 - Loss: 12.683, Acc: 99.03%


100%|██████████| 483/483 [03:58<00:00,  2.02it/s]


Epoch 15 - Loss: 12.734, Acc: 99.03%


100%|██████████| 483/483 [04:23<00:00,  1.83it/s]


Epoch 16 - Loss: 9.325, Acc: 99.36%


100%|██████████| 483/483 [03:59<00:00,  2.01it/s]


Epoch 17 - Loss: 10.196, Acc: 99.23%


100%|██████████| 483/483 [04:07<00:00,  1.95it/s]


Epoch 18 - Loss: 10.569, Acc: 99.18%


100%|██████████| 483/483 [04:43<00:00,  1.70it/s]


Epoch 19 - Loss: 8.672, Acc: 99.31%


100%|██████████| 483/483 [04:03<00:00,  1.98it/s]

Epoch 20 - Loss: 7.527, Acc: 99.42%





In [71]:
# ============================================
# 📈 7. Evaluation
# ============================================

def evaluate():
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    print(f"Test Accuracy: {100. * correct / total:.2f}%")

evaluate()

Test Accuracy: 96.91%


In [74]:
torch.save(model.state_dict(), 'PyTorchAI85Net20_weights.pth')

In [76]:
loaded_model = PyTorchAI85Net20(num_classes=NUM_CLASSES, dimensions=(32, 32))
loaded_model.load_state_dict(torch.load('PyTorchAI85Net20_weights.pth', weights_only=True))
loaded_model.eval()

PyTorchAI85Net20(
  (conv1): Conv2d(1, 15, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(15, 30, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (pool3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(30, 60, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (pool4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv4): Conv2d(60, 30, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (pool5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv5): Conv2d(30, 30, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (conv6): Conv2d(30, 30, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (fc): Linear(in_features=120, out_features=2, bias=True)
)