<a href="https://colab.research.google.com/github/shubham13596/Stanford-CS224S/blob/main/Homework3_Deep_neural_network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CS224S Assignment 3: Deep Learning for End-to-End Speech Recognition

---


This notebook is worth 100 of the total 160 possible points for homework 3. You should be able to train all models in Colab. We encourage you to read general PyTorch / Lightning tutorials as necessary as you work. You might need to purchase more Colab GPU credits for training to work reasonably fast.

We provide a target error rate for each model training exercise, you should be able to obtain an error rate at least this good using the code setup and data provided. Start early, model training may take 30 mins or more per training run.

**Note:** You will need to make a copy of this Colab notebook in your Google Drive before you can edit it.




In [None]:
!pip install numpy



In [None]:


# Do not modify.

import os
from google.colab import drive
drive.mount('/content/gdrive')

DRIVE_PATH = '/content/gdrive/MyDrive/cs224s_spring2025'
DRIVE_PYTHON_PATH = DRIVE_PATH.replace('\\', '')
if not os.path.exists(DRIVE_PYTHON_PATH):
  %mkdir $DRIVE_PATH

SYM_PATH = '/content/cs224s_spring2025'
if not os.path.exists(SYM_PATH):
  !ln -s $DRIVE_PATH $SYM_PATH

DATA_PATH = '{}/data'.format(SYM_PATH)
if not os.path.exists(DATA_PATH):
  %mkdir $DATA_PATH
%cd $DATA_PATH
if not os.path.exists(os.path.join(DATA_PATH, 'harper_valley_bank_minified')):
  !wget -q http://web.stanford.edu/class/cs224s/download/harper_valley_bank_minified.zip
  !unzip -q harper_valley_bank_minified.zip
  %rm harper_valley_bank_minified.zip

MODEL_PATH = '{}/trained_models'.format(SYM_PATH)
if not os.path.exists(MODEL_PATH):
  %mkdir $MODEL_PATH

%cd $SYM_PATH
if not os.path.exists(os.path.join(SYM_PATH, 'utils.py')):
  !wget -q http://web.stanford.edu/class/cs224s/download/utils.py

!pip -q install pytorch_lightning
!pip install wandb -qqq

from collections import OrderedDict
from itertools import chain

import h5py
import math
import json
import torch
import wandb
import numpy as np
import pytorch_lightning as pl
from glob import glob
import librosa
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import random
from sklearn.metrics import f1_score
from typing import *
from IPython.display import Audio
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import WandbLogger


Mounted at /content/gdrive
/content/gdrive/MyDrive/cs224s_spring2025/data
/content/gdrive/MyDrive/cs224s_spring2025
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m823.1/823.1 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m113.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m93.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m61.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

# Part 1: ML Speech Data Pipeline

HarperValleyBank consists of 23 hours of audio from 1,446 human-human conversations between 59 unique speakers. For your convenience, we store in `harper_valley_bank_minified` all utterance audio waveforms as `numpy` arrays in `data.h5` and all transcripts and labels as `numpy` arrays in `labels.npz`.

Our custom dataset class `HarperValleyBank` should inherit `torch.utils.data.Dataset` and overwrite the following methods:
- `__len__` so that `len(dataset)` returns the size of the dataset.
- `__getitem__` to support the indexing such that `dataset[i]` can be used to get the `i`th dataset sample.

There are a few special features that the `HarperValleyBank` class should exhibit.
- **Fixed-length data.** Both the extracted audio features and the character labels will inherently be sequences of different lengths. However, in order to store data in a minibatch during training, we need to make the lengths uniform. To do so, we can first enforce a maximum length for audio waves and a maximum length for labels (note that these two maximum lengths are not necessarily the same). We have preprocessed all sequences to be cropped by single utterances as opposed to conversations. Next, we can crop and pad each sequence with a pad token (e.g. `3`) such that all audio sequences and all label sequences are their respective maximum lengths. We will also store the actual lengths of each sequence so that the model does not learn from the padded indices.
- **Sequence representation.** We are training a character-level model, so the ASR model is responsible for predicting each spoken character. Therefore, we must convert our transcript text to a list of indices representing 34 possible characters (see the global variable `VOCAB`) and a few domain-specific tokens (see the global variable `SILENT_VOCAB` e.g. `[laughter]`). Think of each character as its own class.
```
Raw utterance:  hi this is an example .
List of characters: ['h', 'i', ' ', 't', 'h', 'i', 's', ' ', 'i', 's', ' ', 'a', 'n', ' ', 'e', 'x', 'a', 'm', 'p', 'l', 'e', ' ', '.']
List of indices: [18, 19, 3, 30, 18, 19, 29, 3, 19, 29, 3, 11, 24, 3, 15, 34, 11, 23, 26, 22, 15, 3, 6]
```
- **Special tokens.** Although this next part is provided, it is worth pointing out. Aside from the padding index, there are three special tokens in our vocabulary:
    - A blank token (`epsilon`, represented by index `0`) which designates a padded index and plays a special role in CTC.
    - A start-of-sentence token (`SOS`, represented by index `1`) which designates the start of a sentence.
    - An end-of-sentence token (`EOS`, represented by index `2`) which designates the end of a sentence.
```
Example label sequence: [18, 19, 3, 30, 18]
Add an END token: [18, 19, 3, 30, 18, 2]
```
Suppose the maximum label sequence has length 10.
```
Padded label sequence: [18, 19, 3, 30, 18, 2, 0, 0, 0, 0]
Label sequence length: 6
```

**It may be helpful to first read through the `HarperValleyBank` starter code and `utils.py` to get familiar with the data pipeline.**

Below, we provide a cell for you to index into the raw data and listen to randomly chosen samples.

In [None]:
root = os.path.join(DATA_PATH, 'harper_valley_bank_minified')
waveform_h5 = h5py.File(os.path.join(root, 'data.h5'), 'r')
waveform_data = waveform_h5.get('waveforms')
label_data = np.load(os.path.join(root, 'labels.npz'))
assert len(waveform_data) == len(label_data['human_transcripts'])
index = random.randint(0, len(waveform_data) - 1)
w = waveform_data[f'{index}'][:]
t = label_data['human_transcripts'][index]

print('index {}: "{}"\n'.format(index, t))
Audio(w, rate=8000)

index 20242: "electric"



## **Task 1.1: Set up primary task data (5 Points)**

To train speech recognition models, we need a consistent input format. However, raw audio clips and transcript labels vary in length, which makes batching impossible.

Thus, for every dataset sample, we must generate four objects:

- `inputs`: the log-Mel spectrogram features, padded to a fixed maximum length.
- `input_lengths`: the true (unpadded) length of the spectrogram (in frames).
- `labels`: the character-level transcript labels, padded to a fixed maximum length.
- `label_lengths`: the true (unpadded) number of label tokens.

These objects will be used for what we call our *primary* task: speech recognition. In later parts, we will use *auxiliary* tasks to perform multi-task learning toward boosting speech recognition.

**→ Implement the `get_primary_task_data` method.** This will be used in the `__getitem__` method of `HarperValleyBank` and later its subclass for multi-task learning, and it is responsible for extracting log-Mel spectrogram features from the raw audio clips. Do not modify other methods. You should pass the sanity check at the end.


In [None]:
from utils import (
  prune_transcripts, pad_wav, pad_transcript_label, get_transcript_labels,
  get_cer_per_sample)


# HarperValleyBank character vocabulary
VOCAB = [' ', "'", '~', '-', '.', '<', '>', '[', ']', 'a', 'b', 'c', 'd', 'e',
         'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's',
         't', 'u', 'v', 'w', 'x', 'y', 'z']

SILENT_VOCAB = ['[baby]', '[ringing]', '[laughter]', '[kids]', '[music]',
                '[noise]', '[unintelligible]', '[dogs]', '[cough]']


