# 1. Install library

In [None]:
%%capture
!pip install datasets evaluate --upgrade
!python -m spacy download en_core_web_sm

In [None]:
!pip install --upgrade pyarrow

In [None]:
!pip install datasets 
!pip install torchtext
!pip install spacy

In [1]:
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import spacy
import datasets
from datasets import load_dataset, Dataset

import torchtext
import tqdm

# 2. Processing data

In [2]:
df = pd.read_csv('dialogs.txt', sep='\t', names=['question', 'answer'])

df.head()

Unnamed: 0,question,answer
0,"hi, how are you doing?",i'm fine. how about yourself?
1,i'm fine. how about yourself?,i'm pretty good. thanks for asking.
2,i'm pretty good. thanks for asking.,no problem. so how have you been?
3,no problem. so how have you been?,i've been great. what about you?
4,i've been great. what about you?,i've been good. i'm in school right now.


In [3]:
dataset = Dataset.from_pandas(df)

In [4]:
dataset

Dataset({
    features: ['question', 'answer'],
    num_rows: 3725
})

In [5]:
# split data into train, test and validation set
train_data, valid_data, test_data = (
    dataset[:3000],
    dataset[3000:3300],
    dataset[3300:],
)

In [None]:
from datasets import Dataset

train_data = Dataset.from_dict(train_data)
test_data = Dataset.from_dict(test_data)
valid_data = Dataset.from_dict(valid_data)

In [None]:
seed = 1234

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True

In [None]:
# Load model spacy for tokenizer
en_nlp = spacy.load("en_core_web_sm")

In [None]:
"""
Example
"""
string = "Who does mental illness affect"

[token.text for token in en_nlp.tokenizer(string)]

In [None]:
import re
def tokenize_example(example, en_nlp, max_length, lower, sos_token, eos_token):
    """
    Tokenizes an example containing English

    Args:
        example (dict): A dictionary containing question and answer
        en_nlp: English spaCy tokenizer.
        max_length (int): Maximum length of tokens to consider.
        lower (bool): Whether to convert tokens to lowercase.
        sos_token (str): Start-of-sequence token.
        eos_token (str): End-of-sequence token.

    Returns:
        dict: A dictionary containing tokenized of question and answer
    """

    process_question = re.sub('[^a-zA-Z]', ' ', example["question"])
    process_answer = re.sub('[^a-zA-Z]', ' ', example["answer"])
    # Tokenize question
    ques_tokens = [token.text for token in en_nlp.tokenizer(process_question)][:max_length]

    # Tokenize answer
    ans_tokens = [token.text for token in en_nlp.tokenizer(process_answer)][:max_length]


    # Convert tokens to lowercase if specified
    if lower:
        ques_tokens = [token.lower() for token in ques_tokens]
        ans_tokens = [token.lower() for token in ans_tokens]

    # Add start-of-sequence and end-of-sequence tokens
    ques_tokens = [sos_token] + ques_tokens + [eos_token]
    ans_tokens = [sos_token] + ans_tokens + [eos_token]

    return {"ques_tokens": ques_tokens, "ans_tokens": ans_tokens}


In [None]:
max_length = 1000
lower = True
sos_token = "<sos>"
eos_token = "<eos>"

fn_kwargs = {
    "en_nlp": en_nlp,
    "max_length": max_length,
    "lower": lower,
    "sos_token": sos_token,
    "eos_token": eos_token,
}

train_data = train_data.map(tokenize_example, fn_kwargs=fn_kwargs)
valid_data = valid_data.map(tokenize_example, fn_kwargs=fn_kwargs)
test_data = test_data.map(tokenize_example, fn_kwargs=fn_kwargs)

In [None]:
train_data[0]

In [None]:
def prepare_vocab(row_data):
    vocab = row_data["ques_tokens"] + row_data["ans_tokens"]
    vocab = set(i for i in vocab)
    row_data["build_vocab"] = list(vocab)
    return row_data


In [None]:
train_data = train_data.map(prepare_vocab)

In [None]:
min_freq = 2
unk_token = "<unk>"
pad_token = "<pad>"

special_tokens = [
    unk_token,
    pad_token,
    sos_token,
    eos_token,
]

