In [1]:
import os
import glob
import numpy as np
import librosa
import soundfile as sf
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from datasets import load_dataset
from torch.utils.data import Dataset, DataLoader
from IPython.display import Audio

# Step 1: Preprocessing and Saving Audio Files

# Create directory for saving preprocessed audio files
os.makedirs('preprocessed_audios', exist_ok=True)

# Define the preprocessing function
def preprocess_and_save_audio(example, idx):
    # Load the audio file using librosa from the 'filepath'
    y, sr = librosa.load(example['filepath'], sr=None)
    
    # Convert to mono if it's stereo
    y_mono = librosa.to_mono(y)
    
    # Resample the audio to exactly 22,000 Hz
    target_sr = 22000
    y_resampled = librosa.resample(y_mono, orig_sr=sr, target_sr=target_sr)
    
    # Calculate the number of samples for 5 seconds
    target_samples = target_sr * 5  # 5 seconds = 110,000 samples

    # Trim or pad the audio to exactly 5 seconds (110,000 samples)
    y_trimmed = librosa.util.fix_length(y_resampled, size=target_samples)
    
    # Save the processed audio to a WAV file
    filename = f"preprocessed_audios/{idx}.wav"
    sf.write(filename, y_trimmed, samplerate=target_sr)
    
    # Optionally, add the path to the saved file to the example dict
    example['processed_filepath'] = filename
    example['sampling_rate'] = target_sr
    
    return example

# Load dataset
dataset = load_dataset('DBD-research-group/BirdSet', 'HSN', trust_remote_code=True)

# Apply the preprocessing function to the dataset
preprocessed_dataset = dataset['train'].select(range(2000)).map(preprocess_and_save_audio, with_indices=True)

print("Preprocessing complete. Preprocessed audio files are saved in 'preprocessed_audios/' directory.")

# Step 2: Creating a PyTorch Dataset

def mu_law_encoding(x, quantization_channels=256):
    mu = quantization_channels - 1
    fx = np.sign(x) * np.log1p(mu * np.abs(x)) / np.log1p(mu)
    # Map to quantization bins
    x_mu = ((fx + 1) / 2 * mu + 0.5).astype(np.int32)
    return x_mu

class BirdAudioDataset(Dataset):
    def __init__(self, file_list, quantization_channels=256):
        self.file_list = file_list
        self.quantization_channels = quantization_channels
        
    def __len__(self):
        return len(self.file_list)
    
    def __getitem__(self, idx):
        filename = self.file_list[idx]
        # Load the audio file
        y, sr = librosa.load(filename, sr=None)
        # Normalize audio to -1 to 1
        y = y / np.abs(y).max()
        # Apply mu-law encoding
        y_mu = mu_law_encoding(y, self.quantization_channels)
        # Convert to torch tensor
        y_tensor = torch.from_numpy(y_mu).long()
        # Reshape to (1, seq_length)
        y_tensor = y_tensor.unsqueeze(0)
        return y_tensor

# Get the list of preprocessed audio files
file_list = glob.glob('preprocessed_audios/*.wav')

# Create dataset and dataloader
dataset = BirdAudioDataset(file_list)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)

print("DataLoader is ready.")

# Step 3: Defining the WaveNet Model

class ResidualBlock(nn.Module):
    def __init__(self, dilation, residual_channels, skip_channels):
        super(ResidualBlock, self).__init__()
        self.filter_conv = nn.Conv1d(residual_channels, residual_channels, kernel_size=2, dilation=dilation)
        self.gate_conv = nn.Conv1d(residual_channels, residual_channels, kernel_size=2, dilation=dilation)
        self.residual_conv = nn.Conv1d(residual_channels, residual_channels, kernel_size=1)
        self.skip_conv = nn.Conv1d(residual_channels, skip_channels, kernel_size=1)

    def forward(self, x):
        residual = x
        # Remove padding from input if necessary
        padding = (self.filter_conv.kernel_size[0] - 1) * self.filter_conv.dilation[0]
        if padding > 0:
            x = F.pad(x, (padding, 0))
        filter = torch.tanh(self.filter_conv(x))
        gate = torch.sigmoid(self.gate_conv(x))
        x = filter * gate
        skip = self.skip_conv(x)
        x = self.residual_conv(x)
        # Adjust for size mismatch
        x = x[:, :, :residual.size(2)]
        x = x + residual
        skip = skip[:, :, :residual.size(2)]
        return x, skip

