In [None]:
import torch
import torch.nn as nn
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence

qa_dataset = [
    {
    'context': 'My name is AIVN and I am from Vietnam.',
    'question': 'Where does AIVN come from?',
    'answer': 'Vietnam'
    },
    {
        'context': 'I love painting and my favorite artist is Van Gogh.',
        'question': 'What is my favorite activity?',
        'answer': 'painting'
    },
    {
        'context': 'I am studying computer science at the University of Tokyo',
        'question': 'Where do I live?',
        'answer': 'Tokyo'
    },
    {
        'context': 'I was born in Paris, but now I live in New York',
        'question': 'Where do I live now',
        'answer': 'New York'
    },
]

data_size = len(qa_dataset)
data_size




4

In [None]:
# Define tokenizer function
tokenizer = get_tokenizer('basic_english')

def yield_tokens(data_iter):
    for item in data_iter:
        yield tokenizer('<cls> ' + item['context'] + ' <sep> ' + item['question'])

# Create vocabulary
vocab = build_vocab_from_iterator(
    yield_tokens(qa_dataset),
    specials=['<unk>', '<pad>', '<bos>', '<eos>', '<sep>', '<cls>'])

vocab.set_default_index(vocab["<unk>"])
vocab.get_stoi()

{'what': 45,
 'was': 44,
 'vietnam': 43,
 'van': 42,
 'university': 41,
 'paris': 36,
 'painting': 35,
 'of': 34,
 'new': 33,
 'tokyo': 40,
 'name': 32,
 'love': 31,
 'gogh': 30,
 'does': 29,
 'my': 10,
 '<sep>': 4,
 '<bos>': 2,
 'is': 8,
 'at': 24,
 'science': 37,
 '?': 7,
 'where': 11,
 '<cls>': 5,
 'the': 39,
 'in': 19,
 '<eos>': 3,
 'come': 27,
 '<pad>': 1,
 'computer': 28,
 '<unk>': 0,
 'studying': 38,
 'i': 6,
 'aivn': 13,
 'and': 15,
 'artist': 23,
 'favorite': 17,
 'york': 46,
 'am': 14,
 'do': 16,
 'now': 20,
 ',': 21,
 'live': 9,
 'activity': 22,
 '.': 12,
 'born': 25,
 'from': 18,
 'but': 26}

In [None]:
pad_idx = vocab['<pad>']

def pad_and_truncate(input_ids, max_length):
    if len(input_ids) > max_length:
        input_ids = input_ids[:max_length]
    elif len(input_ids) < max_length:
        input_ids += [vocab['<pad>']] * (max_length - len(input_ids))
    return input_ids

max_length = 30
text = 'I love AIVN'
tokens = tokenizer(text)
input_ids = vocab(tokens)
input_ids = pad_and_truncate(input_ids, max_length)
input_ids

[6,
 31,
 13,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1]

In [None]:
def vectorize(question, context, answer):
  input_text = question + ' <sep> ' + context
  input_ids = [vocab[token] for token in tokenizer(input_text)]
  input_ids = pad_and_truncate(input_ids, max_length)

  answer_ids = [vocab[token] for token in tokenizer(answer)]
  start_pos = input_ids.index(answer_ids[0])
  end_pos = start_pos + len(answer_ids) - 1

  input_ids = torch.tensor(input_ids, dtype=torch.long)
  start_pos = torch.tensor(start_pos, dtype=torch.long)
  end_pos = torch.tensor(end_pos, dtype=torch.long)

  return input_ids, start_pos, end_pos

vectorize (
    'What is your name?',
    'My name is AIVN',
    'AIVN'
)

(tensor([45,  8,  0, 32,  7,  4, 10, 32,  8, 13,  1,  1,  1,  1,  1,  1,  1,  1,
          1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1]),
 tensor(9),
 tensor(9))

In [None]:
class QADataset(Dataset):
  def __init__(self, data):
    self.data = data

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    item = self.data[idx]
    question = item['question']
    context = item['context']
    answer = item['answer']

    input_ids, start_pos, end_pos = vectorize(question, context, answer)



    return input_ids, start_pos, end_pos

In [None]:
def decode(input_ids):
  return ' '.join([vocab.lookup_token(token) for token in input_ids])

In [None]:
train_dataset = QADataset(qa_dataset)
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)

In [None]:
batch = next(iter(train_loader))
batch

[tensor([[11, 16,  6,  9,  7,  4,  6, 14, 38, 28, 37, 24, 39, 41, 34, 40,  1,  1,
           1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1],
         [45,  8, 10, 17, 22,  7,  4,  6, 31, 35, 15, 10, 17, 23,  8, 42, 30, 12,
           1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1]]),
 tensor([15,  9]),
 tensor([15,  9])]

In [None]:
import math
import torch.nn as nn
import torch.optim as optim

class TransformerBlock(nn.Module):
    def __init__ (self, embed_dim, num_heads, ff_dim):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads)
        self.ffn = nn.Linear(in_features=embed_dim,
                             out_features=ff_dim)
        self.layernorm_1 = nn.LayerNorm(normalized_shape=embed_dim)
        self.layernorm_2 = nn.LayerNorm(normalized_shape=embed_dim)

    def forward(self, query, key, value):
        attn_output, _ = self.attn(query, key, value)
        out_1 = self.layernorm_1(query + attn_output)
        ffn_output = self.ffn(out_1)
        x = self.layernorm_2(out_1 + ffn_output)
        return x

