# Audio Chunking Demo

This notebook demonstrates how to slice WAV files into smaller audio chunks.

**Purpose:**
- Split long audio files into fixed-duration chunks
- Support overlapping chunks for better coverage
- Validate audio properties (sample rate, channels)
- Export chunks with systematic naming

**Use Cases:**
- Preparing training data for ML models (fixed-size inputs)
- Creating paired x/y training examples from before/after recordings
- Windowed audio analysis

## 1. Imports and Configuration

In [None]:
import os
import sys
from pathlib import Path

# Add project root to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(".")))

import IPython.display as ipd
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
from pydub import AudioSegment

from src.utils import Config

In [None]:
# Load configuration
cfg = Config()
cfg.print_paths()

## 2. Chunking Parameters

Configure the chunk size and overlap for audio splitting.

In [None]:
# Chunking parameters
CHUNK_DURATION_MS = 50      # Duration of each chunk in milliseconds
CHUNK_SPACING_MS = 25       # Spacing between chunk starts (25ms = 50% overlap)

# Calculate overlap
overlap_ms = CHUNK_DURATION_MS - CHUNK_SPACING_MS
overlap_pct = (overlap_ms / CHUNK_DURATION_MS) * 100

print(f"Chunk duration: {CHUNK_DURATION_MS} ms")
print(f"Chunk spacing:  {CHUNK_SPACING_MS} ms")
print(f"Overlap:        {overlap_ms} ms ({overlap_pct:.0f}%)")

## 3. Utility Functions

In [None]:
def find_wav_files(directory: Path, recursive: bool = True) -> list[Path]:
    """Find all WAV files in a directory.
    
    Args:
        directory: Directory to search
        recursive: Whether to search subdirectories
    
    Returns:
        Sorted list of WAV file paths
    """
    directory = Path(directory)
    if recursive:
        return sorted(directory.rglob('*.wav'))
    else:
        return sorted(directory.glob('*.wav'))


def get_audio_info(audio: AudioSegment) -> dict:
    """Get audio file properties.
    
    Args:
        audio: PyDub AudioSegment
    
    Returns:
        Dictionary with audio properties
    """
    return {
        'duration_ms': len(audio),
        'frame_rate': audio.frame_rate,
        'channels': audio.channels,
        'sample_width': audio.sample_width,
        'frame_count': audio.frame_count()
    }


def calculate_chunks(duration_ms: int, chunk_duration_ms: int, chunk_spacing_ms: int) -> list[tuple[int, int]]:
    """Calculate chunk boundaries for an audio file.
    
    Args:
        duration_ms: Total audio duration in milliseconds
        chunk_duration_ms: Duration of each chunk
        chunk_spacing_ms: Spacing between chunk starts
    
    Returns:
        List of (start_ms, end_ms) tuples for each chunk
    """
    chunks = []
    for start in range(0, duration_ms, chunk_spacing_ms):
        end = start + chunk_duration_ms
        
        # Handle last chunk - align to end if it would be too short
        if end > duration_ms:
            end = duration_ms
            start = max(0, end - chunk_duration_ms)
            
            # Avoid duplicate of previous chunk
            if chunks and chunks[-1] == (start, end):
                break
        
        chunks.append((start, end))
        
        # Stop if we've reached the end
        if end >= duration_ms:
            break
    
    return chunks

## 4. Load a Sample Audio File

In [None]:
# Find sample audio files
SAMPLES_DIR = cfg.get_audio_assets_dir() / "samples"
sample_files = find_wav_files(SAMPLES_DIR, recursive=False)

print(f"Found {len(sample_files)} WAV files in {SAMPLES_DIR}")

if len(sample_files) == 0:
    print("No sample files found. Please add WAV files to the samples directory.")
else:
    # Select first file for demo
    demo_file = sample_files[0]
    print(f"\nDemo file: {demo_file.name}")
    
    # Load audio
    audio = AudioSegment.from_file(str(demo_file))
    info = get_audio_info(audio)
    
    print(f"\nAudio properties:")
    print(f"  Duration:    {info['duration_ms']} ms ({info['duration_ms']/1000:.2f} s)")
    print(f"  Sample rate: {info['frame_rate']} Hz")
    print(f"  Channels:    {info['channels']}")
    print(f"  Bit depth:   {info['sample_width'] * 8} bit")

