# DD2424 - Project 3 - NLP

> Group 12: Tristan Perrot Paul Mauduit Adrien Jouanny Arthur Depret


## Imports

In [11]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import math
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import nltk
from tqdm.notebook import trange, tqdm
from nltk.corpus import gutenberg
nltk.download('gutenberg')
nltk.download('punkt')

device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
print('device:', device)
print('current directory:', os.getcwd())

device: cuda
current directory: /home/jovyan


[nltk_data] Downloading package gutenberg to /home/jovyan/nltk_data...
[nltk_data]   Package gutenberg is already up-to-date!
[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


## Data

In [12]:
# Load the data from the gutenberg corpus
alice = gutenberg.raw('carroll-alice.txt')
# Remove the header
header = alice.find("CHAPTER I.")
alice = alice[header:]

print(alice[:100])

CHAPTER I. Down the Rabbit-Hole

Alice was beginning to get very tired of sitting by her sister on t


In [13]:
words = alice.split()
word_counts = Counter(words)
vocab = list(word_counts.keys())
vocab_size = len(vocab)
word_to_int = {word: i for i, word in enumerate(vocab)}
int_to_word = {i: word for word, i in word_to_int.items()}
SEQUENCE_LENGTH = 64
samples = [words[i:i+SEQUENCE_LENGTH+1] for i in range(len(words)-SEQUENCE_LENGTH)]
print(vocab)
print(word_to_int)
print(int_to_word)



In [14]:
class TextDataset(Dataset):
    def __init__(self, samples, word_to_int):
        self.samples = samples
        self.word_to_int = word_to_int

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        input_seq = torch.LongTensor(
            [self.word_to_int[word] for word in sample[:-1]])
        target_seq = torch.LongTensor(
            [self.word_to_int[word] for word in sample[1:]])
        return input_seq, target_seq

In [15]:
BATCH_SIZE = 32
dataset = TextDataset(samples, word_to_int)
dataloader = DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
)
print(dataset[1])

(tensor([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,  3,
        18, 19, 12, 20, 21,  8, 22, 23, 24, 25, 26, 27, 28, 29,  3, 30, 15, 16,
         6, 31, 32, 33, 27, 34, 35, 24, 36, 37, 38, 39, 40, 41,  3, 42, 12, 43,
        44, 45,  5, 46, 35, 24, 47, 48, 26,  6]), tensor([ 2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,  3, 18,
        19, 12, 20, 21,  8, 22, 23, 24, 25, 26, 27, 28, 29,  3, 30, 15, 16,  6,
        31, 32, 33, 27, 34, 35, 24, 36, 37, 38, 39, 40, 41,  3, 42, 12, 43, 44,
        45,  5, 46, 35, 24, 47, 48, 26,  6, 49]))


In [16]:
def generate_square_subsequent_mask(sz):
    """
    Generate a square mask for the sequence. The masked positions are filled with float('-inf').
    Unmasked positions are filled with float(0.0).
    """
    mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float(
        '-inf')).masked_fill(mask == 1, float(0.0))
    return mask

