***Fundamentals of Artificial Intelligence***

> **Lab 6:** *Natural Language Processing and Chat Bots* <br>

> **Performed by:** *Corneliu Catlabuga*, group *FAF-213* <br>

> **Verified by:** Elena Graur, asist. univ.

#### Imports

In [101]:
import os

import re

import numpy as np

import pandas as pd

from collections import Counter

from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

from warnings import filterwarnings

In [102]:
filterwarnings('ignore')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Running on {device}")

Running on cpu


#### Task 1

Set up the Telegram Bot. Interact with BotFather on Telegram to obtain an API token. Create your Telegram Bot (its name should follow the pattern FIA_Surname_Name_FAF_21x). Make sure you are able to receive and send requests to it.

1. Bot link: [FIA_Catlabuga_Corneliu_FAF_213](https://t.me/FAFCatlabugaCorneliuFAF213bot)

2. Run `app.py` to start the bot.

#### Task 2

Create a dataset that will serve as a training set for your model. It should follow the rules:
- an entry consists of two parts: the question and the answer;
- there are at least 75 entries written by you in your dataset;
- questions should be something tourists or locals can ask about a new city.

You can increase your dataset by adding open-source data. However, you MUST clearly show the questions written by you. Split your dataset into train and validation.

*Hint: it is recommended to split it into 80% and 20%, but you can adjust it according to your needs.*

#### Dataset

In [103]:
dataset = pd.read_csv('dataset.csv')

questions = dataset['question'].astype(str).tolist()
answers = dataset['answer'].astype(str).tolist()


def clean_text(text: str) -> str:
    text = text.lower()
    text = re.sub(r"[^a-zA-Z0-9?.!,']", " ", text)
    return text.strip()

questions = [clean_text(question) for question in questions]
answers = [clean_text(answer) for answer in answers]

#### Task 3

Use Tensorflow or Pytorch to implement the architecture of the Neural Network you are planning to use. It is highly recommended to use a Seq2Seq model (implement an LSTM or GRU architecture). You are NOT allowed to use pre-built or existing solutions (yep, connecting to GPT will not work).

In [104]:
def tokenize(text: str) -> list:
    return text.split()

def build_vocab(texts: list) -> dict:
    tokens = [token for sentence in texts for token in tokenize(sentence)]
    freq = Counter(tokens)
    vocab = {word: idx+2 for idx, (word, _) in enumerate(freq.most_common())}
    vocab['<UNK>'] = 0
    vocab['<PAD>'] = 1
    return vocab

In [105]:
def vectorize(sentences, vocab, max_len):
    vectors = []
    for sentence in sentences:
        tokens = tokenize(sentence)
        vector = [vocab.get(token, vocab['<UNK>']) for token in tokens]
        if len(vector) < max_len:
            vector += [vocab['<PAD>']] * (max_len - len(vector))
        else:
            vector = vector[:max_len]
        vectors.append(vector)
    return np.array(vectors)

In [106]:
question_vocab = build_vocab(questions)
answer_vocab = build_vocab(answers)

idx_to_word = {idx: word for word, idx in question_vocab.items()}

MAX_LEN_Q = 20
MAX_LEN_A = 20

questions_vec = vectorize(questions, question_vocab, MAX_LEN_Q)
answers_vec = vectorize(answers, answer_vocab, MAX_LEN_A)

#### Task 4

Train your model and fine-tune it based on the chosen performance metrics.

In [107]:
class QADataset(Dataset):
    def __init__(self, questions, answers):
        self.questions = questions
        self.answers = answers

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        return torch.tensor(self.questions[idx], dtype=torch.long), torch.tensor(self.answers[idx], dtype=torch.long)

In [108]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embed_dim, hidden_dim, n_layers):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, batch_first=True)
        self.hidden_dim = hidden_dim

    def forward(self, x):
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.rnn(embedded)
        return hidden, cell

In [109]:
class Decoder(nn.Module):
    def __init__(self, output_dim, embed_dim, hidden_dim, n_layers):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden, cell):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        predictions = self.fc(outputs.squeeze(1))
        return predictions, hidden, cell

In [110]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(src.device)

        hidden, cell = self.encoder(src)

        input = trg[:, 0]
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output
            top1 = output.argmax(1)
            input = trg[:, t] if np.random.random() < teacher_forcing_ratio else top1

        return outputs

In [111]:
train_dataset = QADataset(questions_vec, answers_vec)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