class HarperValleyBank(Dataset):
  """Dataset to be used to train CTC, LAS, and MTL.

  Args:
    root: string
          path to the data files.
    split: string (default: train)
            choices: train | val | test
            which split of data to load
    n_mels: integer (default: 128)
            number of mel frequencies
    n_fft: integer (default: 256)
            number of fourier components
    win_length: integer (default: 256)
                should be <= n_fft
    hop_length: integer (default: 128)
                number of frames to skip in between
    wav_max_length: integer (default: 200)
                    maximum number of timesteps in a waveform
    transcript_max_length: integer (default: 200)
                            maximum number of timesteps in a transcript
    append_eos_token: boolean (default: False)
                      add EOS token to the end of every transcription
                      this is used for LAS (and LAS+CTC models)
  """
  def __init__(
      self, root, split='train', n_mels=128, n_fft=256, win_length=256,
      hop_length=128, wav_max_length=200, transcript_max_length=200,
      append_eos_token=False):
    super().__init__()
    print(f'> Constructing HarperValleyBank {split} dataset...')

    self.label_data = np.load(os.path.join(root, 'labels.npz'))
    self.root = root
    self.wav_max_length = wav_max_length
    self.transcript_max_length = transcript_max_length

    self.input_dim = n_mels
    self.n_mels = n_mels
    self.n_fft = n_fft
    self.win_length = win_length
    self.hop_length = hop_length

    # Prune away very short examples.
    # This returns a list of indices of examples longer than 3 words.
    valid_indices = prune_transcripts(self.label_data['human_transcripts'])

    # Decides which indices belong to which split.
    train_indices, val_indices, test_indices = self.split_data(valid_indices)

    if split == 'train':
      indices = train_indices
    elif split == 'val':
      indices = val_indices
    elif split == 'test':
      indices = test_indices
    else:
      raise Exception(f'Split {split} not supported.')

    raw_human_transcripts = self.label_data['human_transcripts'].tolist()
    human_transcript_labels = get_transcript_labels(
      raw_human_transcripts, VOCAB, SILENT_VOCAB)

    # Increment all indices by 4 to reserve the following special tokens:
    #   0 for epsilon
    #   1 for start-of-sentence (SOS)
    #   2 for end-of-sentence (EOS)
    #   3 for padding
    num_special_tokens = 4
    human_transcript_labels = [list(np.array(lab) + num_special_tokens)
                                for lab in human_transcript_labels]
    # CTC doesn't use SOS nor EOS; LAS doesn't use EPS but add anyway.
    eps_index, sos_index, eos_index, pad_index = 0, 1, 2, 3

    if append_eos_token:
      # Ensert an EOS token to the end of all the labels.
      # This is important for the LAS objective.
      human_transcript_labels_ = []
      for i in range(len(human_transcript_labels)):
        new_label_i = human_transcript_labels[i] + [eos_index]
        human_transcript_labels_.append(new_label_i)
      human_transcript_labels = human_transcript_labels_
    self.human_transcript_labels = human_transcript_labels

    # Include epsilon, SOS, and EOS tokens.
    self.num_class = len(VOCAB) + len(SILENT_VOCAB) + num_special_tokens
    self.num_labels = self.num_class  # These are interchangeable.
    self.eps_index = eps_index
    self.sos_index = sos_index
    self.eos_index = eos_index
    self.pad_index = pad_index # Use this index for padding.

    self.indices = indices

  def indices_to_chars(self, indices):
    # indices: list of integers in vocab
    # add special characters in front (since we did this above)
    full_vocab = ['<eps>', '<sos>', '<eos>', '<pad>'] + VOCAB + SILENT_VOCAB
    chars = [full_vocab[ind] for ind in indices]
    return chars

  def split_data(self, valid_indices, train_ratio = 0.8, val_ratio = 0.1):
    """Splits data into train, val, and test sets based on speaker. When
    evaluating methods on the test split, we measure how well they generalize
    to new (unseen) speakers.

    Concretely, this stores and returns indices belonging to each split.
    """
    # Fix seed so everyone reproduces the same splits.
    rs = np.random.RandomState(42)

    speaker_ids = self.label_data['speaker_ids']
    unique_speaker_ids = sorted(list(set(speaker_ids)))
    unique_speaker_ids = np.array(unique_speaker_ids)

    # Shuffle so the speaker IDs are distributed.
    rs.shuffle(unique_speaker_ids)

    num_speaker = len(unique_speaker_ids)
    num_train = int(train_ratio * num_speaker)
    num_val = int(val_ratio * num_speaker)
    num_test = num_speaker - num_train - num_val

    train_speaker_ids = unique_speaker_ids[:num_train]
    val_speaker_ids = unique_speaker_ids[num_train:num_train+num_val]
    test_speaker_ids = unique_speaker_ids[num_train+num_val:]

    train_speaker_dict = dict(zip(train_speaker_ids, ['train'] * num_train))
    val_speaker_dict = dict(zip(val_speaker_ids, ['val'] * num_val))
    test_speaker_dict = dict(zip(test_speaker_ids, ['test'] * num_test))
    speaker_dict = {**train_speaker_dict, **val_speaker_dict,
                    **test_speaker_dict}

    train_indices, val_indices, test_indices = [], [], []
    for i in range(len(speaker_ids)):
      speaker_id = speaker_ids[i]
      if speaker_dict[speaker_id] == 'train':
          train_indices.append(i)
      elif speaker_dict[speaker_id] == 'val':
          val_indices.append(i)
      elif speaker_dict[speaker_id] == 'test':
          test_indices.append(i)
      else:
          raise Exception('split not recognized.')

    train_indices = np.array(train_indices)
    val_indices = np.array(val_indices)
    test_indices = np.array(test_indices)

    # Make sure to only keep "valid indices" i.e. those with more than 4
    # words in the transcription.
    train_indices = np.intersect1d(train_indices, valid_indices)
    val_indices = np.intersect1d(val_indices, valid_indices)
    test_indices = np.intersect1d(test_indices, valid_indices)

    return train_indices, val_indices, test_indices

  def get_primary_task_data(self, index):
    """Returns audio and transcript information for a single utterance.

    Args:
      index: Index of an utterance.

    Returns:
      log melspectrogram, wav length, transcript label, transcript length
    """
    input_feature = None
    input_length = None
    human_transcript_label = None
    human_transcript_length = None

    wav = self.waveform_data[f'{index}'][:] # An h5py file uses string keys.
    sr = 8000 # We fix the sample rate for you.

    ############################ START OF YOUR CODE ############################
    # TODO(1.1)
    # - Compute the mel spectrogram of the audio crop.
    # - Convert the mel spectrogram to log space and normalize it.
    # - This is your primary task feature. Note that models will expect feature
    #   inputs of shape (T, n_mels).
    # - Pad the feature so that all features are fixed-length and
    #   convert it into a tensor.
    # - Likewise, retrieve and pad the corresponding transcript label sequence.
    #
    # Hint:
    # - Refer to https://librosa.org/doc/latest/index.html.
    # - Use `librosa.feature.melspectrogram` and `librosa.util.normalize`.
    # - Make sure to use our provided sr, n_mels, n_fft, win_length,
    # - and hop_length
    # - utils.py has helpful padding functions.


    # Compute mel spectrogram
    wav_mel = librosa.feature.melspectrogram(
        y = wav,
        sr = sr,
        n_mels = self.n_mels,
        n_fft = self.n_fft,
        win_length= self.win_length,
        hop_length=self.hop_length
        )

    # Convert to log scale and normalize
    wav_mel_log = librosa.power_to_db(wav_mel)
    wav_mel_log_normalized = librosa.util.normalize(wav_mel_log)

    # Transpose to get (T, n_mels) shape
    input_feature = wav_mel_log_normalized.transpose(1,0)

    # Convert to PyTorch tensor
    input_feature = torch.tensor(input_feature, dtype=torch.float32)

    # Calculate input length (before padding)
    original_length = input_feature.size(0)

    if original_length > self.wav_max_length:
      input_feature = input_feature[:self.wav_max_length]
      input_length = self.wav_max_length
    else:
      padding_shape = (self.wav_max_length - original_length, self.n_mels)
      padding = torch.zeros(padding_shape, dtype=torch.float32)
      input_feature = torch.cat([input_feature, padding], dim=0)
      input_length = original_length  # Keep the original length for CTC loss

    # Get transcript label from the pre-processed labels
    human_transcript_label = self.human_transcript_labels[index]

    # Store the original transcript length before padding
    human_transcript_length = min(len(human_transcript_label), self.transcript_max_length)

    # padding the transcript labels
    human_transcript_label, _ = pad_transcript_label(
        human_transcript_label,
        self.transcript_max_length,
        pad = self.pad_index)

    # Convert the padded label list to a PyTorch tensor
    human_transcript_label = torch.tensor(human_transcript_label, dtype=torch.long)

    ############################# END OF YOUR CODE #############################

    return input_feature, input_length, human_transcript_label, human_transcript_length

  def load_waveforms(self):
    # Make a file pointer to waveforms file.
    waveform_h5 = h5py.File(os.path.join(self.root, 'data.h5'), 'r')
    self.waveform_data = waveform_h5.get('waveforms')

  def __getitem__(self, index):
    """Serves primary task data for a single utterance."""
    if not hasattr(self, 'waveform_data'):
      # Do this in __getitem__ function so we enable multiprocessing.
      self.load_waveforms()
    index = int(self.indices[index])
    return self.get_primary_task_data(index)

  def __len__(self):
    """Returns total number of utterances in the dataset."""
    return len(self.indices)


**Sanity check.** Let's check that your dataset implementation is correct. This will be important to properly run our experiments in later parts. In particular, make sure your `__getitem__` and `__len__` are implemented correctly.

In [None]:
 # Do not modify.
root = os.path.join(DATA_PATH, 'harper_valley_bank_minified')
train_dataset = HarperValleyBank(root, split='train')
val_dataset = HarperValleyBank(root, split='val')
test_dataset = HarperValleyBank(root, split='test')

assert len(train_dataset) == 10402
assert len(val_dataset) == 679
assert len(test_dataset) == 2854

input, input_length, label, label_length = train_dataset.__getitem__(224)
assert input.size() == torch.Size([train_dataset.wav_max_length, train_dataset.n_mels])
assert input_length == 92
assert label_length == 26
print('\nValidated dataset class implementation!')


> Constructing HarperValleyBank train dataset...
> Constructing HarperValleyBank val dataset...
> Constructing HarperValleyBank test dataset...

Validated dataset class implementation!


# Part 2: Connectionist Temporal Classification (CTC) Neural Network

