#  Transcript Generation


# Section 0: Setup/Importing

In [None]:
# TODO: Run this cell and follow instructions to connect this notebook to Google Drive
try:
    from google.colab import drive
    drive.mount('/content/drive')
except ImportError:
    print("Not on google drive")

In [None]:
# TODO: Replace path below to the folder containing your notebook and data folder
%cd path/to/folder/in/google/drive

In [None]:
# TODO Run this cell to download the data from Amazon AWS
# TODO If needed, replace your the local Google Drive path (/content/drive/MyDrive/pa1b/) with a path that works for you 

!wget -P /content/drive/MyDrive/pa3b/ https://cmu-dele-leaderboard-us-east-2-003014019879.s3.us-east-2.amazonaws.com/colab/pa3b/data3pb.zip

In [None]:
# TODO Run this cell to unzip the data from Amazon AWS to your local Drive
# TODO If needed, replace your the local Google Drive path (/content/drive/MyDrive/pa1b/data1pb.zip) with a path that works for you 


!unzip /content/drive/MyDrive/pa3b/data3pb.zip

In [None]:
# Install the levenshtein distance package
!pip install python-Levenshtein

In [1]:
# TODO: Run this cell to import packages
import os
import random
import numpy as np
from tqdm import tqdm
from Levenshtein import distance
import sys

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_packed_sequence, pack_padded_sequence, pack_sequence
print(torch.__version__, sys.version)

%load_ext autoreload
%autoreload 2

1.9.1 3.8.8 (default, Apr 13 2021, 15:08:03) [MSC v.1916 64 bit (AMD64)]


In [2]:
# TODO: Run this cell to automatically detect if GPU is available.
DEVICE = torch.device('cuda' if not torch.cuda.is_available() else 'cpu')

print(DEVICE)

cpu


# Section 1: Dataset and DataLoaders

Let's first load in our data and preview it.

You can see the `TOKEN_LIST` in the `utils.py` file.

In [3]:
# Let's first load in the data and label files.
from utils import load_data, convert_str_to_idxs

# TODO: If necessary, change the strings below to be the paths of your data files. 
train_data_path = "data/train_data.npy"
train_labels_path = "data/train_labels.npy"
val_data_path = "data/val_data.npy"
val_labels_path = "data/val_labels.npy"
test_data_path = "data/test_data.npy"

In [4]:
# TODO: Run this cell to preview what your data and labels will look like. 
val_data, val_labels = load_data(val_data_path, val_labels_path)

print(f"Example data:\n{val_data[0]}")
print(f"Shape of this data (num_frames, num_channels):\n{val_data[0].shape}")
print(f"Label:\n{val_labels[0]}")
print(f"Label converted to list(int) with <SOS> and <EOS> indices added:\n{convert_str_to_idxs(val_labels[0])}")
print(f"Number of tokens in label:\n{len(convert_str_to_idxs(val_labels[0]))}")

Example data:
[[2.3062422e-03 2.0986800e-03 3.0386688e-03 ... 4.5379176e-05
  1.3997178e-05 1.1967877e-05]
 [3.1049701e-03 7.7780215e-03 2.2952498e-03 ... 9.4744173e-06
  4.0979262e-06 6.3597912e-07]
 [2.9091453e-03 8.3023859e-03 2.6321730e-03 ... 1.2208303e-05
  4.4432700e-06 5.5138651e-07]
 ...
 [2.5126212e-03 8.6815646e-03 3.5488086e-03 ... 1.7203306e-06
  3.8962827e-07 3.2171218e-07]
 [2.3398087e-03 7.0686312e-03 2.7115962e-03 ... 1.7138029e-06
  1.0856153e-06 6.4771206e-07]
 [2.9576111e-03 4.1935476e-03 7.5066503e-04 ... 2.4030285e-06
  1.0650157e-06 4.4126014e-07]]
Shape of this data (num_frames, num_channels):
(307, 40)
Label:
THIS IS THE AMUSING ADVENTURE WHICH CLOSED OUR EXPLOITS
Label converted to list(int) with <SOS> and <EOS> indices added:
[37, 20, 8, 9, 19, 36, 9, 19, 36, 20, 8, 5, 36, 1, 13, 21, 19, 9, 14, 7, 36, 1, 4, 22, 5, 14, 20, 21, 18, 5, 36, 23, 8, 9, 3, 8, 36, 3, 12, 15, 19, 5, 4, 36, 15, 21, 18, 36, 5, 24, 16, 12, 15, 9, 20, 19, 38]
Number of tokens in label:
57

## Dataset

Let's begin by writing our own custom `Dataset` object.

In [5]:
from utils import load_data