In [None]:
class PositionalEncoding(nn.Module):
  def __init__ (self, d_model, max_len=5000):
    super(PositionalEncoding, self).__init__()
    pe = torch.zeros(max_len, d_model)
    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    pe = pe.unsqueeze(0).transpose(0, 1)
    self.register_buffer('pe', pe)

  def forward(self, x):
    x = x + self.pe[:x.size(0), :]
    return x


In [None]:
class QAModel(nn.Module):
  def __init__(self, vocab_size, embedding_dim, n_heads, ff_dim, max_len):
    super(QAModel, self).__init__()
    self.input_embedding = nn.Embedding(vocab_size, embedding_dim)
    self.positional_encoding = PositionalEncoding(embedding_dim, max_len)
    self.transformer = TransformerBlock(embedding_dim, n_heads, ff_dim)

    self.start_linear = nn.Linear(ff_dim, 1)
    self.end_linear = nn.Linear(ff_dim, 1)

  def forward(self, text):
    input_embeded = self.input_embedding(text)
    input_embeded = self.positional_encoding(input_embeded)
    transformer_out = self.transformer(input_embeded, input_embeded, input_embeded)
    start_logits = self.start_linear(transformer_out).squeeze(-1)
    end_logits = self.end_linear(transformer_out).squeeze(-1)

    return start_logits, end_logits

In [None]:
# Model params
Embedding_dims = 128
FF_dim = 128
N_heads = 1
vocab_size = len(vocab.get_stoi().values())

model = QAModel(vocab_size, Embedding_dims, N_heads, FF_dim, max_length)

input = torch.randint(0, 10, size=(1, 10))
print (input.shape)

model.eval()
with torch.no_grad():
  start_logit, end_logit = model(input)
print ("Shape of start logits", start_logit.shape)
print ("Shape of end logits", end_logit.shape)


torch.Size([1, 10])
Shape of start logits torch.Size([1, 10])
Shape of end logits torch.Size([1, 10])


In [None]:
LR = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss()
EPOCHS = 15

In [None]:
model.train()
for epoch in range(EPOCHS):
  for idx, (input_ids, start_pos, end_pos) in enumerate(train_loader):
    optimizer.zero_grad()
    start_logit, end_logit = model(input_ids)
    start_loss = criterion(start_logit, start_pos)
    end_loss = criterion(end_logit, end_pos)
    loss = (start_loss + end_loss) / 2
    loss.backward()
    optimizer.step()
    print (f'Epoch: {epoch+1}/{EPOCHS}, Batch: {idx+1}/{len(train_loader)}, Loss: {loss.item()}')

Epoch: 1/15, Batch: 1/2, Loss: 3.691862106323242
Epoch: 1/15, Batch: 2/2, Loss: 3.517197608947754
Epoch: 2/15, Batch: 1/2, Loss: 3.0236682891845703
Epoch: 2/15, Batch: 2/2, Loss: 2.997326374053955
Epoch: 3/15, Batch: 1/2, Loss: 2.7603917121887207
Epoch: 3/15, Batch: 2/2, Loss: 2.162611961364746
Epoch: 4/15, Batch: 1/2, Loss: 2.1418211460113525
Epoch: 4/15, Batch: 2/2, Loss: 1.6836179494857788
Epoch: 5/15, Batch: 1/2, Loss: 1.5318635702133179
Epoch: 5/15, Batch: 2/2, Loss: 1.2931123971939087
Epoch: 6/15, Batch: 1/2, Loss: 1.0562162399291992
Epoch: 6/15, Batch: 2/2, Loss: 0.8903276324272156
Epoch: 7/15, Batch: 1/2, Loss: 0.6317712068557739
Epoch: 7/15, Batch: 2/2, Loss: 0.5451221466064453
Epoch: 8/15, Batch: 1/2, Loss: 0.9038580656051636
Epoch: 8/15, Batch: 2/2, Loss: 0.39669689536094666
Epoch: 9/15, Batch: 1/2, Loss: 0.2213417887687683
Epoch: 9/15, Batch: 2/2, Loss: 0.08456418663263321
Epoch: 10/15, Batch: 1/2, Loss: 0.2979718744754791
Epoch: 10/15, Batch: 2/2, Loss: 0.20126160979270935

In [None]:
model.eval()
with torch.no_grad():
  sample = qa_dataset[3]
  context, question, answer = sample.values()
  input_ids, start_pos, end_pos = vectorize(question, context, answer)
  input_ids = input_ids.unsqueeze(0)
  start_logit, end_logit = model(input_ids)

  offset = len(tokenizer(question)) + 1

  start_position = torch.argmax(start_logit, dim=1).numpy()[0]
  end_position = torch.argmax(end_logit, dim=1).numpy()[0]

  start_position -= offset
  end_position -= offset

  start_position = max(start_position, 0)
  end_position = min(end_position, len(tokenizer(context)) - 1)

  if end_position >= start_position:
    # Extracted the predicted answer span
    context_tokens = tokenizer(context)
    predicted_answer = ' '.join(context_tokens[start_position:end_position+1])
  else:
    predicted_answer = ""

  print(f"Question: {question}")
  print(f"Context: {context}")
  print (f"Start Position: {start_position}")
  print (f"End Position: {end_position}")
  print(f"Predicted Answer: {predicted_answer}")

Question: Where do I live now
Context: I was born in Paris, but now I live in New York
Start Position: 11
End Position: 12
Predicted Answer: new york