## 5. Visualize Chunk Boundaries

In [None]:
# Calculate chunks
chunks = calculate_chunks(len(audio), CHUNK_DURATION_MS, CHUNK_SPACING_MS)

print(f"Audio duration: {len(audio)} ms")
print(f"Number of chunks: {len(chunks)}")
print(f"\nFirst 5 chunks: {chunks[:5]}")
print(f"Last 5 chunks:  {chunks[-5:]}")

In [None]:
# Load with librosa for visualization
y, sr = librosa.load(str(demo_file))

# Plot waveform with chunk boundaries
fig, ax = plt.subplots(figsize=(14, 5))

# Plot waveform
librosa.display.waveshow(y, sr=sr, ax=ax, color='cyan', alpha=0.7)

# Add chunk boundaries (show first 10 chunks to avoid clutter)
colors = plt.cm.tab10(np.linspace(0, 1, 10))
for i, (start_ms, end_ms) in enumerate(chunks[:10]):
    start_s = start_ms / 1000
    end_s = end_ms / 1000
    ax.axvline(x=start_s, color=colors[i % 10], linestyle='--', alpha=0.7, linewidth=1)
    ax.axvspan(start_s, end_s, alpha=0.1, color=colors[i % 10])

ax.set_title(f"Waveform with Chunk Boundaries (first 10 of {len(chunks)} chunks)")
ax.set_xlabel("Time (s)")
ax.set_ylabel("Amplitude")
plt.tight_layout()
plt.show()

## 6. Extract and Play Individual Chunks

In [None]:
# Extract first few chunks
num_chunks_to_show = min(5, len(chunks))

print(f"Extracting first {num_chunks_to_show} chunks:\n")

# Create temp directory for chunks (in playground)
temp_dir = cfg.get_playground_dir() / "demo-audio-chunking" / "temp"
os.makedirs(temp_dir, exist_ok=True)

for i in range(num_chunks_to_show):
    start_ms, end_ms = chunks[i]
    
    # Extract chunk using pydub slicing
    chunk_audio = audio[start_ms:end_ms]
    
    # Export to temp file
    chunk_file = temp_dir / f"chunk_{i:03d}_{start_ms:06d}ms.wav"
    chunk_audio.export(str(chunk_file), format="wav")
    
    print(f"Chunk {i}: {start_ms}-{end_ms} ms (duration: {end_ms - start_ms} ms)")
    ipd.display(ipd.Audio(str(chunk_file)))

In [None]:
# Clean up temp files
import shutil
if temp_dir.exists():
    shutil.rmtree(temp_dir)
    print(f"Cleaned up temp directory: {temp_dir}")

## 7. Batch Chunking Function

A reusable function for chunking multiple audio files.

In [None]:
def chunk_audio_file(
    source_file: Path,
    output_dir: Path,
    chunk_duration_ms: int,
    chunk_spacing_ms: int,
    dry_run: bool = True,
    required_frame_rate: int | None = None,
    required_channels: int | None = None
) -> dict:
    """Split an audio file into chunks.
    
    Args:
        source_file: Path to source audio file
        output_dir: Directory for output chunks
        chunk_duration_ms: Duration of each chunk in ms
        chunk_spacing_ms: Spacing between chunk starts in ms
        dry_run: If True, don't actually create files
        required_frame_rate: Optional required sample rate (skip if mismatch)
        required_channels: Optional required channel count (skip if mismatch)
    
    Returns:
        Dictionary with processing statistics
    """
    source_file = Path(source_file)
    output_dir = Path(output_dir)
    
    result = {
        'source': source_file.name,
        'chunks_created': 0,
        'chunks_skipped': 0,
        'error': None
    }
    
    try:
        # Load audio
        audio = AudioSegment.from_file(str(source_file))
        
        # Validate properties if required
        if required_frame_rate and audio.frame_rate != required_frame_rate:
            result['error'] = f"Frame rate {audio.frame_rate} != required {required_frame_rate}"
            return result
        
        if required_channels and audio.channels != required_channels:
            result['error'] = f"Channels {audio.channels} != required {required_channels}"
            return result
        
        # Calculate chunks
        chunks = calculate_chunks(len(audio), chunk_duration_ms, chunk_spacing_ms)
        
        # Create output directory
        if not dry_run:
            os.makedirs(output_dir, exist_ok=True)
        
        # Process each chunk
        for start_ms, end_ms in chunks:
            # Generate output filename: originalname_XXXXXX.wav (XXXXXX = start time in ms)
            chunk_name = f"{source_file.stem}_{start_ms:06d}.wav"
            chunk_path = output_dir / chunk_name
            
            if chunk_path.exists():
                result['chunks_skipped'] += 1
                continue
            
            if not dry_run:
                chunk_audio = audio[start_ms:end_ms]
                chunk_audio.export(str(chunk_path), format="wav")
            
            result['chunks_created'] += 1
        
    except Exception as e:
        result['error'] = str(e)
    
    return result


