Audio Embeddings Tutorial with Hugging Face
===========================================
  
[View on Google Colab](https://colab.research.google.com/drive/1ExzturxhZDXktxqSiMWxv0cSvwQ8-mVt?usp=sharing)
  
This tutorial demonstrates how to create embeddings for audio using:
- Pre-trained audio models from Hugging Face
- Speech Commands dataset for demonstration
- Visualization of latent space embeddings
- Audio similarity analysis

Includes detailed explanations and working examples.

### Import the necessary libraries

In [1]:
# !pip install torch transformers datasets matplotlib seaborn librosa scikit-learn ipython
# !pip install "numpy<2.0.0"
# !pip install --upgrade scikit-learn

import numpy as np
import torch
from transformers import AutoFeatureExtractor, AutoModel, Wav2Vec2Processor, Wav2Vec2Model
from datasets import load_dataset
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import seaborn as sns
import librosa
import librosa.display
from typing import List, Union, Optional

from IPython.display import display, Audio

import warnings
warnings.filterwarnings('ignore')

---

### Load the Speech Commands Dataset from HuggingFace Datasets

In [2]:
def load_speech_commands_dataset(num_samples: int = 1000) -> tuple:
    """
    Load Speech Commands dataset from Hugging Face datasets.
    
    Args:
        num_samples (int, optional): Number of samples to load from the dataset.
            Default is 1000 for faster processing.
    
    Returns:
        tuple: (audio_arrays, labels, sampling_rates) where audio_arrays is a list of 
               numpy arrays, labels is a list of corresponding command labels, and
               sampling_rates is a list of sampling rates.
    """
    print(f"Loading Speech Commands dataset from Hugging Face...")
    
    # Load the Speech Commands dataset from Hugging Face
    dataset = load_dataset("speech_commands", "v0.01", split="train", trust_remote_code=True)
    
    # Take a subset for demonstration
    dataset = dataset.select(range(min(num_samples, len(dataset))))
    
    # Extract audio data, labels, and sampling rates
    audio_arrays = []
    labels = []
    sampling_rates = []
    
    for sample in dataset:
        audio_arrays.append(sample["audio"]["array"])
        labels.append(sample["label"])
        sampling_rates.append(sample["audio"]["sampling_rate"])
    
    # Get label names
    label_names = dataset.features["label"].names
    
    print(f"Loaded {len(audio_arrays)} audio samples")
    print(f"Sampling rate: {sampling_rates[0]}Hz")
    print(f"Available commands: {label_names}")
    
    return audio_arrays, labels, sampling_rates, label_names

In [None]:
# Step 1: Load Speech Commands dataset
print("\n1. Loading Speech Commands Dataset:")
print("-" * 40)

audio_arrays, labels, sampling_rates, label_names = load_speech_commands_dataset(num_samples=1000)
print("Unique labels after filtering:", set(labels))

In [4]:
def load_and_filter_two_commands(command_names=['bed', 'cat'], min_samples_per_class=50):
    """
    Load the Speech Commands dataset and filter for two commands, ensuring both are present.

    Args:
        command_names (list): List of two command names to filter.
        min_samples_per_class (int): Minimum number of samples per class to collect.

    Returns:
        Tuple: (audio_arrays, labels, sampling_rates, label_names)
    """
   

    # Load a large chunk of the dataset
    dataset = load_dataset("speech_commands", "v0.01", split="train")
    label_names = dataset.features["label"].names
    command_indices = [label_names.index(cmd) for cmd in command_names]

    audio_arrays, labels, sampling_rates = [], [], []
    class_counts = [0, 0]

    for sample in dataset:
        label = sample["label"]
        if label in command_indices:
            idx = command_indices.index(label)
            if class_counts[idx] < min_samples_per_class:
                audio_arrays.append(sample["audio"]["array"])
                labels.append(idx)  # Relabel as 0 or 1
                sampling_rates.append(sample["audio"]["sampling_rate"])
                class_counts[idx] += 1
            if all(count >= min_samples_per_class for count in class_counts):
                break

    filtered_label_names = command_names
    print(f"Loaded {class_counts[0]} samples for '{command_names[0]}' and {class_counts[1]} samples for '{command_names[1]}'")
    return audio_arrays, labels, sampling_rates, filtered_label_names

In [None]:
audio_arrays, labels, sampling_rates, label_names = load_and_filter_two_commands(['bed', 'stop'], min_samples_per_class=100)

---

### Visualize the Audio Samples

In [6]:
def visualize_audio_samples(audio_arrays, labels, label_names, sampling_rates, num_samples=4):
    """
    Visualize waveforms and spectrograms of sample audio files, with play buttons.
    Ensures both classes are represented in the plot.

    Args:
        audio_arrays (List[np.ndarray]): List of audio waveforms.
        labels (List[int]): List of corresponding labels.
        label_names (List[str]): List of label names.
        sampling_rates (List[int]): List of sampling rates.
        num_samples (int, optional): Total number of samples to visualize (default: 4).
    """

    unique_labels = sorted(set(labels))
    samples_per_class = max(1, num_samples // len(unique_labels))
    indices = []

    for label in unique_labels:
        label_indices = np.where(np.array(labels) == label)[0]
        chosen = np.random.choice(label_indices, min(samples_per_class, len(label_indices)), replace=False)
        indices.extend(chosen)

    # If not enough, fill up to num_samples
    if len(indices) < num_samples:
        remaining = list(set(range(len(labels))) - set(indices))
        np.random.shuffle(remaining)
        indices.extend(remaining[:num_samples - len(indices)])

    # Plotting
    cols = 2
    rows = (num_samples + cols - 1) // cols
    fig, axes = plt.subplots(rows, cols, figsize=(12, 3 * rows))
    axes = axes.flatten() if num_samples > 1 else [axes]

    for i, idx in enumerate(indices[:num_samples]):
        ax = axes[i]
        time = np.linspace(0, len(audio_arrays[idx]) / sampling_rates[idx], len(audio_arrays[idx]))
        ax.plot(time, audio_arrays[idx])
        ax.set_title(f'Command: {label_names[labels[idx]]}')
        ax.set_xlabel('Time (s)')
        ax.set_ylabel('Amplitude')
        ax.grid(True, alpha=0.3)
    for i in range(num_samples, len(axes)):
        axes[i].set_visible(False)
    plt.suptitle('Sample Audio Waveforms from Speech Commands Dataset', fontsize=16)
    plt.tight_layout()
    plt.show()

    # Display audio players below the plots
    for idx in indices[:num_samples]:
        display(Audio(audio_arrays[idx], rate=sampling_rates[idx], autoplay=False))

In [None]:
# Step 2: Visualize sample audio waveforms
print("\n2. Sample Audio Waveforms:")
print("-" * 30)

visualize_audio_samples(audio_arrays, labels, label_names, sampling_rates, num_samples=4)

---

### Generate the audio embeddings

In [8]:
def create_audio_embeddings(
    audio_arrays: List[np.ndarray],
    sampling_rates: List[int],
    model_name: str = "facebook/wav2vec2-base",
    normalize: bool = True,
    device: Optional[str] = None
) -> np.ndarray:
    """
    Create dense vector embeddings for audio using pre-trained transformer models.
    
    This function uses Hugging Face audio models to convert audio waveforms into numerical
    representations that capture acoustic and semantic features. These embeddings can be used
    for audio similarity search, clustering, classification, and retrieval tasks.
    
    Args:
        audio_arrays (List[np.ndarray]): List of audio waveforms as numpy arrays.
        sampling_rates (List[int]): List of corresponding sampling rates for each audio.
        model_name (str, optional): Name of the pre-trained audio model from Hugging Face.
            Default is "facebook/wav2vec2-base". Other options include:
            - "facebook/wav2vec2-large" (larger model, better quality)
            - "facebook/hubert-base-ls960" (HuBERT model)
            - "microsoft/wavlm-base" (WavLM model)
        normalize (bool, optional): Whether to normalize embeddings to unit vectors.
            Normalized embeddings work better for cosine similarity. Default is True.
        device (Optional[str], optional): Device to run the model on ('cpu', 'cuda').
            If None, automatically detects available device.
    
    Returns:
        np.ndarray: Array of embeddings with shape (n_audio, embedding_dim).
            Each row represents the embedding vector for one input audio.
    
    Example:
        >>> # Single audio embedding
        >>> audio, sr = librosa.load("path/to/audio.wav", sr=16000)
        >>> embedding = create_audio_embeddings([audio], [sr])
        >>> print(f"Embedding shape: {embedding.shape}")
        
        >>> # Multiple audio files
        >>> embeddings = create_audio_embeddings(audio_list, sr_list)
        >>> print(f"Embeddings shape: {embeddings.shape}")
    """
    
    # Step 1: Determine compute device (GPU if available, else CPU)
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    
    print(f"Using device: {device}")
    print(f"Loading audio model: {model_name}")
    
    # Step 2: Load pre-trained processor and model
    # The processor handles audio preprocessing (resampling, normalization, etc.)
    try:
        if "wav2vec2" in model_name.lower():
            processor = Wav2Vec2Processor.from_pretrained(model_name)
            model = Wav2Vec2Model.from_pretrained(model_name)
        else:
            # Fallback to generic feature extractor
            processor = AutoFeatureExtractor.from_pretrained(model_name)
            model = AutoModel.from_pretrained(model_name)
    except Exception as e:
        print(f"Error loading model: {e}")
        print("Falling back to wav2vec2-base...")
        processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
        model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
    
    model.to(device)
    model.eval()  # Set to evaluation mode
    
    # Step 3: Process audio data
    print(f"Processing {len(audio_arrays)} audio sample(s)...")
    
    # Resample all audio to the model's expected sampling rate (usually 16kHz)
    target_sr = processor.sampling_rate if hasattr(processor, 'sampling_rate') else 16000
    processed_audios = []
    
    for i, (audio, sr) in enumerate(zip(audio_arrays, sampling_rates)):
        # Resample if necessary
        if sr != target_sr:
            print(f"  Resampling audio {i+1} from {sr}Hz to {target_sr}Hz...")
            audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)
        
        # Ensure audio is 1D
        if len(audio.shape) > 1:
            audio = audio.mean(axis=0)  # Convert stereo to mono
        
        processed_audios.append(audio)
    
    # Step 4: Extract features using the processor
    print("Extracting audio features...")
    
    # Process audio in batches for efficiency
    batch_size = 8
    all_embeddings = []
    
    for i in range(0, len(processed_audios), batch_size):
        batch_audios = processed_audios[i:i+batch_size]
        
        # Process the batch
        inputs = processor(
            batch_audios,
            sampling_rate=target_sr,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=100000  # Limit to ~6 seconds at 16kHz
        )
        
        # Move inputs to device
        input_values = inputs.input_values.to(device)
        attention_mask = inputs.attention_mask.to(device) if 'attention_mask' in inputs else None
        
        # Step 5: Generate embeddings using the model
        with torch.no_grad():  # Disable gradient computation for efficiency
            if attention_mask is not None:
                outputs = model(input_values, attention_mask=attention_mask)
            else:
                outputs = model(input_values)
            
            # Extract embeddings based on model architecture
            if hasattr(outputs, 'last_hidden_state'):
                # For Wav2Vec2 and similar models
                # Use mean pooling over the sequence dimension
                hidden_states = outputs.last_hidden_state
                
                if attention_mask is not None:
                    # Apply attention mask for proper mean pooling
                    mask_expanded = attention_mask.unsqueeze(-1).expand(hidden_states.size()).float()
                    sum_embeddings = torch.sum(hidden_states * mask_expanded, 1)
                    sum_mask = torch.sum(mask_expanded, 1)
                    embeddings = sum_embeddings / torch.clamp(sum_mask, min=1e-9)
                else:
                    # Simple mean pooling
                    embeddings = hidden_states.mean(dim=1)
            
            elif hasattr(outputs, 'pooler_output'):
                # For models with pooler output
                embeddings = outputs.pooler_output
            
            else:
                raise ValueError("Could not extract embeddings from model outputs")
        
        # Move to CPU and store
        batch_embeddings = embeddings.cpu().numpy()
        all_embeddings.append(batch_embeddings)
    
    # Step 6: Concatenate all batches
    embeddings = np.concatenate(all_embeddings, axis=0)
    
    # Step 7: Optional normalization for better similarity computation
    if normalize:
        # L2 normalization: each embedding becomes a unit vector
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        embeddings = embeddings / (norms + 1e-9)  # Avoid division by zero
    
    print(f"Generated audio embeddings with shape: {embeddings.shape}")
    
    return embeddings


In [None]:
# Step 3: Create audio embeddings
print("\n3. Creating Audio Embeddings:")
print("-" * 35)

# Use a smaller subset for faster processing
subset_size = 200
subset_audios = audio_arrays[:subset_size]
subset_labels = labels[:subset_size]
subset_srs = sampling_rates[:subset_size]

embeddings = create_audio_embeddings(subset_audios, subset_srs)

print(f"Created embeddings for {len(subset_audios)} audio samples")
print(f"Embedding dimension: {embeddings.shape[1]}")


---

### Visualize the PCA and t-SNE plots

In [10]:
def visualize_audio_embeddings_2d(embeddings, labels, label_names, method="tsne", title="Audio Embeddings"):
    """
    Visualize high-dimensional audio embeddings in 2D space using dimensionality reduction.
    Uses blue for 'bed' and red for 'cat' for clear distinction.

    Args:
        embeddings (np.ndarray): High-dimensional embeddings with shape (n_samples, embedding_dim).
        labels (List[int]): List of labels corresponding to each embedding.
        label_names (List[str]): List of label names.
        method (str, optional): Dimensionality reduction method ('pca' or 'tsne'). Default is 'tsne'.
        title (str, optional): Title for the plot.
    """

    print(f"Reducing dimensionality using {method.upper()}...")

    if method.lower() == "pca":
        reducer = PCA(n_components=2, random_state=42)
        embeddings_2d = reducer.fit_transform(embeddings)
    elif method.lower() == "tsne":
        reducer = TSNE(n_components=2, random_state=42, perplexity=30, max_iter=1000)
        embeddings_2d = reducer.fit_transform(embeddings)
    else:
        raise ValueError("Method must be 'pca' or 'tsne'")

    # Manual color map for two classes
    color_map = {0: 'blue', 1: 'red'}
    label_color_map = {name: color_map[i] for i, name in enumerate(label_names)}

    plt.figure(figsize=(12, 8))
    for i, label in enumerate(sorted(set(labels))):
        mask = np.array(labels) == label
        plt.scatter(
            embeddings_2d[mask, 0], embeddings_2d[mask, 1],
            c=label_color_map[label_names[label]],
            label=label_names[label],
            alpha=0.7, s=20
        )

    plt.title(f'{title} - {method.upper()} Visualization')
    plt.xlabel(f'{method.upper()} Component 1')
    plt.ylabel(f'{method.upper()} Component 2')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    return embeddings_2d

In [None]:
# Step 4: Visualize embeddings in 2D
print("\n4. Visualizing Embeddings in 2D Space:")
print("-" * 45)

# PCA visualization
print("Creating PCA visualization...")
pca_embeddings = visualize_audio_embeddings_2d(embeddings, subset_labels, label_names,
                                                method="pca", 
                                                title="Speech Commands Embeddings")

# t-SNE visualization
print("Creating t-SNE visualization...")
tsne_embeddings = visualize_audio_embeddings_2d(embeddings, subset_labels, label_names,
                                                method="tsne", 
                                                title="Speech Commands Embeddings")

---

### Create the Cosine Similarity Matrix

In [12]:
def compute_audio_similarity_matrix(embeddings: np.ndarray, 
                                  labels: List[int], 
                                  label_names: List[str],
                                  max_samples: int = 100) -> np.ndarray:
    """
    Compute and visualize similarity matrix for a subset of audio embeddings.
    
    Args:
        embeddings (np.ndarray): Array of embeddings.
        labels (List[int]): List of corresponding labels.
        label_names (List[str]): List of label names.
        max_samples (int, optional): Maximum number of samples to include in similarity matrix.
    
    Returns:
        np.ndarray: Similarity matrix.
    """
    # Take a subset for visualization
    if len(embeddings) > max_samples:
        indices = np.random.choice(len(embeddings), max_samples, replace=False)
        subset_embeddings = embeddings[indices]
        subset_labels = [labels[i] for i in indices]
    else:
        subset_embeddings = embeddings
        subset_labels = labels
    
    # Compute cosine similarity matrix
    similarity_matrix = np.dot(subset_embeddings, subset_embeddings.T)
    
    # Create visualization
    plt.figure(figsize=(10, 8))
    
    # Sort by labels for better visualization
    sorted_indices = np.argsort(subset_labels)
    sorted_similarity = similarity_matrix[sorted_indices][:, sorted_indices]
    sorted_labels = [label_names[subset_labels[i]] for i in sorted_indices]
    
    # Create heatmap
    sns.heatmap(sorted_similarity, 
                xticklabels=sorted_labels, 
                yticklabels=sorted_labels,
                cmap='coolwarm', 
                center=0,
                square=True,
                cbar_kws={'label': 'Cosine Similarity'})
    
    plt.title('Cosine Similarity Matrix of Audio Embeddings\n(sorted by command labels)')
    plt.xlabel('Audio Sample (by command)')
    plt.ylabel('Audio Sample (by command)')
    plt.xticks(rotation=45)
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.show()
    
    return similarity_matrix

In [None]:
# Step 5: Compute similarity matrix
print("\n5. Computing Similarity Matrix:")
print("-" * 35)

similarity_matrix = compute_audio_similarity_matrix(embeddings, subset_labels, label_names,
                                                    max_samples=50)


---

### Analyze the Quality of the Embeddings

In [None]:
# Step 6: Analyze embedding quality
print("\n6. Embedding Quality Analysis:")
print("-" * 35)

# Compute average intra-class vs inter-class similarity
intra_similarities = []
inter_similarities = []

for i in range(len(subset_labels)):
    for j in range(i+1, len(subset_labels)):
        similarity = np.dot(embeddings[i], embeddings[j])
        
        if subset_labels[i] == subset_labels[j]:
            intra_similarities.append(similarity)
        else:
            inter_similarities.append(similarity)

print(f"Average intra-class similarity: {np.mean(intra_similarities):.4f}")
print(f"Average inter-class similarity: {np.mean(inter_similarities):.4f}")
print(f"Separation ratio: {np.mean(intra_similarities) / np.mean(inter_similarities):.4f}")

# Plot similarity distributions
plt.figure(figsize=(10, 6))
plt.hist(intra_similarities, bins=50, alpha=0.7, label='Intra-class (Same Command)', density=True)
plt.hist(inter_similarities, bins=50, alpha=0.7, label='Inter-class (Different Commands)', density=True)
plt.xlabel('Cosine Similarity')
plt.ylabel('Density')
plt.title('Distribution of Intra-class vs Inter-class Similarities')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()


### Command Wise Analysis

In [None]:
# Step 7: Command-wise analysis
print("\n7. Command-wise Embedding Analysis:")
print("-" * 40)

# Analyze average embeddings per command
unique_labels = sorted(set(subset_labels))
command_embeddings = []

for label in unique_labels:
    mask = np.array(subset_labels) == label
    if np.any(mask):
        avg_embedding = embeddings[mask].mean(axis=0)
        command_embeddings.append(avg_embedding)
        print(f"Command '{label_names[label]}': {np.sum(mask)} samples, "
                f"avg embedding norm: {np.linalg.norm(avg_embedding):.4f}")

# Compute command-to-command similarity
command_embeddings = np.array(command_embeddings)
command_similarity = np.dot(command_embeddings, command_embeddings.T)

plt.figure(figsize=(8, 6))
command_names = [label_names[label] for label in unique_labels]
sns.heatmap(command_similarity,
            xticklabels=command_names,
            yticklabels=command_names,
            annot=True,
            fmt='.3f',
            cmap='coolwarm',
            center=0,
            square=True)
plt.title('Average Command-to-Command Similarity')
plt.xticks(rotation=45)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()


---