<a href="https://colab.research.google.com/github/MatchLab-Imperial/deep-learning-course/blob/master/05_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Coursework



### **Task 1: RNN Regression**

In this task, you are asked to estimate the next value of a time series. Specifically, we have selected the popular airline passenger dataset. This dataset contains the number of passengers that travels with a certain airline company. The data contains 144 entries, each entry corresponds to the number of the passengers that travel in a given month. The dataset starts in 1949, and it lasts until 1960.

Similarly to the previous example, we import the data and plot it to see the structure.

In [None]:
!wget https://raw.githubusercontent.com/jbrownlee/Datasets/master/airline-passengers.csv

In [None]:
# Load dataset (only 2nd column)
data = pd.read_csv("airline-passengers.csv", usecols=[1], engine="python")

# Plot passengers over time
data.plot(title="Airline Passengers (1949–1960)")
plt.xlabel("Months")
plt.ylabel("Passengers")
plt.show()

# Convert to float32 numpy
data_np = data.to_numpy(dtype="float32")

# Train/test split (70/30)
split_idx = int(len(data_np) * 0.7)
train_np, test_np = data_np[:split_idx], data_np[split_idx:]

print(f"Training samples: {len(train_np)} | Test samples: {len(test_np)}")

# Scale using MinMaxScaler (fit on train, transform both)
scaler = sklearn.preprocessing.MinMaxScaler(feature_range=(0, 1))
train_np_norm, test_np_norm = scaler.fit_transform(train_np), scaler.transform(test_np)

First of all, you need to train an RNN on the airline passenger dataset. This exercise expects you to study the impact of the `window_size` variable when defining the `train` and `test` dataset splits. Remember that the `window_size` variable indicates the number of past observations used for predicting the current value. Here, we treat the `test` split as a validation set.

In [None]:
def create_dataset(dataset, window_size = 1):
    data_x, data_y = [], []
    for i in range(len(dataset) - window_size):
        sample = dataset[i:(i + window_size), 0]
        data_x.append(sample)
        data_y.append(dataset[i + window_size, 0])
    return np.array(data_x), np.array(data_y)


window_size = 1 # Use this variable to build the dataset with different number of inputs

# Create test and training sets for regression with different window sizes.
train_X, train_Y = create_dataset(train_np_norm, window_size)
test_X, test_Y = create_dataset(test_np_norm, window_size)

train_X = np.reshape(train_X, (train_X.shape[0], train_X.shape[1], 1))
test_X = np.reshape(test_X, (test_X.shape[0], test_X.shape[1], 1))

# Convert to tensors
train_X_tensor = torch.tensor(train_X, dtype=torch.float32)
train_Y_tensor = torch.tensor(train_Y, dtype=torch.float32).unsqueeze(1)
test_X_tensor = torch.tensor(test_X, dtype=torch.float32)
test_Y_tensor = torch.tensor(test_Y, dtype=torch.float32).unsqueeze(1)

print("Shape of training inputs: " + str((train_X_tensor.shape)))
print("Shape of training labels: " + str((train_Y_tensor.shape)))


**Report**:

*   Create a plot showing the test curves of models trained with different `window_size` values. Report the plot and discuss the main differences you observe between the predicted curves. You can use the style proposed on the Many to One RNNs - Regression section to plot your curves.

### **Task 2: Text Embeddings**
For this task, we tackle a classification problem using the IMDB sentiment dataset as done in the example in the notebook. Labels in IMDB are 0 for negative reviews and 1 for positive reviews. The definitions of the models you will use for this task are given the code below. This task is similar to the transfer learning/finetuning task in the CNN Architectures notebook, however we now test the effect of transfer learning in the embeddings. In this task we use train, validation and test splits with Early Stopping. That means that we will take the best performing model in the validation set and use it in the test set to get a final performance.

**Report**
* Using embeddings of dimensionality 1, train a model without using any LSTM, only using an average pooling of the input embeddings (called `embeddings_model` in the code given below). Then train another model with an LSTM and trainable embeddings initialized at random (called `lstm_model`). Finally train a model with an LSTM with non-trainable embeddings initialized with GloVe embeddings (called `lstm_glove_model`). The code to train the three models is given below. Report in a table the test accuracy obtained after training with the given code for the three models. Also attach in the Appendix the training and validation accuracy curves for the different models trained. You can report the curves after using EarlyStopping with patience 10 (already given in the code), so you don't have to train for the full 50 epochs the three models. Discuss the results.

* Predict the sentiment of the two given example reviews in the code below for the model trained without a LSTM (`embeddings_model`) and for the model trained with a LSTM and GloVe embeddings (`lstm_glove_model`). Report the predictions (you can use the same table as when reporting test accuracies). Discuss the results. Also discuss the differences you can observe between the GloVe embeddings and the embeddings learnt in `embeddings_model` (e.g. what kind of properties the embeddings encode, or differences in the closest words).


We provide the training code you need to use for this exercise below. First we load the dataset as we did in the tutorial. In this exercise, we will use of train, validation and test splits, which are defined in the next cell.


In [None]:
# Parameters
nb_words = 5000
maxlen = 100

# Load IMDb dataset
dataset = load_dataset("imdb")

# Tokenizer
def tokenize(text):
    return nltk.word_tokenize(text.lower())

# Build vocabulary from training set
counter = Counter()
for text in dataset['train']['text']:
    counter.update(tokenize(text))

# Create word2idx with special tokens
most_common = counter.most_common(nb_words - 3)
word2idx = {'<PAD>': 0, '<START>': 1, '<UNK>': 2}
for idx, (word, _) in enumerate(most_common, start=3):
    word2idx[word] = idx

# Reverse index
idx2word = {idx: word for word, idx in word2idx.items()}

