In [None]:
!pip install portalocker

In [None]:
from IPython.display import clear_output

!pip install -U torchdata
!pip install -U spacy
!python -m spacy download en_core_web_sm
!python -m spacy download de_core_news_sm
!pip install torchtext==0.15.1
clear_output()

In [None]:
import requests
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import random
import math
import time

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
SEED = 1234
random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

url = "https://www.gutenberg.org/files/4300/4300-0.txt"
response = requests.get(url)
if response.status_code == 200:
    full_text = response.text
else:
    full_text = "Download failed."

words = full_text.split()
num_pages = 25
words_per_page = 1000
subset_words = words[:num_pages * words_per_page]
ulysses_subset = " ".join(subset_words)
text = ulysses_subset

tokenizer = get_tokenizer('spacy', language='en_core_web_sm')
tokenized_text = tokenizer(text)
window_size = 4
inputs = []
targets = []
for i in range(len(tokenized_text) - window_size):
    inputs.append(tokenized_text[i:i+window_size])
    targets.append(tokenized_text[i+1:i+window_size+1])
special_tokens = ['<unk>', '<pad>', '<bos>', '<eos>']

def yield_tokens(data):
    for tokens in data:
        yield tokens

vocab = build_vocab_from_iterator(yield_tokens([tokenized_text]), specials=special_tokens, special_first=True)
vocab.set_default_index(vocab['<unk>'])

def tensor_transform(tokens):
    token_ids = [vocab[token] for token in tokens]
    return torch.tensor([vocab['<bos>']] + token_ids + [vocab['<eos>']], dtype=torch.long)

class NextTokenDataset(Dataset):

    def __init__(self, inputs, targets, transform):
        self.inputs = inputs
        self.targets = targets
        self.transform = transform

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.transform(self.inputs[idx]), self.transform(self.targets[idx])

def collate_fn(batch):
    inputs, targets = zip(*batch)
    return pad_sequence(inputs, padding_value=vocab['<pad>']), pad_sequence(targets, padding_value=vocab['<pad>'])

dataset = NextTokenDataset(inputs, targets, tensor_transform)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

class LSTMLayer(nn.Module):

    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.W_f = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_f = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_f = nn.Parameter(torch.Tensor(hidden_size))
        self.W_i = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_i = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_i = nn.Parameter(torch.Tensor(hidden_size))
        self.W_g = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_g = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_g = nn.Parameter(torch.Tensor(hidden_size))
        self.W_o = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_o = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_o = nn.Parameter(torch.Tensor(hidden_size))

    def forward(self, input, h_prev, c_prev):
        f = torch.sigmoid(input @ self.W_f + h_prev @ self.U_f + self.b_f)
        k = f * c_prev
        i = torch.sigmoid(input @ self.W_i + h_prev @ self.U_i + self.b_i)
        g = torch.tanh(input @ self.W_g + h_prev @ self.U_g + self.b_g)
        j = i * g
        o = torch.sigmoid(input @ self.W_o + h_prev @ self.U_o + self.b_o)
        c_next = k + j
        h_next = o * torch.tanh(c_next)
        return h_next, c_next

class StackLSTMLayers(nn.Module):

    def __init__(self, input_size, hidden_size, num_layers=1):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.layers = nn.ModuleList([LSTMLayer(input_size if i == 0 else hidden_size, hidden_size) for i in range(num_layers)])

    def forward(self, input, hidden=None):
        if hidden is None:
            hidden = self.init_hidden(input.size(1))
        hiddens, cells = hidden
        outputs = []
        for input_t in input:
            for layer_idx, layer in enumerate(self.layers):
                hiddens[layer_idx], cells[layer_idx] = layer(input_t, hiddens[layer_idx], cells[layer_idx])
                if layer_idx < self.num_layers - 1:
                    input_t = hiddens[layer_idx]
            outputs.append(hiddens[-1])
        outputs = torch.stack(outputs, dim=0)
        return outputs, (hiddens, cells)

    def init_hidden(self, batch_size):
        hiddens = [torch.zeros(batch_size, self.hidden_size, device=device) for _ in range(self.num_layers)]
        cells = [torch.zeros(batch_size, self.hidden_size, device=device) for _ in range(self.num_layers)]
        return hiddens, cells

