### Init

In [None]:
# Using "content" in google colab, "work" in deepnote
!cp /content/msc-train.zip /tmp/msc-train.zip
!cp /content/msc-val.zip /tmp/msc-val.zip
!cp /content/msc-test.zip /tmp/msc-test.zip
!unzip -oqqq /content/msc-train.zip -d /tmp/msc-train
!unzip -oqqq /content/msc-val.zip -d /tmp/msc-val
!unzip -oqqq /content/msc-test.zip -d /tmp/msc-test

### Libraries

In [14]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchaudio
import torchaudio.transforms as T
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
import copy
import onnx
import wave
import shutil
import time
import onnxruntime as ort
from onnxruntime.quantization import (
    CalibrationDataReader, CalibrationMethod, QuantFormat,
    QuantType, StaticQuantConfig, quantize
)

### Configuration & Reproducibility Setup

In [15]:
CFG = {
    'seed': 42,
    'sampling_rate': 16000,
    # Feature Extraction
    'n_fft': 512,
    'win_length': 400,   # 25ms window
    'hop_length': 320,
    'n_mels': 46,        # High frequency resolution to balance time resolution loss

    # Training
    'batch_size': 32,
    'lr': 0.005,
    'epochs': 100,
    'device': torch.device("cuda" if torch.cuda.is_available() else "cpu")
}

def set_seed(seed=CFG['seed']):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True

set_seed()
print(f"[INFO] Device: {CFG['device']}")

[INFO] Device: cuda


### Dataset Management

In [16]:
class MSCDataset(Dataset):
    def __init__(self, data_dir, subset, wanted_words=None):
        self.subset = subset
        self.data_path = Path(data_dir)
        self.wanted_words = wanted_words
        if not self.data_path.exists():
            # Fallback or silent fail to avoid crash if path doesn't exist locally
            print(f"[WARNING] Directory not found: {self.data_path}")
            self.file_paths = []
        else:
            print(f"[INFO] Scanning {self.subset} set in {self.data_path}...")
            all_paths = sorted(list(self.data_path.glob("**/*.wav")))
            self.file_paths = []
            for p in all_paths:
                if p.name.startswith("._"): continue
                parts = p.name.split('_')
                if len(parts) > 0:
                    label = parts[0]
                    if self.wanted_words is None or label in self.wanted_words:
                        self.file_paths.append(p)

        if len(self.file_paths) > 0:
            self.labels_str = [p.name.split('_')[0] for p in self.file_paths]
            self.classes = sorted(set(self.labels_str))
            self.class_to_idx = {c: i for i, c in enumerate(self.classes)}
            self.labels = [self.class_to_idx[lbl] for lbl in self.labels_str]
            print(f"[INFO] Loaded {self.subset}: {len(self.file_paths)} files. Caching...")
            self.cached_wavs = [self.load_wav(str(p)) for p in self.file_paths]
        else:
            self.cached_wavs = []
            self.labels = []
            print(f"[WARNING] No files found for {self.subset}")

    def load_wav(self, wav_path):
        try:
            with wave.open(wav_path, 'rb') as wf:
                sr, n_frames = wf.getframerate(), wf.getnframes()
                n_channels, width = wf.getnchannels(), wf.getsampwidth()
                raw_bytes = wf.readframes(n_frames)
            if width == 2: data = np.frombuffer(raw_bytes, dtype=np.int16).astype(np.float32) / 32768.0
            elif width == 1: data = (np.frombuffer(raw_bytes, dtype=np.uint8).astype(np.float32) - 128.0) / 128.0
            else: data = np.frombuffer(raw_bytes, dtype=np.int32).astype(np.float32) / 2147483648.0
            if n_channels > 1: data = data.reshape(-1, n_channels)
            waveform = torch.from_numpy(data).float()
        except Exception as e:
            print(f"Error loading {wav_path}: {e}")
            return torch.zeros(1, CFG["sampling_rate"])

        if waveform.dim() == 1: waveform = waveform.unsqueeze(0)
        else: waveform = waveform.t()

        # Resample logic
        if sr != CFG["sampling_rate"]:
            waveform = T.Resample(sr, CFG["sampling_rate"])(waveform)

        if waveform.shape[1] > CFG["sampling_rate"]: waveform = waveform[:, :CFG["sampling_rate"]]
        elif waveform.shape[1] < CFG["sampling_rate"]:
            waveform = torch.nn.functional.pad(waveform, (0, CFG["sampling_rate"] - waveform.shape[1]))
        return waveform

    def __len__(self): return len(self.file_paths)
    def __getitem__(self, idx): return self.cached_wavs[idx], self.labels[idx]