# Encode and pad a single text
def encode(text, maxlen=maxlen):
    tokens = tokenize(text)
    indices = [word2idx['<START>']] + [word2idx.get(t, word2idx['<UNK>']) for t in tokens]
    indices = indices[:maxlen]  # truncate if too long
    indices += [word2idx['<PAD>']] * (maxlen - len(indices))  # pad if too short
    return torch.tensor(indices, dtype=torch.long)

# Preprocess list of texts
def preprocess(text_list):
    return torch.stack([encode(text) for text in text_list])

# Process datasets
x_train_full = preprocess(dataset['train']['text'])
y_train_full = torch.tensor(dataset['train']['label'], dtype=torch.long)
x_test = preprocess(dataset['test']['text'])
y_test = torch.tensor(dataset['test']['label'], dtype=torch.long)

# Train/Val split
x_val = x_train_full[20000:]
y_val = y_train_full[20000:]
x_train = x_train_full[:20000]
y_train = y_train_full[:20000]

# Print shapes
print('x_train shape:', x_train.shape)
print('x_val shape:', x_val.shape)
print('x_test shape:', x_test.shape)

# Wrap in DataLoaders
train_loader = DataLoader(TensorDataset(x_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(TensorDataset(x_val, y_val), batch_size=32)
test_loader = DataLoader(TensorDataset(x_test, y_test), batch_size=32)

In [None]:
# Function implementing early stopping logic

def train_with_early_stopping(
    model,
    train_loader,
    val_loader,
    test_loader,
    optimizer,
    criterion,
    device,
    max_epochs=50,
    patience=10,
    reshape=False
):
    best_val_acc = 0.0
    best_model_state = None
    epochs_no_improve = 0
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': []
    }

    for epoch in range(max_epochs):
        # --- Training ---
        model.train()
        train_loss, train_correct, total = 0.0, 0, 0
        for x_batch, y_batch in train_loader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device).float()

            optimizer.zero_grad()
            outputs = model(x_batch)
            if reshape:
              loss = criterion(outputs, y_batch.unsqueeze(1).float())
            else:
              loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * x_batch.size(0)
            preds = (torch.sigmoid(outputs) >= 0.5).long()
            train_correct += (preds.view(-1) == y_batch.long()).sum().item()
            total += x_batch.size(0)

        avg_train_loss = train_loss / total
        train_acc = train_correct / total

        # --- Validation ---
        model.eval()
        val_loss, val_correct, val_total = 0.0, 0, 0
        with torch.no_grad():
            for x_val, y_val in val_loader:
                x_val, y_val = x_val.to(device), y_val.to(device).float()
                outputs = model(x_val)
                if reshape:
                  loss = criterion(outputs, y_val.unsqueeze(1).float())
                else:
                  loss = criterion(outputs, y_val)

                val_loss += loss.item() * x_val.size(0)
                preds = (torch.sigmoid(outputs) >= 0.5).long()
                val_correct += (preds.view(-1) == y_val.long()).sum().item()
                val_total += x_val.size(0)

        avg_val_loss = val_loss / val_total
        val_acc = val_correct / val_total

        # Save history
        history['train_loss'].append(avg_train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(avg_val_loss)
        history['val_acc'].append(val_acc)

        print(f"Epoch {epoch+1}: Train Loss={avg_train_loss:.4f}, Train Acc={train_acc:.4f}, Val Loss={avg_val_loss:.4f}, Val Acc={val_acc:.4f}")

        # --- Early stopping logic ---
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = copy.deepcopy(model.state_dict())
            epochs_no_improve = 0
            print("  ↳ New best model saved")
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping at epoch {epoch+1}")
                break

    # Load best model
    model.load_state_dict(best_model_state)

    # --- Final test evaluation ---
    model.eval()
    test_loss, test_correct, test_total = 0.0, 0, 0
    with torch.no_grad():
        for x_test, y_test in test_loader:
            x_test, y_test = x_test.to(device), y_test.to(device).float()
            outputs = model(x_test)
            if reshape:
              loss = criterion(outputs, y_test.unsqueeze(1).float())
            else:
              loss = criterion(outputs, y_test)

            test_loss += loss.item() * x_test.size(0)
            preds = (torch.sigmoid(outputs) >= 0.5).long()
            test_correct += (preds.view(-1) == y_test.long()).sum().item()
            test_total += x_test.size(0)

    final_test_loss = test_loss / test_total
    final_test_acc = test_correct / test_total

    print(f"Final Test Loss: {final_test_loss:.4f}")
    print(f"Final Test Accuracy: {final_test_acc:.4f}")

    return history, final_test_loss, final_test_acc, best_model_state

The following code includes the model that uses embeddings of size 1 (so each word is only represented by a single digit) and averages them.

In [None]:
# Define the model
class EmbeddingsModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim=1):
        super(EmbeddingsModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.global_avg_pool = lambda x: x.mean(dim=1)
        self.fc = nn.Linear(embedding_dim, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.embedding(x)                     # [batch_size, maxlen, 1]
        x = self.global_avg_pool(x)               # [batch_size, 1]
        x = self.fc(x)                            # [batch_size, 1]
        return x

In [None]:
set_seed(42)

# Instantiate model
embeddings_model = EmbeddingsModel(
    vocab_size=len(word2idx)
).to(DEVICE)

# Loss and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(embeddings_model.parameters(), lr=1e-3)

# Call the training function
history, test_loss, test_acc, best_embeddings_model_state = train_with_early_stopping(
    model=embeddings_model,
    train_loader=train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    optimizer=optimizer,
    criterion=criterion,
    device=DEVICE,
    reshape=True
)

We use Early Stopping, so the best validation model is then used to compute the result in the test set.

Now we have `embedding_model` trained. The code below will print the embedding of any `query_word`, which in this case is a single number. We also give you the code to compute the `top_k` closest embeddings to `query_word`. The metric used is the L2 distance.

In [None]:
def get_most_similar_words(best_model_state, model_class, parameters,
                           query, word2idx, idx2word, top_k=10, is_index=False):
    """
    query: can be a word (like 'cat') or an index (like 1 if is_index=True)
    """
    # Load model and set to eval mode
    model = model_class(**parameters)
    model.load_state_dict(best_model_state)
    model.eval()

    # Get embeddings
    embeddings = model.embedding.weight.data.cpu().numpy()

    # Handle query input
    if is_index:
        query_idx = int(query)
        query_word = idx2word.get(query_idx, "<UNK>")
    else:
        query_word = query
        if query_word not in word2idx:
            print(f"Word '{query_word}' not found in vocab.")
            return
        query_idx = word2idx[query_word]

    query_vector = embeddings[query_idx]

    # Compute L2 distances
    distances = ((embeddings - query_vector) ** 2).sum(axis=1)
    nearest_indices = distances.argsort()[1:top_k+1]

    print(f"Query index: {query_idx}")
    print(f"Query word: '{query_word}'")
    print(f"Embedding value of '{query_word}' is {query_vector[0]:.6f}")
    print(f"Most {top_k} similar words to '{query_word}':")
    for rank, idx in enumerate(nearest_indices, start=1):
        print(f"{rank}: {idx2word.get(idx, '<UNK>')}")

# Example usage
parameters = {
    "vocab_size": len(word2idx)
}

get_most_similar_words(
    best_model_state=best_embeddings_model_state,
    model_class=EmbeddingsModel,
    parameters=parameters,
    query='8',            # Query parameter
    word2idx=word2idx,
    idx2word=idx2word,
    top_k=10,
    is_index=False
)

The code below gives the prediction for two example reviews we input. Remember that predictions close to 0 refer to a negative review, and predictions close to 1 refer to a positive review.

In [None]:
def predict_sentiment(text, model_class, best_model_state, word2idx, parameters, maxlen, device='cpu'):
    # Re-instantiate and load model
    model = model_class(**parameters)
    model.load_state_dict(best_model_state)
    model.to(device)
    model.eval()

    # Tokenize and encode
    tokens = text.lower().split()
    encoded = [word2idx.get('<START>', 1)] + [word2idx.get(w, word2idx['<UNK>']) for w in tokens]
    tensor = torch.tensor(encoded, dtype=torch.long).unsqueeze(0)  # batch of 1

    # Pad or truncate
    if tensor.size(1) < maxlen:
        pad_len = maxlen - tensor.size(1)
        tensor = F.pad(tensor, (0, pad_len), value=word2idx['<PAD>'])
    else:
        tensor = tensor[:, :maxlen]

    tensor = tensor.to(device)

    # Predict
    with torch.no_grad():
        output = model(tensor)
        prob = torch.sigmoid(output).item()
    return prob

In [None]:
neg_review = "the movie is boring and not good"
pos_review = "the movie is good and not boring"

neg_score = predict_sentiment(
    text=neg_review,
    model_class=EmbeddingsModel,
    best_model_state=best_embeddings_model_state,
    word2idx=word2idx,
    parameters=parameters,
    maxlen=100
)

pos_score = predict_sentiment(
    text=pos_review,
    model_class=EmbeddingsModel,
    best_model_state=best_embeddings_model_state,
    word2idx=word2idx,
    parameters=parameters,
    maxlen=100
)

print(f"The score for the negative review is: {neg_score}")
print(f"The score for the positive review is: {pos_score}")

With the above code, we trained a model that classifies the sentiment of the sentence using the average of all the embeddings, which were only of size 1. Now we will increase the capacity of the embeddings to 300 and will also add a LSTM to process the embeddings. Hence, the model has a much higher capacity.

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim=300, lstm_units=50, dropout=0.2):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        ### Do not modify the layers below
        self.dropout1 = nn.Dropout(dropout)
        self.lstm = nn.LSTM(embedding_dim, lstm_units, batch_first=True, num_layers=2, dropout=dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.fc = nn.Linear(lstm_units, 1)

    def forward(self, x):
        x = self.embedding(x)                     # [batch_size, seq_len, embedding_dim]
        x = self.dropout1(x)
        output, (hidden, _) = self.lstm(x)        # hidden: [1, batch_size, lstm_units]
        x = self.dropout2(hidden[-1])             # Take the final hidden state
        x = self.fc(x)                            # [batch_size, 1]
        return x

Similarly, we use EarlyStopping for this model.

In [None]:
set_seed(42)

# Instantiate model
lstm_model = LSTMModel(
    vocab_size=len(word2idx),
    embedding_dim=300,
    lstm_units=50
).to(DEVICE)

# Loss and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(lstm_model.parameters(), lr=1e-3)

# Call the training function
history, test_loss, test_acc, best_lstm_state = train_with_early_stopping(
    model=lstm_model,
    train_loader=train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    optimizer=optimizer,
    criterion=criterion,
    device=DEVICE,
    reshape=True
)

In [None]:
parameters = {
    "vocab_size": len(word2idx)
}

neg_review = "the movie is boring and not good"
pos_review = "the movie is good and not boring"

neg_score = predict_sentiment(
    text=neg_review,
    model_class=LSTMModel,
    best_model_state=best_lstm_state,
    word2idx=word2idx,
    parameters=parameters,
    maxlen=100
)

pos_score = predict_sentiment(
    text=pos_review,
    model_class=LSTMModel,
    best_model_state=best_lstm_state,
    word2idx=word2idx,
    parameters=parameters,
    maxlen=100
)

print(f"The score for the negative review is: {neg_score}")
print(f"The score for the positive review is: {pos_score}")

We just trained a model with a large number of parameters in the IMDB, which is a small dataset.

The last model we train is the same model as the `lstm_model` above, but in this case we use the embeddings from the GloVe method (which were introduced in this notebook) without any finetuning. First, we download them.

In [None]:
!wget https://imperialcollegelondon.box.com/shared/static/c9trfhhwl9ohje5g3sapu3xk2zoywp3c.txt -O glove_vectors.txt

Then we load the GloVe embeddings with dimensionality 300 we just downloaded. This takes some time.

In [None]:
# Load GloVe vectors and build vocab and weight matrix
def load_glove(glove_path, embedding_dim):
    vocab = {}
    vectors = []
    skipped = 0

    with open(glove_path, "r", encoding="utf-8") as f:
        first_line = f.readline()
        if len(first_line.strip().split()) == 2:
            print(f"Skipping header: {first_line.strip()}")
        else:
            f.seek(0)

        for line in f:
            tokens = line.rstrip().split()
            word, values = tokens[0], tokens[1:]

            if len(values) != embedding_dim:
                skipped += 1
                continue

            try:
                vector = np.array(values, dtype=np.float32)
            except ValueError:
                skipped += 1
                continue

            vocab[word] = len(vectors)
            vectors.append(torch.from_numpy(vector))

    if not vectors:
        raise RuntimeError("No valid embeddings loaded. Check file format.")

    weight_matrix = torch.stack(vectors)
    print(f"Loaded {len(vectors)} word vectors. Skipped {skipped} lines.")
    return vocab, weight_matrix

# Example
glove_path = "glove_vectors.txt"
vocab, weight_matrix = load_glove(glove_path, embedding_dim=300)

In [None]:
embedding_dim = 300
embedding_matrix = torch.zeros((nb_words, embedding_dim))

# Align GloVe vectors to our vocabulary
for word, idx in word2idx.items():
    if idx >= nb_words:
        continue  # Skip words beyond vocab limit
    if word in vocab:
        embedding_matrix[idx] = weight_matrix[vocab[word]]

To initialize the PyTorch Embedding layer with the embeddings we loaded, we can use the function `nn.Embedding.from_pretrained`. Also, to freeze the embeddings during training, we use `freeze=freeze`.

In [None]:
class LSTMWithGloVe(nn.Module):
    def __init__(self, embedding_tensor, lstm_units=50, dropout=0.2, freeze=True):
        super(LSTMWithGloVe, self).__init__()

        # Define embedding layer from pretrained tensor
        self.embedding = nn.Embedding.from_pretrained(embedding_tensor, freeze=freeze)

        self.dropout1 = nn.Dropout(dropout)
        self.lstm = nn.LSTM(
            input_size=embedding_tensor.shape[1],  # Embedding dim
            hidden_size=lstm_units,
            batch_first=True,
            dropout=dropout,
            num_layers=2
        )
        self.dropout2 = nn.Dropout(dropout)
        self.fc = nn.Linear(lstm_units, 1)

    def forward(self, x):
        x = self.embedding(x)                     # [batch_size, seq_len, embedding_dim]
        x = self.dropout1(x)
        output, (hidden, _) = self.lstm(x)        # hidden: [1, batch_size, lstm_units]
        x = self.dropout2(hidden[-1])             # Final hidden state
        x = self.fc(x)                            # [batch_size, 1]
        return x

In [None]:
set_seed(42)

# Instantiate model
lstm_glove_model = LSTMWithGloVe(
    embedding_tensor=embedding_matrix,
    lstm_units=50
).to(DEVICE)

# Loss and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(lstm_glove_model.parameters(), lr=1e-3)

# Call the training function
history, test_loss, test_acc, best_lstm_glove_state = train_with_early_stopping(
    model=lstm_glove_model,
    train_loader=train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    optimizer=optimizer,
    criterion=criterion,
    device=DEVICE,
    reshape=True
)

We can also compute the closest words in the GloVe embeddings to any `query_word` using the code below.

In [None]:
parameters = {
    "embedding_tensor": embedding_matrix
}

get_most_similar_words(
    best_model_state=best_lstm_glove_state,
    model_class=LSTMWithGloVe,
    parameters=parameters,
    query='8',              # Query parameter
    word2idx=word2idx,
    idx2word=idx2word,
    top_k=10,
    is_index=False
)

We use the same example reviews as for the `EmbeddingsModel` case and we compute the predictions using the `LSTMWithGloVe`.

In [None]:
neg_review = "the movie is boring and not good"
pos_review = "the movie is good and not boring"

neg_score = predict_sentiment(
    text=neg_review,
    model_class=LSTMWithGloVe,
    best_model_state=best_lstm_glove_state,
    word2idx=word2idx,
    parameters=parameters,
    maxlen=100
)

pos_score = predict_sentiment(
    text=pos_review,
    model_class=LSTMWithGloVe,
    best_model_state=best_lstm_glove_state,
    word2idx=word2idx,
    parameters=parameters,
    maxlen=100
)

print(f"The score for the negative review is: {neg_score}")
print(f"The score for the positive review is: {pos_score}")

### **Task 3: Text Generation**
In this task we focus on the text generation problem. For this purpose, we will download the scripts of the TV show Game of Thrones and try to generate some text resembling the style of the scripts.


**Report**
* Plot the retrieved BLEU for different temperature values (from 0 to 2 in the x-axis) for both the character-level model and the word-level model. To compute the BLEU score, use a minimum of 20 generated samples per temperature used to reduce variability (you can increase it at the cost of higher computational time for lower variability). Each sample should contain 100 characters for the char-level model or 30 words for the word-level model (the code given uses these parameters by default). Do you see any relationship between the obtained BLEU score and temperature used? If you generate sentences at different temperatures what differences can you observe? Are the generated sentences grammatically correct? Do the generated sentences make sense?

We give below the code needed to download the dataset and to compute the results.

We first download and read the dataset.

In [None]:
!git clone https://github.com/shekharkoirala/Game_of_Thrones

data = open('./Game_of_Thrones/Data/final_data.txt', 'r').read()

**Character-level model**

We first include the code to build the character-level dataset.

In [None]:
# Vocabulary
characters = sorted(set(data))
n_to_char = {i: ch for i, ch in enumerate(characters)}
char_to_n = {ch: i for i, ch in enumerate(characters)}

# Sliding windows
seq_char_length = 100
x_char = np.array([
    [char_to_n[ch] for ch in data[i:i+seq_char_length]]
    for i in range(len(data) - seq_char_length)
], dtype=np.int64)

y_char = np.array([
    char_to_n[data[i+seq_char_length]]
    for i in range(len(data) - seq_char_length)
], dtype=np.int64)

print("Total Samples:", len(x_char))
print("x_char shape:", x_char.shape, "y_char shape:", y_char.shape)

The splits used for training are given below, although we already give the model trained.

In [None]:
# Sizes
n_samples = len(x_char)
n_samples_train = int(n_samples * 0.7)
n_samples_test  = int(n_samples * 0.2)
n_samples_val   = n_samples - n_samples_train - n_samples_test

# Train/val/test splits
x_train_char = x_char[:n_samples_train]
y_train_char = y_char[:n_samples_train]

x_val_char   = x_char[n_samples_train:n_samples_train + n_samples_val]
y_val_char   = y_char[n_samples_train:n_samples_train + n_samples_val]

x_test_char  = x_char[n_samples_train + n_samples_val:]
y_test_char  = y_char[n_samples_train + n_samples_val:]

# Convert all to torch tensors in one go
to_tensor = lambda arr: torch.tensor(arr, dtype=torch.long)

x_train_char, y_train_char = map(to_tensor, (x_train_char, y_train_char))
x_val_char,   y_val_char   = map(to_tensor, (x_val_char, y_val_char))
x_test_char,  y_test_char  = map(to_tensor, (x_test_char, y_test_char))

print(f"x_train_char: {x_train_char.shape}, y_train_char: {y_train_char.shape}")
print(f"x_val_char:   {x_val_char.shape},   y_val_char:   {y_val_char.shape}")
print(f"x_test_char:  {x_test_char.shape},  y_test_char:  {y_test_char.shape}")

The definition of the model is the one given below. You will not train the model, so this piece of code is only for you to know what kind of model we trained for this task.

In [None]:
# define the LSTM model
class CharLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_size=300, lstm_units=256):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_size)
        self.lstm = nn.LSTM(embedding_size, lstm_units, batch_first=True)
        self.fc = nn.Linear(lstm_units, vocab_size)

    def forward(self, x):
        emb = self.embedding(x)                 # (B, T, E)
        out, _ = self.lstm(emb)                 # (B, T, H)
        last = out[:, -1, :]                    # (B, H)  last time step
        logits = self.fc(last)                  # (B, V)
        return logits                           # raw logits