Our first experiment will be a [Connectionist Temporal Classification](https://www.cs.toronto.edu/~graves/icml_2006.pdf) (Graves et al.) model trained on our primary task of speech recognition.

As an overview, given an input matrix of shape `batch_size x sequence_length x feature_dim`, the network encodes the input speech features with an LSTM, producing a tensor of shape `batch_size x sequence_length x hidden_dim`. Using an additional linear layer, we transform this to `batch_size x sequence_length x vocab_size`, representing the probability of transcribing each character in the vocabulary at each time step. This is directly given to the CTC loss function.

We will use [Weights & Biases](https://wandb.ai) to log loss curves and character error rates (CER) in the cloud. You can create a free account [here](https://wandb.ai/site).

## **CTC Network**

**Implementation**

You will use the CTC objective to train your network. Previously, you implemented the CTC loss function from scratch. For this assignment, you may use PyTorch's implementation. Filling out this section will be necessary to carry out later experiments.

**→ Fill out `get_ctc_loss` using `F.ctc_loss`.**

**→ Read through the starter code and fill out the `forward` pass of `CTCEncoderDecoder`.**

In [None]:
def get_ctc_loss(
    log_probs, targets, input_lengths, target_lengths, blank=0):
  """Connectionist Temporal Classification objective function."""
  ctc_loss = None
  log_probs = log_probs.contiguous()
  targets = targets.long()
  input_lengths = input_lengths.long()
  target_lengths = target_lengths.long()
  ############################ START OF YOUR CODE ############################
  # TODO(2.1)
  # Hint:
  # - `F.ctc_loss`: https://pytorch.org/docs/stable/nn.functional.html#ctc-loss
  # - log_probs is passed in with shape (batch_size, input_length, num_classes).
  # - Notice that `F.ctc_loss` expects log_probs of shape
  #   (input_length, batch_size, num_classes)
  # - Turn on zero_infinity.

  # reshaping log_probs to meet input shape for F.ctc_loss
  log_probs_reshaped = log_probs.permute(1,0,2)

  ctc_loss = F.ctc_loss(log_probs_reshaped, targets, input_lengths, target_lengths, blank = blank, zero_infinity=True  )

  ############################# END OF YOUR CODE #############################
  return ctc_loss

In [None]:
# this declares the CTC network architecture
class CTCEncoderDecoder(nn.Module):
  """
  Encoder-Decoder model trained with CTC objective.

  Args:
    input_dim: integer
                number of input features
    num_class: integer
                size of transcription vocabulary
    num_layers: integer (default: 2)
                number of layers in encoder LSTM
    hidden_dim: integer (default: 128)
                number of hidden dimensions for encoder LSTM
    bidirectional: boolean (default: True)
                    is the encoder LSTM bidirectional?
  """
  def __init__(
      self, input_dim, num_class, num_layers=2, hidden_dim=128,
      bidirectional=True):
    super().__init__()
    # Note: `batch_first=True` argument implies the inputs to the LSTM should
    # be of shape (batch_size x T x D) instead of (T x batch_size x D).
    self.encoder = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers,
                            bidirectional=bidirectional, batch_first=True)
    self.decoder = nn.Linear(hidden_dim * 2, num_class)
    self.input_dim = input_dim
    self.num_class = num_class
    self.num_layers = num_layers
    self.hidden_dim = hidden_dim
    self.embedding_dim = hidden_dim * num_layers * 2 * \
                          (2 if bidirectional else 1)

  def combine_h_and_c(self, h, c):
    """Combine the signals from RNN hidden and cell states."""
    batch_size = h.size(1)
    h = h.permute(1, 0, 2).contiguous()
    c = c.permute(1, 0, 2).contiguous()
    h = h.view(batch_size, -1)
    c = c.view(batch_size, -1)
    return torch.cat([h, c], dim=1)  # just concatenate

  def forward(self, inputs, input_lengths):
    batch_size, max_length, _ = inputs.size()
    # `torch.nn.utils.rnn.pack_padded_sequence` collapses padded sequences
    # to a contiguous chunk
    inputs = torch.nn.utils.rnn.pack_padded_sequence(
        inputs, input_lengths.cpu(), batch_first=True, enforce_sorted=False)
    log_probs = None
    h, c = None, None
    ############################ START OF YOUR CODE ############################
    # TODO(2.1)
    # Hint:
    # - Refer to https://pytorch.org/docs/stable/nn.html
    # - Use `self.encoder` to get the encodings output which is of shape
    #   (batch_size, max_length, num_directions*hidden_dim) and the
    #   hidden states and cell states which are both of shape
    #   (batch_size, num_layers*num_directions, hidden_dim)
    # - Pad outputs with `0.` using `torch.nn.utils.rnn.pad_packed_sequence`
    #   (turn on batch_first and set total_length as max_length).
    # - Apply 50% dropout.
    # - Use `self.decoder` to take the embeddings sequence and return
    #   probabilities for each character.
    # - Make sure to then convert to log probabilities.

    # Pass packed sequence through the encoder
    # Note: LSTM returns (output, (h, c))
    output, (h, c) = self.encoder(inputs)


    # Unpack the packed sequence
    # pad_packed_sequence returns (padded_seq, original_lengths)
    encoding_padded, _ = torch.nn.utils.rnn.pad_packed_sequence(
        output,
        total_length = max_length,
        batch_first = True
        )

    # Apply dropout to the padded output
    dropout = nn.Dropout(0.5)
    x = dropout(encoding_padded)

    logits = self.decoder(x)

    # Apply log_softmax to get log probabilities
    log_probs = F.log_softmax(logits, dim=2)

    ############################# END OF YOUR CODE #############################

    # The extracted embedding is not used for the ASR task but will be
    # needed for other auxiliary tasks.
    embedding = self.combine_h_and_c(h, c)
    return log_probs, embedding

  def get_loss(
      self, log_probs, targets, input_lengths, target_lengths, blank=0):
    return get_ctc_loss(
        log_probs, targets, input_lengths, target_lengths, blank)

  def decode(self, log_probs, input_lengths, labels, label_lengths,
             sos_index, eos_index, pad_index, eps_index):
    # Use greedy decoding.
    decoded = torch.argmax(log_probs, dim=2)
    batch_size = decoded.size(0)
    # Collapse each decoded sequence using CTC rules.
    hypotheses = []
    for i in range(batch_size):
      hypotheses_i = self.ctc_collapse(decoded[i], input_lengths[i].item(),
                                       blank_index=eps_index)
      hypotheses.append(hypotheses_i)

    hypothesis_lengths = input_lengths.cpu().numpy().tolist()
    if labels is None: # Run at inference time.
      references, reference_lengths = None, None
    else:
      references = labels.cpu().numpy().tolist()
      reference_lengths = label_lengths.cpu().numpy().tolist()

    return hypotheses, hypothesis_lengths, references, reference_lengths

  def ctc_collapse(self, seq, seq_len, blank_index=0):
    result = []
    for i, tok in enumerate(seq[:seq_len]):
      if tok.item() != blank_index:  # remove blanks
        if i != 0 and tok.item() == seq[i-1].item():  # remove dups
          pass
        else:
          result.append(tok.item())
    return result


## **Introduction to PyTorch Lightning**

**Walkthrough**

*This section is a walkthrough and will not require any code or answers.* We will use [PyTorch Lightning](https://www.pytorchlightning.ai/), a lightweight wrapper framework for PyTorch, to run our experiments. You can learn more about the lightning toolkit [here](https://github.com/PyTorchLightning/pytorch-lightning). As a short introduction, Pytorch Lightning is a scaffold for training deep learning models. It handles a lot of the usual pipeline for you (e.g. looping over the training set, calling your optimizer). It has several callback handlers you can overwrite to specify your model.

In [None]:
# Do not modify.

class LightningCTC(pl.LightningModule):
  """PyTorch Lightning class for training a CTC model.

  Args:
    n_mels: number of mel frequencies. (default: 128)
    n_fft: number of fourier features. (default: 256)
    win_length: number of frames in a window. (default: 256)
    hop_length: number of frames to hop in computing spectrogram. (default: 128)
    wav_max_length: max number of timesteps in a waveform spectrogram. (default: 200)
    transcript_max_length: max number of characters in decoded transcription. (default: 200)
    learning_rate: learning rate for Adam optimizer. (default: 1e-3)
    batch_size: batch size used in optimization and evaluation. (default: 256)
    weight_decay: weight decay for Adam optimizer. (default: 1e-5)
    encoder_num_layers: number of layers in LSTM encoder. (default: 2)
    encoder_hidden_dim: number of hidden dimensions in LSTM encoder. (default: 256)
    encoder_bidirectional: directionality of LSTM encoder. (default: True)
  """
  def __init__(self, n_mels=128, n_fft=256, win_length=256, hop_length=128,
               wav_max_length=200, transcript_max_length=200,
               learning_rate=1e-3, batch_size=256, weight_decay=1e-5,
               encoder_num_layers=2, encoder_hidden_dim=256,
               encoder_bidirectional=True):
    super().__init__()
    self.save_hyperparameters()
    self.n_mels = n_mels
    self.n_fft = n_fft
    self.win_length = win_length
    self.hop_length = hop_length
    self.lr = learning_rate
    self.batch_size = batch_size
    self.weight_decay = weight_decay
    self.wav_max_length = wav_max_length
    self.transcript_max_length = transcript_max_length
    self.train_dataset, self.val_dataset, self.test_dataset = \
      self.create_datasets()
    self.encoder_num_layers = encoder_num_layers
    self.encoder_hidden_dim = encoder_hidden_dim
    self.encoder_bidirectional = encoder_bidirectional

    # Instantiate the CTC encoder/decoder.
    self.model = self.create_model()

  def create_model(self):
    model = CTCEncoderDecoder(
      self.train_dataset.input_dim,
      self.train_dataset.num_class,
      num_layers=self.encoder_num_layers,
      hidden_dim=self.encoder_hidden_dim,
      bidirectional=self.encoder_bidirectional)
    return model

  def create_datasets(self):
    root = os.path.join(DATA_PATH, 'harper_valley_bank_minified')
    train_dataset = HarperValleyBank(
        root, split='train', n_mels=self.n_mels, n_fft=self.n_fft,
        win_length=self.win_length, hop_length=self.hop_length,
        wav_max_length=self.wav_max_length,
        transcript_max_length=self.transcript_max_length,
        append_eos_token=False)
    val_dataset = HarperValleyBank(
        root, split='val', n_mels=self.n_mels, n_fft=self.n_fft,
        win_length=self.win_length, hop_length=self.hop_length,
        wav_max_length=self.wav_max_length,
        transcript_max_length=self.transcript_max_length,
        append_eos_token=False)
    test_dataset = HarperValleyBank(
        root, split='test', n_mels=self.n_mels, n_fft=self.n_fft,
        win_length=self.win_length, hop_length=self.hop_length,
        wav_max_length=self.wav_max_length,
        transcript_max_length=self.transcript_max_length,
        append_eos_token=False)
    return train_dataset, val_dataset, test_dataset

  def configure_optimizers(self):
    optim = torch.optim.AdamW(self.model.parameters(),
                              lr=self.lr, weight_decay=self.weight_decay)
    return [optim], [] # <-- put scheduler in here if you want to use one

  def get_loss(self, log_probs, input_lengths, labels, label_lengths):
    loss = self.model.get_loss(log_probs, labels, input_lengths, label_lengths,
                                blank=self.train_dataset.eps_index)
    return loss

  def forward(self, inputs, input_lengths, labels, label_lengths):
    log_probs, embedding = self.model(inputs, input_lengths)
    return log_probs, embedding

  def get_primary_task_loss(self, batch, split='train'):
    """Returns ASR model losses, metrics, and embeddings for a batch."""
    inputs, input_lengths = batch[0], batch[1]
    labels, label_lengths = batch[2], batch[3]

    if split == 'train':
      log_probs, embedding = self.forward(
          inputs, input_lengths, labels, label_lengths)
    else:
      # do not pass labels to not teacher force after training
      log_probs, embedding = self.forward(
          inputs, input_lengths, None, None)

    loss = self.get_loss(log_probs, input_lengths, labels, label_lengths)

    # Compute CER (no gradient necessary).
    with torch.no_grad():
      hypotheses, hypothesis_lengths, references, reference_lengths = \
        self.model.decode(
            log_probs, input_lengths, labels, label_lengths,
            self.train_dataset.sos_index,
            self.train_dataset.eos_index,
            self.train_dataset.pad_index,
            self.train_dataset.eps_index)
      cer_per_sample = get_cer_per_sample(
          hypotheses, hypothesis_lengths, references, reference_lengths)
      cer = cer_per_sample.mean()
      metrics = {f'{split}_loss': loss, f'{split}_cer': cer}

    return loss, metrics, embedding

  # Overwrite TRAIN
  def training_step(self, batch, batch_idx):
    loss, metrics, _ = self.get_primary_task_loss(batch, split='train')
    self.log_dict(metrics)
    # self.log('train_loss', loss, prog_bar=True, on_step=True)
    # self.log('train_cer', metrics['train_cer'], prog_bar=True, on_step=True)
    return loss

  # Overwrite VALIDATION: get next minibatch
  def validation_step(self, batch, batch_idx):
    loss, metrics, _ = self.get_primary_task_loss(batch, split='val')
    self.log("val_loss", metrics["val_loss"],
             prog_bar=True, on_step=False, on_epoch=True, sync_dist=True)
    self.log("val_cer",  metrics["val_cer"],
             prog_bar=True, on_step=False, on_epoch=True, sync_dist=True)
    return metrics

  def test_step(self, batch, batch_idx):
    loss, metrics, _ = self.get_primary_task_loss(batch, split='test')
    self.log("test_loss", metrics["test_loss"],
             prog_bar=True, on_step=False, on_epoch=True, sync_dist=True)
    self.log("test_cer",  metrics["test_cer"],
             prog_bar=True, on_step=False, on_epoch=True, sync_dist=True)
    return metrics

  def train_dataloader(self):
    # - important to shuffle to not overfit!
    # - drop the last batch to preserve consistent batch sizes
    loader = DataLoader(self.train_dataset, batch_size=self.batch_size,
                        shuffle=True, pin_memory=True, drop_last=True)
    return loader

  def val_dataloader(self):
    loader = DataLoader(self.val_dataset, batch_size=self.batch_size,
                        shuffle=False, pin_memory=True)
    return loader

  def test_dataloader(self):
    loader = DataLoader(self.test_dataset, batch_size=self.batch_size,
                        shuffle=False, pin_memory=True)
    return loader


## **Task 2.1: Train a network with CTC [20 Points]**

**Training & Written Response**

Go to **Runtime** > **Change runtime type** and set **Hardware accelerator** to **GPU**.

This section will be graded based on 1) your model's performance in regards to loss plots and CER plots and 2) your response for qualitative assessments of your plots.

**→ Train the CTC network with the default hyperparameters we provide.**

With batch size 128, one epoch of optimizing CTC takes roughly 3 minutes. We recommend to train for at least 15-20 epochs, although we do not guarantee this is enough to converge. If your notebook resets, you can continue training from an old checkpoint.

**CER target:
You should obtain a test CER of at most 0.35 for this model. You will obtain full points for demonstrating a model with test CER below this threshold.**

**→ Paste screenshots from your Weights & Biases dashboard of your loss curve and CER curve in the cell marked "Plots".**

In [None]:
WANDB_NAME = 'shubham13596-self' # Fill in your Weights & Biases ID here.
#api_key = 8a5377984a305230e33e41a164a5d23198c414c6

def run(system, config, ckpt_dir, epochs=1, monitor_key='val_loss',
        use_gpu=False, seed=1337):
  random.seed(seed)
  torch.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)
  np.random.seed(seed)
  os.environ['PYTHONHASHSEED'] = str(seed)

  SystemClass = globals()[system]
  system = SystemClass(**config)

  checkpoint_callback = ModelCheckpoint(
    dirpath=os.path.join(MODEL_PATH, ckpt_dir),
    save_top_k=1,
    verbose=True,
    monitor=monitor_key,
    mode='min')

  wandb.init(project='cs224s', entity=WANDB_NAME, name=ckpt_dir,
             config=config, sync_tensorboard=True)
  wandb_logger = WandbLogger()

  if use_gpu:

    trainer = pl.Trainer(
      accelerator="gpu",
      devices=1,
      max_epochs=epochs,
      min_epochs=epochs,
      enable_checkpointing=True,
      callbacks=[checkpoint_callback],
      logger=wandb_logger
    )
  else:
    trainer = pl.Trainer(
      accelerator="cpu",
      max_epochs=epochs,
      min_epochs=epochs,
      enable_checkpointing=True,
      callbacks=[checkpoint_callback],  # Note: must be a list now
      logger=wandb_logger
    )

  trainer.fit(system)
  result = trainer.test()