### Data Loading

In [17]:
CLASSES = ['stop', 'up']
# Paths
train_ds = MSCDataset('/tmp/msc-train', subset='train', wanted_words=CLASSES)
val_ds   = MSCDataset('/tmp/msc-val',   subset='val',   wanted_words=CLASSES)
test_ds  = MSCDataset('/tmp/msc-test',  subset='test',  wanted_words=CLASSES)

train_loader = DataLoader(train_ds, batch_size=CFG['batch_size'], shuffle=True, num_workers=0)
val_loader   = DataLoader(val_ds, batch_size=CFG['batch_size'], shuffle=False, num_workers=0)
test_loader  = DataLoader(test_ds, batch_size=CFG['batch_size'], shuffle=False, num_workers=0)

[INFO] Scanning train set in /tmp/msc-train...
[INFO] Loaded train: 1600 files. Caching...
[INFO] Scanning val set in /tmp/msc-val...
[INFO] Loaded val: 200 files. Caching...
[INFO] Scanning test set in /tmp/msc-test...
[INFO] Loaded test: 200 files. Caching...


### Model Architecture

In [18]:
# Frontend:
class LogMelSpec(nn.Module):
    def __init__(self):
        super().__init__()
        self.melspec = T.MelSpectrogram(
            sample_rate=CFG['sampling_rate'],
            n_fft=CFG['n_fft'],
            win_length=CFG['win_length'],
            hop_length=CFG['hop_length'],
            n_mels=CFG['n_mels']
        )

    def forward(self, x):
        # x: (B, 1, T)
        mel = self.melspec(x)
        # Log compression
        return torch.log(mel + 1e-6)

# Augmentation:
class AddGaussianNoise(nn.Module):
    def __init__(self, p=0.5, min_amp=0.001, max_amp=0.015):
        super().__init__()
        self.p = p
        self.min_amp = min_amp
        self.max_amp = max_amp
    def forward(self, x):
        if self.training and random.random() < self.p:
            noise = torch.randn_like(x) * random.uniform(self.min_amp, self.max_amp)
            return x + noise
        return x

class RandomGain(nn.Module):
    def __init__(self, pb=0.5, min_gain=0.8, max_gain=1.2):
        super().__init__()
        self.pb = pb
        self.min_gain = min_gain
        self.max_gain = max_gain
    def forward(self, x):
        if self.training and random.random() < self.pb:
            return x * random.uniform(self.min_gain, self.max_gain)
        return x

class RandomShift(nn.Module):
    def __init__(self, p=0.5, max_shift=1600):
        super().__init__()
        self.p = p
        self.max_shift = max_shift
    def forward(self, x):
        if self.training and random.random() < self.p:
            shift = random.randint(-self.max_shift, self.max_shift)
            return torch.roll(x, shift, dims=-1)
        return x

augment = nn.Sequential(
    RandomShift(p=0.6),
    RandomGain(pb=0.5),
    AddGaussianNoise(p=0.4)
)

