In [1]:
# !pip uninstall -y datasets

In [2]:
!pip install datasets==2.18.0



In [3]:
!pip install torchaudio



In [4]:
!pip install torchcodec



In [5]:
!pip install --no-build-isolation torchcodec




In [6]:
!pip install librosa soundfile



In [7]:
# ================================================
# Lightweight Vanilla RNN Speech Recognition System
# (CPU-Optimized, Beam Search Decoding)
# ================================================

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from datasets import load_dataset
import torchaudio
import torch.nn.functional as F
import random, string, gc
import numpy as np

In [8]:
# ------------------------------------------------
# 1. Configuration
# ------------------------------------------------
DEVICE = torch.device("cpu")  # M1 CPU (MPS not used due to instability with RNNs)
SAMPLE_RATE = 16000
MAX_AUDIO_SEC = 5.0  # 2 seconds for memory efficiency
# MAX_SAMPLES = 1000    # subset for 1.5-hour training
BATCH_SIZE = 4
HIDDEN_SIZE = 64
NUM_EPOCHS = 10
LEARNING_RATE = 1e-3
BEAM_WIDTH = 3       # Beam search width

torch.manual_seed(42)
np.random.seed(42)
random.seed(42)


In [9]:
# ------------------------------------------------
# 2. Data Loading (LibriSpeech subset)
# ------------------------------------------------
import torch
import numpy as np
from itertools import islice
from datasets import load_dataset
import torchaudio

print("Loading LibriSpeech dataset...")

# Target sample counts
train_target, val_target, test_target = 1000, 500, 100

# Load small streaming portions
streamed_train = load_dataset("librispeech_asr", "clean", split="train.100", streaming=True)
streamed_val   = load_dataset("librispeech_asr", "clean", split="validation", streaming=True)
streamed_test  = load_dataset("librispeech_asr", "clean", split="test", streaming=True)

# Take only first few samples from each stream
train_subset = list(islice(streamed_train, train_target))
val_subset   = list(islice(streamed_val,   val_target))
test_subset  = list(islice(streamed_test,  test_target))

print(f"Loaded subset -> train: {len(train_subset)}, val: {len(val_subset)}, test: {len(test_subset)}")


def preprocess(batch):
    audio = batch["audio"]["array"]
    text = batch["text"].lower().strip()

    max_len = int(SAMPLE_RATE * MAX_AUDIO_SEC)
    if len(audio) > max_len:
        audio = audio[:max_len]
    else:
        audio = np.pad(audio, (0, max_len - len(audio)))

    batch["audio"] = torch.tensor(audio, dtype=torch.float32)
    batch["text"] = text
    return batch

print("⚙️ Preprocessing...")
train_subset = [preprocess(b) for b in train_subset]
val_subset   = [preprocess(b) for b in val_subset]
test_subset  = [preprocess(b) for b in test_subset]

print("✅ Preprocessing complete.")

print("\n Samples from the datasets:")
print(train_subset[0])
print(val_subset[0])
print(test_subset[0])



Loading LibriSpeech dataset...


