In [1]:
%%capture
# %%capture prevents this cell from printing a ton of STDERR stuff to the screen

## First, check to see if lightning is installed, if not, install it.
##
## NOTE: If you **do** need to install something, just know that you may need to
##       restart your session for python to find the new module(s).
##
##       To restart your session:
##       - In Google Colab, click on the "Runtime" menu and select
##         "Restart Session" from the pulldown menu
##       - In a local jupyter notebook, click on the "Kernel" menu and select
##         "Restart Kernel" from the pulldown menu
import pip
try:
  __import__("lightning")
except ImportError:
  pip.main(['install', "lightning"])

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset,DataLoader
import lightning as L
from torch.optim import Adam

In [2]:
## first, we create a dictionary that maps vocabulary tokens to id numbers...
english_token_to_id = {
    "lets": 0,
    "to": 1,
    "go": 2,
    "<EOS>": 3,  ## <EOS> = end of sequence
}
## ...then we create a dictionary that maps the ids to tokens. This will help us interpret the output.
## We use the "map()" function to apply the "reversed()" function to each tuple (i.e. ('lets', 0)) stored
## in the token_to_id dictionary. We then use dict() to make a new dictionary from the
## reversed tuples.
english_id_to_token = dict(map(reversed, english_token_to_id.items()))

spanish_token_to_id = {"ir": 0, "vamos": 1, "y": 2, "<EOS>": 3}
spanish_id_to_token = dict(map(reversed, spanish_token_to_id.items()))

inputs = torch.tensor(
    [
        [english_token_to_id["lets"], english_token_to_id["go"]],
        [english_token_to_id["to"], english_token_to_id["go"]],
    ]
)

labels = torch.tensor(
    [
        [spanish_token_to_id["vamos"], spanish_token_to_id["<EOS>"]],
        [spanish_token_to_id["ir"], spanish_token_to_id["<EOS>"]],
    ]
)

In [3]:
labels

tensor([[1, 3],
        [0, 3]])

In [4]:
dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)

In [21]:
class seq2seq(L.LightningModule):

    def __init__(self, max_len=2):

        super().__init__()

        self.max_output_length = max_len

        L.seed_everything(seed=420)

        #################################
        ##
        ## ENCODING
        ##
        #################################
        self.encoder_we = nn.Embedding(
            num_embeddings=4,  # num_embeddings = # of words in input vocabulary
            embedding_dim=2,
        )  # embedding_dim = 2 numbers per embedding

        self.encoder_lstm = nn.LSTM(
            input_size=2,  # input_size = number of inputs (2 numbers per word)
            hidden_size=2,  # hidden_size = number of outputs (2 per word per layer)
            num_layers=2,
        )  # num_layers = how many lstm's to stack
        #          If there are 2 layers, then the short term memory from the
        #          first layer is used as input to the second layer

        #################################
        ##
        ## DECODING
        ##
        #################################
        self.decoder_we = nn.Embedding(num_embeddings=4, embedding_dim=2)

        self.decoder_lstm = nn.LSTM(input_size=2, hidden_size=2, num_layers=2)

        self.output_fc = nn.Linear(
            in_features=2, out_features=4  # in_features = # of outputs per LSTM
        )  # out_features = # of words in the output vocabulary

        #################################
        ##
        ## Training
        ##
        #################################
        self.loss = nn.CrossEntropyLoss()

    def forward(self, input, output=None):

        #################################
        ##
        ## ENCODING
        ##
        #################################
        ## first, use the encoder stage to create an intermediate encoding of the input text
        encoder_embeddings = self.encoder_we(input)
        encoder_lstm_output, (encoder_lstm_hidden, encoder_lstm_cell) = (
            self.encoder_lstm(encoder_embeddings)
        )

        #################################
        ##
        ## DECODING
        ##
        #################################
        ## We start by initializing the decoder with the <EOS> token...
        decoder_token_id = torch.tensor([spanish_token_to_id["<EOS>"]])
        decoder_embeddings = self.decoder_we(decoder_token_id)

        decoder_lstm_output, (decoder_lstm_hidden, decoder_lstm_cell) = (
            self.decoder_lstm(
                decoder_embeddings, (encoder_lstm_hidden, encoder_lstm_cell)
            )
        )

        output_values = self.output_fc(decoder_lstm_output)
        outputs = output_values

        predicted_id = torch.tensor([torch.argmax(output_values)])
        predicted_ids = predicted_id

        for i in range(1, self.max_output_length):

            if output == None:  # using the model...
                if (
                    predicted_id == spanish_token_to_id["<EOS>"]
                ):  # if the prediction is <EOS>, then we are done
                    break
                decoder_embeddings = self.decoder_we(predicted_id)
            else:
                ## run this when training the model
                decoder_embeddings = self.decoder_we(torch.tensor([output[i - 1]]))

            decoder_lstm_output, (decoder_lstm_hidden, decoder_lstm_cell) = (
                self.decoder_lstm(
                    decoder_embeddings, (decoder_lstm_hidden, decoder_lstm_cell)
                )
            )

            output_values = self.output_fc(decoder_lstm_output)
            outputs = torch.cat((outputs, output_values), 0)
            predicted_id = torch.tensor([torch.argmax(output_values)])
            predicted_ids = torch.cat((predicted_ids, predicted_id))

        return outputs

    def configure_optimizers(
        self,
    ):  # this configures the optimizer we want to use for backpropagation.
        return Adam(
            self.parameters(), lr=0.1
        )  ## NOTE: Setting the learning rate to 0.1 trains way faster than
        ## using the default learning rate, lr=0.001

    def training_step(self, batch, batch_idx):  # take a step during gradient descent.
        input_tokens, labels = batch  # collect input
        output = self.forward(
            input_tokens[0], labels[0]
        )  # run input through the neural network
        loss = self.loss(output, labels[0])  ## self.loss = cross entropy
        ###################
        ##
        ## Logging the loss
        ##
        ###################
        # self.log("train_loss", loss)

        return loss

