In [1]:
import torch
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import Vocab, build_vocab_from_iterator, vocab
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Subset
from collections import Counter
import os
import spacy
import csv
from sklearn.model_selection import train_test_split

In [2]:
! spacy download en_core_web_lg

Collecting en-core-web-lg==3.5.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.5.0/en_core_web_lg-3.5.0-py3-none-any.whl (587.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m587.7/587.7 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: en-core-web-lg
Successfully installed en-core-web-lg-3.5.0
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_lg')


In [3]:
nlp = spacy.load("en_core_web_lg")

In [4]:
 ! pip install -q kaggle

In [5]:
os.environ['KAGGLE_USERNAME'] = "" # username from the json file
os.environ['KAGGLE_KEY'] = "" # key from the json file
!kaggle datasets download -d rtatman/ubuntu-dialogue-corpus

Downloading ubuntu-dialogue-corpus.zip to /content
100% 799M/799M [00:20<00:00, 43.7MB/s]
100% 799M/799M [00:20<00:00, 40.0MB/s]


In [6]:
! unzip ubuntu-dialogue-corpus.zip

Archive:  ubuntu-dialogue-corpus.zip
  inflating: Ubuntu-dialogue-corpus/dialogueText.csv  
  inflating: Ubuntu-dialogue-corpus/dialogueText_196.csv  
  inflating: Ubuntu-dialogue-corpus/dialogueText_301.csv  
  inflating: toc.csv                 


In [7]:

def question_tokenizer(text):
    return [token.text.strip().lower() for token in nlp.tokenizer(text) if not token.is_punct and token.text.strip() != '']

def answer_tokenizer(text):
    return [token.text.strip().lower() for token in nlp.tokenizer(text) if not token.is_punct and token.text.strip() != '']

In [8]:

question_answer_pairs = []
with open('Ubuntu-dialogue-corpus/dialogueText.csv', newline='') as csvfile:
    dialogue = csv.reader(csvfile)
    question = ''
    answer = ''
    previous_frm = 'Na'
    previous_to = 'Na'
    for idx, row in enumerate(dialogue):
      # skip the first row
      if idx ==0:
        continue


      frm = row[3]
      to = row[4]
      text = row[5]

      #TODO this logic doesn't account for multiple answers to the samae question

      # if the text is to the previous from it's an answer
      if to == previous_frm:
        answer += f' {text}'
      # if the from and to are still the same it's the sam question
      elif frm == previous_frm and to == previous_to:
        #question += f' {text}'
        question = text
      # if the from or to are different we assume it's a new question
      elif frm != previous_frm or to != previous_to:
        # if the previous question got an answer add it

        if answer and len(question.split(' ')) < 20:
          question_answer_pairs.append((question, answer))
        question = text
        answer = ''
      else:
        raise ValueError(f'unknown state: {frm} - {to} - {previous_frm} -- {previous_to}')

      previous_frm = frm
      previous_to = to



In [85]:
len(question_answer_pairs)
#question_answer_pairs = question_answer_pairs[:10]

In [9]:


def question_iter():
  for question, _ in question_answer_pairs:
    yield question_tokenizer(question)

def answer_iter():
  for _, answer in question_answer_pairs:
    yield answer_tokenizer(answer)

vocab_q = build_vocab_from_iterator(question_iter(), specials=['<pad>','<bos>','<eos>'])
vocab_a = build_vocab_from_iterator(answer_iter(), specials=['<pad>','<bos>','<eos>'])

In [87]:
# different way to build vocab.
def build_vocab():
  counter_q = Counter()
  counter_a = Counter()
  for question, answer in question_answer_pairs:
      counter_q.update(question_tokenizer(question))
      counter_a.update(answer_tokenizer(answer))
  return vocab(counter_q, specials=['<unk>', '<pad>', '<bos>', '<eos>']), vocab(counter_a, specials=['<unk>', '<pad>', '<bos>', '<eos>'])


vocab_q, vocab_a = build_vocab()

In [103]:
vocab_a['<eos>']

2

In [32]:
max(vocab_q.get_stoi().values())

3

In [10]:
data = []
for question, answer in question_answer_pairs:
  question_tensor_ = torch.tensor([vocab_q[token] for token in question_tokenizer(question)],
                            dtype=torch.long)
  answer_tensor_ = torch.tensor([vocab_a[token] for token in answer_tokenizer(answer)],
                            dtype=torch.long)
  data.append((question_tensor_, answer_tensor_))

In [137]:
torch.cuda.is_available()

True

In [11]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#device = torch.device('cpu')

BATCH_SIZE = 64
PAD_IDX = vocab_a['<pad>']
BOS_IDX = vocab_a['<bos>']
EOS_IDX = vocab_a['<eos>']

def generate_batch(data_batch):
  batch_question, batch_answer = [], []
  for question, answer in data_batch:
    batch_answer.append(torch.cat([torch.tensor([BOS_IDX]), answer, torch.tensor([EOS_IDX])], dim=0))
    batch_question.append(torch.cat([torch.tensor([BOS_IDX]), question, torch.tensor([EOS_IDX])], dim=0))
  batch_answer = pad_sequence(batch_answer, padding_value=PAD_IDX)
  batch_question = pad_sequence(batch_question, padding_value=PAD_IDX)
  return batch_question, batch_answer

train_idx, test_idx = train_test_split(list(range(len(data))), test_size=0.1)
train_idx, val_idx = train_test_split(train_idx, test_size=0.1)


train_iter = DataLoader(Subset(data, train_idx), batch_size=BATCH_SIZE, shuffle=False, collate_fn=generate_batch)
test_iter = DataLoader(Subset(data, test_idx), batch_size=BATCH_SIZE, shuffle=False, collate_fn=generate_batch)
val_iter = DataLoader(Subset(data, val_idx), batch_size=BATCH_SIZE, shuffle=False, collate_fn=generate_batch)

In [133]:
_, value = next(enumerate(train_iter))

In [134]:
vocab_q.lookup_tokens([i[0].item() for i in value[0]])

['<bos>',
 'how',
 'do',
 'i',
 'install',
 '.deb',
 'files',
 'too',
 '<eos>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>']

In [135]:
vocab_a.lookup_tokens([i[0].item() for i in value[1]])

['<bos>',
 'search',
 'automatrix',
 'on',
 'the',
 'forums',
 '<eos>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>']

In [12]:
from binascii import Error
import random
from typing import Tuple

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch import Tensor


class Encoder(nn.Module):
    def __init__(self,
                 input_dim: int,
                 emb_dim: int,
                 enc_hid_dim: int,
                 dec_hid_dim: int,
                 dropout: float):
        super().__init__()

        self.input_dim = input_dim
        self.emb_dim = emb_dim
        self.enc_hid_dim = enc_hid_dim
        self.dec_hid_dim = dec_hid_dim
        self.dropout = dropout

        self.embedding = nn.Embedding(input_dim, emb_dim)

        self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional = True)

        self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self,
                src: Tensor) -> Tuple[Tensor]:
        try:
          embedded = self.dropout(self.embedding(src))

          outputs, hidden = self.rnn(embedded)

          hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))

          return outputs, hidden
        except Error as e:
          print(e)
          raise