Error while fetching `HF_TOKEN` secret value from your vault: 'Requesting secret HF_TOKEN timed out. Secrets can only be fetched when running from the Colab UI.'.
You are not authenticated with the Hugging Face Hub in this notebook.
If the error persists, please let us know by opening an issue on GitHub (https://github.com/huggingface/huggingface_hub/issues/new).


Resolving data files:   0%|          | 0/48 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/48 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/48 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/48 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/48 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/48 [00:00<?, ?it/s]

Loaded subset -> train: 1000, val: 500, test: 100
⚙️ Preprocessing...
✅ Preprocessing complete.

 Samples from the datasets:
{'file': '/home/albert/.cache/huggingface/datasets/downloads/extracted/bc0d9a6ef85c2d487c9c6efbc91f8892df927c69d3f80545a668cc058d5f677e/374-180298-0000.flac', 'audio': tensor([ 0.0007,  0.0007,  0.0007,  ..., -0.0270, -0.0223, -0.0300]), 'text': 'chapter sixteen i might have told you of the beginning of this liaison in a few lines but i wanted you to see every step by which we came i to agree to whatever marguerite wished', 'speaker_id': 374, 'chapter_id': 180298, 'id': '374-180298-0000'}
{'file': '/home/albert/.cache/huggingface/datasets/downloads/extracted/234bf103726646f6948bab918be3a226316832f62b2ca6861100cff043922b4a/2277-149896-0000.flac', 'audio': tensor([0.0019, 0.0005, 0.0002,  ..., 0.0149, 0.0109, 0.0109]), 'text': "he was in a fevered state of mind owing to the blight his wife's action threatened to cast upon his entire future", 'speaker_id': 2277, 'ch

In [10]:
# ------------------------------------------------
# 3. Feature Extraction
# ------------------------------------------------
mfcc_transform = torchaudio.transforms.MFCC(
    sample_rate=SAMPLE_RATE, n_mfcc=40, log_mels=True
)

def extract_features(audio_tensor):
    with torch.no_grad():
        feats = mfcc_transform(audio_tensor).transpose(0, 1)  # [T, 40]
    return feats

def collate_fn(batch):
    xs, ys = [], []
    for b in batch:
        feats = extract_features(b["audio"])
        xs.append(feats)
        ys.append(b["text"])
    max_len = max(x.shape[0] for x in xs)
    xs_pad = torch.zeros(len(xs), max_len, 40)
    for i, x in enumerate(xs):
        xs_pad[i, :x.shape[0]] = x
    return xs_pad, ys

train_loader = DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True,
                          collate_fn=collate_fn, num_workers=0)
val_loader = DataLoader(val_subset, batch_size=BATCH_SIZE, shuffle=False,
                        collate_fn=collate_fn, num_workers=0)
test_loader  = DataLoader(test_subset,  batch_size=1, shuffle=False,
                        collate_fn=collate_fn, num_workers=0)




In [11]:
# ------------------------------------------------
# 4. Vocabulary
# ------------------------------------------------
CHARS = list(string.ascii_lowercase + " '")
char2idx = {c: i + 1 for i, c in enumerate(CHARS)}
char2idx["<pad>"] = 0
idx2char = {v: k for k, v in char2idx.items()}

In [12]:
# ------------------------------------------------
# 5. Model Definitions
# ------------------------------------------------
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        # self.rnn = nn.GRU(input_size, hidden_size, batch_first=True)  # (for later)
        # self.rnn = nn.LSTM(input_size, hidden_size, batch_first=True) # (for later)
    def forward(self, x):
        outputs, hidden = self.rnn(x)
        return outputs, hidden

class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super().__init__()
        self.rnn = nn.RNN(output_size, hidden_size, batch_first=True)
        # self.rnn = nn.GRU(output_size, hidden_size, batch_first=True)  # (for later)
        # self.rnn = nn.LSTM(output_size, hidden_size, batch_first=True) # (for later)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, hidden, seq_len, beam_search=False):
        # Greedy or beam decoding
        if beam_search:
            return self.beam_search_decode(hidden)
        else:
            return self.greedy_decode(hidden, seq_len)

    def greedy_decode(self, hidden, seq_len):
        batch_size = hidden.shape[1]
        input_char = torch.zeros((batch_size, 1, len(char2idx))).to(DEVICE)
        outputs = []
        h = hidden
        for _ in range(seq_len):
            out, h = self.rnn(input_char, h)
            logits = self.fc(out.squeeze(1))
            pred = logits.argmax(dim=-1)
            outputs.append(pred)
            input_char = F.one_hot(pred, num_classes=len(char2idx)).float().unsqueeze(1)
        return torch.stack(outputs, dim=1)

    def beam_search_decode(self, hidden):
        beam_width = BEAM_WIDTH
        batch_size = hidden.shape[1]
        sequences = [[[], 0.0, hidden]]  # (tokens, score, hidden)
        final_seq = []

        for _ in range(50):  # max decoding steps
            all_candidates = []
            for seq, score, h in sequences:
                if len(seq) > 0 and seq[-1] == char2idx[" "]:  # early stop token
                    all_candidates.append((seq, score, h))
                    continue
                inp = torch.zeros((1, 1, len(char2idx))).to(DEVICE)
                if len(seq) > 0:
                    inp[0, 0, seq[-1]] = 1
                out, new_h = self.rnn(inp, h)
                logits = self.fc(out.squeeze(1))
                probs = F.log_softmax(logits, dim=-1)
                topk = torch.topk(probs, beam_width)
                for k in range(beam_width):
                    token = topk.indices[0, k].item()
                    new_score = score + topk.values[0, k].item()
                    all_candidates.append((seq + [token], new_score, new_h))
            ordered = sorted(all_candidates, key=lambda x: x[1], reverse=True)
            sequences = ordered[:beam_width]
        final_seq = sequences[0][0]
        return torch.tensor(final_seq).unsqueeze(0)

In [13]:
# ------------------------------------------------
# 6. Training Utilities
# ------------------------------------------------
def text_to_tensor(text):
    return torch.tensor([char2idx.get(c, 0) for c in text], dtype=torch.long)

def tensor_to_text(tensor):
    return "".join(idx2char.get(i, "") for i in tensor if i != 0)

def compute_loss(outputs, targets):
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
    return loss

# def train_epoch(encoder, decoder, loader, optimizers):
#     encoder.train(), decoder.train()
#     total_loss = 0
#     for xb, yb in loader:
#         xb = xb.to(DEVICE)
#         target_tensors = [text_to_tensor(y) for y in yb]
#         max_tgt_len = max(t.size(0) for t in target_tensors)
#         tgt_batch = torch.zeros(len(yb), max_tgt_len, dtype=torch.long)
#         for i, t in enumerate(target_tensors):
#             tgt_batch[i, :t.size(0)] = t
#         tgt_batch = tgt_batch.to(DEVICE)

#         enc_out, hidden = encoder(xb)
#         decoder_input = torch.zeros((xb.size(0), max_tgt_len, len(char2idx))).to(DEVICE)
#         dec_out, _ = decoder.rnn(decoder_input, hidden)
#         logits = decoder.fc(dec_out)
#         loss = compute_loss(logits, tgt_batch)

#         for opt in optimizers: opt.zero_grad()
#         loss.backward()
#         for opt in optimizers: opt.step()
#         total_loss += loss.item()
#         gc.collect()
#     return total_loss / len(loader)

def train_epoch(encoder, decoder, loader, optimizers):
    encoder.train(), decoder.train()
    total_loss = 0
    for xb, yb in loader:
        xb = xb.to(DEVICE)
        target_tensors = [text_to_tensor(y) for y in yb]
        max_tgt_len = max(t.size(0) for t in target_tensors)
        tgt_batch = torch.zeros(len(yb), max_tgt_len, dtype=torch.long)
        for i, t in enumerate(target_tensors):
            tgt_batch[i, :t.size(0)] = t
        tgt_batch = tgt_batch.to(DEVICE)

        enc_out, hidden = encoder(xb)

        batch_size = xb.size(0)
        sos_token = torch.zeros((batch_size, 1, len(char2idx))).to(DEVICE)


        teacher_forcing_inputs = F.one_hot(tgt_batch[:, :-1], num_classes=len(char2idx)).float()

        decoder_input = torch.cat([sos_token, teacher_forcing_inputs], dim=1)

        dec_out, _ = decoder.rnn(decoder_input, hidden)
        logits = decoder.fc(dec_out)

        loss = compute_loss(logits, tgt_batch)

        for opt in optimizers: opt.zero_grad()
        loss.backward()
        for opt in optimizers: opt.step()
        total_loss += loss.item()
        gc.collect()
    return total_loss / len(loader)

# def evaluate(encoder, decoder, loader):
#     encoder.eval(), decoder.eval()
#     total_loss = 0
#     with torch.no_grad():
#         for xb, yb in loader:
#             xb = xb.to(DEVICE)
#             target_tensors = [text_to_tensor(y) for y in yb]
#             max_tgt_len = max(t.size(0) for t in target_tensors)
#             tgt_batch = torch.zeros(len(yb), max_tgt_len, dtype=torch.long)
#             for i, t in enumerate(target_tensors):
#                 tgt_batch[i, :t.size(0)] = t
#             tgt_batch = tgt_batch.to(DEVICE)

#             enc_out, hidden = encoder(xb)
#             decoder_input = torch.zeros((xb.size(0), max_tgt_len, len(char2idx))).to(DEVICE)
#             dec_out, _ = decoder.rnn(decoder_input, hidden)
#             logits = decoder.fc(dec_out)
#             loss = compute_loss(logits, tgt_batch)
#             total_loss += loss.item()
#     return total_loss / len(loader)

def evaluate(encoder, decoder, loader):
    encoder.eval(), decoder.eval()
    total_loss = 0
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(DEVICE)
            target_tensors = [text_to_tensor(y) for y in yb]
            max_tgt_len = max(t.size(0) for t in target_tensors)
            tgt_batch = torch.zeros(len(yb), max_tgt_len, dtype=torch.long)
            for i, t in enumerate(target_tensors):
                tgt_batch[i, :t.size(0)] = t
            tgt_batch = tgt_batch.to(DEVICE)

            enc_out, hidden = encoder(xb)

            # --- START: FIX ---
            batch_size = xb.size(0)
            sos_token = torch.zeros((batch_size, 1, len(char2idx))).to(DEVICE)
            teacher_forcing_inputs = F.one_hot(tgt_batch[:, :-1], num_classes=len(char2idx)).float()
            decoder_input = torch.cat([sos_token, teacher_forcing_inputs], dim=1)
            # --- END: FIX ---

            dec_out, _ = decoder.rnn(decoder_input, hidden)
            logits = decoder.fc(dec_out)
            loss = compute_loss(logits, tgt_batch)
            total_loss += loss.item()
    return total_loss / len(loader)

In [14]:
# ------------------------------------------------
# 7. Initialize and Train
# ------------------------------------------------
encoder = EncoderRNN(40, HIDDEN_SIZE).to(DEVICE)
decoder = DecoderRNN(HIDDEN_SIZE, len(char2idx)).to(DEVICE)

opt_enc = optim.Adam(encoder.parameters(), lr=LEARNING_RATE)
opt_dec = optim.Adam(decoder.parameters(), lr=LEARNING_RATE)

print("\nStarting training...\n")
for epoch in range(NUM_EPOCHS):
    train_loss = train_epoch(encoder, decoder, train_loader, [opt_enc, opt_dec])
    val_loss = evaluate(encoder, decoder, val_loader)
    print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.3f} | Val Loss: {val_loss:.3f}")
    torch.cuda.empty_cache()
    gc.collect()