In [22]:
model = Seq2Seq()
outputs = model.forward(
    input=torch.tensor(
        [english_token_to_id["lets"], english_token_to_id["go"]]
    ),  ## translate "lets go", we should get "vamos <EOS>"
    output=None,
)

print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

Seed set to 42


Translated text:
	 y
	 y


In [23]:
trainer = L.Trainer(max_epochs=40, accelerator="cpu")
trainer.fit(model, train_dataloaders=dataloader)

Using default `ModelCheckpoint`. Consider installing `litmodels` package to enable `LitModelCheckpoint` for automatic upload to the Lightning model registry.
GPU available: True (cuda), used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
/home/hadi/Documents/statquest/.venv/lib/python3.11/site-packages/lightning/pytorch/trainer/setup.py:177: GPU available but not used. You can set it by doing `Trainer(accelerator='gpu')`.

  | Name         | Type             | Params | Mode 
----------------------------------------------------------
0 | encode_we    | Embedding        | 8      | train
1 | encoder_lstm | LSTM             | 96     | train
2 | decoder_we   | Embedding        | 8      | train
3 | decoder_lstm | LSTM             | 96     | train
4 | output_fc    | Linear           | 12     | train
5 | loss         | CrossEntropyLoss | 0      | train
----------------------------------------------------------
220       Trainable params
0         Non-traina

Training: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=40` reached.


In [24]:
outputs = model.forward(
    input=torch.tensor(
        [english_token_to_id["lets"], english_token_to_id["go"]]
    ),  ## translate "lets go", we should get "vamos <EOS>"
    output=None,
)

print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

Translated text:
	 ir
	 <EOS>


In [25]:
outputs = model.forward(
    input=torch.tensor(
        [english_token_to_id["to"], english_token_to_id["go"]]
    ),  ## translate "lets go", we should get "vamos <EOS>"
    output=None,
)

print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

Translated text:
	 ir
	 <EOS>


In [26]:
## count the number of parameters...
total_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print("Total number of trainable parameters:", total_trainable_params)

Total number of trainable parameters: 220


In [27]:
trainer.save_checkpoint(
    "seq2seq_en2es_220_trained.ckpt"
)  ## NOTE: You can specify a path as part of the filename

In [28]:
new_model = Seq2Seq.load_from_checkpoint("seq2seq_en2es_220_trained.ckpt")

outputs = new_model.forward(
    input=torch.tensor([english_token_to_id["lets"], english_token_to_id["go"]]),
    output=None,
)

print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

Seed set to 42


Translated text:
	 ir
	 <EOS>