'''
Build vocab for english language
'''
en_vocab = torchtext.vocab.build_vocab_from_iterator(
    train_data["build_vocab"],
    min_freq=min_freq,
    specials=special_tokens,
)


In [None]:
# Get the index of the unknown token in the English vocabulary
unk_index = en_vocab[unk_token]

# Get the index of the padding token in the English vocabulary
pad_index = en_vocab[pad_token]

In [None]:
# Set the default index for the English vocabulary to the index of the unknown token
en_vocab.set_default_index(unk_index)


In [None]:
tokens = ['do', 'you', 'like', 'it']
en_vocab.lookup_indices(tokens)

In [None]:
en_vocab.lookup_tokens(en_vocab.lookup_indices(tokens))

In [None]:
def numericalize_example(example, en_vocab):
    """
    Numericalizes the tokenized example using provided vocabularies.

    Args:
        example (dict): A dictionary containing tokenized question, answer
        en_vocab: English vocabulary.

    Returns:
        dict: A dictionary containing numericalized question and answer
    """
    ques_ids = en_vocab.lookup_indices(example["ques_tokens"])
    ans_ids = en_vocab.lookup_indices(example["ans_tokens"])

    return {"ques_ids": ques_ids, "ans_ids": ans_ids}

In [None]:
fn_kwargs = {"en_vocab": en_vocab}

train_data = train_data.map(numericalize_example, fn_kwargs=fn_kwargs)
valid_data = valid_data.map(numericalize_example, fn_kwargs=fn_kwargs)
test_data = test_data.map(numericalize_example, fn_kwargs=fn_kwargs)

In [None]:
train_data[0]

In [None]:
# Define the data type for the torch tensors
data_type = "torch"

# Define the columns to format
format_columns = ["ques_ids", "ans_ids"]

# Format the training data
train_data = train_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

# Format the validation data
valid_data = valid_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)

# Format the test data
test_data = test_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)


In [None]:
def get_collate_fn(pad_index):
    """
    Returns a collate function for padding sequences in a batch.

    Args:
        pad_index (int): Index of the padding token.

    Returns:
        function: A collate function that pads sequences in a batch.
    """
    def collate_fn(batch):
        """
        Collate function to pad sequences in a batch.

        Args:
            batch (list): A list of examples, where each example is a dictionary containing numericalized of question and answer

        Returns:
            dict: A dictionary containing padded sequences for question ans answer
        """
        batch_ques_ids = [example["ques_ids"] for example in batch]
        batch_ans_ids = [example["ans_ids"] for example in batch]

        batch_ques_ids = nn.utils.rnn.pad_sequence(batch_ques_ids, padding_value=pad_index)
        batch_ans_ids = nn.utils.rnn.pad_sequence(batch_ans_ids, padding_value=pad_index)

        # Construct the batch dictionary with padded sequences
        batch = {
            "batch_ques_ids": batch_ques_ids,
            "batch_ans_ids": batch_ans_ids,
        }
        return batch

    return collate_fn


In [None]:
def get_data_loader(dataset, batch_size, pad_index, shuffle=False):
    """
    Returns a DataLoader for a given dataset with specified batch size and padding index.

    Args:
        dataset: The dataset to create the DataLoader from.
        batch_size (int): The size of each batch.
        pad_index (int): Index of the padding token.
        shuffle (bool, optional): Whether to shuffle the data. Default is False.

    Returns:
        torch.utils.data.DataLoader: A DataLoader for the dataset.
    """
    # Get the collate function for padding sequences
    collate_fn = get_collate_fn(pad_index)

    # Create a DataLoader with the provided dataset, batch size, collate function, and shuffle option
    data_loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        collate_fn=collate_fn,
        shuffle=shuffle,
    )
    return data_loader


In [None]:
batch_size = 64

train_data_loader = get_data_loader(train_data, batch_size, pad_index, shuffle=True)
valid_data_loader = get_data_loader(valid_data, batch_size, pad_index)
test_data_loader = get_data_loader(test_data, batch_size, pad_index)

