In [22]:
from collections import Counter
from typing import List, Callable, Optional

import spacy
import torch
import torchtext
import pandas as pd
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader



In [7]:
english_language = spacy.load('en_core_web_sm')  # Utilizaremos spaCy para la tokenización

# 1. Introducción

En este cuaderno abordarás la tarea de inversión de oraciones. Aunque no posee una aplicación práctica inmediata, este ejercicio te permitirá explorar y familiarizarte con la arquitectura transformer. El objetivo consiste en invertir el orden de las palabras en una oración; por ejemplo, ante una entrada tokenizada ["hola", "mundo", "!"], se espera que el modelo genere ["!", "mundo", "hola"] como resultado.

Para el conjunto de datos usaremos el archivo `open_subtitles_english.txt` el cual ya habíamos utilizado en el cuaderno de `machine translation` utilizando la red LSTM.

# 2. Creación del Vocabulario

En esta sección tienes que crear un vocabulario utilizando torchtext. El vocabulario tiene que tener 10,000 + 4 palabras. El +4 es debido a los tokens especiales `<sos>`, `<eos>`, `<unk>` y `<pad>`. El 10,000 significa que tomaremos las 10,000 palabras más frecuentes en el conjunto de datos.

In [24]:
def read_file_into_lines(filename: str) -> List[str]:
    with open(filename, "rt") as f:
        lines = f.readlines()
    lines = [l.strip() for l in lines]
    return lines


lines = read_file_into_lines("open_subtitles_english.txt")
single_line = " ".join(lines)
english_language.max_length = len(single_line)
document = english_language(single_line, disable=['tok2vec', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'ner'])
tokenized_single_line = [token.text for token in document]

vocabulary = build_vocab_from_iterator([tokenized_single_line], max_tokens=10_000)
vocabulary.append_token("<unk>")
vocabulary.append_token("<sos>")
vocabulary.append_token("<eos>")
vocabulary.append_token("<pad>")

In [25]:
print(f"len(vocabulary): {len(vocabulary):,}")  # len(vocabulary): 10,004

len(vocabulary): 10,004


# 3. Definición de Dataset y Dataloader

En esta sección definirás el `torch.utils.data.Dataset` y `torch.utils.data.Dataloader`. El `Dataset` debe de entregar la i-ésima oración del conjunto de datos como un arreglo de `PyTorch`. Se espera recibir una función opcional que recibe un conjunto de oraciones de tipo List[str] y realiza las siguientes transformaciones:
1. Convierte las oraciones a minúsculas.
2. Tokeniza las oraciones utilizando spaCy.
3. Agrega tokens espaciales `<sos>`, `<eos>`, `<unk>`.
4. Convierte los símbolos de cada oración en índices.

In [8]:
class SentenceInversionDataset(torch.utils.data.Dataset):
    def __init__(self, filename: str, transform: Optional[Callable]):
        """Dataset para la tarea de inversión de oraciones.
        :param filename: Path to the txt file with all data.
        """
        self._lines: List[str] = read_file_into_lines(filename)
        if transform:
            self._lines = transform(self._lines)

    def __len__(self):
        return len(self._lines)

    def __getitem__(self, idx: int) -> torch.tensor:
        return torch.tensor(self._lines[idx])

In [32]:
class Compose:
    def __init__(self, transforms: List[Callable]):
        self._transforms = transforms

    def __call__(self, sentences: List[str]) -> None:
        for t in self._transforms:
            sentences = t(sentences)
        return sentences


class ToLowerCase:
    def __call__(self, sentences: List[str]) -> List[str]:
        new_sentences = []
        for s in sentences:
            new_sentences.append(s.lower())
        return new_sentences


class Tokenize:
    def __init__(self, language: spacy.Language) -> None:
        self._language = language

    def __call__(self, sentences: List[str]) -> List[List[str]]:
        tokenized_lines = []
        for i, document in enumerate(self._language.pipe(sentences, disable=['tok2vec', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'ner'])):
            tokenized_lines.append([token.text for token in document])
        return tokenized_lines


class InsertSpecialTokens:
    def __init__(self, vocabulary) -> None:
        self._vocabulary = vocabulary

    def __call__(self, tokenized_sentences: List[List[str]]) -> List[List[str]]:
        new_sentences = []
        for ts in tokenized_sentences:
            new_sentences.append(["<sos>"] + [token if token in self._vocabulary else "<unk>" for token in ts] + ["<eos>"])
        return new_sentences


class SymbolToIndex:
    def __init__(self, vocabulary) -> None:
        self._vocabulary = vocabulary

    def __call__(self, tokenized_sentences: List[List[str]]) -> List[List[int]]:
        new_sentences = []
        for ts in tokenized_sentences:
            new_sentences.append([self._vocabulary[token] for token in ts])
        return new_sentences