class Speech2TextDataset(torch.utils.data.Dataset):
    """Dataset for training a speech-to-text model."""
    def __init__(self, data_path, labels_path=None):
        """[Given] All data preprocessing (including converting to Tensors) should happen here.
        This method runs only once, when the object is instantialized.
        
        You technically could do more processing/conversion in the __getitem__() method,
        but it'd drastically slow down querying data.  

        Args:
            data_path (str): Path to *_data.npy file
            labels_path (str, optional): Path to *_labels.npy file. Defaults to None.
        """
        # Load in data (and labels, if given)
        if labels_path is not None:
            data, labels = load_data(data_path, labels_path)
        else:
            data = load_data(data_path)
            labels = None
        
        # Convert the data to FloatTensors
        self.data = [torch.FloatTensor(d) for d in data]
        
        # Convert the labels to index tensors
        if labels is not None:
            self.labels = [torch.LongTensor(convert_str_to_idxs(l)) for l in labels]
        else:
            self.labels = None

    def __len__(self):
        """ TODO: This method defines what happens when someone runs len() on this object.

        Returns:
            int: The number of observations in the dataset.
        """
        # TODO: Complete this method based on the docstring above (1-liner, don't overthink)
        ### BEGIN SOLUTION
        return len(self.data)
        ### END SOLUTION
        raise NotImplementedError


    def __getitem__(self, idx):
        """ TODO: This method defines what happens when someone tries to index this object, e.g. `train_dataset[3]`

        Args:
            idx (int): The idx of the desired observation from self.data and self.labels (if exists). Will be in [0, len(self))
                       After defining this method, multi-index querying such as `train_dataset[3:5]` will work too.

        Returns (depends on if labels are given):
            torch.FloatTensor, torch.LongTensor: If labels given, return data and labels
            or 
            torch.FloatTensor: If no labels given, return only data
        """
        # TODO: Complete this method based on the docstring above
        # Hint: check if `self.labels` exists
        ### BEGIN SOLUTION
        if self.labels is not None:
            return self.data[idx], self.labels[idx]
        else:
            return self.data[idx]
        ### END SOLUTION
        raise NotImplementedError

Now let's test out your implementation with the val and test datasets to make sure everything works.

In [6]:
# TODO: Run to test the __init__ method.
val_dataset = Speech2TextDataset(val_data_path, val_labels_path)
test_dataset = Speech2TextDataset(test_data_path)

In [7]:
# TODO: Run to test the __len__ method
assert len(val_dataset) == 2703, "__len__ method defined incorrectly, or paths to val files are incorrect"
assert len(test_dataset) == 2620, "__len__ method defined incorrectly, or paths to test file is incorrect"
print(f"len(val_dataset): {len(val_dataset)}")
print(f"len(test_dataset): {len(test_dataset)}")

len(val_dataset): 2703
len(test_dataset): 2620


In [8]:
# TODO: Run to test the __getitem__ method

# Test that querying works on the val dataset
data, label = val_dataset[0]
assert data is not None and label is not None, "__getitem__ defined incorrectly, val dataset shouldn't return None for labels"
assert isinstance(data, torch.Tensor) and isinstance(label, torch.Tensor), "Objects returned are not tensors."
assert data.shape == (307, 40), "Shape of queried data is incorrect, possibly queried wrong data"
assert label.shape == (57,), "Shape of queried label is incorrect, possibly queried wrong label"

# Test that querying works on the test dataset
data = test_dataset[0]
assert isinstance(data, torch.Tensor), f"Test dataset should return only a single data tensor"

print("Everything works correctly!")

Everything works correctly!


## `collate_and_pad`
Below, we give you the implementation of the collate function we described in the writeup.

In [5]:
def collate_and_pad(batch):
    """Instructions for the dataloader on how to form a batch given multiple observations
    
    Args:
        batch (list): list of observations. If labels are present, it will be a list of tuples of two tensors,
                      else it'll be a list of tensors
                      
    Returns (depends on if labels are present):
        torch.FloatTensor, torch.LongTensor, torch.FloatTensor: data, data_lens, labels
        or 
        torch.FloatTensor, torch.LongTensor: data, data_lens
    """
    # If each item in batch is a tuple, that means labels are present
    if isinstance(batch[0], tuple):
        # Convert the list of (data, label) into two separate lists
        data, labels = zip(*batch)
        
        # Pad the labels and make into a single tensor
        labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first=True, padding_value=0)
    else:
        data, labels = batch, None
        
    # Create tensors for lengths and padded inputs, similar to above
    data_lens = torch.LongTensor([len(d) for d in data])
    data = torch.nn.utils.rnn.pad_sequence(data, batch_first=True, padding_value=0)
    
    if labels is not None:
        return data, data_lens, labels
    else:
        return data, data_lens

## Initialize `Dataset`s and `DataLoader`s 
Next we'll initialize the custom `Dataset`s for train/val/test and the default `DataLoader`s.

In [6]:
# TODO: Initialize Speech2TextDataset objects here
train_dataset = None
val_dataset = None
test_dataset = None

### BEGIN SOLUTION
train_dataset = Speech2TextDataset(train_data_path, train_labels_path)
val_dataset = Speech2TextDataset(val_data_path, val_labels_path)
test_dataset = Speech2TextDataset(test_data_path)
### END SOLUTION