# 3. Define model

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
        """
        Initialize the Encoder module.

        Args:
            input_dim (int): Dimension of the input vocabulary.
            embedding_dim (int): Dimension of the word embeddings.
            hidden_dim (int): Dimension of the hidden states of the LSTM.
            n_layers (int): Number of layers in the LSTM.
            dropout (float): Dropout probability.
        """
        super().__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        # Initialize embedding layer
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        # Initialize LSTM layer
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        # Initialize dropout layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        """
        Forward pass of the Encoder module.

        Args:
            src (torch.Tensor): Input tensor representing the source sequence.

        Returns:
            torch.Tensor: Hidden state tensor for the last time step.
            torch.Tensor: Cell state tensor for the last time step.
        """
        # Apply dropout to the embedded input
        embedded = self.dropout(self.embedding(src))
        # Forward pass through the LSTM layer
        outputs, (hidden, cell) = self.rnn(embedded)
        # Return the hidden and cell state tensors for the last time step
        return hidden, cell


In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
        """
        Initialize the Decoder module.

        Args:
            output_dim (int): Dimension of the output vocabulary.
            embedding_dim (int): Dimension of the word embeddings.
            hidden_dim (int): Dimension of the hidden states of the LSTM.
            n_layers (int): Number of layers in the LSTM.
            dropout (float): Dropout probability.
        """
        super().__init__()
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        # Initialize embedding layer
        self.embedding = nn.Embedding(output_dim, embedding_dim)
        # Initialize LSTM layer
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        # Initialize fully connected output layer
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        # Initialize dropout layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        """
        Forward pass of the Decoder module.

        Args:
            input (torch.Tensor): Input tensor representing the previous decoder output.
            hidden (torch.Tensor): Hidden state tensor from the previous time step.
            cell (torch.Tensor): Cell state tensor from the previous time step.

        Returns:
            torch.Tensor: Output tensor containing the predicted logits for the next token.
            torch.Tensor: Hidden state tensor for the current time step.
            torch.Tensor: Cell state tensor for the current time step.
        """
        # Add a dimension to input tensor
        input = input.unsqueeze(0)
        # Apply dropout to the embedded input
        embedded = self.dropout(self.embedding(input))
        # Forward pass through the LSTM layer
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # Remove the added dimension and pass through the fully connected output layer
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell


In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        """
        Initialize the Seq2Seq model with encoder, decoder, and device.

        Args:
            encoder: Encoder module.
            decoder: Decoder module.
            device (torch.device): Device for computation.
        """
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        # Check if hidden dimensions and number of layers match between encoder and decoder
        assert (
            encoder.hidden_dim == decoder.hidden_dim
        ), "Hidden dimensions of encoder and decoder must be equal!"
        assert (
            encoder.n_layers == decoder.n_layers
        ), "Encoder and decoder must have equal number of layers!"

    def forward(self, src, trg, teacher_forcing_ratio):
        """
        Forward pass of the Seq2Seq model.

        Args:
            src (torch.Tensor): Source sequence tensor.
            trg (torch.Tensor): Target sequence tensor.
            teacher_forcing_ratio (float): Ratio of teacher forcing during training.

        Returns:
            torch.Tensor: Decoder outputs tensor.
        """
        batch_size = trg.shape[1]
        trg_length = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        # Initialize outputs tensor
        outputs = torch.zeros(trg_length, batch_size, trg_vocab_size).to(self.device)
        # Encode the source sequence
        hidden, cell = self.encoder(src)
        # Initialize decoder input as the first token in target sequence
        input = trg[0, :]
        # Iterate over each token in target sequence
        for t in range(1, trg_length):
            # Forward pass through decoder
            output, hidden, cell = self.decoder(input, hidden, cell)
            # Save decoder output
            outputs[t] = output
            # Decide whether to use teacher forcing
            teacher_force = random.random() < teacher_forcing_ratio
            # Get next input token
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1
        return outputs


In [None]:
input_dim = len(en_vocab)
output_dim = len(en_vocab)
encoder_embedding_dim = 200
decoder_embedding_dim = 200
hidden_dim = 64
n_layers = 4
encoder_dropout = 0.5
decoder_dropout = 0.5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder = Encoder(
    input_dim,
    encoder_embedding_dim,
    hidden_dim,
    n_layers,
    encoder_dropout,
)

decoder = Decoder(
    output_dim,
    decoder_embedding_dim,
    hidden_dim,
    n_layers,
    decoder_dropout,
)

model = Seq2Seq(encoder, decoder, device).to(device)

In [None]:
def init_weights(m):
    """
    Initialize the weights of the model using uniform distribution.

    Args:
        m (torch.nn.Module): The model for which weights are initialized.
    """
    # Iterate over all named parameters in the model
    for name, param in m.named_parameters():
        # Initialize the parameter data with values sampled from a uniform distribution between -0.08 and 0.08
        nn.init.uniform_(param.data, -0.08, 0.08)

# Apply the init_weights function to initialize the weights of the model
model.apply(init_weights)


In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


print(f"The model has {count_parameters(model):,} trainable parameters")

# 4. Define training process

In [None]:
optimizer = optim.RMSprop(model.parameters(), lr=0.001)

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=pad_index)

In [None]:
def train_fn(
    model, data_loader, optimizer, criterion, clip, teacher_forcing_ratio, device
):
    """
    Perform one training epoch.

    Args:
        model (torch.nn.Module): The model to train.
        data_loader (torch.utils.data.DataLoader): DataLoader for the training data.
        optimizer: The optimizer to use for training.
        criterion: The loss function criterion.
        clip (float): Value for gradient clipping.
        teacher_forcing_ratio (float): The ratio of teacher forcing to use during training.
        device: Device to perform computations on (e.g., "cpu" or "cuda").

    Returns:
        float: Average loss for the epoch.
    """
    # Set the model to train mode
    model.train()
    epoch_loss = 0
    # Iterate over batches in the data loader
    for i, batch in enumerate(data_loader):
        # Move source and target sequences to the specified device
        src = batch["batch_ques_ids"].to(device)
        trg = batch["batch_ans_ids"].to(device)


        # Zero gradients
        optimizer.zero_grad()
        # Forward pass
        output = model(src, trg, teacher_forcing_ratio)
        # Calculate output dimension
        output_dim = output.shape[-1]
        # Remove the start token from the output and flatten it
        output = output[1:].view(-1, output_dim)
        # Remove the start token from the target and flatten it
        trg = trg[1:].view(-1)
        # Calculate the loss
        loss = criterion(output, trg)
        # Backward pass
        loss.backward()
        # Clip gradients to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        # Update model parameters
        optimizer.step()
        # Accumulate loss for the epoch
        epoch_loss += loss.item()
    # Return the average loss for the epoch
    return epoch_loss / len(data_loader)


In [None]:
def evaluate_fn(model, data_loader, criterion, device):
    """
    Evaluate the model on the validation or test set.

    Args:
        model (torch.nn.Module): The model to evaluate.
        data_loader (torch.utils.data.DataLoader): DataLoader for the validation or test data.
        criterion: The loss function criterion.
        device: Device to perform computations on (e.g., "cpu" or "cuda").

    Returns:
        float: Average loss for the evaluation.
    """
    # Set the model to evaluation mode
    model.eval()
    epoch_loss = 0
    # Disable gradient computation
    with torch.no_grad():
        # Iterate over batches in the data loader
        for i, batch in enumerate(data_loader):
            # Move source and target sequences to the specified device
            src = batch["batch_ques_ids"].to(device)
            trg = batch["batch_ans_ids"].to(device)
            # Forward pass without teacher forcing (teacher_forcing_ratio=0)
            output = model(src, trg, 0)
            # Calculate output dimension
            output_dim = output.shape[-1]
            # Remove the start token from the output and flatten it
            output = output[1:].view(-1, output_dim)
            # Remove the start token from the target and flatten it
            trg = trg[1:].view(-1)
            # Calculate the loss
            loss = criterion(output, trg)
            # Accumulate loss for the epoch
            epoch_loss += loss.item()
    # Return the average loss for the evaluation
    return epoch_loss / len(data_loader)


In [None]:
n_epochs = 300  # Number of epochs for training
clip = 1.0  # Gradient clipping threshold
teacher_forcing_ratio = 0.5  # Ratio of teacher forcing during training

best_valid_loss = float("inf")  # Initialize best validation loss to infinity

import numpy as np
import matplotlib.pyplot as plt

# Lists to store training and validation losses
train_losses = []
valid_losses = []

# Iterate over epochs
for epoch in tqdm.tqdm(range(n_epochs)):
    # Perform one training epoch
    train_loss = train_fn(
        model,
        train_data_loader,
        optimizer,
        criterion,
        clip,
        teacher_forcing_ratio,
        device,
    )

    # Evaluate on the validation set
    valid_loss = evaluate_fn(
        model,
        valid_data_loader,
        criterion,
        device,
    )

    # Save the model if the validation loss improves
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), "tut1-model.pt")

    if (epoch +1) % 5 == 0:
        # Print training and validation loss for the epoch
        print(f"\tTrain Loss: {train_loss:7.3f} | Train PPL: {np.exp(train_loss):7.3f}")
        print(f"\tValid Loss: {valid_loss:7.3f} | Valid PPL: {np.exp(valid_loss):7.3f}")

    # Append the training and validation losses to their respective lists
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)





In [None]:
# Plotting the training and validation losses
plt.figure(figsize=(10, 5))
plt.plot(range(1, n_epochs + 1), train_losses, label='Train Loss')
plt.plot(range(1, n_epochs + 1), valid_losses, label='Valid Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Losses')
plt.legend()
plt.grid(True)
plt.show()

# 5. Inference model

In [None]:
model.load_state_dict(torch.load("tut1-model.pt"))

test_loss = evaluate_fn(model, test_data_loader, criterion, device)

print(f"| Test Loss: {test_loss:.3f} | Test PPL: {np.exp(test_loss):7.3f} |")

In [None]:
def chat(
    sentence,
    model=model,
    en_nlp=en_nlp,
    en_vocab=en_vocab,
    lower=lower,
    sos_token=sos_token,
    eos_token=eos_token,
    device=device,
    max_output_length=25,
):
    """
    inferrence model

    Args:
        sentence (str or List[str]): The input sentence to chat, either as a string or a list of tokens.
        model: The trained translation model.
        en_nlp: English spaCy tokenizer.
        en_vocab: English vocabulary.
        lower (bool): Whether to convert tokens to lowercase.
        sos_token (str): Start-of-sequence token.
        eos_token (str): End-of-sequence token.
        device: Device to perform computations on (e.g., "cpu" or "cuda").
        max_output_length (int, optional): Maximum length of the output sequence. Default is 25.

    Returns:
        List[str]: The respopnse sentence as a list of tokens.
    """
    # Set the model to evaluation mode
    model.eval()
    with torch.no_grad():
        # Tokenize the input sentence
        if isinstance(sentence, str):
            tokens = [token.text for token in en_nlp.tokenizer(sentence)]
        else:
            tokens = [token for token in sentence]

        # Optionally convert tokens to lowercase
        if lower:
            tokens = [token.lower() for token in tokens]

        # Add start-of-sequence and end-of-sequence tokens
        tokens = [sos_token] + tokens + [eos_token]

        # Convert tokens to indices using the English vocabulary
        ids = en_vocab.lookup_indices(tokens)

        # Convert indices to a tensor and move it to the specified device
        tensor = torch.LongTensor(ids).unsqueeze(-1).to(device)

        # Encode the input sentence using the encoder of the model
        hidden, cell = model.encoder(tensor)

        # Initialize the input for the decoder with the start-of-sequence token
        inputs = en_vocab.lookup_indices([sos_token])

        # Translate the input sentence
        for _ in range(max_output_length):
            # Convert the input token to a tensor and move it to the specified device
            inputs_tensor = torch.LongTensor([inputs[-1]]).to(device)

            # Perform one step of decoding using the decoder of the model
            output, hidden, cell = model.decoder(inputs_tensor, hidden, cell)

            # Predict the next token
            predicted_token = output.argmax(-1).item()

            # Add the predicted token to the input for the next step
            inputs.append(predicted_token)

            # Stop translating if the end-of-sequence token is predicted
            if predicted_token == en_vocab[eos_token]:
                break

        # Convert the predicted tokens to tokens using the English vocabulary
        tokens = en_vocab.lookup_tokens(inputs)

    # Return the translated sentence
    return tokens


In [None]:
test_data = [
    "it's an upstairs unit.",
     "that's good, because i don't want to live under people with loud feet.",
     "and it's a corner unit.",
     "that's great. we won't have neighbors on both sides of us.",
     'no pets are allowed.',
     "perfect. we don't have to listen to barking dogs.",
     'and there are only six units in the whole building.',
     'did you call the manager?', "yes. he said he'd come over tomorrow.",
     'did he say what time?']

for ques in test_data:
    response = chat(ques)
    print(response)