As the training takes a while, we include a saved model that you can load to skip the training step. Use this model to compute your results.

In [None]:
'''
CODE USED FOR TRAINING (DO NOT RUN IT!)

from google.colab import files

# ---- config ----
batch_size = 128
epochs = 100
patience = 10
lr = 1e-3
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

vocab_size = len(characters)
model = CharLSTM(vocab_size=vocab_size, embedding_size=300, lstm_units=256).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

# ---- data loaders (labels are integer class ids) ----
train_ds = TensorDataset(x_train_char, y_train_char)
val_ds   = TensorDataset(x_val_char,   y_val_char)
test_ds  = TensorDataset(x_test_char,  y_test_char)

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size)
test_loader  = DataLoader(test_ds,  batch_size=batch_size)

# ---- early stopping ----
class EarlyStopping:
    def __init__(self, patience=10, mode='max', min_delta=0.0, restore_best=True):
        self.patience = patience
        self.mode = mode  # 'max' for accuracy
        self.min_delta = min_delta
        self.restore_best = restore_best
        self.best_score = None
        self.counter = 0
        self.best_state = None
        self.best_epoch = 0

    def step(self, score, model, epoch):
        improve = False
        if self.best_score is None:
            improve = True
        else:
            if self.mode == 'max':
                improve = score > (self.best_score + self.min_delta)
            else:
                improve = score < (self.best_score - self.min_delta)

        if improve:
            self.best_score = score
            self.counter = 0
            if self.restore_best:
                self.best_state = {k: v.detach().clone() for k, v in model.state_dict().items()}
            self.best_epoch = epoch
            return False
        else:
            self.counter += 1
            return self.counter > self.patience  # stop if patience exceeded

    def restore(self, model):
        if self.restore_best and self.best_state is not None:
            model.load_state_dict(self.best_state)

early_stop = EarlyStopping(patience=patience, mode='max', min_delta=0.0, restore_best=True)

# ---- history ----
history = {
    'loss': [],
    'val_loss': [],
    'accuracy': [],
    'val_accuracy': []
}

def accuracy_from_logits(logits, targets):
    preds = logits.argmax(dim=1)
    return (preds == targets).float().mean().item()

# ---- training loop ----
best_path = 'char_gen_model.pth'
for epoch in range(1, epochs + 1):
    # train
    model.train()
    epoch_loss = 0.0
    epoch_acc = 0.0
    n_batches = 0

    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        logits = model(xb)            # [B, V]
        loss = criterion(logits, yb)  # yb: [B] integer targets
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc  += accuracy_from_logits(logits, yb)
        n_batches  += 1

    train_loss = epoch_loss / n_batches
    train_acc  = epoch_acc  / n_batches

    # validate
    model.eval()
    val_loss = 0.0
    val_acc  = 0.0
    n_val    = 0
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)
            val_loss += loss.item()
            val_acc  += accuracy_from_logits(logits, yb)
            n_val    += 1

    val_loss /= max(1, n_val)
    val_acc  /= max(1, n_val)

    # record history
    history['loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['accuracy'].append(train_acc)
    history['val_accuracy'].append(val_acc)

    print(f"Epoch {epoch:3d}/{epochs}  "
          f"loss={train_loss:.4f}  acc={train_acc:.4f}  "
          f"val_loss={val_loss:.4f}  val_acc={val_acc:.4f}")

    # early stopping on val_accuracy
    stop = early_stop.step(val_acc, model, epoch)

    # save best
    if early_stop.best_state is not None and early_stop.best_epoch == epoch:
        torch.save(early_stop.best_state, best_path)

    if stop:
        print(f"Early stopping at epoch {epoch}. Restoring best epoch {early_stop.best_epoch}...")
        break

# restore best weights
early_stop.restore(model)
torch.save(model.state_dict(), best_path)

# ---- test evaluation ----
model.eval()
test_loss = 0.0
test_acc = 0.0
n_test = 0
with torch.no_grad():
    for xb, yb in test_loader:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb)
        loss = criterion(logits, yb)
        test_loss += loss.item()
        test_acc  += accuracy_from_logits(logits, yb)
        n_test    += 1

test_loss /= max(1, n_test)
test_acc  /= max(1, n_test)

print(f"\nFinal test loss is: {test_loss:.4f}")
print(f"Final test accuracy is: {test_acc:.4f}")
print(f"Best model saved to: {best_path}")
files.download(best_path)
'''