Starting training...

Epoch 1/10 | Train Loss: 2.680 | Val Loss: 2.379
Epoch 2/10 | Train Loss: 2.296 | Val Loss: 2.214
Epoch 3/10 | Train Loss: 2.188 | Val Loss: 2.142
Epoch 4/10 | Train Loss: 2.127 | Val Loss: 2.093
Epoch 5/10 | Train Loss: 2.080 | Val Loss: 2.054
Epoch 6/10 | Train Loss: 2.041 | Val Loss: 2.023
Epoch 7/10 | Train Loss: 2.009 | Val Loss: 1.996
Epoch 8/10 | Train Loss: 1.982 | Val Loss: 1.976
Epoch 9/10 | Train Loss: 1.959 | Val Loss: 1.954
Epoch 10/10 | Train Loss: 1.939 | Val Loss: 1.937


In [15]:
# ------------------------------------------------
# 8. Test Decoding (with Beam Search)
# ------------------------------------------------
print("\nTesting beam search decoding on test samples:")
encoder.eval()
decoder.eval()

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)

        # Encode the batch
        _, hidden = encoder(xb)

        # Loop through each sample in the batch (to keep beam search stable)
        for i in range(min(len(yb), 3)):  # test up to 3 samples for speed
            # Slice hidden state for one sample
            if isinstance(hidden, tuple):  # LSTM case
                h_i = hidden[0][:, i:i+1, :].contiguous()
                c_i = hidden[1][:, i:i+1, :].contiguous()
                hidden_i = (h_i, c_i)
            else:
                hidden_i = hidden[:, i:i+1, :].contiguous()

            # Decode one sequence at a time
            output_seq = decoder(hidden_i, seq_len=40, beam_search=True)

            # Display results
            print(f"\nSample {i+1}:")
            print("Target:", yb[i])
            print("Predicted:", tensor_to_text(output_seq[0]))

        break  # only test on one batch for now