In [17]:
class PositionalEncoding(nn.Module):
    def __init__(self, max_len, d_model, dropout=0.1):
        """
        :param max_len: Input length sequence.
        :param d_model: Embedding dimension.
        :param dropout: Dropout value (default=0.1)
        """
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(
            0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Inputs of forward function
        :param x: the sequence fed to the positional encoder model (required).
        Shape:
            x: [sequence length, batch size, embed dim]
            output: [sequence length, batch size, embed dim]
        """
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

In [18]:
class TextGen(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_layers, num_heads):
        super(TextGen, self).__init__()
        self.pos_encoder = PositionalEncoding(
            max_len=SEQUENCE_LENGTH, d_model=embed_dim)
        self.emb = nn.Embedding(vocab_size, embed_dim)
        self.decoder_layer = nn.TransformerDecoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            batch_first=True
        )
        self.decoder = nn.TransformerDecoder(
            decoder_layer=self.decoder_layer,
            num_layers=num_layers,
        )
        self.linear = nn.Linear(embed_dim, vocab_size)
        self.dropout = nn.Dropout(0.2)

    # Positional encoding is required. Else the model does not learn.
    def forward(self, x):
        emb = self.emb(x)

        # Generate input sequence mask with shape (SEQUENCE_LENGTH, SEQUENCE_LENGTH)
        input_mask = generate_square_subsequent_mask(x.size(1)).to(x.device)

        x = self.pos_encoder(emb)
        x = self.decoder(x, memory=x, tgt_mask=input_mask,
                         memory_mask=input_mask)
        x = self.dropout(x)
        out = self.linear(x)
        return out

In [19]:
epochs = 100
learning_rate = 0.001
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TextGen(
    vocab_size=vocab_size,
    embed_dim=100,
    num_layers=2,
    num_heads=2,
).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(model)
# Total parameters and trainable parameters.
total_params = sum(p.numel() for p in model.parameters())
print(f"{total_params:,} total parameters.")
total_trainable_params = sum(
    p.numel() for p in model.parameters() if p.requires_grad)
print(f"{total_trainable_params:,} training parameters.\n")

TextGen(
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (emb): Embedding(5289, 100)
  (decoder_layer): TransformerDecoderLayer(
    (self_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=100, out_features=100, bias=True)
    )
    (multihead_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=100, out_features=100, bias=True)
    )
    (linear1): Linear(in_features=100, out_features=2048, bias=True)
    (dropout): Dropout(p=0.1, inplace=False)
    (linear2): Linear(in_features=2048, out_features=100, bias=True)
    (norm1): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
    (norm3): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
    (dropout1): Dropout(p=0.1, inplace=False)
    (dropout2): Dropout(p=0.1, inplace=False)
    (dropout3): Dropout(p=0.1, inplace=False)
  )
  (decoder): Transform

In [20]:
# Training
def train(model, epochs, dataloader, criterion):
    model.train()
    for epoch in trange(epochs):
        running_loss = 0
        for input_seq, target_seq in dataloader:
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)
            outputs = model(input_seq)
            target_seq = target_seq.contiguous().view(-1)
            outputs = outputs.view(-1, vocab_size)

            loss = criterion(outputs, target_seq.view(-1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.detach().cpu().numpy()
        epoch_loss = running_loss / len(dataloader)
        print(f"Epoch {epoch} loss: {epoch_loss:.3f}")


train(model, epochs, dataloader, criterion)

  0%|          | 0/100 [00:00<?, ?it/s]

Epoch 0 loss: 4.149
Epoch 1 loss: 1.757
Epoch 2 loss: 1.063
Epoch 3 loss: 0.799
Epoch 4 loss: 0.661
Epoch 5 loss: 0.572
Epoch 6 loss: 0.510
Epoch 7 loss: 0.463
Epoch 8 loss: 0.427
Epoch 9 loss: 0.395
Epoch 10 loss: 0.362
Epoch 11 loss: 0.342
Epoch 12 loss: 0.325
Epoch 13 loss: 0.309
Epoch 14 loss: 0.297
Epoch 15 loss: 0.285
Epoch 16 loss: 0.274
Epoch 17 loss: 0.264
Epoch 18 loss: 0.257
Epoch 19 loss: 0.249
Epoch 20 loss: 0.242
Epoch 21 loss: 0.236
Epoch 22 loss: 0.231
Epoch 23 loss: 0.226
Epoch 24 loss: 0.220
Epoch 25 loss: 0.217
Epoch 26 loss: 0.211
Epoch 27 loss: 0.208
Epoch 28 loss: 0.203
Epoch 29 loss: 0.200
Epoch 30 loss: 0.197
Epoch 31 loss: 0.194
Epoch 32 loss: 0.191
Epoch 33 loss: 0.189
Epoch 34 loss: 0.186
Epoch 35 loss: 0.184
Epoch 36 loss: 0.181
Epoch 37 loss: 0.179
Epoch 38 loss: 0.177
Epoch 39 loss: 0.176
Epoch 40 loss: 0.173
Epoch 41 loss: 0.172
Epoch 42 loss: 0.170
Epoch 43 loss: 0.169
Epoch 44 loss: 0.167
Epoch 45 loss: 0.166
Epoch 46 loss: 0.164
Epoch 47 loss: 0.161
Ep

In [24]:
# Save the model
torch.save(model.state_dict(), 'transformer_model.pth')
print('Model saved')

Model saved


In [25]:
# Load the model
model = TextGen(
    vocab_size=vocab_size,
    embed_dim=100,
    num_layers=2,
    num_heads=2,
).to(device)

model.load_state_dict(torch.load('transformer_model.pth'))
model.eval()
print('Model loaded')

Model loaded


In [21]:
def return_int_vector(text):
    words = text.split()
    input_seq = torch.LongTensor(
        [word_to_int[word] for word in words[-SEQUENCE_LENGTH:]]).unsqueeze(0)
    return input_seq


def sample_next(predictions):
    """
    Greedy sampling.
    """
    # Greedy approach.
    probabilities = F.softmax(predictions[:, -1, :], dim=-1).cpu()
    next_token = torch.argmax(probabilities)
    return int(next_token.cpu())


def text_generator(sentence, generate_length):
    model.eval()
    sample = sentence
    for i in range(generate_length):
        int_vector = return_int_vector(sample)
        if len(int_vector) >= SEQUENCE_LENGTH - 1:
            break
        input_tensor = int_vector.to(device)
        with torch.no_grad():
            predictions = model(input_tensor)
        next_token = sample_next(predictions)
        sample += ' ' + int_to_word[next_token]
    print(sample)
    print('\n')

In [22]:
sentences = [
    "Alice was a"
]
generate_length = 100
for sentence in sentences:
    print(f"PROMPT: {sentence}")
    text_generator(sentence, generate_length)

PROMPT: Alice was a
Alice was a good deal worse off than before, as the March Hare had just upset the milk-jug into his plate. Alice did not wish to offend the Dormouse again, so she began very cautiously: 'But I don't understand. Where did they draw the treacle from?' 'You can draw water out of a water-well,' said the Hatter; 'so I should think you could draw treacle out of a treacle-well--eh, stupid?' 'But they were IN the well,' Alice said to the Dormouse, not choosing to notice this last remark. 'Of course they were', said the Dormouse; '--well in.' This answer so confused poor Alice,