test = ["Hello World !", "this is another test sentence"]
transform = Compose(transforms=[ToLowerCase(), Tokenize(english_language), InsertSpecialTokens(vocabulary), SymbolToIndex(vocabulary)])
print(transform(test))

[[10001, 837, 286, 11, 10002], [10001, 31, 21, 296, 2180, 2443, 10002]]


In [None]:
# Initialize dataset
transform = Compose(transforms=[ToLowerCase(), Tokenize(english_language), InsertSpecialTokens(vocabulary), SymbolToIndex(vocabulary)])
dataset = SentenceInversionDataset("open_subtitles_english.txt", transform)


# Inicializar el DataLoader
def collate_fn(batch: List[torch.tensor]):
    # idea, create inverted batch and then pad the target
    inverted_batch = [torch.flip(x, [0]) for x in batch]
    input = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=vocabulary["<pad>"])
    target = torch.nn.utils.rnn.pad_sequence(inverted_batch, batch_first=True, padding_value=vocabulary["<pad>"])
    return {'input': input, 'target': target}


dataloader = DataLoader(dataset, batch_size=256, shuffle=True, collate_fn=collate_fn, drop_last=True)

In [None]:
print(f"next(iter(dataloader))['input'].dtype: {next(iter(dataloader))['input'].dtype}")

next(iter(dataloader))['input'].dtype: torch.int64


In [None]:
print(f"next(iter(dataloader))['target'].dtype: {next(iter(dataloader))['target'].dtype}")

next(iter(dataloader))['target'].dtype: torch.int64


# 4. Definición del Modelo

In [None]:
class TransformerModel(torch.nn.Module):
    def __init__(
        self,
        vocab_size: int,
        d_model: int,
        nhead: int,
        num_encoder_layers: int,
        num_decoder_layers: int,
        max_seq_length: int,
        dropout=0.1) -> None:
        super().__init__()
        self.d_model = d_model
        self.embedding = torch.nn.Embedding(vocab_size, d_model)
        self.pos_encoder = torch.nn.Embedding(max_seq_length, d_model)
        self.transformer = torch.nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dropout=dropout,
            batch_first=True
        )
        # Output linear layer to project back to vocab size
        self.fc_out = torch.nn.Linear(d_model, vocab_size)

        # Positions for positional encoding
        self.positions = torch.arange(max_seq_length).unsqueeze(0)  # [1, max_seq_length]  [[0, 1, 2, 3, 4, 5]]

    def forward(self, src, tgt):
        # src and tgt shapes: [batch_size, seq_length]
        src_seq_length = src.size(1)
        tgt_seq_length = tgt.size(1)

        src_key_padding_mask = (src == vocabulary["<pad>"]).to(torch.bool)
        tgt_key_padding_mask = (tgt == vocabulary["<pad>"]).to(torch.bool)

        # Generate positional indices for src and tgt
        src_positions = self.positions[:, :src_seq_length].to(src.device)  # [1, src_seq_length]
        tgt_positions = self.positions[:, :tgt_seq_length].to(tgt.device)  # [1, tgt_seq_length]

        # Add embedding and positional encoding
        src = self.embedding(src) + self.pos_encoder(src_positions)  # [batch_size, src_seq_length, d_model]
        tgt = self.embedding(tgt) + self.pos_encoder(tgt_positions)  # [batch_size, tgt_seq_length, d_model]

        # Forward pass through transformer
        tgt_mask = torch.nn.Transformer.generate_square_subsequent_mask(tgt_seq_length, device=tgt.device)

        output = self.transformer(
            src=src,
            tgt=tgt,
            tgt_mask=tgt_mask,
            src_key_padding_mask=src_key_padding_mask,  # [N, S]
            tgt_key_padding_mask=tgt_key_padding_mask,  # [N, T]
            # memory_key_padding_mask=src_key_padding_mask,
            # tgt_is_causal=True
        )  # [batch_size, tgt_seq_length, d_model]

        output = self.fc_out(output)  # [batch_size, tgt_seq_length, vocab_size]

        return output


In [None]:
# Example model initialization
vocab_size = len(vocabulary)  # Size of your vocabulary
d_model = 512  # Embedding dimension
nhead = 8  # Number of attention heads
num_encoder_layers = 3  # Number of encoder layers
num_decoder_layers = 3  # Number of decoder layers
max_seq_length = 200  # Maximum sequence length
dropout = 0.1  # Dropout rate

# Model definition
model = TransformerModel(vocab_size, d_model, nhead, num_encoder_layers, num_decoder_layers, max_seq_length, dropout)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

# Loss Function
criterion = torch.nn.CrossEntropyLoss(ignore_index=vocabulary["<pad>"])