Testing beam search decoding on validation samples:

Sample 1:
Target: he was in a fevered state of mind owing to the blight his wife's action threatened to cast upon his entire future
Predicted: 

Sample 2:
Target: he would have to pay her the money which she would now regularly demand or there would be trouble it did not matter what he did
Predicted: 

Sample 3:
Target: hurstwood walked the floor mentally arranging the chief points of his situation
Predicted: 


In [16]:
# import torch
# from tqdm import tqdm
# import evaluate

# # Load WER metric from Hugging Face
# wer_metric = evaluate.load("wer")

# def evaluate_model(model, dataloader, device):
#     model.eval()
#     predictions, references = [], []

#     with torch.no_grad():
#         for batch in tqdm(dataloader, desc="Evaluating", unit="batch"):
#             # Move tensors to device
#             input_values = batch["input_values"].to(device)
#             labels = batch["labels"].to(device)

#             # Forward pass
#             outputs = model(input_values)
#             logits = outputs["logits"]

#             # Beam search decoding
#             decoded_preds = model.decode_batch(logits, method="beam", beam_width=3)

#             # Convert labels (token IDs) back to text
#             for label in labels:
#                 label = label[label != -100]  # ignore padding
#                 decoded_label = model.processor.decode(label.cpu().numpy(), group_tokens=False)
#                 references.append(decoded_label.lower())