In [None]:
# Load the model
!wget 'https://raw.githubusercontent.com/MatchLab-Imperial/deep-learning-course/master/asset/05_RNN/char_gen_model.pth' -O char_gen_model.pth

vocab_size = len(characters)
model = CharLSTM(vocab_size=vocab_size, embedding_size=300, lstm_units=256).to(DEVICE)
model.load_state_dict(torch.load("char_gen_model.pth", map_location=DEVICE))

The code you need to evaluate the BLEU score is given below. Vary the temperature to the different needed values. It takes around 1 minute in average per temperature if `n_eval` is set to 20.

In [None]:
characters = sorted(list(set(data)))
n_to_char = {n: char for n, char in enumerate(characters)}
char_to_n = {char: n for n, char in enumerate(characters)}

# Vary the temperature here
temperature = 0.5
n_eval = 20
seq_char_length = 100
smoother = SmoothingFunction().method1
bleu_score = 0.0

model.eval()

with torch.no_grad():
  for _ in range(n_eval):
      # Randomly select a starting point in the test data
      start = np.random.randint(0, len(x_test_char) - seq_char_length - 1)
      pattern = x_test_char[start].tolist()
      reference = x_test_char[start + seq_char_length].tolist()

      # Convert reference from numbers to characters
      reference = ''.join([n_to_char[value] for value in reference])

      # Generate characters using the model
      output_sent = ''
      for _n in range(seq_char_length):
          x = torch.tensor(pattern, dtype=torch.long, device=DEVICE).unsqueeze(0)
          logits = model(x)
          temp = float(temperature) + 0.01
          probs = F.softmax((logits + 1e-7) / temp, dim=-1)
          idx = torch.multinomial(probs[0], num_samples=1).item()
          output_sent += n_to_char[idx]
          pattern.append(idx)
          pattern = pattern[1:]

      # Preprocess reference and candidate text
      reference = word_tokenize(reference.lower())
      candidate = word_tokenize(output_sent.lower())
      reference = list(filter(lambda x: x != '', reference))
      candidate = list(filter(lambda x: x != '', candidate))

      # Remove incomplete words at the beginning and end of both lists
      if len(reference) > 2:
          reference = reference[1:-1]
      if len(candidate) > 2:
          candidate = candidate[1:-1]

      # Compute BLEU score
      bleu_score += sentence_bleu([reference], candidate, smoothing_function=smoother)