In [None]:
def predict(
    model: torch.nn.Module,
    sentence: torch.tensor,  # [1, L]
    vocabulary: torchtext.vocab.Vocab,
    device,
    max_seq_length: int = 50,
) -> str:
    sentence.to(device)
    model.eval()
    with torch.no_grad():
        # Generate target sequence
        tgt_indices = [vocabulary["<eos>"]]  # Target sequence starts with <eos>
        for _ in range(max_seq_length - 1):
            tgt_tensor = torch.tensor(tgt_indices, dtype=torch.long).unsqueeze(0).to(device)
            output = model(sentence, tgt_tensor)

            # Take the most likely word from the last position of the tgt sequence
            next_word_logits = output[0, -1, :]  # Shape: [vocab_size]
            next_word_index = next_word_logits.argmax().item()
            tgt_indices.append(next_word_index)

            if next_word_index == vocabulary["<sos>"]:
                break

    model.train()
    input_sentence_string =" ".join([vocabulary.lookup_token(index) for index in sentence[0] if index != vocabulary["<pad>"]])
    output_sentence_string = " ".join([vocabulary.lookup_token(index) for index in tgt_indices])
    return input_sentence_string, output_sentence_string

In [None]:
def train(model, dataloader, optimizer, criterion, num_epochs=10):
    model.train()  # set model to training mode

    for epoch in range(num_epochs):
        epoch_loss = 0

        for i, batch in enumerate(dataloader):
            src = batch["input"].to(device)  # [batch_size, src_seq_length]
            tgt = batch["target"].to(device)  # [batch_size, tgt_seq_length]

            # Forward pass
            # The target sequence `tgt` is both what we compare against (labels) and input as part of the decoder input
            # We need to shift the decoder's input to include the <sos> token and exclude the <eos> token from the inputs
            tgt_input = tgt[:, :-1]
            labels = tgt[:, 1:]  # Exclude <sos> token for labels [batch_size, tgt_seq_length]

            optimizer.zero_grad()
            output = model(src, tgt_input)  # [batch_size, tgt_seq_length - 1, vocab_size]

            # Reshape output to fit into CrossEntropyLoss
            output_dim = output.shape[-1]
            output = output.view(-1, output_dim)  # [batch_size * (tgt_seq_length - 1), vocab_size]
            labels = labels.view(-1)  # [batch_size * (tgt_seq_length - 1)]

            # import pdb; pdb.set_trace()
            loss = criterion(output, labels)
            loss.backward()  # Backpropagation
            optimizer.step()  # Update the optimizer

            if i % 50 == 0:
                print(f"\tloss: {loss.item()}")
                input_sentence, output_sentence = predict(model, src[[0], :], vocabulary, device)
                print(f"\tinput_sentence: {input_sentence}")
                print(f"\toutput_sentence: {output_sentence}")

            epoch_loss += loss.item() * labels.size(0)  # Multiply by the number of entries

        epoch_loss /= len(dataloader.dataset)  # Average the loss

        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')

# Call the training function
train(model, dataloader, optimizer, criterion, num_epochs=10)




	loss: 0.028363779187202454
	input_sentence: <sos> - <unk> ... ah , hey <unk> , <eos>
	output_sentence: <eos> , <unk> hey , ah ... <unk> - , ah ... <unk> - , <unk> - , <unk> - , <unk> - <sos>
	loss: 0.027767186984419823
	input_sentence: <sos> why did i ever get myself mixed up in this mess ? <eos>
	output_sentence: <eos> ? mess this in up mixed myself get ever i did why ? mess this did why did why did why did why <sos>
	loss: 0.03434755653142929
	input_sentence: <sos> there it is . <eos>
	output_sentence: <eos> . is it there . is it there there there there there there there there there there there there there there <sos>
	loss: 0.016844915226101875
	input_sentence: <sos> for our country will never make war we have a reason that 's worth marching for <eos>
	output_sentence: <eos> for marching worth 's that reason a have we war make never will country our for country our for country our for <sos>
	loss: 0.028306610882282257
	input_sentence: <sos> <unk> <eos>
	output_sentence: <eos> <unk>

KeyboardInterrupt: 

In [None]:
# Print input sentence without pad tokens
# Print the iteration of the loss
# Print accruacy (succesfully inverted sentences) per X batchs
# Create train, test sets. Evaluate accuracy on test set

In [None]:
model = model.cpu()
model.eval()

TransformerModel(
  (embedding): Embedding(10004, 512)
  (pos_encoder): Embedding(200, 512)
  (transformer): Transformer(
    (encoder): TransformerEncoder(
      (layers): ModuleList(
        (0-2): 3 x TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
          )
          (linear1): Linear(in_features=512, out_features=2048, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear2): Linear(in_features=2048, out_features=512, bias=True)
          (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.1, inplace=False)
          (dropout2): Dropout(p=0.1, inplace=False)
        )
      )
      (norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    )
    (decoder): TransformerDecoder(
      (layers): ModuleList(
        

In [None]:
torch.save(model.state_dict(), "model_state_dict.pt")