In [112]:
def train(epochs: int = 10, output_path: str = "./output.pth") -> None:
    INPUT_DIM = len(question_vocab)
    OUTPUT_DIM = len(answer_vocab)
    EMBED_DIM = 256
    HIDDEN_DIM = 512
    N_LAYERS = 2

    encoder = Encoder(INPUT_DIM, EMBED_DIM, HIDDEN_DIM, N_LAYERS)
    decoder = Decoder(OUTPUT_DIM, EMBED_DIM, HIDDEN_DIM, N_LAYERS)
    model = Seq2Seq(encoder, decoder).to(device)

    criterion = nn.CrossEntropyLoss(ignore_index=answer_vocab['<PAD>'])
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        loop = tqdm(train_loader)
        for questions, answers in loop:
            loop.set_description(f'Epoch {epoch+1}/{epochs}')
            questions, answers = questions.to(device), answers.to(device)

            optimizer.zero_grad()
            output = model(questions, answers)
            output_dim = output.shape[-1]

            output = output[:, 1:].reshape(-1, output_dim)
            answers = answers[:, 1:].reshape(-1)

            loss = criterion(output, answers)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

            loop.set_postfix(loss=epoch_loss/len(train_loader))

    os.makedirs('models', exist_ok=True)
    torch.save(model.state_dict(), f"./models/{output_path}")

In [113]:
train(50, "./output50.pth")

Epoch 1/50:   0%|          | 0/3 [00:00<?, ?it/s]

Epoch 1/50: 100%|██████████| 3/3 [00:02<00:00,  1.01it/s, loss=5.96]
Epoch 2/50: 100%|██████████| 3/3 [00:02<00:00,  1.11it/s, loss=5.53]
Epoch 3/50: 100%|██████████| 3/3 [00:02<00:00,  1.11it/s, loss=5.28]
Epoch 4/50: 100%|██████████| 3/3 [00:02<00:00,  1.11it/s, loss=5.16]
Epoch 5/50: 100%|██████████| 3/3 [00:02<00:00,  1.12it/s, loss=5.05]
Epoch 6/50: 100%|██████████| 3/3 [00:02<00:00,  1.11it/s, loss=4.94]
Epoch 7/50: 100%|██████████| 3/3 [00:02<00:00,  1.12it/s, loss=4.87]
Epoch 8/50: 100%|██████████| 3/3 [00:02<00:00,  1.17it/s, loss=4.79]
Epoch 9/50: 100%|██████████| 3/3 [00:02<00:00,  1.12it/s, loss=4.73]
Epoch 10/50: 100%|██████████| 3/3 [00:02<00:00,  1.15it/s, loss=4.63]
Epoch 11/50: 100%|██████████| 3/3 [00:02<00:00,  1.12it/s, loss=4.53]
Epoch 12/50: 100%|██████████| 3/3 [00:02<00:00,  1.16it/s, loss=4.47]
Epoch 13/50: 100%|██████████| 3/3 [00:02<00:00,  1.14it/s, loss=4.37]
Epoch 14/50: 100%|██████████| 3/3 [00:02<00:00,  1.15it/s, loss=4.33]
Epoch 15/50: 100%|██████████|

#### Task 5

Integrate your model into your Telegram ChatBot, so that the sent messages are taken as input by the model and its output is sent back as a reply.

In [129]:
model_path = "./models/output50.pth"

encoder = Encoder(len(question_vocab), 256, 512, 2).to(device)
decoder = Decoder(len(answer_vocab), 256, 512, 2).to(device)
model = Seq2Seq(encoder, decoder).to(device)

model.load_state_dict(torch.load(model_path))
model.eval()

def decode_output(output, idx_to_word):
    tokens = [idx_to_word[idx] for idx in output if idx != answer_vocab['<PAD>']]
    return ' '.join(tokens)

def answer(question: str) -> str:
    question = clean_text(question)
    question_vec = vectorize([question], question_vocab, MAX_LEN_Q)
    question_tensor = torch.tensor(question_vec, dtype=torch.long).to(device)

    with torch.no_grad():
        hidden, cell = model.encoder(question_tensor)
        input = torch.tensor([answer_vocab['<PAD>']], dtype=torch.long).to(device)
        answer = []
        for _ in range(MAX_LEN_A):
            output, hidden, cell = model.decoder(input, hidden, cell)
            top1 = output.argmax(1)
            answer.append(top1.item())
            input = top1

            if top1.item() == answer_vocab['<PAD>']:
                break

    predicted_answer = decode_output(answer, idx_to_word)
    return predicted_answer

In [131]:
print(answer("Is Plutan restaurant kid-friendly?"))

KeyError: 282

#### Task 6

Handle potential errors that may occur, such as model errors or invalid inputs.