def batch_chunk_files(
    source_files: list[Path],
    output_dir: Path,
    chunk_duration_ms: int,
    chunk_spacing_ms: int,
    dry_run: bool = True,
    required_frame_rate: int | None = None,
    required_channels: int | None = None
) -> dict:
    """Batch process multiple audio files into chunks.
    
    Args:
        source_files: List of source audio file paths
        output_dir: Directory for output chunks
        chunk_duration_ms: Duration of each chunk in ms
        chunk_spacing_ms: Spacing between chunk starts in ms
        dry_run: If True, don't actually create files
        required_frame_rate: Optional required sample rate
        required_channels: Optional required channel count
    
    Returns:
        Dictionary with aggregate statistics
    """
    stats = {
        'files_processed': 0,
        'files_with_errors': 0,
        'total_chunks_created': 0,
        'total_chunks_skipped': 0
    }
    
    prefix = "[DRY RUN] " if dry_run else ""
    
    for i, source_file in enumerate(source_files):
        result = chunk_audio_file(
            source_file=source_file,
            output_dir=output_dir,
            chunk_duration_ms=chunk_duration_ms,
            chunk_spacing_ms=chunk_spacing_ms,
            dry_run=dry_run,
            required_frame_rate=required_frame_rate,
            required_channels=required_channels
        )
        
        if result['error']:
            print(f"{prefix}ERROR processing {source_file.name}: {result['error']}")
            stats['files_with_errors'] += 1
        else:
            stats['total_chunks_created'] += result['chunks_created']
            stats['total_chunks_skipped'] += result['chunks_skipped']
        
        stats['files_processed'] += 1
        
        # Progress update
        if (i + 1) % 10 == 0:
            print(f"{prefix}Processed {i + 1}/{len(source_files)} files...")
    
    return stats

## 8. Run Batch Chunking (Dry Run)

In [None]:
# Configuration
DRY_RUN = True  # Set to False to actually create files
OUTPUT_DIR = cfg.get_playground_dir() / "demo-audio-chunking" / "output"

print(f"Source directory: {SAMPLES_DIR}")
print(f"Output directory: {OUTPUT_DIR}")
print(f"Chunk duration:   {CHUNK_DURATION_MS} ms")
print(f"Chunk spacing:    {CHUNK_SPACING_MS} ms")
print(f"Dry run:          {DRY_RUN}")
print()

# Run batch processing
stats = batch_chunk_files(
    source_files=sample_files,
    output_dir=OUTPUT_DIR,
    chunk_duration_ms=CHUNK_DURATION_MS,
    chunk_spacing_ms=CHUNK_SPACING_MS,
    dry_run=DRY_RUN
)

print()
print("=" * 50)
print("Summary:")
print(f"  Files processed:     {stats['files_processed']}")
print(f"  Files with errors:   {stats['files_with_errors']}")
print(f"  Chunks created:      {stats['total_chunks_created']}")
print(f"  Chunks skipped:      {stats['total_chunks_skipped']}")

total_duration_ms = stats['total_chunks_created'] * CHUNK_DURATION_MS
print(f"\n  Total chunk duration: {total_duration_ms / 1000:.1f} seconds ({total_duration_ms / 60000:.1f} minutes)")

if DRY_RUN:
    print("\nThis was a dry run. Set DRY_RUN = False to actually create files.")