NameError: name 'Speech2TextDataset' is not defined

In [7]:
# Feel free to adjust based on guidelines we provided in homework 1B.
batch_size = 64

# TODO: Initialize dataloaders
num_workers = 0
# num_workers = os.cpu_count() # this will speed things up

train_dataloader = None
val_dataloader = None
test_dataloader = None

### BEGIN SOLUTION
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, collate_fn=collate_and_pad)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=collate_and_pad)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=collate_and_pad)
### END SOLUTION

TypeError: object of type 'NoneType' has no len()

# Section 2: Encoder

## `downsample()`
In preparation for implementing the `pBLSTM`, let's first implement the downsampling operation that each `pBLSTM` performs.


In [8]:
def downsample(x, lens):
    """Downsamples given input for pBLSTM.

    Args:
        x (torch.FloatTensor): (batch_size, seq_len, hidden_size) Data to downsample 
        lens (torch.LongTensor): (batch_size,) Length of each batch before padding

    Returns:
        torch.FloatTensor, torch.LongTensor: (batch_size, seq_len//2, hidden_size*2), (batch_size,)
                                             x and lens after downsampling
    """
    batch_size, seq_len, hidden_size = x.shape

    # TODO: Implement based on description in writeup
    ### BEGIN SOLUTION
    x = x[:, :seq_len//2*2, :]
    x = x.reshape(batch_size, seq_len//2, hidden_size*2)
    lens //= 2
    return x, lens
    ### END SOLUTION
    raise NotImplementedError

Let's test the implementation.

In [9]:
# TODO: Run this test

# Example input shaped (batch_size=2, max_len=5, hidden_size=4)
x = torch.FloatTensor([[[ 4.,  2.,  2.,  1.],  # First seq in the batch, a sequence of 5 frames, 
                        [ 2.,  2.,  1.,  -2.], # each with 4 frequency bands
                        [ 1.,  3.,  3.,  2.],
                        [ 3.,  2.,  2.,  4.],
                        [ -2.,  1.,  1.,  1.]],

                       [[ 2.,  1.,  -3., -1.], # Second seq in the batch, originally shaped (3, 4)
                        [-2.,  1.,  3.,  2.],  # but padded with 0's to shape (5, 4) 
                        [ -2., -1.,  -1.,  3.],
                        [ 0.,  0.,  0.,  0.],
                        [ 0.,  0.,  0.,  0.]]])

# Corresponding lengths tensor shaped (batch_size=2,)
lens = torch.LongTensor([5, 3])

# Run your downsampling method
downsampled_x, downsampled_lens = downsample(x, lens)

# Make sure input is correctly downsampled
assert torch.equal(downsampled_x, torch.FloatTensor([[[ 4.,  2.,  2.,  1.,  2.,  2.,  1., -2.],
                                                      [ 1.,  3.,  3.,  2.,  3.,  2.,  2.,  4.]],
                                                    
                                                     [[ 2.,  1., -3., -1., -2.,  1.,  3.,  2.],
                                                      [-2., -1., -1.,  3.,  0.,  0.,  0.,  0.]]]))
# Make sure lengths are correctly downsampled
assert torch.equal(downsampled_lens, torch.LongTensor([2, 1]))

print("Before downsampling:", x.shape)
print("After downsampling:", downsampled_x.shape)
print("Correct!")

Before downsampling: torch.Size([2, 5, 4])
After downsampling: torch.Size([2, 2, 8])
Correct!


To keep the current behavior, use torch.div(a, b, rounding_mode='trunc'), or for actual floor division, use torch.div(a, b, rounding_mode='floor'). (Triggered internally at  ..\aten\src\ATen\native\BinaryOps.cpp:450.)
  lens //= 2


## `pBLSTM`
Now let's implement the custom object itself.

Finish the `__init__` and `forward` methods.

See [this link](https://stackoverflow.com/questions/51030782/why-do-we-pack-the-sequences-in-pytorch) for an explanation of why we convert input tensors to `PackedSequence`s.

In [10]:
class pBLSTM(nn.Module):
    """The Pyramidal Bi-LSTM layer, as per LAS"""
    def __init__(self, hidden_size):
        super().__init__()
        # TODO: Initialize LSTM with appropriate parameters (see encoder diagram in writeup)
        self.lstm = None 
        ### BEGIN SOLUTION
        self.lstm = nn.LSTM(hidden_size*4, hidden_size, bidirectional=True, batch_first=True)
        ### END SOLUTION

    def forward(self, x):
        """Forward pass of the pBLSTM.

        Args:
            x (torch.nn.utils.rnn.PackedSequence): Packed input data.

        Returns:
            torch.nn.utils.rnn.PackedSequence: Packed output data.
        """
        # [Given] Unpack the input
        x, lens = pad_packed_sequence(x, batch_first=True)

        # TODO: Run downsampling
        ### BEGIN SOLUTION
        x, lens = downsample(x, lens)
        ### END SOLUTION
        
        # [Given] Pack the downsampled input
        x = pack_padded_sequence(x, lens, enforce_sorted=False, batch_first=True)

        # TODO: Run through the LSTM and return
        ### BEGIN SOLUTION
        x, _ = self.lstm(x)
        return x
        ### END SOLUTION
        raise NotImplementedError


Let's run a basic test for your implementation.

In [11]:
# TODO: Run this cell to test pBLSTM implementation
from utils import init_pblstm_for_testing

# Create layer
pblstm = pBLSTM(hidden_size=2) # Note the hidden_size
init_pblstm_for_testing(pblstm)

# Create input shaped (batch_size=2, max_len=5, hidden_size=4)
x = torch.FloatTensor([[[ 4.,  2.,  2.,  1.],
                        [ 2.,  2.,  1.,  -2.],
                        [ 1.,  3.,  3.,  2.],
                        [ 3.,  2.,  2.,  4.],
                        [ -2.,  1.,  1.,  1.]],

                       [[ 2.,  1.,  -3., -1.],
                        [-2.,  1.,  3.,  2.],
                        [ -2., -1.,  -1.,  3.],
                        [ 0.,  0.,  0.,  0.],
                        [ 0.,  0.,  0.,  0.]]])

# Create lengths tensor shaped (batch_size=2,)
lens = torch.LongTensor([5, 3])

# We need to pack this tensor before giving it to the layer
print("Shape before pBLSTM:", x.shape)
x = pack_padded_sequence(x, lens, batch_first=True, enforce_sorted=False)

# Run through layer, unpack
out = pblstm(x)
out, lens = pad_packed_sequence(out, batch_first=True)

print("Shape after pBLSTM:", out.shape)

out_expected = torch.tensor([
    [[7.6159e-01, 7.6159e-01, 9.6403e-01, 9.6403e-01],
     [9.6403e-01, 9.6403e-01, 7.6159e-01, 7.6159e-01]],
    [[1.7026e-02, 9.1105e-04, 7.5950e-01, 1.0450e-01],
     [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00]]])

assert out.shape == (2, 2, 4), "Shape of the output is incorrect; did you return the correct one?"
assert torch.equal(lens, torch.tensor([2, 1])), "Lens tensor is incorrect; did you return the correct downsampled one?"
assert torch.allclose(out, out_expected, atol=1e-4), "Output is incorrect; did you correctly instantiate your LSTM?"

print("Tests passed!")


Shape before pBLSTM: torch.Size([2, 5, 4])
Shape after pBLSTM: torch.Size([2, 2, 4])
Tests passed!


## `Encoder`

Now to implement the encoder

In [12]:
class Encoder(nn.Module):
    """The Encoder embeds input speech data by projecting them into a 'key' tensor and 'value' tensor."""
    def __init__(self, num_channels, hidden_size, attn_size):
        super(Encoder, self).__init__()
        
        # TODO: Initialize layers appropriately using the args given to __init__
        self.lstm = None
        self.pblstm1 = None
        self.pblstm2 = None
        self.pblstm3 = None
        self.key_network = None
        self.value_network = None
        ### BEGIN SOLUTION
        self.lstm = nn.LSTM(num_channels, hidden_size, bidirectional=True, batch_first=True)
        self.pblstm1 = pBLSTM(hidden_size)
        self.pblstm2 = pBLSTM(hidden_size)
        self.pblstm3 = pBLSTM(hidden_size)
        self.key_network = nn.Linear(hidden_size*2, attn_size)
        self.value_network = nn.Linear(hidden_size*2, attn_size)
        ### END SOLUTION

    def forward(self, x, lens):
        """Forward pass of the LAS encoder

        Args:
            x (torch.FloatTensor): Padded input tensor, before packing. Shaped (batch_size, num_frames, num_channels)
            lens (torch.LongTensor): Lengths of each seq before padding. Shaped (batch_size,)

        Returns:
            torch.FloatTensor, torch.FloatTensor, torch.LongTensor : keys, values, lens
        """
        # [Given] Pack sequence
        x = pack_padded_sequence(x, lengths=lens.cpu(), enforce_sorted=False, batch_first=True)
        
        # TODO: Pass through LSTM and pBLSTMs
        ### BEGIN SOLUTION
        x, _ = self.lstm(x)
        x = self.pblstm1(x)
        x = self.pblstm2(x)
        x = self.pblstm3(x)
        ### END SOLUTION
        
        # [Given] Unpack
        x, lens = pad_packed_sequence(x, batch_first=True)
        
        # TODO: Pass through final linear layers, return
        ### BEGIN SOLUTION
        keys = self.key_network(x)
        values = self.value_network(x)
        return keys, values, lens
        ### END SOLUTION
        raise NotImplementedError

Let's run a simple test to see if your encoder will initialize and pass an input through successfully.

In [13]:
from utils import init_encoder_for_testing

# Initialize (for the actual encoder, use input_size 40, hidden_size 256, attn_size 128!)
encoder = Encoder(num_channels=5, hidden_size=4, attn_size=2)
init_encoder_for_testing(encoder)

# Create some random data
data = torch.randint(5, (2, 18, 5)).float()
data_lens = torch.LongTensor([18, 16])
print(f"data.shape: {data.shape}, data_lens.shape: {data_lens.shape}")

# Pass through encoder
keys, values, lens = encoder(data, data_lens)
print(f"keys.shape: {keys.shape}, values.shape: {values.shape}, lens.shape: {lens.shape}")

# Check that keys and values are correctly shaped
assert keys.shape[1] == data.shape[1] // 8 and values.shape[1] == data.shape[1] // 8, "seq_len dimension of keys and values not correctly shortened by // 8"
assert keys.shape[2] == 2 and values.shape[2] == 2, "Keys and values should have last dimension size 4 (the attn_size we set), but it does not."

# Check that the lengths are shortened too
assert torch.equal(data_lens//8, lens), "Values in the lens tensor are not correctly shortened by // 8"

# Check values of keys and values
keys_expected = torch.tensor([
    [[21.2562, 17.0434], [21.2562, 16.8409]],
    [[21.2562, 17.0434], [21.2562, 16.8409]]])

values_expected = torch.tensor([
    [[14.6025, 14.6025], [15.0074, 15.0074]],
    [[14.6025, 14.6025], [15.0074, 15.0074]]])

assert torch.allclose(keys, keys_expected, atol = 1e-4), "Keys are incorrect, 2x check your encoder!"
assert torch.allclose(values, values_expected, atol = 1e-4), "Values are incorrect, 2x check your encoder!"

print("Seems good!")

data.shape: torch.Size([2, 18, 5]), data_lens.shape: torch.Size([2])
keys.shape: torch.Size([2, 2, 2]), values.shape: torch.Size([2, 2, 2]), lens.shape: torch.Size([2])
Seems good!


To keep the current behavior, use torch.div(a, b, rounding_mode='trunc'), or for actual floor division, use torch.div(a, b, rounding_mode='floor'). (Triggered internally at  ..\aten\src\ATen\native\BinaryOps.cpp:467.)
  return torch.floor_divide(self, other)


# Section 3: `Decoder`

## `Attention`

Let's first implement the attention mechanism, as it'll be needed in the decoder.

In [14]:
class Attention(nn.Module):
    def __init__(self):
        super().__init__()
        # [Optional] If desired, you init your own layers here to actively learn attention.

    def forward(self, query, keys, values, lens):
        """Forward pass of attention.

        Args:
            query (torch.FloatTensor): (batch_size, attn_size)
            keys (torch.FloatTensor): (batch_size, seq_len, attn_size)
            values (torch.FloatTensor): (batch_size, seq_len, attn_size)
            lens (torch.LongTensor): (batch_size,)

        Returns:
            torch.FloatTensor, torch.FloatTensor: context (batch_size, attn_size) and attention (batch_size, seq_len)
        """
        # TODO: Implement steps 1-3 of attention diagram in writeup
        scores = None
        ### BEGIN SOLUTION
        scores = torch.bmm(keys, query.unsqueeze(2)).squeeze(2)
        ### END SOLUTION
        
        # [Given] Step 4
        mask = torch.arange(values.size(1), device=DEVICE).unsqueeze(0) >= lens.to(DEVICE).unsqueeze(1)
        scores = scores.masked_fill(mask, float('-inf'))
        attention = F.softmax(scores, dim=1)
        
        # TODO: Complete remaining steps 
        ### BEGIN SOLUTION
        context = torch.bmm(attention.unsqueeze(1), values).squeeze(1)
        return context, attention
        ### END SOLUTION
        raise NotImplementedError


Let's test your implementation of attention!

In [15]:
# Initialize inputs
query = torch.FloatTensor([[1, 2],
                           [3, 4]]).to(DEVICE)
key = torch.FloatTensor([[[3, -2],
                          [1, 2]],
                         [[4, 2],
                          [2, 4]]]).to(DEVICE)
value = torch.FloatTensor([[[1, 2],
                          [2, 1]],
                         [[-2, 2],
                          [3, -2]]]).to(DEVICE)
lens = torch.FloatTensor([1, 2]).to(DEVICE)

print(f"query.shape: {query.shape}, key.shape: {key.shape}, value.shape: {value.shape}, lens.shape: {lens.shape}")

# Initialize attention module, pass inputs through
attention = Attention()
context, attention_mask = attention(query, key, value, lens)

print(f"context.shape: {context.shape}, attention_mask.shape: {attention_mask.shape}")

expected_context = torch.FloatTensor([[ 1.0000,  2.0000], [ 2.4040, -1.5232]]).to(DEVICE)
expected_attention_mask = torch.FloatTensor([[1.0000, 0.0000], [0.1192, 0.8808]]).to(DEVICE)

# Check context vector values are close enough to reference (within floating point tolerance)
assert torch.allclose(context, expected_context, atol=1e-4), \
        "Values or shape of context is incorrect."

# Check attention mask values
assert torch.allclose(attention_mask, expected_attention_mask, atol=1e-4), \
        "Values or shape of attention_mask is incorrect"

print("Correct!")

query.shape: torch.Size([2, 2]), key.shape: torch.Size([2, 2, 2]), value.shape: torch.Size([2, 2, 2]), lens.shape: torch.Size([2])
context.shape: torch.Size([2, 2]), attention_mask.shape: torch.Size([2, 2])
Correct!


## `Decoder`

Implement `forward` and (optional but recommended) `prepare_input`


In [16]:
from utils import token_to_idx

class Decoder(nn.Module):
    def __init__(self, vocab_size, hidden_size, attn_size):
        super(Decoder, self).__init__()
        
        self.vocab_size = vocab_size
        
        # [Given] Initialize modules
        self.embedding_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=hidden_size, padding_idx=0)
        self.lstm1 = nn.LSTMCell(input_size=hidden_size + attn_size, hidden_size=hidden_size)
        self.lstm2 = nn.LSTMCell(input_size=hidden_size, hidden_size=attn_size)
        self.attention_layer = Attention()
        self.character_prob = nn.Linear(attn_size*2, vocab_size)

    def forward(self, keys, values, lens, labels, tf_prob):
        """Forward pass of decoder
        
        Args:
            keys (torch.FloatTensor): (batch_size, seq_len, attn_size)
            value (torch.FloatTensor): (batch_size, seq_len, attn_size)
            lens (torch.LongTensor): (batch_size,)
            labels (torch.LongTensor): Labels as indices, shaped (batch_size, max_label_len)
                                       only needed during training. During eval, this should be None.
            tf_prob (float): Teacher forcing probability, where 0 means we never give correct labels
                                       and 1 is we always give correct labels. During eval, this should be 0.
        Returns:
            torch.FloatTensor, torch.FloatTensor: Concatenated predictions (batch_size, vocab_size, max_len)
                                                  and stacked attentions (max_len, seq_len)
        """
        # [Given] Depending on if we're in train or eval, set max_len and pre-generate label embeddings 
        if labels is not None: # Train
            max_len = labels.shape[1] - 1
            label_embeddings = self.embedding_layer(labels)
        else:
            max_len = 600 # Eval
            label_embeddings = None
        
        # [Given] Initialize first prediction logit as having 100% probability of predicting <sos>
        prediction = torch.zeros((keys.shape[0], self.vocab_size), dtype=torch.float, device=DEVICE)
        prediction[:, token_to_idx["<sos>"]] = 1.0
        
        # [Given] Initialize context vector
        context = values[:, 0, :] # Normally this should store the attended values of attention,
                                  # but at t=0 we just use a slice of values shaped (batch_size, attn_size)
        
        # [Given] Other initializations
        predictions = [] # Append your predicted logit at each timestep here
                         # Note we don't store the above <sos> prediction, not needed for loss calculation
        hidden_states = [None, None] # Two sets of hidden states, one for each LSTMCell.
                                     # Each list will hold the h_0 and c_0 of that cell to pass between time steps  
        attentions = [] # To store the attention tensors produced at each time step

        # TODO: Follow for loop pseudocode in writeup
        ### BEGIN SOLUTION
        for t in range(max_len):
            x = self.prepare_input(prediction, label_embeddings, context, t, tf_prob)
            
            # Pass input through LSTMCells
            hidden_states[0] = self.lstm1(x, hidden_states[0])
            x = hidden_states[0][0]
            hidden_states[1] = self.lstm2(x, hidden_states[1])
            x = hidden_states[1][0] # (batch_size, hidden_size)
            
            # Attention
            context, attention = self.attention_layer(x, keys, values, lens)
            
            # Calculate prediction logit for next token
            prediction = self.character_prob(torch.cat([x, context], dim=1))
            
            # Append the attention mask and predictions
            attentions.append(attention[0, :])
            predictions.append(prediction)
        ### END SOLUTION

        # TODO: Return appropriate args
        ### BEGIN SOLUTION
        return torch.stack(predictions, dim=2), torch.stack(attentions)
        ### END SOLUTION
        raise NotImplementedError

    def prepare_input(self, prediction, label_embeddings, context, t, tf_prob):
        """[Optional] Method to prepare x at each timestep. Step 1 in for loop pseudocode. 
        
        We made a separate method for this to reduce clutter, but you can implement step 1 directly in the for loop.

        Args:
            prediction (torch.FloatTensor): (batch_size, vocab_size) Prediction logit of previous timestep  
            context (torch.FloatTensor): (batch_size, attn_size) Context from previous timestep
            label_embeddings (torch.FloatTensor): (batch_size, hidden_size) Pre-embedded labels.
                                                  During eval, this will be None.
            t (int): Index of current timestep, used to index label_embeddings if teacher forcing
            tf_prob (float): The probability of teacher forcing occurring

        Returns:
            torch.FloatTensor: x (batch_size, hidden_size+attn_size)
        """
        # TODO: Implement step 1 of the for loop pseudocode, with teacher forcing
        ### BEGIN SOLUTION
        # If we're teacher forcing this time:
        if random.random() < tf_prob:
            char_embed = label_embeddings[:, t, :]
        else:
            # Otherwise, use our previous prediction
            char_embed = self.embedding_layer(prediction.argmax(dim=-1))

        return torch.cat([char_embed, context], dim=1)
        ### END SOLUTION
        raise NotImplementedError

Below are some tests to validate.


In [17]:
from utils import init_decoder_for_testing, TOKEN_LIST

# Initialize weights of network with random seed
decoder = Decoder(vocab_size=len(TOKEN_LIST), hidden_size=256, attn_size=4).to(DEVICE)
init_decoder_for_testing(decoder)

# Create keys and values, both shaped (batch_size=2, max_len=5, attn_size=4)
keys = torch.FloatTensor([[[ 4.,  2.,  2.,  1.],
                        [ 2.,  2.,  1.,  -2.],
                        [ 1.,  3.,  3.,  2.],
                        [ 3.,  2.,  2.,  4.],
                        [ -2.,  1.,  1.,  1.]],

                       [[ 2.,  1.,  -3., -1.],
                        [-2.,  1.,  3.,  2.],
                        [ -2., -1.,  -1.,  3.],
                        [ 0.,  0.,  0.,  0.],
                        [ 0.,  0.,  0.,  0.]]]).to(DEVICE)

values = torch.FloatTensor([[[ 4.,  2.,  2.,  1.],
                        [ 2.,  2.,  1.,  -2.],
                        [ 1.,  3.,  3.,  2.],
                        [ 3.,  2.,  2.,  4.],
                        [ -2.,  1.,  1.,  1.]],

                       [[ 2.,  1.,  -3., -1.],
                        [-2.,  1.,  3.,  2.],
                        [ -2., -1.,  -1.,  3.],
                        [ 0.,  0.,  0.,  0.],
                        [ 0.,  0.,  0.,  0.]]]).to(DEVICE)

labels = torch.LongTensor([[10, 2, 5, 3, 5],
                           [10, 2, 5, 3, 5]]).to(DEVICE)

# Lengths tensor and tf probability of 0 (always use prev prediction)
lens = torch.LongTensor([2, 5])
tf_prob = 0.

# Run through decoder
predictions, attentions = decoder(keys, values, lens, labels, tf_prob)

# Compare a slice of your prediction tensor against a reference. We use a slice for visual clarity.
your_prediction_slice = predictions[-1, -1, : ]
answer_prediction_slice = torch.tensor([ 8.2803,  9.6490,  9.8506,  9.8782]).to(DEVICE)

# Reference attention matrix
answer_attentions = torch.tensor([[0.9897, 0.0103, 0.0000, 0.0000, 0.0000],
         [0.9969, 0.0031, 0.0000, 0.0000, 0.0000],
         [0.9975, 0.0025, 0.0000, 0.0000, 0.0000],
         [0.9975, 0.0025, 0.0000, 0.0000, 0.0000]]).to(DEVICE)


# Check that slice of prediction is correct
assert torch.allclose(your_prediction_slice, answer_prediction_slice, atol=1e-4), \
    "Slice of your predictions do not match our reference."

# Check that attention matrix is correct
assert torch.allclose(attentions, answer_attentions, atol=1e-4), \
    "Attention matrix does not match our reference."

print("All good!")

All good!


## Section 4: `LAS`


In [None]:
class LAS(nn.Module):
    """Listen, Attend, and Spell model (Chan, Jaitly, Le, Vinyals 2015)"""
    def __init__(self, num_channels, vocab_size, hidden_size, attn_size):
        """[Given]
        Args:
            num_channels (int): How many frequency bands each frame of each spectrogram has
            vocab_size (int): How many tokens are in your vocabulary
            hidden_size (int): Size of various components throughout network.
            attn_size (int): Number of dimensions your attention should work with.
        """
        super().__init__()
        self.encoder = Encoder(num_channels, hidden_size, attn_size)
        self.decoder = Decoder(vocab_size, hidden_size, attn_size)

    def forward(self, spectrograms, spectrogram_lens, labels=None, tf_prob=0.):
        """[Given]
        Args:
            spectrograms (torch.FloatTensor): (batch_size, num_frames, num_channels) Padded batch of spectrograms
            spectrogram_lens (torch.LongTensor): (batch_size,) Length of each spectrogram before padding
            labels (torch.LongTensor, optional): (batch_size, max_label_len) Padded batch of label indices. Defaults to None.
            tf_prob (float, optional): Teacher forcing probability. Defaults to 0. Must be 0 during eval

        Returns:
            torch.FloatTensor, torch.FloatTensor: Predictions (batch_size, vocab_size, max_len)
                                                  Attentions (max_len, seq_len)
        """
        key, value, lens = self.encoder(spectrograms, spectrogram_lens)
        predictions, attentions = self.decoder(key, value, lens, labels, tf_prob)
        return predictions, attentions

## Section 5: Train/Val/Test loops

### `train_epoch`

This will be a pretty typical training loop.

In [None]:
from utils import convert_idxs_to_str

def train_epoch(model, optimizer, dataloader, tf_prob=1.):
    """Runs a single training epoch.

    Args:
        model (nn.Module): Your initialized LAS model.
        optimizer (torch.optim.Optimizer): An initialized optimizer.
        dataloader (torch.utils.data.DataLoader): Your train dataloader
        tf_prob (float, optional): Teacher forcing rate. Defaults to 1 (100%).

    Returns:
        torch.FloatTensor: The final attention tensor of the epoch, shaped (max_len, seq_len)
    """
    ### BEGIN SOLUTION
    model.train()
    loss_per_batch = []
    loss_function = nn.CrossEntropyLoss(ignore_index=0)

    for i, (data, data_lens, labels) in tqdm(enumerate(dataloader), total=len(dataloader)):
        data, labels = data.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        predictions, attention = model(data, data_lens, labels, tf_prob)
        loss = loss_function(predictions, labels[:, 1:])
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 2)
        optimizer.step()
        loss_per_batch.append(loss.item())
    ### END SOLUTION
    return attention

Below, we provide validation code.

In [None]:
def validate(model, dataloader):
    """Runs a single validation epoch and prints results.

    Args:
        model (nn.Module): Your initialized LAS model.
        dataloader (torch.utils.data.DataLoader): Your val dataloader
    """
    model.eval()
    distances = []
    with torch.inference_mode():
        for (data, data_lens, labels) in tqdm(dataloader, total=len(dataloader)):
            data, labels = data.to(DEVICE), labels.to(DEVICE)
            predictions, _ = model(data, data_lens, labels=None, tf_prob=0.)
            pred_idxs = predictions.argmax(dim=1)
            prediction_strs = [convert_idxs_to_str(p.tolist(), remove_special_tokens=True) for p in pred_idxs]
            label_strs = [convert_idxs_to_str(l.tolist(), remove_special_tokens=True) for l in labels]
            batch_distances = [distance(p, l) for p, l in zip(prediction_strs, label_strs)]
            distances.extend(batch_distances)
    print(f"Example prediction: {prediction_strs[0]}")
    print(f"Label: {label_strs[0]}")
    print(f"Average Levenshtein distance: {np.mean(distances)}")

### `predict`


In [None]:
def predict(model, dataloader):
    """Generates list of predictions for a dataloader

    Args:
        model (nn.Module): Your initialized LAS model.
        dataloader (torch.utils.data.DataLoader): Your test dataloader
    
    Returns:
        list: All prediction strings of the given test dataloader, in original order. 
    """
    ### BEGIN SOLUTION
    model.eval()
    prediction_strs = []
    with torch.inference_mode():
        for (data, data_lens) in tqdm(dataloader, total=len(dataloader)):
            data = data.to(DEVICE)
            predictions, _ = model(data, data_lens, labels=None, tf_prob=0.)
            pred_idxs = predictions.argmax(dim=1)
            batch_prediction_strs = [convert_idxs_to_str(p.tolist(), remove_special_tokens=True) for p in pred_idxs]
            prediction_strs.extend(batch_prediction_strs)
    return prediction_strs
    ### END SOLUTION

## Section 6: Initialization and Running

### Initialization

First, initialize the objects.

In [None]:
# TODO: Initialize your model (put on GPU), optimizer, and (optional) scheduler
model = None
optimizer = None

### BEGIN SOLUTION
model = LAS(num_channels=40, vocab_size=len(TOKEN_LIST), hidden_size=256, attn_size=128).to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters())
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)
### END SOLUTION

In [None]:
print(model)

Now we'll make sure that predict runs

In [None]:
results = predict(model, test_dataloader)

### Train

Now, write the full train loop.

In [None]:
print(model)

In [None]:
from utils import plot_attention

# TODO: Run for some number of epochs

### BEGIN SOLUTION
num_epochs = 2
for e in range(num_epochs):
    print(f"Epoch #{e}")
    attention = train_epoch(model, optimizer, train_dataloader, tf_prob=0.9)
    plot_attention(attention)
    validate(model, val_dataloader)
### END SOLUTION

# Section 7: Test Predictions

Now to generate predictions and export!

In [None]:
from utils import export_predictions_to_csv

predictions = predict(model, test_dataloader)
export_predictions_to_csv(predictions)