class Attention(nn.Module):
    def __init__(self,
                 enc_hid_dim: int,
                 dec_hid_dim: int,
                 attn_dim: int):
        super().__init__()

        self.enc_hid_dim = enc_hid_dim
        self.dec_hid_dim = dec_hid_dim

        self.attn_in = (enc_hid_dim * 2) + dec_hid_dim

        self.attn = nn.Linear(self.attn_in, attn_dim)

    def forward(self,
                decoder_hidden: Tensor,
                encoder_outputs: Tensor) -> Tensor:

        src_len = encoder_outputs.shape[0]

        repeated_decoder_hidden = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1)

        encoder_outputs = encoder_outputs.permute(1, 0, 2)

        energy = torch.tanh(self.attn(torch.cat((
            repeated_decoder_hidden,
            encoder_outputs),
            dim = 2)))

        attention = torch.sum(energy, dim=2)

        return F.softmax(attention, dim=1)


class Decoder(nn.Module):
    def __init__(self,
                 output_dim: int,
                 emb_dim: int,
                 enc_hid_dim: int,
                 dec_hid_dim: int,
                 dropout: int,
                 attention: nn.Module):
        super().__init__()

        self.emb_dim = emb_dim
        self.enc_hid_dim = enc_hid_dim
        self.dec_hid_dim = dec_hid_dim
        self.output_dim = output_dim
        self.dropout = dropout
        self.attention = attention

        self.embedding = nn.Embedding(output_dim, emb_dim)

        self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)

        self.out = nn.Linear(self.attention.attn_in + emb_dim, output_dim)

        self.dropout = nn.Dropout(dropout)


    def _weighted_encoder_rep(self,
                              decoder_hidden: Tensor,
                              encoder_outputs: Tensor) -> Tensor:

        a = self.attention(decoder_hidden, encoder_outputs)

        a = a.unsqueeze(1)

        encoder_outputs = encoder_outputs.permute(1, 0, 2)

        weighted_encoder_rep = torch.bmm(a, encoder_outputs)

        weighted_encoder_rep = weighted_encoder_rep.permute(1, 0, 2)

        return weighted_encoder_rep


    def forward(self,
                input: Tensor,
                decoder_hidden: Tensor,
                encoder_outputs: Tensor) -> Tuple[Tensor]:

        input = input.unsqueeze(0)

        embedded = self.dropout(self.embedding(input))

        weighted_encoder_rep = self._weighted_encoder_rep(decoder_hidden,
                                                          encoder_outputs)

        rnn_input = torch.cat((embedded, weighted_encoder_rep), dim = 2)

        output, decoder_hidden = self.rnn(rnn_input, decoder_hidden.unsqueeze(0))

        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted_encoder_rep = weighted_encoder_rep.squeeze(0)

        output = self.out(torch.cat((output,
                                     weighted_encoder_rep,
                                     embedded), dim = 1))

        return output, decoder_hidden.squeeze(0)


