In [1]:
# -*- coding: utf-8 -*-
"""RNN-based Speech Recognition with Noise Augmentation (Using nn.RNN)"""


'RNN-based Speech Recognition with Noise Augmentation (Using nn.RNN)'

In [2]:

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import numpy as np
import string
from torch.utils.data import DataLoader


In [3]:
# Ensure data directory exists to avoid FileNotFoundError
os.makedirs("./data", exist_ok=True)

In [4]:
# -------------------- Noise Augmentation Module --------------------
class AddGaussianNoise(nn.Module):
    """Adds Gaussian noise to the waveform."""
    def __init__(self, noise_level=0.005):
        super().__init__()
        self.noise_level = noise_level

    def forward(self, waveform):
        if self.training:
            noise = torch.randn_like(waveform) * self.noise_level
            return waveform + noise
        return waveform

In [5]:

# -------------------- Audio Transforms --------------------
train_audio_transforms = nn.Sequential(
    AddGaussianNoise(noise_level=0.01),
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
    torchaudio.transforms.TimeMasking(time_mask_param=100)
)
valid_audio_transforms = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128)



In [6]:

# -------------------- Text Processing --------------------
class TextTransform:
    """Maps characters to integers and vice versa."""
    def __init__(self):
        self.chars = ["'", '<SPACE>'] + list(string.ascii_lowercase)
        self.char_map = {c: i for i, c in enumerate(self.chars)}
        self.index_map = {i: c for i, c in enumerate(self.chars)}
        self.index_map[self.char_map['<SPACE>']] = ' '

    def text_to_int(self, text):
        return [self.char_map.get(c, self.char_map['<SPACE>']) for c in text.lower()]

    def int_to_text(self, labels):
        return ''.join([self.index_map[i] for i in labels]).replace('<SPACE>', ' ')

text_transform = TextTransform()

In [7]:

# -------------------- Data Processing --------------------
def data_processing(data, data_type="train"):
    specs, labels = [], []
    input_lengths, label_lengths = [], []
    transform = train_audio_transforms if data_type == 'train' else valid_audio_transforms
    for (waveform, _, utterance, *_ ) in data:
        spec = transform(waveform).squeeze(0).transpose(0, 1)
        specs.append(spec)
        label = torch.tensor(text_transform.text_to_int(utterance))
        labels.append(label)
        input_lengths.append(spec.shape[0] // 2)
        label_lengths.append(len(label))

    specs = nn.utils.rnn.pad_sequence(specs, batch_first=True)
    specs = specs.unsqueeze(1).transpose(2, 3)  # (B,1,Feats,Time)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)
    return specs, labels, input_lengths, label_lengths

In [8]:
# -------------------- Model Components --------------------
class CNNLayerNorm(nn.Module):
    def __init__(self, n_feats):
        super().__init__()
        self.layer_norm = nn.LayerNorm(n_feats)
    def forward(self, x):
        x = x.transpose(2, 3).contiguous()
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous()

class ResidualCNN(nn.Module):
    def __init__(self, in_ch, out_ch, kernel, stride, dropout, n_feats):
        super().__init__()
        self.cnn1 = nn.Conv2d(in_ch, out_ch, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_ch, out_ch, kernel, stride, padding=kernel//2)
        self.dropout = nn.Dropout(dropout)
        self.ln1 = CNNLayerNorm(n_feats)
        self.ln2 = CNNLayerNorm(n_feats)
    def forward(self, x):
        residual = x
        x = self.ln1(x); x = nn.GELU()(x); x = self.dropout(x)
        x = self.cnn1(x)
        x = self.ln2(x); x = nn.GELU()(x); x = self.dropout(x)
        x = self.cnn2(x)
        return x + residual

class BidirectionalRNN(nn.Module):
    """Bidirectional RNN with LayerNorm"""
    def __init__(self, input_dim, hidden_dim, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(input_dim)
        self.rnn = nn.RNN(input_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.norm(x); x = nn.GELU()(x)
        x, _ = self.rnn(x)
        return self.dropout(x)

In [9]:
# -------------------- Main Model --------------------
class SpeechRecognitionModel(nn.Module):
    def __init__(self, n_cnn_layers=3, n_rnn_layers=5, rnn_dim=512,
                 n_class=29, n_feats=128, stride=2, dropout=0.2):
        super().__init__()
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=1)
        self.rescnn = nn.Sequential(*[
            ResidualCNN(32, 32, 3, 1, dropout, n_feats//2)
            for _ in range(n_cnn_layers)
        ])
        self.linear = nn.Linear(32 * (n_feats//2), rnn_dim)
        self.rnns = nn.Sequential(*[
            BidirectionalRNN(
                input_dim=rnn_dim if i==0 else rnn_dim*2,
                hidden_dim=rnn_dim,
                dropout=dropout
            ) for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim), nn.GELU(), nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn(x)
        batch, ch, feat_dim, seq_len = x.size()
        x = x.permute(0, 3, 1, 2).contiguous()
        x = x.view(batch, seq_len, ch * feat_dim)
        x = self.linear(x)
        x = self.rnns(x)
        return self.classifier(x)

In [10]:

# -------------------- Training Setup --------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
params = {"batch_size":16, "epochs":20, "lr":3e-4,
          "n_cnn_layers":3, "n_rnn_layers":5, "rnn_dim":512,
          "n_class":29, "n_feats":128, "stride":2, "dropout":0.2}

model = SpeechRecognitionModel(**{k:params[k] for k in [
    'n_cnn_layers','n_rnn_layers','rnn_dim','n_class','n_feats','stride','dropout']
}).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=params['lr'])
criterion = nn.CTCLoss(blank=28).to(device)

In [11]:

# -------------------- Training Utilities --------------------
def decode(outputs):
    """Greedy decoder"""
    _, preds = torch.max(outputs, dim=2)
    return [text_transform.int_to_text(p.tolist()) for p in preds]

def wer(ref, hyp):
    """Word Error Rate"""
    ref_words = ref.split()
    hyp_words = hyp.split()
    return levenshtein_distance(ref_words, hyp_words) / max(len(ref_words), 1)

def cer(ref, hyp):
    """Character Error Rate"""
    return levenshtein_distance(ref, hyp) / max(len(ref), 1)

def levenshtein_distance(a, b):
    """Dynamic programming implementation of Levenshtein distance"""
    m, n = len(a), len(b)
    dp = [[0]*(n+1) for _ in range(m+1)]
    
    for i in range(m+1):
        for j in range(n+1):
            if i == 0:
                dp[i][j] = j
            elif j == 0:
                dp[i][j] = i
            else:
                cost = 0 if a[i-1] == b[j-1] else 1
                dp[i][j] = min(dp[i-1][j] + 1,
                               dp[i][j-1] + 1,
                               dp[i-1][j-1] + cost)
    return dp[m][n]


In [12]:

# -------------------- Training Execution --------------------
def train(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for batch_idx, (spectrograms, labels, input_lens, label_lens) in enumerate(loader):
        spectrograms, labels = spectrograms.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(spectrograms)
        outputs = F.log_softmax(outputs, dim=2).transpose(0, 1)
        
        loss = criterion(outputs, labels, input_lens, label_lens)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        if batch_idx % 100 == 0:
            print(f"Batch {batch_idx}/{len(loader)} Loss: {loss.item():.4f}")
    
    return total_loss / len(loader)

def validate(model, loader, criterion, device):
    model.eval()
    total_loss = total_cer = total_wer = 0
    with torch.no_grad():
        for spectrograms, labels, input_lens, label_lens in loader:
            spectrograms, labels = spectrograms.to(device), labels.to(device)
            outputs = model(spectrograms)
            outputs = F.log_softmax(outputs, dim=2).transpose(0, 1)
            
            loss = criterion(outputs, labels, input_lens, label_lens)
            total_loss += loss.item()
            
            pred_texts = decode(outputs.transpose(0, 1))
            true_texts = [text_transform.int_to_text(l.tolist()) for l in labels]
            
            for ref, hyp in zip(true_texts, pred_texts):
                total_cer += cer(ref, hyp)
                total_wer += wer(ref, hyp)
    
    avg_loss = total_loss / len(loader)
    avg_cer = total_cer / len(loader.dataset)
    avg_wer = total_wer / len(loader.dataset)
    print(f"Validation Loss: {avg_loss:.4f} | CER: {avg_cer:.4f} | WER: {avg_wer:.4f}")
    return avg_loss, avg_cer, avg_wer

In [13]:
# -------------------- Main Execution --------------------
if __name__ == "__main__":
    # Load dataset
    train_dataset = torchaudio.datasets.LIBRISPEECH(
        root="./data",
        url="train-clean-100",
        download=True
    )
    
    test_dataset = torchaudio.datasets.LIBRISPEECH(
        root="./data",
        url="test-clean",
        download=True
    )

    # Create data loaders
    train_loader = DataLoader(
        train_dataset,
        batch_size=pipeline_params["batch_size"],
        shuffle=True,
        collate_fn=lambda x: data_processing(x, "train")
    )
    
    test_loader = DataLoader(
        test_dataset,
        batch_size=pipeline_params["batch_size"],
        collate_fn=lambda x: data_processing(x, "valid")
    )

    # Training loop
    best_wer = float('inf')
    for epoch in range(pipeline_params["epochs"]):
        print(f"\nEpoch {epoch+1}/{pipeline_params['epochs']}")
        train_loss = train(model, train_loader, optimizer, criterion, device)
        val_loss, val_cer, val_wer = validate(model, test_loader, criterion, device)
        
        # Save best model
        if val_wer < best_wer:
            best_wer = val_wer
            torch.save(model.state_dict(), "best_model.pth")
            print("Saved new best model!") 


  0%|          | 23.5M/5.95G [01:10<5:04:19, 348kB/s]  


KeyboardInterrupt: 