bleu_score /= n_eval
print("BLEU Score:", bleu_score)

The code below allows you to generate sentences for different input patterns and different temperature values. You can test how the temperature values affect the quality of the output sentences for the character-level model by generating a few examples.

In [None]:
# Change the temperature here
temperature = 0.7
seed_text = "TYRION pours himself some wine and drinks it down. He pours another glass, and walks back to CERSEI "
pattern = [char_to_n[ch] for ch in seed_text[:seq_char_length]]

print("\nPredicted:")
model.eval()

with torch.no_grad():
    for _ in range(300):
        x = torch.tensor(pattern, dtype=torch.long, device=DEVICE).unsqueeze(0)
        logits = model(x)

        if temperature == 0:
            idx = torch.argmax(logits, dim=-1).item()
        else:
            probs = F.softmax(logits / temperature, dim=-1)
            idx = torch.multinomial(probs, num_samples=1).item()

        ch = n_to_char[idx]
        sys.stdout.write(ch)
        sys.stdout.flush()

        pattern = pattern[1:] + [idx]

**Word-level model**

We now give the code to run the word-level model. The code is similar to the char-level model. The main difference is that we only try to predict the 2000 words most commonly used in the dataset. The reason for this limitation is to limit the size of the output layer and number of input embeddings for memory constraints.