In [None]:
config = {
    'n_mels': 128,
    'n_fft': 256,
    'win_length': 256,
    'wav_max_length': 512,
    'hop_length': 128,
    'transcript_max_length': 200,
    'learning_rate': 1e-3,
    'batch_size': 128,
    'weight_decay': 0,
    'encoder_num_layers': 2,
    'encoder_hidden_dim': 256,
    'encoder_bidirectional': True,
}

# NOTES:
# -----
# - PyTorch Lightning will run 2 steps of validation prior to the first
#   epoch to sanity check that validation works (otherwise you
#   might waste an epoch training and error).
# - The progress bar updates very slowly, the model is likely
#   training even if it doesn't look like it is.
# - Wandb will generate a URL for you where all the metrics will be logged.
# - Every validation loop, the best performing model is saved.
# - After training, the system will evaluate performance on the test set.
run(system="LightningCTC", config=config, ckpt_dir='ctc', epochs=20, use_gpu=True)


> Constructing HarperValleyBank train dataset...
> Constructing HarperValleyBank val dataset...
> Constructing HarperValleyBank test dataset...


0,1
epoch,▁▁
train_cer,▁
train_loss,▁
trainer/global_step,▁█
val_cer,▁
val_loss,▁

0,1
epoch,0.0
train_cer,1.0
train_loss,3.01625
trainer/global_step,80.0
val_cer,1.0
val_loss,3.01708


INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
/usr/local/lib/python3.11/dist-packages/pytorch_lightning/loggers/wandb.py:397: There is a wandb run already in progress and newly created instances of `WandbLogger` will reuse this run. If this is not desired, call `wandb.finish()` before instantiating `WandbLogger`.
/usr/local/lib/python3.11/dist-packages/pytorch_lightning/callbacks/model_checkpoint.py:654: Checkpoint directory /content/gdrive/MyDrive/cs224s_spring2025/trained_models/ctc exists and is not empty.
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name  | Type              | Params | Mode 
----------------------------------------------------
0 | model | CTCEncoderDecoder | 2.4 M  | train
--------

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

/usr/local/lib/python3.11/dist-packages/pytorch_lightning/trainer/connectors/data_connector.py:425: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.
/usr/local/lib/python3.11/dist-packages/pytorch_lightning/trainer/connectors/data_connector.py:425: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 0, global step 81: 'val_loss' reached 3.01708 (best 3.01708), saving model to '/content/gdrive/MyDrive/cs224s_spring2025/trained_models/ctc/epoch=0-step=81-v1.ckpt' as top 1


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 1, global step 162: 'val_loss' reached 3.00081 (best 3.00081), saving model to '/content/gdrive/MyDrive/cs224s_spring2025/trained_models/ctc/epoch=1-step=162.ckpt' as top 1


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 2, global step 243: 'val_loss' reached 2.97444 (best 2.97444), saving model to '/content/gdrive/MyDrive/cs224s_spring2025/trained_models/ctc/epoch=2-step=243-v1.ckpt' as top 1


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 3, global step 324: 'val_loss' reached 2.95476 (best 2.95476), saving model to '/content/gdrive/MyDrive/cs224s_spring2025/trained_models/ctc/epoch=3-step=324.ckpt' as top 1


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 4, global step 405: 'val_loss' reached 2.93068 (best 2.93068), saving model to '/content/gdrive/MyDrive/cs224s_spring2025/trained_models/ctc/epoch=4-step=405.ckpt' as top 1
INFO:pytorch_lightning.utilities.rank_zero:
Detected KeyboardInterrupt, attempting graceful shutdown ...


NameError: name 'exit' is not defined

In [None]:
# You can find the saved checkpoint here:
!ls /content/cs224s_spring2025/trained_models/ctc

'epoch=0-step=81.ckpt'	'epoch=2-step=243.ckpt'  'epoch=4-step=405.ckpt'


In [None]:
ctc_checkpoint_file ='epoch=4-step=405.ckpt' # Fill in your checkpoint file
ctc_checkpoint_path = os.path.join(MODEL_PATH, 'ctc', ctc_checkpoint_file)

LightningCTC.load_from_checkpoint(ctc_checkpoint_path)

> Constructing HarperValleyBank train dataset...
> Constructing HarperValleyBank val dataset...
> Constructing HarperValleyBank test dataset...


LightningCTC(
  (model): CTCEncoderDecoder(
    (encoder): LSTM(128, 256, num_layers=2, batch_first=True, bidirectional=True)
    (decoder): Linear(in_features=512, out_features=48, bias=True)
  )
)

---

**Plots:**



---


**→ Using your plots as evidence in your description, answer the following questions:**




a) What is the model's best test CER?

b) Does the model learn and converge? What do you notice about CTC loss early in training?

c) Does the model overfit? Despite the small dataset size, why might CTC not overfit?



---

**Answer:**
(Your answer here)

---

# Part 3: Analysis

While looking at validation and test CER is a good way to judge how a model is performing, it is also important to look at specific examples it does well on or fails on, in order to build an intuition for why it fails.

## **Task 3.1: Lowest and Highest CER Examples [5 Points]**

**Implementation & Written Response**

**→ Now we will find and examine a test utterance your model transcribes well and a test utterance it transcribes poorly.** Fill out `get_low_high_cer_wav` to get the lowest and highest CERs and their corresponding utterances in your test set.

In [None]:
from tqdm import tqdm

def get_low_high_cer_wav(system, device=None):
  """Gets the test set sample with lowest CER and the sample with highest CER.

  Args:
    system: Subclassed LightningModule for your model.
    device: Instance of torch.device(...) [default: None]

  Returns:
    lowest CER (float), audio of the lowest CER utterance (ndarray),
    highest CER (float), audio of the highest CER utterance (ndarray)
  """
  # Init values.
  low_cer = float('inf')
  low_idx = 0
  high_cer = float('-inf')
  high_idx = 0

  test_dataloader = system.test_dataloader()
  index_lookup = system.test_dataset.indices

  pbar = tqdm(total=len(test_dataloader))
  for i, batch in enumerate(test_dataloader):
    input_features, input_lengths = batch[0], batch[1]
    labels, label_lengths = batch[2], batch[3]
    batch_size = input_features.size(0)
    if device is not None:
      input_features = input_features.to(device)
    ############################ START OF YOUR CODE ############################
    # TODO(3.1)
    # Hint:
    # - Use `get_cer_per_sample`, which gets a numpy array of
    #   CERs for each sample in a batch
    # - Use `index_lookup` to map a sample's test set index to
    #   its index in the full dataset.

    log_probs, _ = system.model(input_features, input_lengths)
    hypotheses, hypothesis_lengths, references, reference_lengths = system.model.decode(
        log_probs, input_lengths, labels, label_lengths,
        system.test_dataset.sos_index, system.test_dataset.eos_index,
        system.test_dataset.pad_index, system.test_dataset.eps_index)

    cer_per_sample = get_cer_per_sample(hypotheses, hypothesis_lengths, references, reference_lengths)

    for j in range(batch_size):
      batch_idx = i*batch_size + j
      if batch_idx >= len(index_lookup):
        continue
      # Get the original index in the full dataset
      full_idx = int(index_lookup[batch_idx])

      # Check if this sample has the lowest CER so far
      if cer_per_sample[j] < low_cer:
          low_cer = cer_per_sample[j]
          low_idx = full_idx

      # Check if this sample has the highest CER so far
      if cer_per_sample[j] > high_cer:
          high_cer = cer_per_sample[j]
          high_idx = full_idx

    ############################# END OF YOUR CODE #############################
    pbar.update()
  pbar.close()

  # Retrieve ndarray wav data from the original h5py file.
  system.test_dataset.load_waveforms()
  waveform_data = system.test_dataset.waveform_data
  low_wav = waveform_data[f'{low_idx}'][:]
  high_wav = waveform_data[f'{high_idx}'][:]

  return low_cer, low_wav, high_cer, high_wav

In [None]:
checkpoint_path = ctc_checkpoint_path

device = torch.device('cuda')
system = LightningCTC.load_from_checkpoint(checkpoint_path)
system = system.to(device)
system.eval()
low_cer, low_wav, high_cer, high_wav = get_low_high_cer_wav(system, device)