#             predictions.extend([pred.lower() for pred in decoded_preds])

#     # Compute Word Error Rate
#     wer_score = wer_metric.compute(predictions=predictions, references=references)
#     print(f"\n✅ Word Error Rate (WER): {wer_score:.4f}")
#     return wer_score


In [17]:
# device = torch.device("cpu")
# wer_score = evaluate_model(model, test_dataloader, device)


In [19]:
!pip install jiwer

Collecting jiwer
  Downloading jiwer-4.0.0-py3-none-any.whl.metadata (3.3 kB)
Collecting rapidfuzz>=3.9.7 (from jiwer)
  Downloading rapidfuzz-3.14.3-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (12 kB)
Downloading jiwer-4.0.0-py3-none-any.whl (23 kB)
Downloading rapidfuzz-3.14.3-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m48.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, jiwer
Successfully installed jiwer-4.0.0 rapidfuzz-3.14.3


In [20]:
# ------------------------------------------------
# 9. Evaluate on Test Set (Compute WER)
# ------------------------------------------------
from jiwer import wer
import gc

encoder.eval()
decoder.eval()

references = []
hypotheses = []

print("\n Evaluating test set with beam search decoding...")

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)

        # Encode batch
        _, hidden = encoder(xb)

        for i in range(len(yb)):
            # Extract per-sample hidden state
            if isinstance(hidden, tuple):  # LSTM
                h_i = hidden[0][:, i:i+1, :].contiguous()
                c_i = hidden[1][:, i:i+1, :].contiguous()
                hidden_i = (h_i, c_i)
            else:
                hidden_i = hidden[:, i:i+1, :].contiguous()

            # Decode using beam search (or greedy as fallback)
            try:
                output_seq = decoder(hidden_i, seq_len=40, beam_search=True)
            except Exception as e:
                print(f"Beam search failed on sample {i}: {e}")
                output_seq = decoder(hidden_i, seq_len=40, beam_search=False)

            pred_text = tensor_to_text(output_seq[0])
            true_text = yb[i]

            hypotheses.append(pred_text)
            references.append(true_text)

        # Free memory periodically (for M1 stability)
        torch.cuda.empty_cache()
        gc.collect()

# ------------------------------------------------
# Compute WER
# ------------------------------------------------
wer_score = wer(references, hypotheses)
print("\n✅ Evaluation complete!")
print(f"Word Error Rate (WER): {wer_score * 100:.2f}%")

# Optional: show a few qualitative results
print("\nSample predictions:")
for i in range(min(5, len(hypotheses))):
    print(f"\nRef: {references[i]}")
    print(f"Hyp: {hypotheses[i]}")



 Evaluating test set with beam search decoding...

✅ Evaluation complete!
Word Error Rate (WER): 100.00%

Sample predictions:

Ref: concord returned to its place amidst the tents
Hyp: 

Ref: the english forwarded to the french baskets of flowers of which they had made a plentiful provision to greet the arrival of the young princess the french in return invited the english to a supper which was to be given the next day
Hyp: 

Ref: congratulations were poured in upon the princess everywhere during her journey
Hyp: 

Ref: from the respect paid her on all sides she seemed like a queen and from the adoration with which she was treated by two or three she appeared an object of worship the queen mother gave the french the most affectionate reception france was her native country and she had suffered too much unhappiness in england for england to have made her forget france
Hyp: 

Ref: she taught her daughter then by her own affection for it that love for a country where they had both been ho