In [None]:
n_words = 2000
seq_length = 30

# Preprocess text: lowercase + space around punctuation
data_p = (
    data.replace('.', ' . ').replace(',', ' , ').replace(':', ' : ')
        .replace('?', ' ? ').replace('!', ' ! ')
        .replace('\n', ' \n ').replace('[', ' [ ').replace(']', ' ] ')
        .replace(')', ' ) ').replace('(', ' ( ').lower().split()
)
data_p = [tok for tok in data_p if tok.strip()]

# Build vocab (most common words)
common = [w for w, _ in Counter(data_p).most_common(n_words)]
word_to_n = {w: i for i, w in enumerate(common)}
n_to_word = {i: w for i, w in enumerate(common)}
OOV_IDX = len(word_to_n)

# Build dataset
x_word, y_word = [], []
for i in range(len(data_p) - seq_length):
    seq = [word_to_n.get(w, OOV_IDX) for w in data_p[i:i+seq_length]]
    label = data_p[i + seq_length]
    if label in word_to_n:  # only predict in-vocab words
        x_word.append(seq)
        y_word.append(word_to_n[label])

n_samples = len(x_word)
print("Total Samples:", n_samples)

In [None]:
n_samples = len(x_word)
n_samples_train = int(n_samples * 0.7)
n_samples_test  = int(n_samples * 0.2)
n_samples_val   = n_samples - n_samples_train - n_samples_test