class Seq2Seq(nn.Module):
    def __init__(self,
                 encoder: nn.Module,
                 decoder: nn.Module,
                 device: torch.device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self,
                src: Tensor,
                trg: Tensor,
                teacher_forcing_ratio: float = 0.5) -> Tensor:

        batch_size = src.shape[1]
        max_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim




        outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device)

        encoder_outputs, hidden = self.encoder(src)

        # first input to the decoder is the <sos> token
        output = trg[0,:]

        for t in range(1, max_len):
            output, hidden = self.decoder(output, hidden, encoder_outputs)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.max(1)[1]
            output = (trg[t] if teacher_force else top1)
            #print(vocab_a.lookup_token(trg[t][0].item()))
        return outputs


INPUT_DIM = len(vocab_q)
OUTPUT_DIM = len(vocab_a)
#ENC_EMB_DIM = 256
#DEC_EMB_DIM = 256
#ENC_HID_DIM = 512
#DEC_HID_DIM = 512
#ATTN_DIM = 64
#ENC_DROPOUT = 0.5
#DEC_DROPOUT = 0.5

ENC_EMB_DIM = 32
DEC_EMB_DIM = 32
ENC_HID_DIM = 64
DEC_HID_DIM = 64
ATTN_DIM = 8
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

enc = Encoder(INPUT_DIM, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_DROPOUT)

attn = Attention(ENC_HID_DIM, DEC_HID_DIM, ATTN_DIM)

dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_DROPOUT, attn)

model = Seq2Seq(enc, dec, device).to(device)


def init_weights(m: nn.Module):
    for name, param in m.named_parameters():
        if 'weight' in name:
            nn.init.normal_(param.data, mean=0, std=0.01)
        else:
            nn.init.constant_(param.data, 0)


model.apply(init_weights)

optimizer = optim.Adam(model.parameters())


def count_parameters(model: nn.Module):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 20,206,144 trainable parameters


In [36]:
print(OUTPUT_DIM)

69114


In [13]:
PAD_IDX = vocab_a['<pad>']

criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

In [None]:
import math
import time


def train(model: nn.Module,
          iterator: torch.utils.data.DataLoader,
          optimizer: optim.Optimizer,
          criterion: nn.Module,
          clip: float):

    model.train()

    epoch_loss = 0

    for _, (src, trg) in enumerate(iterator):
        src, trg = src.to(device), trg.to(device)

        optimizer.zero_grad()

        output = model(src, trg)

        output = output[1:].view(-1, output.shape[-1])
        trg = trg[1:].view(-1)

        loss = criterion(output, trg)

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)


def evaluate(model: nn.Module,
             iterator: torch.utils.data.DataLoader,
             criterion: nn.Module):

    model.eval()

    epoch_loss = 0

    with torch.no_grad():

        for _, (src, trg) in enumerate(iterator):
            src, trg = src.to(device), trg.to(device)

            output = model(src, trg, 0) #turn off teacher forcing

            output = output[1:].view(-1, output.shape[-1])
            trg = trg[1:].view(-1)


            loss = criterion(output, trg)

            epoch_loss += loss.item()

    return epoch_loss / len(iterator)


def epoch_time(start_time: int,
               end_time: int):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs


N_EPOCHS = 1
CLIP = 1

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()

    train_loss = train(model, train_iter, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, val_iter, criterion)

    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

    torch.save(model.state_dict(), '/content/drive/MyDrive/Colab Notebooks/model.data')

test_loss = evaluate(model, test_iter, criterion)

print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')

In [41]:
torch.save(model.state_dict(), '/content/drive/MyDrive/Colab Notebooks/model.data')

In [16]:
#model = Seq2Seq(*args, **kwargs)
model.load_state_dict(torch.load('/content/drive/MyDrive/Colab Notebooks/model.data'))
model.eval()

RuntimeError: ignored

In [33]:

def generate_response(sentence, src_field, trg_field, model, device, max_len = 50):

    model.eval()

    tokens = question_tokenizer(sentence)


    tokens = ['<bos>'] + tokens + ['<eos>']

    src_indexes = [src_field[token] for token in tokens]

    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)

    src_len = torch.LongTensor([len(src_indexes)])

    with torch.no_grad():
        encoder_outputs, hidden = model.encoder(src_tensor)

    trg_indexes = [trg_field['<bos>']]

    for i in range(max_len):

        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)

        with torch.no_grad():
            output, hidden = model.decoder(trg_tensor, hidden, encoder_outputs)


        pred_token = output.argmax(1).item()

        trg_indexes.append(pred_token)

        if pred_token == trg_field['<eos>']:
            break

    trg_tokens = trg_field.lookup_tokens(trg_indexes)

    return trg_tokens[1:]

In [35]:
generate_response('what is the command', vocab_q, vocab_a, model, device)

['', 'i', '<eos>']

In [None]:
! pip install nltk
! pip install tabulate
import nltk
nltk.download('punkt')
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu

reference_translations must be testoutput
generated_outputs must be the outputs generated by the model
in 2dArray format

In [None]:
reference_translations = [['I love cats'], ['She likes dogs']]
generated_outputs = ['I love dogs', 'He likes cats']

BLEU Score

In [None]:
import nltk
from nltk.translate.bleu_score import corpus_bleu

reference_translations_tokenized = [[nltk.word_tokenize(sentence) for sentence in reference] for reference in reference_translations]
generated_outputs_tokenized = [nltk.word_tokenize(sentence) for sentence in generated_outputs]

bleu_score = corpus_bleu(reference_translations_tokenized, generated_outputs_tokenized)
print("BLEU score:", bleu_score)

Exact Match Ratio

In [None]:
exact_match_count = sum([1 if ref == gen else 0 for ref, gen in zip(reference_translations, generated_outputs)])
exact_match_ratio = exact_match_count / len(reference_translations) * 100
print("Exact Match Ratio:", exact_match_ratio)

Word Error Rate (WER)

In [None]:
def wer(r, h):
    # Initialize the dynamic programming table
    d = [[0] * (len(h) + 1) for _ in range(len(r) + 1)]

    for i in range(len(r) + 1):
        for j in range(len(h) + 1):
            if i == 0:
                d[i][j] = j
            elif j == 0:
                d[i][j] = i
            elif r[i - 1] == h[j - 1]:
                d[i][j] = d[i - 1][j - 1]
            else:
                d[i][j] = 1 + min(d[i - 1][j], d[i][j - 1], d[i - 1][j - 1])

    return float(d[len(r)][len(h)]) / len(r)

wer_score = wer([word for ref in reference_translations for word in nltk.word_tokenize(ref[0])],
               [word for gen in generated_outputs for word in nltk.word_tokenize(gen)])
print("Word Error Rate (WER):", wer_score)

Token Precision, Recall and F1 score

In [None]:
def calculate_token_overlap(reference_tokens, generated_tokens):
    common_tokens = set(reference_tokens) & set(generated_tokens)
    precision = len(common_tokens) / len(generated_tokens) if len(generated_tokens) > 0 else 0
    recall = len(common_tokens) / len(reference_tokens) if len(reference_tokens) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    return precision, recall, f1_score

all_reference_tokens = [word for ref in reference_translations for word in nltk.word_tokenize(ref[0])]
all_generated_tokens = [word for gen in generated_outputs for word in nltk.word_tokenize(gen)]

precision, recall, f1_score = calculate_token_overlap(all_reference_tokens, all_generated_tokens)
print("Token Precision:", precision)
print("Token Recall:", recall)
print("Token F1-score:", f1_score)


Cosine Similarity

In [None]:
import nltk
from gensim.models import Word2Vec
from sklearn.metrics.pairwise import cosine_similarity

reference_translations_tokenized = [[token.lower() for token in nltk.word_tokenize(sentence)] for sentence in [ref[0] for ref in reference_translations]]
generated_outputs_tokenized = [[token.lower() for token in nltk.word_tokenize(sentence)] for sentence in generated_outputs]

# Train a Word2Vec model on a large corpus (not provided in this example)
model = Word2Vec(sentences=reference_translations_tokenized + generated_outputs_tokenized, vector_size=100, window=5, min_count=1, sg=1)

# Get word embeddings for each token in the reference translations and generated outputs
reference_embeddings = [model.wv[token] for ref in reference_translations_tokenized for token in ref]
generated_embeddings = [model.wv[token] for token in generated_outputs_tokenized]

# Calculate the semantic similarity (cosine similarity) between the embeddings
cosine_similarities = cosine_similarity(reference_embeddings, generated_embeddings)
average_cosine_similarity = cosine_similarities.mean()

print("Average Semantic Similarity (Cosine Similarity):", average_cosine_similarity)