# CAM++ Speaker Verification Deep Tutorial
# CAM++ 声纹识别深度教程

**Author**: 3D-Speaker Team  
**Target Audience**: Deep learning beginners, speaker recognition researchers  
**Estimated Time**: 4-6 hours

---

## Table of Contents

1. [Fundamentals](#1-fundamentals)
   - Speaker Verification Introduction
   - Common Features (MFCC, Fbank)
   - Loss Functions (ArcFace, AAM Loss)
2. [CAM++ Architecture Deep Dive](#2-cam-architecture-deep-dive)
   - ResNet Backbone
   - Multi-scale Pooling
   - Context-Aware Design
3. [Implementation](#3-implementation)
   - Data Preprocessing
   - Model Definition
   - Training Loop and Validation
4. [Model Export and Optimization](#4-model-export-and-optimization)
   - ONNX Conversion
   - INT8 Quantization
5. [Embedded Deployment Guide](#5-embedded-deployment-guide)
   - RK3568 Development Board
   - Real-time Inference Script
6. [Extensions](#6-extensions)
   - Industrial Anomaly Detection

---

# 1. Fundamentals

## 1.1 Speaker Verification Introduction

**Speaker Verification/Recognition** is a biometric identification technology based on speech signals that confirms or identifies a speaker's identity by analyzing their acoustic characteristics.

### Main Task Types

| Task Type | Description | Application |
|-----------|-------------|-------------|
| **Speaker Verification** | 1:1 verification | Identity authentication, security systems |
| **Speaker Identification** | 1:N identification | Meeting transcription, smart assistants |
| **Speaker Diarization** | Segment audio by speaker | Conference transcription, call analysis |

### Technical Pipeline

```
Audio Input -> Preprocessing -> Feature Extraction -> Speaker Embedding -> Similarity -> Decision
     |              |                |                      |                |           |
     +-- Denoise ---+--- Fbank ------+------ CAM++ ---------+--- Cosine -----+-- Threshold
```

In [None]:
# Environment Check and Dependencies
import sys
import os

# Add project root to path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

print(f"Python Version: {sys.version}")
print(f"Project Root: {project_root}")

# Check required dependencies
required_packages = ['torch', 'torchaudio', 'numpy', 'matplotlib']
missing_packages = []

for package in required_packages:
    try:
        __import__(package)
        print(f"[OK] {package} installed")
    except ImportError:
        missing_packages.append(package)
        print(f"[X] {package} not installed")

if missing_packages:
    print(f"\nPlease install missing packages: pip install {' '.join(missing_packages)}")

## 1.2 Common Acoustic Features

### 1.2.1 MFCC (Mel-Frequency Cepstral Coefficients)

MFCC is the classic speech feature that simulates human auditory perception:

**Extraction Pipeline**:
1. Pre-emphasis
2. Framing & Windowing
3. FFT (Fast Fourier Transform)
4. Mel Filter Bank
5. Log operation
6. DCT (Discrete Cosine Transform)

### 1.2.2 Fbank (Filter Bank Features)

Fbank is the predecessor of MFCC, preserving more spectral information:

**Advantages**:
- Preserves more raw spectral information
- Suitable for deep learning models
- High computational efficiency

**This project uses**: 80-dimensional Fbank features

In [None]:
# Feature Extraction Demonstration
import torch
import numpy as np
import matplotlib.pyplot as plt

# Simulate audio signal (use real audio in practice)
sample_rate = 16000
duration = 3.0  # seconds
t = np.linspace(0, duration, int(sample_rate * duration))

# Generate simulated speech signal (sum of sinusoids)
frequencies = [200, 400, 800, 1200]  # fundamental and harmonics
signal = np.zeros_like(t)
for i, f in enumerate(frequencies):
    signal += (1.0 / (i + 1)) * np.sin(2 * np.pi * f * t)
signal = signal / np.max(np.abs(signal))  # normalize

print(f"Sample Rate: {sample_rate} Hz")
print(f"Duration: {duration} s")
print(f"Number of Samples: {len(signal)}")

# Visualize waveform
plt.figure(figsize=(14, 4))
plt.subplot(1, 2, 1)
plt.plot(t[:1600], signal[:1600])  # show first 100ms
plt.title('Audio Waveform (First 100ms)')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.grid(True)

plt.subplot(1, 2, 2)
# Simple spectrogram
try:
    from scipy import signal as scipy_signal
    frequencies_spec, times_spec, Sxx = scipy_signal.spectrogram(
        signal, sample_rate, nperseg=400, noverlap=240)
    plt.pcolormesh(times_spec, frequencies_spec[:50], 
                   10 * np.log10(Sxx[:50] + 1e-10))
    plt.title('Spectrogram')
    plt.xlabel('Time (s)')
    plt.ylabel('Frequency (Hz)')
    plt.colorbar(label='Power (dB)')
except ImportError:
    plt.text(0.5, 0.5, 'scipy not installed', ha='center', va='center')
plt.tight_layout()
plt.show()

In [None]:
# Fbank Feature Extraction (using project's FBank class)
try:
    from speakerlab.process.processor import FBank
    
    # Create Fbank feature extractor
    fbank_extractor = FBank(
        n_mels=80,           # Number of Mel filters
        sample_rate=16000,   # Sample rate
        mean_nor=True        # Mean normalization
    )
    
    # Convert to tensor and extract features
    wav_tensor = torch.from_numpy(signal).float()
    fbank_features = fbank_extractor(wav_tensor)
    
    print(f"Fbank Feature Shape: {fbank_features.shape}")
    print(f"Time Frames: {fbank_features.shape[0]}")
    print(f"Mel Bins: {fbank_features.shape[1]}")
    
    # Visualize Fbank features
    plt.figure(figsize=(12, 4))
    plt.imshow(fbank_features.T.numpy(), aspect='auto', origin='lower', cmap='viridis')
    plt.colorbar(label='Magnitude')
    plt.title('Fbank Features (80-dim)')
    plt.xlabel('Time Frame')
    plt.ylabel('Mel Frequency Bin')
    plt.show()
    
except Exception as e:
    print(f"Feature extraction demo requires complete project environment: {e}")
    print("Please run this notebook from the project root directory")

## 1.3 Loss Functions

### 1.3.1 Softmax Loss (Baseline)

Standard cross-entropy loss for classification.

**Drawback**: Intra-class distance may be large, inter-class distance may be small

### 1.3.2 ArcFace Loss (Additive Angular Margin Loss)

Adds margin in angular space to enhance inter-class separability.

Key parameters:
- cosine: Cosine similarity between feature and class weight
- sine: Computed from cosine (sine of the angle)
- phi: cos(theta + m), used for margin penalty

### 1.3.3 Implementation in this Project

See [`speakerlab/loss/margin_loss.py`](../speakerlab/loss/margin_loss.py) for the full implementation.

You can import it in your code:
```python
from speakerlab.loss.margin_loss import ArcMarginLoss
```

In [None]:
# ArcFace Loss Implementation Analysis
# Simplified demo based on speakerlab/loss/margin_loss.py (see source for full implementation)

import math
import torch
import torch.nn as nn

class ArcMarginLossDemo(nn.Module):
    """
    Implement Additive Angular Margin Loss (ArcFace)
    
    Args:
        scale: Scale factor, controls decision boundary temperature
        margin: Angular margin, increases positive sample difficulty
        easy_margin: Whether to use simplified margin
    """
    def __init__(self, scale=32.0, margin=0.2, easy_margin=False):
        super(ArcMarginLossDemo, self).__init__()
        self.scale = scale
        self.easy_margin = easy_margin
        self.criterion = nn.CrossEntropyLoss()
        self.update(margin)

    def forward(self, cosine, label):
        # cosine: [batch, num_classes] - normalized cosine similarity
        # label: [batch] - ground truth labels
        
        # Calculate sin(theta) for angular transformation
        sine = torch.sqrt(1.0 - torch.pow(cosine, 2))
        
        # cos(theta + m) = cos(theta)*cos(m) - sin(theta)*sin(m)
        phi = cosine * self.cos_m - sine * self.sin_m
        
        # Handle boundary conditions
        if self.easy_margin:
            phi = torch.where(cosine > 0, phi, cosine)
        else:
            phi = torch.where(cosine > self.th, phi, cosine - self.mmm)
        
        # Only apply margin to correct class
        one_hot = torch.zeros(cosine.size()).type_as(cosine)
        one_hot.scatter_(1, label.unsqueeze(1).long(), 1)
        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)
        
        # Apply scale factor
        output *= self.scale
        
        loss = self.criterion(output, label)
        return loss

    def update(self, margin=0.2):
        self.margin = margin
        self.cos_m = math.cos(margin)
        self.sin_m = math.sin(margin)
        self.th = math.cos(math.pi - margin)
        # Note: self.mm kept for compatibility with source code structure
        self.mm = math.sin(math.pi - margin) * margin
        self.mmm = 1.0 + math.cos(math.pi - margin)
        self.m = self.margin  # For compatibility with source

# Test the loss function
loss_fn = ArcMarginLossDemo(scale=32.0, margin=0.2)
print("ArcMarginLoss created successfully!")
print(f"Scale: {loss_fn.scale}, Margin: {loss_fn.margin}")

In [None]:
# ArcFace Loss Visualization
import numpy as np
import matplotlib.pyplot as plt

# Create angle range
theta = np.linspace(0, np.pi, 200)

# Different margin values
margins = [0.0, 0.2, 0.4, 0.5]

plt.figure(figsize=(14, 5))

# Subplot 1: Angular transformation with different margins
plt.subplot(1, 2, 1)
for m in margins:
    cos_theta = np.cos(theta)
    cos_m = np.cos(m)
    sin_m = np.sin(m)
    sin_theta = np.sin(theta)
    
    # cos(theta + m)
    cos_theta_m = cos_theta * cos_m - sin_theta * sin_m
    
    plt.plot(np.degrees(theta), cos_theta_m, label=f'm={m}', linewidth=2)

plt.axhline(y=0, color='gray', linestyle='--', alpha=0.5)
plt.xlabel('Theta (degrees)')
plt.ylabel('cos(theta + m)')
plt.title('Angular Margin Effect')
plt.legend()
plt.grid(True, alpha=0.3)

# Subplot 2: Decision boundary visualization
plt.subplot(1, 2, 2)
angles = np.linspace(0, 2*np.pi, 100)
for i, m in enumerate([0.0, 0.3]):
    r1 = 1.0
    x1 = r1 * np.cos(angles)
    y1 = r1 * np.sin(angles)
    plt.plot(x1, y1, '--' if m == 0 else '-', 
             label=f'Decision boundary (m={m})', alpha=0.7)

# Draw class centers
centers = [(0.7, 0.7), (-0.7, 0.7), (0, -0.9)]
colors = ['red', 'blue', 'green']
for (cx, cy), c in zip(centers, colors):
    plt.scatter(cx, cy, c=c, s=100, marker='*', edgecolors='black', zorder=5)
    
plt.xlim(-1.5, 1.5)
plt.ylim(-1.5, 1.5)
plt.xlabel('Feature Dim 1')
plt.ylabel('Feature Dim 2')
plt.title('Feature Space with Angular Margin')
plt.legend()
plt.grid(True, alpha=0.3)
plt.axis('equal')

plt.tight_layout()
plt.show()

print("Key Parameters:")
print("- scale (s=32): Controls logits range, affects convergence speed")
print("- margin (m=0.2): Angular margin, increases inter-class distance")
print("- Training: margin increases from 0 to 0.2 (margin scheduling)")

---

# 2. CAM++ Architecture Deep Dive

## 2.1 Architecture Overview

CAM++ (Context-Aware Masking) is a state-of-the-art speaker embedding network that achieves excellent performance with relatively small model size (7.2M parameters).

### Key Components

```
Input (B, T, 80)
       |
       v
+------------------+
|   FCM Module     |  <- ResNet-based 2D CNN for frequency modeling
|  (Freq Conv)     |
+------------------+
       |
       v
+------------------+
|   TDNN Layer     |  <- Initial temporal modeling
+------------------+
       |
       v
+------------------+
| CAMDenseTDNN x3  |  <- Context-Aware Multi-scale Dense Blocks
|   Block 1        |     12 layers, kernel=3, dilation=1
|   Block 2        |     24 layers, kernel=3, dilation=2
|   Block 3        |     16 layers, kernel=3, dilation=2
+------------------+
       |
       v
+------------------+
|   Stats Pool     |  <- Mean + Std pooling
+------------------+
       |
       v
+------------------+
|   Dense Layer    |  <- Final embedding (512-dim)
+------------------+
```

## 2.2 FCM (Frequency Convolution Module)

The FCM module uses 2D convolutions to model frequency patterns, similar to image processing.

In [None]:
# FCM Module Analysis
# From speakerlab/models/campplus/DTDNN.py

import torch
import torch.nn as nn
import torch.nn.functional as F

class BasicResBlock(nn.Module):
    """Basic ResNet block for 2D convolution"""
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicResBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3,
                               stride=(stride, 1), padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes,
                          kernel_size=1, stride=(stride, 1), bias=False),
                nn.BatchNorm2d(self.expansion * planes))

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)  # Residual connection
        out = F.relu(out)
        return out

class FCM(nn.Module):
    """
    Frequency Convolution Module
    Uses 2D CNN to process mel-spectrogram as image
    
    Args:
        block: ResNet block type
        num_blocks: Number of blocks per layer
        m_channels: Channel dimension
        feat_dim: Input feature dimension (mel bins)
    """
    def __init__(self, block=BasicResBlock, num_blocks=[2, 2], 
                 m_channels=32, feat_dim=80):
        super(FCM, self).__init__()
        self.in_planes = m_channels
        
        # Initial convolution
        self.conv1 = nn.Conv2d(1, m_channels, kernel_size=3, 
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(m_channels)

        # ResNet layers (downsample frequency dimension)
        self.layer1 = self._make_layer(block, m_channels, num_blocks[0], stride=2)
        self.layer2 = self._make_layer(block, m_channels, num_blocks[1], stride=2)

        # Final convolution
        self.conv2 = nn.Conv2d(m_channels, m_channels, kernel_size=3, 
                               stride=(2, 1), padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(m_channels)
        
        # Output channels: m_channels * (feat_dim // 8)
        self.out_channels = m_channels * (feat_dim // 8)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        # x: (B, F, T) -> (B, 1, F, T)
        x = x.unsqueeze(1)
        
        # Initial conv
        out = F.relu(self.bn1(self.conv1(x)))
        
        # ResNet blocks (downsample F by 4x)
        out = self.layer1(out)  # F/2
        out = self.layer2(out)  # F/4
        
        # Final conv (downsample F by 2x more)
        out = F.relu(self.bn2(self.conv2(out)))  # F/8

        # Reshape: (B, C, F/8, T) -> (B, C*F/8, T)
        shape = out.shape
        out = out.reshape(shape[0], shape[1]*shape[2], shape[3])
        return out

# Test FCM
fcm = FCM(feat_dim=80, m_channels=32)
dummy_input = torch.randn(2, 80, 200)  # (batch, freq, time)
output = fcm(dummy_input)
print(f"FCM Input Shape: {dummy_input.shape}")
print(f"FCM Output Shape: {output.shape}")
print(f"FCM Output Channels: {fcm.out_channels}")

## 2.3 CAM Layer (Context-Aware Masking)

The core innovation of CAM++ is the **Context-Aware Masking** mechanism that combines:
1. **Local features**: Standard 1D convolution
2. **Global context**: Segment-level pooling + channel attention

This allows the model to adaptively weight local and global information.

In [None]:
# CAM Layer Analysis
# From speakerlab/models/campplus/layers.py
# Note: This is an educational demonstration of the CAMLayer structure.
# The seg_pooling method below is part of the CAMLayer class.

import torch.nn as nn
import torch.nn.functional as F

class CAMLayer(nn.Module):
    """
    Context-Aware Masking Layer
    
    Combines local convolution with global context attention.
    
    Key idea:
    - Local: Standard convolution captures local patterns
    - Global: Segment pooling + FC layers create attention mask
    - Output: Local features * Global attention mask
    """
    def __init__(self, bn_channels, out_channels, kernel_size,
                 stride, padding, dilation, bias, reduction=2):
        super(CAMLayer, self).__init__()
        
        # Local convolution branch
        self.linear_local = nn.Conv1d(bn_channels, out_channels, kernel_size,
                                      stride=stride, padding=padding,
                                      dilation=dilation, bias=bias)
        
        # Global context branch (channel attention)
        self.linear1 = nn.Conv1d(bn_channels, bn_channels // reduction, 1)
        self.relu = nn.ReLU(inplace=True)
        self.linear2 = nn.Conv1d(bn_channels // reduction, out_channels, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Local features
        y = self.linear_local(x)
        
        # Global context: mean pooling + segment pooling
        context = x.mean(-1, keepdim=True) + self.seg_pooling(x)
        
        # Attention mask through bottleneck
        context = self.relu(self.linear1(context))
        m = self.sigmoid(self.linear2(context))
        
        # Apply attention mask to local features
        return y * m

    def seg_pooling(self, x, seg_len=100, stype='avg'):
        """
        Segment-level pooling for multi-scale context
        
        Divides input into segments and pools each segment,
        then broadcasts back to original length.
        """
        if stype == 'avg':
            seg = F.avg_pool1d(x, kernel_size=seg_len, stride=seg_len, ceil_mode=True)
        elif stype == 'max':
            seg = F.max_pool1d(x, kernel_size=seg_len, stride=seg_len, ceil_mode=True)
        else:
            raise ValueError('Wrong segment pooling type.')
        
        # Broadcast pooled values back to original length
        shape = seg.shape
        seg = seg.unsqueeze(-1).expand(*shape, seg_len).reshape(*shape[:-1], -1)
        seg = seg[..., :x.shape[-1]]
        return seg

# Visualize CAM mechanism
print("CAM Layer combines:")
print("1. Local Conv: Captures local temporal patterns")
print("2. Global Context: Mean + Segment pooling")
print("3. Attention: Sigmoid gate on global context")
print("4. Output: Local * Attention (element-wise)")

## 2.4 Complete CAM++ Model

In [None]:
# Complete CAM++ Model
# From speakerlab/models/campplus/DTDNN.py
#
# IMPORTANT: This code demonstrates the architecture structure.
# To run this, you need to import the helper classes from the project:
#   from speakerlab.models.campplus.layers import (
#       TDNNLayer, CAMDenseTDNNBlock, TransitLayer, 
#       StatsPool, DenseLayer, get_nonlinear
#   )
# The FCM class is defined in the previous code cell - ensure it is run first.

from collections import OrderedDict
import torch
from torch import nn
import torch.nn.functional as F

# Import required components from speakerlab
try:
    from speakerlab.models.campplus.layers import (
        TDNNLayer, CAMDenseTDNNBlock, TransitLayer,
        StatsPool, DenseLayer, get_nonlinear
    )
    LAYERS_AVAILABLE = True
except ImportError:
    print("Note: speakerlab.models.campplus.layers not available.")
    print("The CAMPPlus class below shows the architecture structure.")
    LAYERS_AVAILABLE = False

class CAMPPlus(nn.Module):
    """
    CAM++ Speaker Embedding Network
    
    Architecture:
    1. FCM: 2D CNN for frequency modeling
    2. TDNN: Initial temporal convolution
    3. CAMDenseTDNN Blocks x3: Multi-scale context-aware blocks
    4. Stats Pooling: Mean + Std aggregation
    5. Dense: Final embedding projection
    """
    def __init__(self, feat_dim=80, embedding_size=512, growth_rate=32,
                 bn_size=4, init_channels=128, config_str='batchnorm-relu',
                 memory_efficient=True):
        super(CAMPPlus, self).__init__()

        # FCM module for frequency processing
        self.head = FCM(feat_dim=feat_dim)
        channels = self.head.out_channels

        # Build x-vector style network with CAM blocks
        self.xvector = nn.Sequential(OrderedDict([
            ('tdnn', TDNNLayer(channels, init_channels, 5, stride=2,
                              dilation=1, padding=-1, config_str=config_str)),
        ]))
        channels = init_channels
        
        # Three CAMDenseTDNN blocks with different configurations
        block_configs = [
            (12, 3, 1),  # Block 1: 12 layers, kernel=3, dilation=1
            (24, 3, 2),  # Block 2: 24 layers, kernel=3, dilation=2
            (16, 3, 2),  # Block 3: 16 layers, kernel=3, dilation=2
        ]
        
        for i, (num_layers, kernel_size, dilation) in enumerate(block_configs):
            # Dense block with CAM layers
            block = CAMDenseTDNNBlock(
                num_layers=num_layers,
                in_channels=channels,
                out_channels=growth_rate,
                bn_channels=bn_size * growth_rate,
                kernel_size=kernel_size,
                dilation=dilation,
                config_str=config_str,
                memory_efficient=memory_efficient
            )
            self.xvector.add_module(f'block{i+1}', block)
            channels = channels + num_layers * growth_rate
            
            # Transition layer (channel reduction)
            self.xvector.add_module(
                f'transit{i+1}',
                TransitLayer(channels, channels // 2, bias=False, config_str=config_str)
            )
            channels //= 2

        # Output layers
        self.xvector.add_module('out_nonlinear', get_nonlinear(config_str, channels))
        self.xvector.add_module('stats', StatsPool())
        self.xvector.add_module('dense', DenseLayer(channels * 2, embedding_size, 
                                                     config_str='batchnorm_'))
        
        # Initialize weights
        for m in self.modules():
            if isinstance(m, (nn.Conv1d, nn.Linear)):
                nn.init.kaiming_normal_(m.weight.data)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

    def forward(self, x):
        # x: (B, T, F) -> (B, F, T)
        x = x.permute(0, 2, 1)
        x = self.head(x)
        x = self.xvector(x)
        return x

# Model summary
print("CAM++ Architecture Summary:")
print("- Input: (Batch, Time, 80) Fbank features")
print("- FCM: 2D CNN, outputs (B, 320, T)")
print("- TDNN: 1D Conv, outputs (B, 128, T/2)")
print("- Block1: 12 CAM layers, outputs (B, 512, T/2)")
print("- Transit1: Channel reduction to 256")
print("- Block2: 24 CAM layers, outputs (B, 1024, T/2)")
print("- Transit2: Channel reduction to 512")
print("- Block3: 16 CAM layers, outputs (B, 1024, T/2)")
print("- Transit3: Channel reduction to 512")
print("- Stats Pool: Mean + Std -> (B, 1024)")
print("- Dense: Final embedding (B, 512)")

---

# 3. Implementation

## 3.1 Data Preprocessing Pipeline

In [None]:
# Data Preprocessing Pipeline - Concept Demonstration
# Based on speakerlab/process/processor.py
#
# NOTE: These are simplified demonstration classes showing the structure.
# They return None as placeholders. For actual usage, import from speakerlab:
#   from speakerlab.process.processor import WavReader, FBank

import torch
import random

class WavReaderDemo:
    """
    Audio reader with speed perturbation and chunking
    """
    def __init__(self, sample_rate=16000, duration=3.0, speed_pertub=False):
        self.duration = duration
        self.sample_rate = sample_rate
        self.speed_pertub = speed_pertub

    def __call__(self, wav_path):
        # In practice, use torchaudio.load(wav_path)
        # wav, sr = torchaudio.load(wav_path)
        
        chunk_len = int(self.duration * self.sample_rate)
        
        # Speed perturbation (0.9x, 1.0x, 1.1x)
        speed_idx = 0
        if self.speed_pertub:
            speed_idx = random.randint(0, 2)
        
        # Random chunk extraction
        # if data_len >= chunk_len:
        #     start = random.randint(0, data_len - chunk_len)
        #     wav = wav[start:start + chunk_len]
        # else:
        #     wav = F.pad(wav, (0, chunk_len - data_len))
        
        return None, speed_idx

class FBankDemo:
    """
    Fbank feature extractor using Kaldi-compatible implementation
    """
    def __init__(self, n_mels=80, sample_rate=16000, mean_nor=True):
        self.n_mels = n_mels
        self.sample_rate = sample_rate
        self.mean_nor = mean_nor

    def __call__(self, wav, dither=0):
        # In practice, use torchaudio.compliance.kaldi.fbank
        # feat = Kaldi.fbank(wav, num_mel_bins=self.n_mels, 
        #                    sample_frequency=self.sample_rate, dither=dither)
        # if self.mean_nor:
        #     feat = feat - feat.mean(0, keepdim=True)
        return None

print("Preprocessing Pipeline:")
print("1. Load audio -> Resample to 16kHz")
print("2. Speed perturbation (optional): 0.9x, 1.0x, 1.1x")
print("3. Random chunk extraction (3 seconds default)")
print("4. Fbank extraction: 80-dim, 25ms window, 10ms shift")
print("5. Mean normalization (optional)")

## 3.2 Training Loop

In [None]:
# Training Loop Example
# Based on speakerlab/bin/train.py (see ../speakerlab/bin/train.py for full implementation)
#
# Note: In practice, the model is wrapped as nn.Sequential(embedding_model, classifier)
# where classifier is a CosineClassifier that outputs logits for ArcFace loss.
# The lr_scheduler and margin_scheduler are created using:
#   - lr_scheduler: speakerlab.process.scheduler.WarmupCosineScheduler
#   - margin_scheduler: speakerlab.process.scheduler.MarginScheduler

def train_one_epoch(train_loader, model, criterion, optimizer, 
                    lr_scheduler, margin_scheduler, epoch):
    """
    Training loop for one epoch
    """
    model.train()
    total_loss = 0
    total_acc = 0
    
    for i, (x, y) in enumerate(train_loader):
        # Update schedulers
        iter_num = (epoch - 1) * len(train_loader) + i
        lr_scheduler.step(iter_num)
        margin_scheduler.step(iter_num)
        
        # Move to GPU
        x = x.cuda(non_blocking=True)
        y = y.cuda(non_blocking=True)
        
        # Forward pass: model = nn.Sequential(embedding_model, classifier)
        # embedding_model outputs (B, 512) embeddings
        # classifier (CosineClassifier) outputs (B, num_classes) logits
        output = model(x)  # Returns logits for classification
        loss = criterion(output, y)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Record metrics
        total_loss += loss.item()
        
    return total_loss / len(train_loader)

print("Training Configuration (VoxCeleb):")
print("- Dataset: VoxCeleb2 dev set (5,994 speakers)")
print("  Download: https://www.robots.ox.ac.uk/~vgg/data/voxceleb/")
print("  Preparation: See egs/voxceleb/sv-cam++/local/prepare_data.sh")
print("- Epochs: 60")
print("- Batch size: 256")
print("- Learning rate: 0.1 -> 1e-4 (cosine decay)")
print("- Margin: 0.0 -> 0.2 (linear increase from epoch 15-25)")
print("- Optimizer: SGD with momentum 0.9")
print("- Weight decay: 1e-4")

---

# 4. Model Export and Optimization

## 4.1 ONNX Export

In [None]:
# ONNX Export
# Implementation adapted from speakerlab's ONNX export logic
# See ../speakerlab/bin/export_speaker_embedding_onnx.py for full script

import torch

def export_to_onnx(model, output_path, feat_dim=80):
    """
    Export PyTorch model to ONNX format
    
    Args:
        model: CAM++ model in eval mode
        output_path: Path to save ONNX file
        feat_dim: Input feature dimension
    """
    model.eval()
    
    # Create dummy input: (batch=1, time=345, freq=80)
    dummy_input = torch.randn(1, 345, feat_dim)
    
    torch.onnx.export(
        model,
        dummy_input,
        output_path,
        export_params=True,
        opset_version=11,
        do_constant_folding=True,
        input_names=['feature'],
        output_names=['embedding'],
        dynamic_axes={
            'feature': {0: 'batch_size', 1: 'frame_num'},
            'embedding': {0: 'batch_size'}
        }
    )
    print(f"Model exported to {output_path}")

# Usage example:
# export_to_onnx(model, "campplus.onnx")

print("ONNX Export Notes:")
print("- Dynamic batch size and frame number")
print("- opset_version=11 for compatibility")
print("- Constant folding enabled for optimization")

## 4.2 INT8 Quantization

In [None]:
# INT8 Quantization for Edge Deployment
import numpy as np

def quantize_onnx_model(onnx_path, output_path):
    """
    Quantize ONNX model to INT8 using dynamic quantization
    
    Requires: onnxruntime, onnxruntime-extensions
    
    Args:
        onnx_path: Path to FP32 ONNX model
        output_path: Path to save INT8 model
    
    Note: For static quantization (better accuracy), you would need
    calibration_data and use quantize_static() instead.
    """
    try:
        from onnxruntime.quantization import quantize_dynamic, QuantType
        
        # Dynamic quantization (no calibration data needed)
        quantize_dynamic(
            onnx_path,
            output_path,
            weight_type=QuantType.QInt8
        )
        print(f"Quantized model saved to {output_path}")
        
    except ImportError:
        print("Please install: pip install onnxruntime")

# For static quantization with calibration:
# from onnxruntime.quantization import quantize_static, CalibrationDataReader

print("Quantization Options:")
print("1. Dynamic Quantization: No calibration, slightly lower accuracy")
print("2. Static Quantization: Requires calibration data, better accuracy")
print("3. QAT (Quantization-Aware Training): Best accuracy, requires retraining")

---

# 5. Embedded Deployment Guide

## 5.1 RK3568 Development Board Setup

The RK3568 is an ARM-based SoC with NPU support, suitable for edge AI deployment.

### Hardware Requirements
- RK3568 Development Board
- USB Microphone or Audio Input
- Linux OS (Debian/Ubuntu based)

### Software Requirements
- RKNN Toolkit for NPU inference
- ONNX Runtime for CPU inference
- PortAudio for microphone input

In [None]:
# RK3568 Inference Script
# Speaker verification on embedded device

import numpy as np

class SpeakerVerifier:
    """
    Speaker verification system for RK3568
    
    Supports:
    - ONNX Runtime (CPU)
    - RKNN Runtime (NPU) - for better performance
    """
    def __init__(self, model_path, use_npu=False):
        self.use_npu = use_npu
        
        if use_npu:
            # RKNN for NPU acceleration
            # from rknn.api import RKNN
            # self.rknn = RKNN()
            # self.rknn.load_rknn(model_path)
            # self.rknn.init_runtime()
            pass
        else:
            # ONNX Runtime for CPU
            try:
                import onnxruntime as ort
            except ImportError:
                raise RuntimeError(
                    "onnxruntime is required for CPU inference. "
                    "Please install it with 'pip install onnxruntime'."
                )
            self.session = ort.InferenceSession(model_path)
    
    def extract_embedding(self, fbank_features):
        """
        Extract speaker embedding from Fbank features
        
        Args:
            fbank_features: numpy array (1, T, 80)
        Returns:
            embedding: numpy array (512,)
        """
        if self.use_npu:
            # outputs = self.rknn.inference(inputs=[fbank_features])
            # return outputs[0]
            pass
        else:
            outputs = self.session.run(
                None, 
                {'feature': fbank_features.astype(np.float32)}
            )
            return outputs[0][0]
    
    def verify(self, emb1, emb2, threshold=0.5):
        """
        Verify if two embeddings are from same speaker
        
        Args:
            emb1, emb2: Speaker embeddings
            threshold: Cosine similarity threshold
        Returns:
            is_same: Boolean
            score: Cosine similarity score
        """
        # Normalize embeddings
        emb1 = emb1 / np.linalg.norm(emb1)
        emb2 = emb2 / np.linalg.norm(emb2)
        
        # Cosine similarity
        score = np.dot(emb1, emb2)
        
        return score > threshold, score

print("RK3568 Deployment Notes:")
print("- Use RKNN for NPU acceleration (3-5x faster)")
print("- Typical inference time: ~50ms (NPU), ~200ms (CPU)")
print("- Memory usage: ~100MB for model")

In [None]:
# Real-time Microphone Recording and Verification
import time

class RealtimeSpeakerVerifier:
    """
    Real-time speaker verification from microphone
    
    Requirements:
    - pip install sounddevice numpy
    - Working microphone
    """
    def __init__(self, model_path, sample_rate=16000, duration=3.0):
        self.sample_rate = sample_rate
        self.duration = duration
        self.chunk_samples = int(sample_rate * duration)
        
        # Initialize verifier
        self.verifier = SpeakerVerifier(model_path)
        
        # Enrolled speaker embeddings
        self.enrolled_speakers = {}
    
    def record_audio(self):
        """Record audio from microphone"""
        try:
            import sounddevice as sd
            print(f"Recording for {self.duration} seconds...")
            audio = sd.rec(self.chunk_samples, samplerate=self.sample_rate,
                          channels=1, dtype='float32')
            sd.wait()
            return audio.flatten()
        except ImportError:
            print("Please install: pip install sounddevice")
            return None
    
    def extract_fbank(self, audio):
        """Extract Fbank features from audio"""
        # PLACEHOLDER: Returns random data for demonstration only!
        # For real usage, replace with actual Fbank extraction:
        #   import torchaudio.compliance.kaldi as Kaldi
        #   feat = Kaldi.fbank(wav, num_mel_bins=80, sample_frequency=16000)
        import numpy as np
        
        # WARNING: Random features - replace with real Fbank extraction!
        n_frames = len(audio) // 160  # 10ms shift
        return np.random.randn(1, n_frames, 80).astype(np.float32)
    
    def enroll(self, speaker_name):
        """Enroll a new speaker"""
        audio = self.record_audio()
        if audio is None:
            return False
        
        fbank = self.extract_fbank(audio)
        embedding = self.verifier.extract_embedding(fbank)
        
        self.enrolled_speakers[speaker_name] = embedding
        print(f"Enrolled speaker: {speaker_name}")
        return True
    
    def verify_speaker(self, claimed_name, threshold=0.5):
        """Verify if speaker matches claimed identity"""
        if claimed_name not in self.enrolled_speakers:
            print(f"Speaker {claimed_name} not enrolled")
            return False, 0.0
        
        audio = self.record_audio()
        if audio is None:
            return False, 0.0
        
        fbank = self.extract_fbank(audio)
        embedding = self.verifier.extract_embedding(fbank)
        
        enrolled_emb = self.enrolled_speakers[claimed_name]
        is_same, score = self.verifier.verify(enrolled_emb, embedding, threshold)
        
        print(f"Verification result: {'PASS' if is_same else 'FAIL'}")
        print(f"Similarity score: {score:.4f}")
        return is_same, score

print("Usage Example:")
print("verifier = RealtimeSpeakerVerifier('campplus.onnx')")
print("verifier.enroll('Alice')  # Record and enroll")
print("verifier.verify_speaker('Alice')  # Verify identity")

---

# 6. Extensions

## 6.1 Industrial Anomaly Detection

The speaker verification framework can be adapted for industrial sound-based anomaly detection:

### Application Scenarios
1. **Equipment Monitoring**: Detect abnormal sounds from motors, pumps, fans
2. **Predictive Maintenance**: Identify wear patterns before failure
3. **Quality Control**: Detect defects in manufacturing through sound

### Adaptation Strategy

| Speaker Verification | Industrial Anomaly Detection |
|---------------------|------------------------------|
| Speaker embedding | Machine sound embedding |
| Speaker identity | Normal/Abnormal state |
| Cosine similarity | Anomaly score |
| Enrollment | Baseline recording |

In [None]:
# Industrial Anomaly Detection Adaptation
import numpy as np

class IndustrialAnomalyDetector:
    """
    Adapt speaker verification for industrial anomaly detection
    
    Approach:
    1. Collect normal operation sounds as baseline
    2. Extract embeddings using CAM++ (or similar)
    3. Compare new sounds against baseline
    4. Flag anomalies when similarity drops
    """
    def __init__(self, model_path, threshold=0.7):
        self.verifier = SpeakerVerifier(model_path)
        self.threshold = threshold
        self.baseline_embeddings = []
        self.reference = None  # Initialize reference to None
    
    def train_baseline(self, normal_audio_files):
        """
        Build baseline from normal operation recordings
        
        Args:
            normal_audio_files: List of audio file paths
        """
        for audio_file in normal_audio_files:
            # Extract features and embedding
            # fbank = extract_fbank(audio_file)
            # emb = self.verifier.extract_embedding(fbank)
            # self.baseline_embeddings.append(emb)
            pass
        
        # Compute mean embedding as reference
        if self.baseline_embeddings:
            self.reference = np.mean(self.baseline_embeddings, axis=0)
            self.reference = self.reference / np.linalg.norm(self.reference)
    
    def detect_anomaly(self, audio):
        """
        Detect if audio contains anomaly
        
        Args:
            audio: Audio samples or Fbank features
        Returns:
            is_anomaly: Boolean
            anomaly_score: 1 - similarity (higher = more anomalous)
        """
        # Check if baseline has been trained
        if self.reference is None:
            raise RuntimeError("Baseline not trained. Call train_baseline() first.")
        
        # Extract embedding
        # fbank = extract_fbank(audio)
        # emb = self.verifier.extract_embedding(fbank)
        # emb = emb / np.linalg.norm(emb)
        
        # Compare with baseline
        # similarity = np.dot(emb, self.reference)
        # anomaly_score = 1 - similarity
        
        # return anomaly_score > (1 - self.threshold), anomaly_score
        pass

print("Industrial Applications:")
print("1. Motor bearing fault detection")
print("2. Compressor anomaly monitoring")
print("3. Production line quality inspection")
print("4. HVAC system health monitoring")
print("")
print("Privacy & Ethics Considerations:")
print("- Ensure data collection consent")
print("- Secure storage of voice/sound data")
print("- Clear data retention policies")
print("- Transparency in monitoring systems")

---

# Summary

This tutorial covered:

1. **Fundamentals**: Speaker verification concepts, Fbank features, ArcFace loss
2. **CAM++ Architecture**: FCM module, CAM attention mechanism, DenseNet-style blocks
3. **Implementation**: Data preprocessing, training loop, validation
4. **Export & Optimization**: ONNX conversion, INT8 quantization
5. **Embedded Deployment**: RK3568 setup, real-time inference
6. **Extensions**: Industrial anomaly detection adaptation

## Next Steps

- Train on VoxCeleb dataset using provided scripts
- Experiment with different model configurations
- Deploy to target hardware
- Explore multi-modal extensions

## References

- [3D-Speaker GitHub](https://github.com/modelscope/3D-Speaker)
- [CAM++ Paper](https://arxiv.org/abs/2303.00332)
- [VoxCeleb Dataset](https://www.robots.ox.ac.uk/~vgg/data/voxceleb/)
- [ONNX Runtime](https://onnxruntime.ai/)

---

**License**: Apache 2.0  
**Acknowledgments**: 3D-Speaker Team, Alibaba DAMO Academy