# Train / val / test splits
x_train_word = x_word[:n_samples_train]
y_train_word = y_word[:n_samples_train]

x_val_word   = x_word[n_samples_train:n_samples_train+n_samples_val]
y_val_word   = y_word[n_samples_train:n_samples_train+n_samples_val]

x_test_word  = x_word[n_samples_train+n_samples_val:]
y_test_word  = y_word[n_samples_train+n_samples_val:]

# Convert labels to numpy first
y_train_word = np.array(y_train_word)
y_val_word   = np.array(y_val_word)
y_test_word  = np.array(y_test_word)

# Convert all to tensors
x_train_word = torch.tensor(x_train_word, dtype=torch.long)
x_val_word   = torch.tensor(x_val_word,   dtype=torch.long)
x_test_word  = torch.tensor(x_test_word,  dtype=torch.long)

y_train_word = torch.tensor(y_train_word, dtype=torch.long)
y_val_word   = torch.tensor(y_val_word,   dtype=torch.long)
y_test_word  = torch.tensor(y_test_word,  dtype=torch.long)

# Shapes
print(f"x_train_word: {x_train_word.shape}, y_train_word: {y_train_word.shape}")
print(f"x_val_word:   {x_val_word.shape},   y_val_word:   {y_val_word.shape}")
print(f"x_test_word:  {x_test_word.shape},  y_test_word:  {y_test_word.shape}")

The definition of the word-level model we train is given below. The model is the same as in the char-level case, the only difference is the size of the output vector and the number of input embeddings.

In [None]:
# define the LSTM model
class WordLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_size=300, lstm_units=256):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_size)
        self.lstm = nn.LSTM(embedding_size, lstm_units, batch_first=True)
        self.fc = nn.Linear(lstm_units, vocab_size)

    def forward(self, x):
        emb = self.embedding(x)           # (B, T, E)
        out, _ = self.lstm(emb)           # (B, T, H)
        last = out[:, -1, :]              # (B, H) last timestep
        logits = self.fc(last)            # (B, V)
        return logits                     # raw logits

As with the character-level model, training the word-level model takes a while. Use the saved model we included to compute your results.

In [None]:
'''
CODE USED FOR TRAINING (DO NOT RUN IT!)

from google.colab import files

# ---- config ----
batch_size = 128
epochs = 100
patience = 10
lr = 1e-3
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

vocab_size = n_words + 1
model = WordLSTM(vocab_size=vocab_size, embedding_size=300, lstm_units=256).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

# ---- data loaders (labels are integer class ids) ----
train_ds = TensorDataset(x_train_word, y_train_word)
val_ds   = TensorDataset(x_val_word,   y_val_word)
test_ds  = TensorDataset(x_test_word,  y_test_word)

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size)
test_loader  = DataLoader(test_ds,  batch_size=batch_size)

# ---- early stopping ----
class EarlyStopping:
    def __init__(self, patience=10, mode='max', min_delta=0.0, restore_best=True):
        self.patience = patience
        self.mode = mode  # 'max' for accuracy
        self.min_delta = min_delta
        self.restore_best = restore_best
        self.best_score = None
        self.counter = 0
        self.best_state = None
        self.best_epoch = 0

    def step(self, score, model, epoch):
        improve = False
        if self.best_score is None:
            improve = True
        else:
            if self.mode == 'max':
                improve = score > (self.best_score + self.min_delta)
            else:
                improve = score < (self.best_score - self.min_delta)

        if improve:
            self.best_score = score
            self.counter = 0
            if self.restore_best:
                self.best_state = {k: v.detach().clone() for k, v in model.state_dict().items()}
            self.best_epoch = epoch
            return False
        else:
            self.counter += 1
            return self.counter > self.patience  # stop if patience exceeded

    def restore(self, model):
        if self.restore_best and self.best_state is not None:
            model.load_state_dict(self.best_state)

early_stop = EarlyStopping(patience=patience, mode='max', min_delta=0.0, restore_best=True)

# ---- history ----
history = {
    'loss': [],
    'val_loss': [],
    'accuracy': [],
    'val_accuracy': []
}

def accuracy_from_logits(logits, targets):
    preds = logits.argmax(dim=1)
    return (preds == targets).float().mean().item()

# ---- training loop ----
best_path = 'word_gen_model.pth'
for epoch in range(1, epochs + 1):
    # train
    model.train()
    epoch_loss = 0.0
    epoch_acc = 0.0
    n_batches = 0

    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        logits = model(xb)            # [B, V]
        loss = criterion(logits, yb)  # yb: [B] integer targets
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc  += accuracy_from_logits(logits, yb)
        n_batches  += 1

    train_loss = epoch_loss / n_batches
    train_acc  = epoch_acc  / n_batches

    # validate
    model.eval()
    val_loss = 0.0
    val_acc  = 0.0
    n_val    = 0
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)
            val_loss += loss.item()
            val_acc  += accuracy_from_logits(logits, yb)
            n_val    += 1

    val_loss /= max(1, n_val)
    val_acc  /= max(1, n_val)

    # record history
    history['loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['accuracy'].append(train_acc)
    history['val_accuracy'].append(val_acc)

    print(f"Epoch {epoch:3d}/{epochs}  "
          f"loss={train_loss:.4f}  acc={train_acc:.4f}  "
          f"val_loss={val_loss:.4f}  val_acc={val_acc:.4f}")

    # early stopping on val_accuracy
    stop = early_stop.step(val_acc, model, epoch)
    if early_stop.best_state is not None and early_stop.best_epoch == epoch:
        torch.save(early_stop.best_state, best_path)

    if stop:
        print(f"Early stopping at epoch {epoch}. Restoring best epoch {early_stop.best_epoch}...")
        break

# restore best weights
early_stop.restore(model)
torch.save(model.state_dict(), best_path)

# ---- test evaluation ----
model.eval()
test_loss = 0.0
test_acc = 0.0
n_test = 0
with torch.no_grad():
    for xb, yb in test_loader:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb)
        loss = criterion(logits, yb)
        test_loss += loss.item()
        test_acc  += accuracy_from_logits(logits, yb)
        n_test    += 1

test_loss /= max(1, n_test)
test_acc  /= max(1, n_test)

print(f"\nFinal test loss is: {test_loss:.4f}")
print(f"Final test accuracy is: {test_acc:.4f}")
print(f"Best model saved to: {best_path}")
files.download(best_path)
'''