# Backend: DSCNN
class DSCNN(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        # Base=28 for lower latency on RPi
        base = 28

        self.conv1 = nn.Conv2d(1, base, kernel_size=3, stride=(2, 1), padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(base)
        self.relu = nn.ReLU(inplace=True)

        self.blocks = nn.Sequential(
            self._ds_block(base, base, 1),
            self._ds_block(base, base*2, 2),
            self._ds_block(base*2, base*2, 1),
            self._ds_block(base*2, base*2, 1),
            self._ds_block(base*2, base*2, 1)
        )

        self.pool = nn.AdaptiveAvgPool2d(1)
        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Linear(base*2, num_classes)

    @property
    def backbone(self):
        return self

    def _ds_block(self, in_ch, out_ch, stride):
        return nn.Sequential(
            # Depthwise
            nn.Conv2d(in_ch, in_ch, 3, stride=stride, padding=1, groups=in_ch, bias=False),
            nn.BatchNorm2d(in_ch),
            nn.ReLU(inplace=True),
            # Pointwise
            nn.Conv2d(in_ch, out_ch, 1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.blocks(x)
        x = self.pool(x).flatten(1)
        x = self.dropout(x)
        x = self.fc(x)
        return x

frontend = LogMelSpec().to(CFG['device'])
model = DSCNN(num_classes=len(CLASSES)).to(CFG['device'])

print(f"Total Number of Model Parameters: {sum(p.numel() for p in model.parameters()):,}")

Total Number of Model Parameters: 15,150


### Training

In [19]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=CFG['lr'], weight_decay=1e-3)
# OneCycleLR for convergence
scheduler = optim.lr_scheduler.OneCycleLR(
    optimizer, max_lr=CFG['lr'],
    steps_per_epoch=len(train_loader),
    epochs=CFG['epochs']
)

best_acc = 0.0
best_model_wts = copy.deepcopy(model.state_dict())

if len(train_ds) > 0: # Only train if data is present
    print("[INFO] Starting Training...")
    for epoch in range(CFG['epochs']):
        # Train
        model.train()
        for wavs, labels in train_loader:
            wavs, labels = wavs.to(CFG['device']), labels.to(CFG['device'])

            with torch.no_grad():
                # Apply Augmentations in Time Domain
                wavs = augment(wavs)
                features = frontend(wavs)

            # Freq Masking
            if random.random() < 0.3:
                 features = T.FrequencyMasking(freq_mask_param=10)(features)

            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step() # Step per batch for OneCycleLR

        # Validation
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for wavs, labels in val_loader:
                wavs, labels = wavs.to(CFG['device']), labels.to(CFG['device'])
                features = frontend(wavs)
                outputs = model(features)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_acc = 100 * correct / total
        if val_acc > best_acc:
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())

        if (epoch+1) % 5 == 0:
            print(f"Epoch {epoch+1:2d}/{CFG['epochs']} | Val Acc: {val_acc:.2f}% (Best: {best_acc:.2f}%)")

else:
    print("ERROR: No training data found. Skipping training loop.")


# Test Accuracy
if len(test_ds) > 0:
    print("\nEvaluating on Test Set...")
    model.load_state_dict(best_model_wts)
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for wavs, labels in test_loader:
            wavs, labels = wavs.to(CFG['device']), labels.to(CFG['device'])
            features = frontend(wavs)
            outputs = model(features)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_acc = 100 * correct / total
    print(f"Final Test Accuracy: {test_acc:.2f}%")

    if test_acc > 99.4:
        print("Accuracy requirement met (> 99.4%)")
    else:
        print(f"Accuracy is below target ({test_acc:.2f}% vs 99.4%). Consider increasing epochs or model capacity.")
else:
    print("No test data present.")

[INFO] Starting Training...
Epoch  5/100 | Val Acc: 78.50% (Best: 78.50%)
Epoch 10/100 | Val Acc: 94.50% (Best: 94.50%)
Epoch 15/100 | Val Acc: 85.00% (Best: 95.50%)
Epoch 20/100 | Val Acc: 97.00% (Best: 97.00%)
Epoch 25/100 | Val Acc: 96.50% (Best: 97.00%)
Epoch 30/100 | Val Acc: 97.50% (Best: 97.50%)
Epoch 35/100 | Val Acc: 96.00% (Best: 98.50%)
Epoch 40/100 | Val Acc: 96.50% (Best: 98.50%)
Epoch 45/100 | Val Acc: 97.00% (Best: 98.50%)
Epoch 50/100 | Val Acc: 96.50% (Best: 98.50%)
Epoch 55/100 | Val Acc: 98.00% (Best: 98.50%)
Epoch 60/100 | Val Acc: 97.50% (Best: 98.50%)
Epoch 65/100 | Val Acc: 98.00% (Best: 98.50%)
Epoch 70/100 | Val Acc: 98.00% (Best: 98.50%)
Epoch 75/100 | Val Acc: 98.50% (Best: 98.50%)
Epoch 80/100 | Val Acc: 98.00% (Best: 99.00%)
Epoch 85/100 | Val Acc: 98.00% (Best: 99.00%)
Epoch 90/100 | Val Acc: 98.00% (Best: 99.00%)
Epoch 95/100 | Val Acc: 98.00% (Best: 99.00%)
Epoch 100/100 | Val Acc: 98.00% (Best: 99.00%)

Evaluating on Test Set...
Final Test Accuracy: 100

### Export, Quantization, Evaluation

In [None]:
# Bind frontend and backbone for export
model.frontend = frontend

print(f"\nExporting : ")

import shutil

OUTPUT_DIR = "/tmp"
if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR)

frontend_fname = os.path.join(OUTPUT_DIR, "Group1_frontend.onnx")
model_fp32_fname = os.path.join(OUTPUT_DIR, "Group1_model_float32.onnx")
model_int8_fname = os.path.join(OUTPUT_DIR, "Group1_model.onnx")

print(f"[INFO] Saving files to: {OUTPUT_DIR}")

device_cpu = torch.device("cpu")
model.to(device_cpu)
model.eval()

# Dummy input :
dummy_input = torch.randn(1, 1, CFG['sampling_rate']).to(device_cpu)

#1) Frontend Export
try:
    torch.onnx.export(
        model.frontend, dummy_input, frontend_fname,
        input_names=['input'], output_names=['output'],
        dynamo=True, optimize=True, report=False, external_data=False
    )
    time.sleep(2)
    if not os.path.exists(frontend_fname): raise FileNotFoundError(f"Frontend not found at {frontend_fname}")
    print("Frontend Exported.")
except Exception as e:
    print(f"Frontend Export Error: {e}")

#2) Backbone Export
try:
    with torch.no_grad():
        # Frontend to Backbone
        frontend_out = model.frontend(dummy_input)
        if frontend_out.dim() == 3: frontend_out = frontend_out.unsqueeze(1)

    torch.onnx.export(
        model.backbone, frontend_out, model_fp32_fname,
        input_names=['input'], output_names=['output'],
        dynamo=True, optimize=True, report=False, external_data=False
    )
    time.sleep(2)
    if not os.path.exists(model_fp32_fname): raise FileNotFoundError(f"Backbone not found at {model_fp32_fname}")
    print("Backbone Exported.")
except Exception as e:
    print(f"Backbone Export Error: {e}")

#3) Quantization
class DataReader(CalibrationDataReader):
    def __init__(self, dataset, frontend_path):
        self.dataset = dataset
        self.ort_frontend = ort.InferenceSession(frontend_path, providers=['CPUExecutionProvider'])
        self.enum_data = None
    def get_next(self):
        if self.enum_data is None: self.enum_data = iter(self.dataset)
        batch = next(self.enum_data, None)
        if batch is None: return None
        x, _ = batch
        if isinstance(x, torch.Tensor): x = x.detach().cpu().numpy()
        if x.ndim == 1: x = np.expand_dims(x, 0)
        if x.ndim == 2: x = np.expand_dims(x, 0)
        frontend_in = self.ort_frontend.get_inputs()[0].name
        feats = self.ort_frontend.run(None, {frontend_in: x})[0]
        # Backbone size:
        if len(feats.shape) == 3: feats = np.expand_dims(feats, 1)
        return {'input': feats}
    def rewind(self): self.enum_data = None

if os.path.exists(model_fp32_fname) and os.path.exists(frontend_fname):
    print("Starting Quantization Process...")
    dr = DataReader(val_ds, frontend_fname)
    q_config = StaticQuantConfig(dr, quant_format=QuantFormat.QDQ, calibrate_method=CalibrationMethod.MinMax, activation_type=QuantType.QInt8, weight_type=QuantType.QInt8, per_channel=False)
    quantize(model_fp32_fname, model_int8_fname, q_config)

    f_size = os.path.getsize(frontend_fname) / 1024
    m_size = os.path.getsize(model_int8_fname) / 1024
    t_size = f_size + m_size


    print(f"\nFiles are ready at: {OUTPUT_DIR}:")
    print(f" - Frontend: {f_size:.2f} KB")
    print(f" - Model:    {m_size:.2f} KB")
    print(f"----------------------------------------------")
    print(f"Total Size:  {t_size:.2f} KB")



    try:
        current_dir = os.getcwd()
        shutil.copy(frontend_fname, os.path.join(current_dir, "Group1_frontend.onnx"))
        shutil.copy(model_int8_fname, os.path.join(current_dir, "Group1_model.onnx"))
        print(f"Files copied to current directory: {current_dir}")
    except Exception as e:
        print(f"Could not copy to current dir (permissions?), but files are safe in {OUTPUT_DIR}")

    if t_size < 300:
        print(f"Submission is under 300 KB.")
    else:
        print(f"Warning: Total size is {t_size:.2f} KB.")
else:
    print("Error: Export files missing.")

# File
source_dir = '/tmp'    # Source directory
target_dir = '/work'   # Target directory

print(f"\nMoving files from {source_dir} to {target_dir}...")

# Ensure target directory exists
if not os.path.exists(target_dir):
    os.makedirs(target_dir)

# Move ONNX files
files_found = 0
if os.path.exists(source_dir):
    print(f"Checking {source_dir} Content")
    for file_name in os.listdir(source_dir):
        # Filter for .onnx files
        if file_name.endswith('.onnx'):
            full_source_path = os.path.join(source_dir, file_name)
            full_target_path = os.path.join(target_dir, file_name)

            try:
                shutil.copy(full_source_path, full_target_path)
                print(f"{file_name} -> Copied to workspace.")
                files_found += 1
            except Exception as e:
                print(f"Could not copy {file_name}. Reason: {e}")

    if files_found == 0:
        print("WARNING: No .onnx files found in /tmp. Export might have failed.")
    else:
        print(f"\n Process Complete! {files_found} model files should be visible in the file explorer.")

else:
    print(f"Error: Source directory ({source_dir}) not found.")


Exporting : 
[INFO] Saving files to: /tmp
[torch.onnx] Obtain model graph for `LogMelSpec([...]` with `torch.export.export(..., strict=False)`...
[torch.onnx] Obtain model graph for `LogMelSpec([...]` with `torch.export.export(..., strict=False)`... ✅
[torch.onnx] Run decomposition...
[torch.onnx] Run decomposition... ✅
[torch.onnx] Translate the graph into ONNX...
[torch.onnx] Translate the graph into ONNX... ✅
Applied 1 of general pattern rewrite rules.
Frontend Exported.
[torch.onnx] Obtain model graph for `DSCNN([...]` with `torch.export.export(..., strict=False)`...
[torch.onnx] Obtain model graph for `DSCNN([...]` with `torch.export.export(..., strict=False)`... ✅
[torch.onnx] Run decomposition...
[torch.onnx] Run decomposition... ✅
[torch.onnx] Translate the graph into ONNX...
[torch.onnx] Translate the graph into ONNX... ✅
Applied 22 of general pattern rewrite rules.




Backbone Exported.
Starting Quantization Process...





Files are ready at: /tmp:
 - Frontend: 62.04 KB
 - Model:    55.44 KB
----------------------------------------------
Total Size:  117.48 KB
Files copied to current directory: /content
Submission is under 300 KB.

Moving files from /tmp to /work...
Checking /tmp Content
Group1_model.onnx -> Copied to workspace.
Group1_frontend.onnx -> Copied to workspace.
Group1_model_float32.onnx -> Copied to workspace.

 Process Complete! 3 model files should be visible in the file explorer.


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=154c2188-6af2-4819-9ca5-4f23502d7a8f' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>