> Constructing HarperValleyBank train dataset...
> Constructing HarperValleyBank val dataset...
> Constructing HarperValleyBank test dataset...




  0%|          | 0/23 [00:00<?, ?it/s][A[A

  4%|▍         | 1/23 [00:04<01:45,  4.81s/it][A[A

  9%|▊         | 2/23 [00:09<01:40,  4.77s/it][A[A

 13%|█▎        | 3/23 [00:14<01:39,  4.98s/it][A[A

 17%|█▋        | 4/23 [00:19<01:30,  4.76s/it][A[A

 22%|██▏       | 5/23 [00:23<01:25,  4.74s/it][A[A

 26%|██▌       | 6/23 [00:28<01:20,  4.74s/it][A[A

 30%|███       | 7/23 [00:33<01:17,  4.83s/it][A[A

 35%|███▍      | 8/23 [00:38<01:10,  4.70s/it][A[A

 39%|███▉      | 9/23 [00:42<01:04,  4.63s/it][A[A

 43%|████▎     | 10/23 [00:47<01:02,  4.82s/it][A[A

 48%|████▊     | 11/23 [00:51<00:55,  4.62s/it][A[A

 52%|█████▏    | 12/23 [00:57<00:52,  4.77s/it][A[A

 57%|█████▋    | 13/23 [01:02<00:48,  4.90s/it][A[A

 61%|██████    | 14/23 [01:05<00:40,  4.55s/it][A[A

 65%|██████▌   | 15/23 [01:10<00:36,  4.54s/it][A[A

 70%|██████▉   | 16/23 [01:15<00:33,  4.76s/it][A[A

 74%|███████▍  | 17/23 [01:20<00:27,  4.61s/it][A[A

 78%|███████▊  | 18/23 [01

In [None]:
low_cer

np.float64(0.9285714285714286)

In [None]:
high_cer

np.float64(1.0)

In [None]:
print('Utterance with lowest CER: {}\n'.format(low_cer))
Audio(low_wav, rate=8000)

Utterance with lowest CER: 0.9285714285714286



In [None]:
print('Utterance with highest CER: {}\n'.format(high_cer))
Audio(high_wav, rate=8000)

Utterance with highest CER: 1.0



**→ What are the lowest and highest CERs? Why do you think CTC got these CERs for these utterances?**



---

**Answer:**
Clear voice vs muffled voice. The one with the higher CER has very unclear voice thereby CTC is not sure what character to assign at each timestep thereby CER whereas the in the first exampe which has a clear audio, CTC is able to assign with high probability at each time frame; thereby reducing CER.


---



## **Task 3.2: Run inference using your model [5 points]**

**Implementation**

**→ Similar to `get_low_high_cer_wav`, we'll run inference on a single audio file and see what the model transcribes.** Fill in `run_inference` to have your system decode test utterances from a `.WAV` file. We will later run this function in Parts 5 and 7 to qualitatively evaluate systems.

In [None]:
def run_inference(
    system, wav, device=None, sr=8000, n_mels=128, n_fft=256, win_length=256,
    hop_length=128, wav_max_length=512, labels=None, label_lengths=None):
  """Run your system on a .WAV file and returns a string utterance.

  Args:
    system: a pl.LightningModule for your chosen model.
    wav: a .WAV file of an utterance
    device: GPU -> torch.device('cuda')

  Returns:
    A string for the utterance transcribed by your model.
  """
  input_feature = None

  mels = librosa.feature.melspectrogram(y=wav, sr=sr,
                                        n_mels=n_mels,
                                        n_fft=n_fft,
                                        win_length=win_length,
                                        hop_length=hop_length)

  mels = librosa.power_to_db(mels, ref=np.max).T
  mels = librosa.util.normalize(mels, axis=1)
  input_feature, input_length = pad_wav(mels, wav_max_length)
  input_feature = torch.tensor([input_feature], dtype=torch.float)

  input_lengths = torch.LongTensor([input_length])
  # Whether or not to use GPU.
  if device is not None:
    input_feature = input_feature.to(device)
    input_lengths = input_lengths.to(device)
    if labels is not None:  # to test teacher-forcing
      labels = labels.to(device)
      labels_lengths = label_lengths.to(device)

  utterance = None
  ############################# START OF YOUR CODE #############################
  # TODO(3.2)
  # Run your system on the utterance input feature to get log probabilities
  # and decode the log probabilities into indices. Then turn those indices into
  # characters.
  log_probs, _ = system.model(input_feature, input_lengths)
  hypotheses, hypothesis_lengths, references, reference_lengths = system.model.decode(
        log_probs, input_lengths, labels, label_lengths,
        system.train_dataset.sos_index, system.train_dataset.eos_index,
        system.train_dataset.pad_index, system.train_dataset.eps_index)

  # Get the first hypothesis (since we only have one audio sample)
  hypothesis = hypotheses[0]

  # Convert indices to characters
  chars = system.train_dataset.indices_to_chars(hypothesis)

  # Join the characters to form the utterance
  utterance = ''.join(chars)

  ############################## END OF YOUR CODE ##############################
  return utterance


# Part 4: Leveraging Auxiliary Tasks for Multi-Task Learing

When designing a speech system, we might care about more than just the transcription. As a bank, we might want to know the intent of the caller, for example.

Our dataset provides labels for these auxiliary tasks, including dialog action, the intent of the caller, and the sentiment of the caller. In the spirit of an end-to-end system, we will expand the CTC model to make predictions for these auxiliary tasks.


## **Task 4.1 Working with auxiliary task data [5 Points]**

**Implementation**

**→ Fill in `__getitem__`. Add one or more auxiliary tasks to your training.** We include `get_auxiliary_labels` for you.

In [None]:
class HarperValleyBankMTL(HarperValleyBank):
  """Like the HarperValleyBank dataset but returns labels for task type,
  dialog actions, and sentiment: our three auxiliary tasks.

  See `HarperValleyBank` class for description.
  """
  def __init__(
    self, root, split='train', n_mels=128, n_fft=128, win_length=256,
    hop_length=128, wav_max_length=200, transcript_max_length=200,
    append_eos_token=False):
    super().__init__(
      root, split=split, n_mels=n_mels, n_fft=n_fft,
      win_length=win_length, hop_length=hop_length,
      wav_max_length=wav_max_length,
      transcript_max_length=transcript_max_length,
      append_eos_token=append_eos_token)
    self.auxiliary_labels = self.get_auxiliary_labels()

  def get_auxiliary_labels(self):
    """Returns auxiliary task labels.

    This function will take the raw auxiliary tasks and convert them
    integers labels (for neural networks).

    These include: `task_type`, `dialogue_acts`, and `sentiment`.
    """
    # task_types: each element is a string representing a conversation-level
    #             label. So all utterances in the same conversation share
    #             the same label.
    task_types = self.label_data['task_types']

    # dialog_acts: each element is a comma-separated string of dialog actions
    #              that describe the current utterance
    dialog_acts = self.label_data['dialog_acts']
    dialog_acts = [acts.split(',') for acts in dialog_acts]


    # sentiments: each element is a 3 dimensional vector that sums to 1
    #             representing the probabilities for
    #             "negative", "neutral", and "positive"
    sentiment_labels = self.label_data['sentiments']

    # Get label vocabularies.
    task_type_vocab = sorted(set(task_types))
    dialog_acts_vocab = sorted(set([item for sublist in dialog_acts
                                    for item in sublist]))

    task_type_labels = [task_type_vocab.index(t) for t in task_types]

    # dialog_acts_labels: list of 1-hot vectors
    dialog_acts_labels = []
    for acts in dialog_acts:
      onehot = [0 for _ in range(len(dialog_acts_vocab))]
      for act in acts:
        onehot[dialog_acts_vocab.index(act)] = 1
      dialog_acts_labels.append(onehot)

    # Store number of classes for each auxiliary task.
    # Note:
    #   - task_type is a N-way classification problem.
    #   - dialog_acts is a set of binary classification problems.
    #       (more than one dialog action may be "on" for an utterance)
    #   - sentiment is a regression problem (match given probabilities).
    self.task_type_num_class = len(task_type_vocab)
    self.dialog_acts_num_class = len(dialog_acts_vocab)
    self.sentiment_num_class = 3

    return task_type_labels, dialog_acts_labels, sentiment_labels

  def __getitem__(self, index):
    """Serves multi-task data for a single utterance."""
    if not hasattr(self, 'waveform_data'):
      self.load_waveforms()

    index = int(self.indices[index])

    primary_task_data = self.get_primary_task_data(index)
    auxiliary_task_data = None

    ############################ START OF YOUR CODE ############################
    # TODO(4.1)
    # Get auxiliary task label(s) for this index using
    # `self.auxiliary_task_labels`. Populate the object `auxiliary_task_data`
    # as a tuple of auxiliary task labels. Make sure to cast appropriate
    # torch tensor types for the different labels.

    # Unpack the auxiliary labels
    task_type_labels, dialog_acts_labels, sentiment_labels = self.auxiliary_labels

    # Get the specific labels for this index
    task_type = task_type_labels[index]
    dialog_acts = dialog_acts_labels[index]
    sentiment = sentiment_labels[index]

    # Convert to appropriate tensor types:
    # - task_type: integer classification, use LongTensor
    # - dialog_acts: multi-label binary classification, use FloatTensor
    # - sentiment: 3-dimensional probability vector, use FloatTensor
    task_type_tensor = torch.tensor(task_type, dtype=torch.long)
    dialog_acts_tensor = torch.tensor(dialog_acts, dtype=torch.float)
    sentiment_tensor = torch.tensor(sentiment, dtype=torch.float)

    # Combine into a tuple
    auxiliary_task_data = (task_type_tensor, dialog_acts_tensor, sentiment_tensor)

    ############################# END OF YOUR CODE #############################
    if not isinstance(auxiliary_task_data, tuple):
      auxiliary_task_data = (auxiliary_task_data,)

    return primary_task_data + auxiliary_task_data


## **Task 4.2: Implement auxiliary task heads [5 points]**

**Implementation**

**→ Fill out the `Classifier` classes.** You will use these in `LightningCTCMTL`.

Each classifier should be a simple one-layer model.

Use a linear layer to map input features to the number of output classes.

Choose the activation function carefully depending on the task:

- The `TaskTypeClassifier` involves selecting one class from N classes (e.g., intent prediction).

- The `DialogActsClassifier` involves making independent binary predictions for multiple labels.

- The `SentimentClassifier` requires predicting a probability distribution of emotions

In [None]:
class TaskTypeClassifier(nn.Module):
  def __init__(self, input_dim, n_classes):
    super().__init__()
    ############################ START OF YOUR CODE ############################
    # TODO(4.2)

    self.linear = nn.Linear(input_dim, n_classes)


    ############################# END OF YOUR CODE #############################

  def forward(self, inputs):
    log_probs = None
    ############################ START OF YOUR CODE ############################
    # TODO(4.2)
    # Hint: This is an N-way classification problem.
    logits = self.linear(inputs)
    log_probs = F.log_softmax(logits, dim=-1)

    ############################# END OF YOUR CODE #############################
    return log_probs

  def get_loss(self, probs, targets):
    loss = None
    ############################ START OF YOUR CODE ############################
    # TODO(4.2)
    loss = F.nll_loss(probs, targets)

    ############################# END OF YOUR CODE #############################
    return loss


class DialogActsClassifier(nn.Module):
  def __init__(self, input_dim, n_classes):
    super().__init__()
    ############################ START OF YOUR CODE ############################
    # TODO(4.2)
    self.linear = nn.Linear(input_dim, n_classes)

    ############################# END OF YOUR CODE #############################

  def forward(self, inputs):
    probs = None
    ############################ START OF YOUR CODE ############################
    # TODO(4.2)
    # Hint: One person can have multiple dialog actions.
    logits = self.linear(inputs)
    probs = torch.sigmoid(logits)  # Multi-label classification

    ############################# END OF YOUR CODE #############################
    return probs

  def get_loss(self, probs, targets):
    loss = None
    ############################ START OF YOUR CODE ############################
    # TODO(4.2)
    # Hint:
    # - probs shape: (batch_size, num_dialog_acts)
    # - targets shape: (batch_size, num_dialog_acts)
    loss = F.binary_cross_entropy(probs, targets)

    ############################# END OF YOUR CODE #############################
    return loss


class SentimentClassifier(nn.Module):
  def __init__(self, input_dim, n_classes):
    super().__init__()
    ############################ START OF YOUR CODE ############################
    # TODO(4.2)
    self.linear = nn.Linear(input_dim, n_classes)
    ############################# END OF YOUR CODE #############################

  def forward(self, inputs):
    probs = None
    ############################ START OF YOUR CODE ############################
    # TODO(4.2)
    # Hint:
    # - Sentiment is measured as a log probability distribution among multiple
    #   possible sentiments.
    logits = self.linear(inputs)
    probs = F.log_softmax(logits, dim=-1)
    ############################# END OF YOUR CODE #############################
    return probs

  def get_loss(self, pred_probs, target_probs):
    loss = None
    ############################ START OF YOUR CODE ############################
    # TODO(4.2)
    # Hint:
    # - As usual, the predictions are probabilities. But the labels for
    #   sentiment are themselves probabilities. Since the targets are not be
    #   single numbers, we cannot just use `F.cross_entropy`.
    # - Therefore, you will need to implement cross entropy manually.
    #     Refer to wikipedia: https://en.wikipedia.org/wiki/Cross_entropy
    # - pred_logits shape: (batch_size, num_sentiment_class)
    # - target_logits shape: (batch_size, num_sentiment_class)

    loss = -torch.sum(target_probs * pred_probs, dim=-1).mean()

    ############################# END OF YOUR CODE #############################
    return loss


## **Task 4.3: Implement multi-task learning loss [5 points]**

**Implementation**

**Metrics.** We provide code for computing the metrics for each possible auxiliary task.
- For Task Type and Sentiment classification, use accuracy.
- For Dialog Acts classification, use F1 score (since the labels are unbalanced).

→ Instantiate all three auxiliary classifiers: `TaskTypeClassifier`, `DialogActsClassifier`, and `SentimentClassifier` in `__init__` for `LightningCTCMTL`

Example:

```python
self.task_type_model = TaskTypeClassifier(...)
self.dialogue_type_model = DialogActsClassifier(...)
self.sentiment_type_model = SentimentClassifier(...)
```

**→ Implement/modify `get_multi_task_loss`.**

In `LightningCTCMTL`, the available weights are:

- `asr_weight`
- `task_type_weight`
- `dialog_acts_weight`
- `sentiment_weight`

You must:
- Combine the add the primary ASR loss and the auxiliary task losses.
- Use weighting parameters to balance the importance of different tasks during training.
- Your weights should sum to 1 across the tasks you are training.

Reminder:
Every classifier you implemented above already includes a `get_loss` method.
Use the `get_loss` method from each classifier to compute the corresponding auxiliary task losses in `get_multi_task_loss` below.

**Note:**

- Choosing your loss weights is an important design decision.
- You can adjust weights to make some tasks more important than others.
- In some cases, you might even use negative weights to turn an auxiliary task into an adversarial task — meaning the model is encouraged not to perform well on that task.
(This can sometimes improve the primary task's robustness on certain edge cases)

In [None]:
class LightningCTCMTL(LightningCTC):
  """PyTorch Lightning class for training CTC with multi-task learning."""
  def __init__(self, n_mels=128, n_fft=256, win_length=256, hop_length=128,
               wav_max_length=200, transcript_max_length=200,
               learning_rate=1e-3, batch_size=256, weight_decay=1e-5,
               encoder_num_layers=2, encoder_hidden_dim=256,
               encoder_bidirectional=True, asr_weight=1.0, task_type_weight=1.0,
               dialog_acts_weight=1.0, sentiment_weight=1.0):
    super().__init__(
      n_mels=n_mels, hop_length=hop_length,
      wav_max_length=wav_max_length,
      transcript_max_length=transcript_max_length,
      learning_rate=learning_rate,
      batch_size=batch_size,
      weight_decay=weight_decay,
      encoder_num_layers=encoder_num_layers,
      encoder_hidden_dim=encoder_hidden_dim,
      encoder_bidirectional=encoder_bidirectional)
    self.save_hyperparameters()
    self.asr_weight = asr_weight
    self.task_type_weight = task_type_weight
    self.dialog_acts_weight = dialog_acts_weight
    self.sentiment_weight = sentiment_weight

    ############################ START OF YOUR CODE ############################
    # TODO(4.3)
    # Instantiate your auxiliary task models here.

    # Get the encoder output dimension
    encoder_output_dim = encoder_hidden_dim * (2 if encoder_bidirectional else 1)

    # You'll need to determine the actual number of classes from your dataset
    # These are placeholder values - adjust based on your actual dataset
    self.task_type_model = TaskTypeClassifier(encoder_output_dim, task_type_num_class)
    self.dialog_acts_model = DialogActsClassifier(encoder_output_dim, dialog_acts_num_class)
    self.sentiment_model = SentimentClassifier(encoder_output_dim, 3)  # 3 sentiment classes

    ############################# END OF YOUR CODE #############################

  def create_datasets(self):
    root = os.path.join(DATA_PATH, 'harper_valley_bank_minified')
    train_dataset = HarperValleyBankMTL(
      root, split='train', n_mels=self.n_mels, n_fft=self.n_fft,
      win_length=self.win_length, hop_length=self.hop_length,
      wav_max_length=self.wav_max_length,
      transcript_max_length=self.transcript_max_length,
      append_eos_token=False)
    val_dataset = HarperValleyBankMTL(
      root, split='val', n_mels=self.n_mels, n_fft=self.n_fft,
      win_length=self.win_length, hop_length=self.hop_length,
      wav_max_length=self.wav_max_length,
      transcript_max_length=self.transcript_max_length,
      append_eos_token=False)
    test_dataset = HarperValleyBankMTL(
      root, split='test', n_mels=self.n_mels, n_fft=self.n_fft,
      win_length=self.win_length, hop_length=self.hop_length,
      wav_max_length=self.wav_max_length,
      transcript_max_length=self.transcript_max_length,
      append_eos_token=False)
    return train_dataset, val_dataset, test_dataset

  def get_multi_task_loss(self, batch, split='train'):
    """Gets losses and metrics for all task heads."""
    # Compute loss on the primary ASR task.
    asr_loss, asr_metrics, embedding = self.get_primary_task_loss(batch, split)

    # Note: Not all of these have to be used (it is up to your design)
    task_type_labels = None
    dialog_acts_labels = None
    sentiment_labels = None
    task_type_log_probs = None
    dialog_acts_probs = None
    sentiment_log_probs = None
    task_type_loss = None
    dialog_acts_loss = None
    sentiment_loss = None
    combined_loss = None
    ############################ START OF YOUR CODE ############################
    # TODO(4.3)
    # Implement multi-task learning by combining multiple objectives.
    # Define `combined_loss` here.

    # Extract auxiliary labels from batch
    # Batch format: (input_features, input_lengths, labels, label_lengths,
    #                task_type_labels, dialog_acts_labels, sentiment_labels)
    task_type_labels = batch[4]
    dialog_acts_labels = batch[5]
    sentiment_labels = batch[6]

    # Get predictions from auxiliary models
    # Use mean pooling over sequence dimension for embedding
    pooled_embedding = torch.mean(embedding, dim=1)  # (batch_size, hidden_dim)

    task_type_log_probs = self.task_type_model(pooled_embedding)
    dialog_acts_probs = self.dialog_acts_model(pooled_embedding)
    sentiment_log_probs = self.sentiment_model(pooled_embedding)

    # Compute auxiliary losses
    task_type_loss = self.task_type_model.get_loss(task_type_log_probs, task_type_labels)
    dialog_acts_loss = self.dialog_acts_model.get_loss(dialog_acts_probs, dialog_acts_labels)
    sentiment_loss = self.sentiment_model.get_loss(sentiment_log_probs, sentiment_labels)

    # Combine all losses with weights
    combined_loss = (self.asr_weight * asr_loss +
                    self.task_type_weight * task_type_loss +
                    self.dialog_acts_weight * dialog_acts_loss +
                    self.sentiment_weight * sentiment_loss)

    ############################ END OF YOUR CODE ##############################

    with torch.no_grad():
      ############################ START OF YOUR CODE ##########################
      # TODO(4.3)
      # No additional code is required here. :)
      # We provide how to compute metrics for all possible auxiliary tasks and
      # store them in your metrics dictionary. Comment out the metrics for tasks
      # you do not plan to use.

      # TASK_TYPE: Compare predicted task type to true task type.
      task_type_preds = torch.argmax(task_type_log_probs, dim=1)
      task_type_acc = \
        (task_type_preds == task_type_labels).float().mean().item()

      # DIALOG_ACTS: Compare predicted dialog actions to true dialog actions.
      dialog_acts_preds = torch.round(dialog_acts_probs)
      dialog_acts_f1 = f1_score(dialog_acts_labels.cpu().numpy().reshape(-1),
                                dialog_acts_preds.cpu().numpy().reshape(-1))


      # # SENTIMENT: Compare largest predicted sentiment to largest true sentim
      sentiment_preds = torch.argmax(sentiment_log_probs, dim=1)
      sentiment_labels = torch.argmax(sentiment_labels, dim=1)
      sentiment_acc = \
      (sentiment_preds == sentiment_labels).float().mean().item()

      metrics = {
        # Task losses.
        f'{split}_asr_loss': asr_metrics[f'{split}_loss'],
        f'{split}_task_type_loss': task_type_loss,
        f'{split}_dialog_acts_loss': dialog_acts_loss,
        f'{split}_sentiment_loss': sentiment_loss,
        # CER as ASR metric.
       f'{split}_asr_cer': asr_metrics[f'{split}_cer'],
        # Accuracy as task_type metric.
       f'{split}_task_type_acc': task_type_acc,
        # F1 score as dialog_acts metric.
        f'{split}_dialog_acts_f1': dialog_acts_f1,
        # # Accuracy as sentiment metric.
        f'{split}_sentiment_acc': sentiment_acc
      }
      ############################ END OF YOUR CODE ############################
    return combined_loss, metrics

  def configure_optimizers(self):
    parameters = chain(self.model.parameters(),
                       self.task_type_model.parameters(),
                      #  self.dialog_acts_model.parameters(),
                      #  self.sentiment_model.parameters()
                      )
    optim = torch.optim.AdamW(parameters, lr=self.lr,
                              weight_decay=self.weight_decay)
    return [optim], []

  def training_step(self, batch, batch_idx):
      loss, metrics = self.get_multi_task_loss(batch, split='train')
      # log every train metric; Lightning will avg across steps & epochs
      for k, v in metrics.items():
          self.log(
              k, v,
              on_step=True, on_epoch=True,
              prog_bar=('asr' in k), sync_dist=True
          )
      return loss

  def validation_step(self, batch, batch_idx):
      loss, metrics = self.get_multi_task_loss(batch, split='val')
      # log every val metric; Lightning will avg across all val batches
      for k, v in metrics.items():
          self.log(
              k, v,
              on_step=False, on_epoch=True,
              prog_bar=('cer' in k), sync_dist=True
          )
      return loss

  def test_step(self, batch, batch_idx):
      loss, metrics = self.get_multi_task_loss(batch, split='test')
      # log every test metric; Lightning will avg across all test batches
      for k, v in metrics.items():
          self.log(
              k, v,
              on_step=False, on_epoch=True,
              prog_bar=('cer' in k), sync_dist=True
          )
      return loss

## **Task 4.4: Training CTC-MTL [20 points]**

**Training & Written Response**

**→ Train the CTC-MTL network with the default hyperparameters we provide.**

One epoch of training CTC-MTL takes 3 minutes. We recommend to train for at least 15-20 epochs, although we do not guarantee this is enough to converge. If your notebook resets, you can continue training from an old checkpoint.

**CER target:
You should obtain a test CER of at most 0.30 for this model. You will obtain full points for demonstrating a model with test CER below this threshold.**

**→ Paste screenshots from your Weights & Biases dashboard of your loss curves and CER curve in the cell marked "Plots". Remember to include learning curves for the auxiliary tasks!**

In [None]:
# Run CTC-MTL

config = {
  'n_mels': 128,
  'n_fft': 256,
  'win_length': 256,
  'hop_length': 128,
  'wav_max_length': 512,
  'transcript_max_length': 200,
  'learning_rate': 1e-3,
  'batch_size': 128,
  'weight_decay': 0,
  'encoder_num_layers': 2,
  'encoder_hidden_dim': 256,
  'encoder_bidirectional': True,
  # you may wish to play with these weights; try to keep the sum
  # of them equal to one.
  'asr_weight': 0.25,
  'task_type_weight': 0.25,
  'dialog_acts_weight': 0.25,
  'sentiment_weight': 0.25,
}

run(system="LightningCTCMTL", config=config, ckpt_dir='ctc_mtl', epochs=20,
    monitor_key='val_loss', use_gpu=True)

In [None]:
# You can find the saved checkpoint here:
!ls /content/cs224s_spring2025/trained_models/ctc_mtl

**→ Using your plots as evidence in your description, answer the following questions:**

a) Report performance metrics on each of the auxiliary tasks and the CER of your jointly trained model.

b) Under the same configuration of hyperparameters, does CTC-MTL perform better than CTC? Why or why not? (Hint: Have the loss plots converged? How does multi-tasking affect the speed of learning the primary task?)

c) Which tasks seem to be more difficult than others? Why might that be?




---

**Plots:**


---

---

**Answer:**


---



# Part 5: One Model to Hear Them All
Congratulations, by this point you have trained multiple end-to-end deep learning neural networks for automatic speech recognition!


## **Task 5.1: Train and summarize your best model [30 Points]**

**Training & Written Response**

**Note:** You are welcome to conduct additional experiments in this part. Please copy cells from above into Part 7 of the notebook if you wish to utilize them.

**→ Alter any model of your choice or training procedure to improve performance! Describe what you tried, and report performance of your best model.** Include in your answer your design choices of:
- Type of loss functions (CTC or Joint CTC-MTL)
- Any auxiliary task(s) and weighting of tasks you may have used
- Training hyperparameters (e.g. learning rate)

**Hints for choosing auxiliary tasks:**
You must experiment with different combinations of auxiliary tasks to find the best-performing model.
Out of the three auxiliary tasks (`TaskTypeClassifier`, `DialogActsClassifier`, and `SentimentClassifier`), you are expected to explore and determine which combination improves your ASR model the most.

To experiment with different task combinations:
- Revisit `LightningCTCMTL`
  - In `__init__`, only instantiate the classifiers for the auxiliary tasks you want to use.
  - In `get_multi_task_loss`, always include ASR loss (since that's the primary task), and add only the auxiliary task losses you want to use.
- In Part 4.4: when you change the active tasks, you must update your weights so that the weights for all active tasks, including ASR, sum to 1.

You should attempt to improve your model performance in a reasonable way given what you have observed so far. You do not need to exhaustively optimize performance though; training at least 2 new models with improvements from previous model.


**CER target:
You should obtain a test CER of at most 0.25 for this model. You will earn a majority of the points for demonstrating a model with test CER below this threshold. Teaching staff will vote on top-performing systems and give full credit to only the top ~10 systems in the course.**


In [None]:
#### TODO YOUR CODE HERE ####


# Submission

**Great work!** You have completed the third assignment of the course, and in doing so you have 1) trained deep acoustic models from scratch on a speech dataset, 2) gained a practical intuition for different architectures and design choices for ASR, and 3) assessed a speech recognition system on your own voice!

**Gradescope Submission**
- Download your Colab notebook **with all cells fully executed** as a `.ipynb` file. Zip together your `.ipynb` with any supporting files. Submit this zipped file under `Assignment 3: Code Submission`.
- Open your `.ipynb` file locally and save it as a PDF. Submit this PDF under `Assignment 3: PDF Submission` and **tag all pages corresponding to each task**.

# Optional: Testing Your System!

**You can test your system with your own voice samples! Make 2 short recordings of yourself: 1) one on any topic and 2) another on a topic that more closely matches utterances in the HarperValleyBank dataset.** Save/convert them as `.WAV` files and upload them to your `DATA_PATH` directory. Then use `run_inference` to get your best model's transcriptions on them.

- How does it do? Are the transcripts accurate?
- Does the model generalize? If not, why do you think that is? What changes could be made to help the model generalize better?

---

**Answer:**


---

In [None]:
# Load your saved model weights.

ctc_checkpoint_file ='epoch=19-step=1620-v1.ckpt' # Fill in your checkpoint file
ctc_checkpoint_path = os.path.join(MODEL_PATH, 'ctc', ctc_checkpoint_file)

system = None
wav = None
device = torch.device('cuda')
sr = 8000

# Use system.eval() after you and your PyTorch Lightning system weights.
# Use `librosa.load` (refer to https://librosa.org/doc/latest/index.html).
# Use the default target sample rate of 8000.
# Use `librosa.effects.trim` to remove leading and trailing silences.

device = torch.device('cuda')
system = LightningCTC.load_from_checkpoint(ctc_checkpoint_path)
system = system.to(device)
system.eval()
wav_bank, sr = librosa.load(f'{DATA_PATH}/wav_bank.wav', sr=8000)
wav_normal, sr = librosa.load(f'{DATA_PATH}/wav_normal.wav', sr=8000)

wav_bank, _   = librosa.effects.trim(wav_bank)
wav_normal, _ = librosa.effects.trim(wav_normal)

predicted_bank = run_inference(system, wav_bank, device=device, sr=sr)
predicted_normal= run_inference(system, wav_normal, device=device, sr=sr)

print("Bank transcript:  ", predicted_bank)
print("Normal transcript:", predicted_normal)