class Encoder(nn.Module):

    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = StackLSTMLayers(emb_dim, hid_dim, n_layers)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        _, (hidden, cell) = self.rnn(embedded)
        return hidden, cell

class Decoder(nn.Module):

    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = StackLSTMLayers(emb_dim, hid_dim, n_layers)
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell

class Seq2Seq(nn.Module):

    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        assert encoder.hid_dim == decoder.hid_dim, "Hidden dimensions must be equal!"
        assert encoder.n_layers == decoder.n_layers, "Encoder and decoder must have equal layers!"

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)
        input = trg[0, :]
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1
        return outputs

def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def train(model, dataloader, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    for src, trg in dataloader:
        src = src.to(device)
        trg = trg.to(device)
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(dataloader)

def evaluate(model, dataloader, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for src, trg in dataloader:
            src = src.to(device)
            trg = trg.to(device)
            output = model(src, trg, 0)
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(dataloader)

INPUT_DIM = len(vocab)
OUTPUT_DIM = len(vocab)
ENC_EMB_DIM = 128
DEC_EMB_DIM = 128
HID_DIM = 256
N_LAYERS = 2
ENC_DROPOUT = 0.2
DEC_DROPOUT = 0.2
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT)

model = Seq2Seq(enc, dec, device).to(device)
model.apply(init_weights)
print(f'The model has {count_parameters(model):,} trainable parameters')
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=vocab['<pad>'])
N_EPOCHS = 5
CLIP = 1.0
for epoch in range(N_EPOCHS):
    start_time = time.time()
    train_loss = train(model, dataloader, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, dataloader, criterion)
    end_time = time.time()
    epoch_mins = int((end_time - start_time) // 60)
    epoch_secs = int((end_time - start_time) % 60)
    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

In [None]:
def generate_next_word(model: Seq2Seq, prompt: str) -> str:
    model.eval()
    tokens = tokenizer(prompt)
    src_tensor = tensor_transform(tokens).unsqueeze(1).to(device)
    hidden, cell = model.encoder(src_tensor)
    input_tok = torch.tensor([vocab['<bos>']], device=device)
    output, _, _ = model.decoder(input_tok, hidden, cell)
    top1 = output.argmax(1)
    token_idx = top1.item()
    return vocab.get_itos()[token_idx]

test_indices = random.sample(range(len(dataset)), 10)
correct = 0
total = 0

for idx in test_indices:
    inp_tensor, tgt_tensor = dataset[idx]
    gt_token = vocab.get_itos()[tgt_tensor.tolist()[-2]]
    prompt_tokens = [vocab.get_itos()[i] for i in inp_tensor.tolist() if i not in (vocab['<bos>'], vocab['<eos>'])]
    prompt_str = " ".join(prompt_tokens)
    pred_token = generate_next_word(model, prompt_str)
    total += 1
    if pred_token == gt_token:
        correct += 1
    print("Prompt:", prompt_str)
    print("Ground Truth:", gt_token)
    print("Predicted:", pred_token)
    print("-" * 50)

accuracy: float = correct / total
print("Accuracy over 10 examples:", accuracy)


In [None]:
import ipywidgets as widgets
from IPython.display import display

prompt_box = widgets.Text(
    value='Stately, plump Buck Mulligan',
    placeholder='Type something',
    description='Prompt:',
    disabled=False
)

generate_button = widgets.Button(description="Generate Text")
output_box = widgets.Output()

def on_generate_clicked(b):
    with output_box:
        output_box.clear_output()
        prompt_text = prompt_box.value
        generated = generate_next_word(model, prompt_text)
        print(prompt_text, generated)

generate_button.on_click(on_generate_clicked)
display(prompt_box, generate_button, output_box)

In [None]:
prompt = "You are not really that interested in what you have to say "
complete = ""
for i in range(100):
    complete += f" {prompt}"
    prompt = generate_next_word(model, prompt)
print(complete)