In [None]:
# Load the model
!wget 'https://raw.githubusercontent.com/MatchLab-Imperial/deep-learning-course/master/asset/05_RNN/word_gen_model.pth' -O word_gen_model.pth

vocab_size = n_words + 1
model = WordLSTM(vocab_size=vocab_size, embedding_size=300, lstm_units=256).to(DEVICE)
model.load_state_dict(torch.load("word_gen_model.pth", map_location=DEVICE))

In [None]:
# Vary the temperature here
temperature = 0.7
n_eval = 20
seq_char_length = 100
smoother = SmoothingFunction().method1
bleu_score = 0.0

model.eval()

with torch.no_grad():
  for _ in range(n_eval):
      # We look for references that do not contain any non-common words as we only
      # learnt to predict the 2000 most common words
      while True:
          start = np.random.randint(0, len(x_test_word)-seq_length-1)
          pattern = x_test_word[start].tolist()
          reference = x_test_word[start+seq_length].tolist()
          if n_words not in reference:
              break
      reference = ' '.join([n_to_word[value] for value in reference])

      # generate words
      output_sent = ''
      for i in range(seq_length):
          x = torch.tensor(pattern, dtype=torch.long, device=DEVICE).unsqueeze(0)
          logits = model(x)
          probs = F.softmax((logits + 1e-7) / (float(temperature) + 0.01), dim=-1)
          idx = torch.multinomial(probs[0], num_samples=1).item()

          word = n_to_word.get(idx, '')
          output_sent += word + ' '

          pattern.append(idx)
          pattern = pattern[1:]

      # Preprocess reference and candidate text
      reference = word_tokenize(reference.lower())
      candidate = word_tokenize(output_sent.lower())

      # Remove empty strings (if any) after tokenization
      reference = list(filter(lambda x: x != '', reference))
      candidate = list(filter(lambda x: x != '', candidate))

      # Remove incomplete words at the beginning and end of both lists
      if len(reference) > 2:
          reference = reference[1:-1]
      if len(candidate) > 2:
          candidate = candidate[1:-1]

      # Compute BLEU score for the candidate and reference
      bleu_score += sentence_bleu([reference], candidate, smoothing_function=smoother)

bleu_score /= n_eval
print("BLEU Score:", bleu_score)

In [None]:
# Vary the temperature here
temperature = 0.7
seed_text = (
    "TYRION pours himself some wine and drinks it down. He pours another glass, "
    "and walks back to CERSEI placing his cup on her desk. He takes another glass.\n"
    "TYRION: "
)

# Preprocess seed
pattern = (
    seed_text.replace('.', ' . ').replace(',', ' , ').replace(':', ' : ')
    .replace('?', ' ? ').replace('!', ' ! ')
    .replace('\n', ' \n ').replace('[', ' [ ').replace(']', ' ] ')
    .replace(')', ' ) ').replace('(', ' ( ')
    .lower()
    .split()
)
pattern = [w for w in pattern if w.strip()][:seq_length]
print("\nInput Pattern:\n", " ".join(pattern))

# Map to indices (OOV → n_words)
pattern = [word_to_n.get(w, n_words) for w in pattern]

print("\nPredicted:")
model.eval()
with torch.no_grad():
    for _ in range(100):
        x = torch.tensor(pattern, dtype=torch.long, device=DEVICE).unsqueeze(0)
        logits = model(x)

        if temperature == 0:
            idx = torch.argmax(logits, dim=-1).item()
        else:
            probs = F.softmax(logits / temperature, dim=-1)
            idx = torch.multinomial(probs, num_samples=1).item()

        word = n_to_word.get(idx, "<UNK>")
        sys.stdout.write(word + " ")
        sys.stdout.flush()

        # Update pattern
        pattern = pattern[1:] + [idx]