class WaveNet(nn.Module):
    def __init__(self, residual_channels=32, skip_channels=256, quantization_channels=256, dilations=[1, 2, 4, 8, 16, 32]):
        super(WaveNet, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=quantization_channels, embedding_dim=residual_channels)
        self.residual_blocks = nn.ModuleList(
            [ResidualBlock(dilation, residual_channels, skip_channels) for dilation in dilations]
        )
        self.output_conv1 = nn.Conv1d(skip_channels, skip_channels, kernel_size=1)
        self.output_conv2 = nn.Conv1d(skip_channels, quantization_channels, kernel_size=1)

    def forward(self, x):
        x = x.squeeze(1)
        x = self.embedding(x)
        x = x.transpose(1, 2)
        skip_connections = []
        for block in self.residual_blocks:
            x, skip = block(x)
            skip_connections.append(skip)
        # Make sure all skip connections are the same size
        min_length = min([s.size(2) for s in skip_connections])
        skip_connections = [s[:, :, :min_length] for s in skip_connections]
        x = sum(skip_connections)
        x = F.relu(x)
        x = F.relu(self.output_conv1(x))
        x = self.output_conv2(x)
        return x

print("WaveNet model is defined.")

# Step 4: Training the Model

# Initialize model, optimizer, loss function
model = WaveNet()
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Training loop
num_epochs = 1  # Increase for better results
for epoch in range(num_epochs):
    for batch in dataloader:
        optimizer.zero_grad()
        x = batch[:, :, :-1]  # Input sequence
        y = batch[:, 0, 1:]   # Target sequence
        output = model(x)
        # Adjust output and target sizes
        min_length = min(output.size(2), y.size(1))
        output = output[:, :, :min_length]
        y = y[:, :min_length]
        output = output.permute(0, 2, 1)  # (batch_size, seq_length, quantization_channels)
        output = output.reshape(-1, 256)
        y = y.reshape(-1)
        loss = criterion(output, y)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

print("Training complete.")

# Step 5: Generating Synthetic Audio

def mu_law_expansion(y_mu, quantization_channels=256):
    mu = quantization_channels - 1
    y = y_mu.astype(np.float32)
    y = y / mu * 2 - 1  # Map values back to [-1, 1]
    y = np.sign(y) * (np.exp(np.abs(y) * np.log1p(mu)) - 1) / mu
    return y

def generate(model, initial_input, num_samples, quantization_channels=256):
    model.eval()
    generated = []
    input_sequence = initial_input  # Should be a tensor of shape (1, 1, seq_length)
    for _ in range(num_samples):
        with torch.no_grad():
            output = model(input_sequence)
            output = output[:, :, -1]  # Get the last time step
            output = F.softmax(output, dim=1)
            # Sample from the distribution
            distrib = torch.distributions.Categorical(output)
            sample = distrib.sample()
            generated.append(sample.item())
            # Append the new sample to input_sequence
            sample = sample.unsqueeze(0).unsqueeze(0)
            input_sequence = torch.cat([input_sequence, sample], dim=2)
            # Keep the last N samples to avoid memory issues
            if input_sequence.size(2) > 1000:
                input_sequence = input_sequence[:, :, -1000:]
    # Decode mu-law encoding
    generated = np.array(generated)
    generated = mu_law_expansion(generated, quantization_channels)
    return generated

# Generate audio
initial_input = torch.zeros(1, 1, 1).long()  # Start with silence
num_samples = 22000 * 5  # Generate 5 seconds at 22kHz
print("Generating audio...")
generated_audio = generate(model, initial_input, num_samples)

# Save generated audio
sf.write('generated_audio.wav', generated_audio, samplerate=22000)
print("Generated audio saved as 'generated_audio.wav'.")

# Play the generated audio
display(Audio(generated_audio, rate=22000))


Preprocessing complete. Preprocessed audio files are saved in 'preprocessed_audios/' directory.
DataLoader is ready.
WaveNet model is defined.
Epoch 1/1, Loss: 3.8348135948181152
Training complete.
Generating audio...
Generated audio saved as 'generated_audio.wav'.


# Why μ-law Encoding

μ-law encoding is a nonlinear companding algorithm used in digital telecommunication systems in North America and Japan. It compresses the dynamic range of audio signals before digitization, optimizing signal representation within limited bandwidth.

## Some more key points which I found

- **Dynamic Range Compression**: Improves the representation of quieter sounds by reducing quantization error at lower amplitudes.
- **Logarithmic Companding**: Uses a logarithmic formula to allocate more quantization levels to lower amplitude signals.
- **Standard Use**: Commonly used in digital telephony and audio compression systems in specific regions.

## Mathematical Formula

The μ-law encoding formula is:

$$
y = \operatorname{sgn}(x) \cdot \frac{\ln\left(1 + \mu |x|\right)}{\ln\left(1 + \mu\right)}
$$


- **\( x \)**: Normalized input signal (from \(-1\) to \(1\))
- **\( y \)**: Compressed output signal
- **\( $\mu$ \)**: Compression parameter (typically \( $\mu$ = 255 \))
- **\( $\operatorname{sgn}(x)$ \)**: Sign function of \( x \)

