# Introduction

This notebook presents a version of the model based on the transformer architecture. The model is trained on a dataset consisting of a variety of dialogs. At the end of the notebook is the model inference code

In [1]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence

import numpy as np
import codecs
import math
import re

# Data Preprocessing Pipeline

In [2]:
# READING THE DATA
lines = open('.../Chatbot/movie_lines.txt', encoding = 'utf-8', errors = 'ignore').read().split('\n')
conversations = open('.../Chatbot/movie_conversations.txt', encoding = 'utf-8', errors = 'ignore').read().split('\n')

FileNotFoundError: [Errno 2] No such file or directory: '.../Chatbot/movie_lines.txt'

In [None]:
# Creating a dictionary that maps each line and its id
id2line = {}
for line in lines:
    _line = line.split(' +++$+++ ')
    if len(_line) == 5:
        id2line[_line[0]] = _line[4]

In [None]:
# Creating a list of all of the conversations
conversations_ids = []
for conversation in conversations[:-1]:
    _conversation = conversation.split(' +++$+++ ')[-1][1:-1].replace("'", "").replace(" ", "")
    conversations_ids.append(_conversation.split(','))

In [None]:
# Getting separately the questions and the answers
questions = []
answers = []
for conversation in conversations_ids:
    for i in range(len(conversation) - 1):
        questions.append(id2line[conversation[i]])
        answers.append(id2line[conversation[i+1]])

In [None]:
# Doing a first cleaning of the texts
def clean_text(text):
    text = text.lower()
    text = re.sub(r"i'm", "i am", text)
    text = re.sub(r"he's", "he is", text)
    text = re.sub(r"she's", "she is", text)
    text = re.sub(r"that's", "that is", text)
    text = re.sub(r"what's", "what is", text)
    text = re.sub(r"where's", "where is", text)
    text = re.sub(r"how's", "how is", text)
    text = re.sub(r"\'ll", " will", text)
    text = re.sub(r"\'ve", " have", text)
    text = re.sub(r"\'re", " are", text)
    text = re.sub(r"\'d", " would", text)
    text = re.sub(r"n't", " not", text)
    text = re.sub(r"won't", "will not", text)
    text = re.sub(r"can't", "cannot", text)
    text = re.sub(r"[-()\"#/@;:<>{}`+=~|.!?,]", "", text)
    return text

In [None]:
# Cleaning the questions
clean_questions = []
for question in questions:
    clean_questions.append(clean_text(question))

# Cleaning the answers
clean_answers = []
for answer in answers:
    clean_answers.append(clean_text(answer))

In [None]:
# Filtering out the questions and answers that are too short or too long
short_questions = []
short_answers = []
i = 0
for question in clean_questions:
    if 2 <= len(question.split()) <= 25:
        short_questions.append(question)
        short_answers.append(clean_answers[i])
    i += 1
clean_questions = []
clean_answers = []
i = 0
for answer in short_answers:
    if 2 <= len(answer.split()) <= 25:
        clean_answers.append(answer)
        clean_questions.append(short_questions[i])
    i += 1

In [None]:
# We create a dictionary of matching index and word and the reverse of it.
vocab = sorted(list(set(''.join(clean_questions + clean_answers))))
#  Add special tokens
vocab.append("<SOS>")
vocab.append("<EOS>")
vocab.append("<PAD>")
vocab.append("<OUT>")
VOCAB_SIZE = len(vocab)

char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = {i:u for i, u in enumerate(vocab)}
# Create encoding and decoding functions
encode = lambda s: [char2idx[c] for c in s]
decode = lambda l: ''.join([idx2char[int(i)] for i in l])

In [None]:
# Translate all question and answer characters in dialogs into indexes
input_questions_as_int = [encode(question) for question in clean_questions]
target_answers_as_int = [encode(answer) for answer in clean_answers]

encoder_input_seqs = input_questions_as_int
decoder_input_seqs = []
decoder_target_seqs = []
# Add the special token indices to the decoder_input_seqs and the decoder_target_seqs
for target_text in target_answers_as_int:
    decoder_input_seqs.append([char2idx['<SOS>']] + target_text)
    decoder_target_seqs.append(target_text + [char2idx['<EOS>']])

In [None]:
# determine the maximum number of sequences (question-answer) of data for training 
SEQS = 15000

encoder_input_seqs = encoder_input_seqs[:SEQS]
decoder_input_seqs = decoder_input_seqs[:SEQS]
decoder_target_seqs = decoder_target_seqs[:SEQS]

In [None]:
# Class for pytorch DataLoader 
class ChatDataset(Dataset):
    def __init__(self, encoder_input_seqs, decoder_input_seqs, decoder_target_seqs):
        self.encoder_input_seqs = encoder_input_seqs
        self.decoder_input_seqs = decoder_input_seqs
        self.decoder_target_seqs = decoder_target_seqs

    def __len__(self):
        return len(self.encoder_input_seqs)

    def __getitem__(self, idx):
        encoder_input = self.encoder_input_seqs[idx]
        decoder_input = self.decoder_input_seqs[idx]
        decoder_target = self.decoder_target_seqs[idx]
        return encoder_input, decoder_input, decoder_target
# Function for converting lists into tensors and converting all encoder_inputs, 
# decoder_inputs, decoder_targets sequences to the same length of each group using paddings
padding_value = char2idx['<PAD>']
def collate_fn(batch, padding_value=char2idx['<PAD>']):
    encoder_inputs, decoder_inputs, decoder_targets = zip(*batch)
    encoder_inputs = [torch.LongTensor(seq) for seq in encoder_inputs]
    decoder_inputs = [torch.LongTensor(seq) for seq in decoder_inputs]
    decoder_targets = [torch.LongTensor(seq) for seq in decoder_targets]

    d_list = encoder_inputs + decoder_inputs + decoder_targets
    d_list_padded = pad_sequence(d_list, batch_first=True, padding_value=padding_value)
    encoder_inputs = d_list_padded[:int(1/3 * len(d_list_padded))]
    decoder_inputs = d_list_padded[int(1/3 * len(d_list_padded)):int(2/3 * len(d_list_padded))]
    decoder_targets = d_list_padded[int(2/3 * len(d_list_padded)):]

    return encoder_inputs, decoder_inputs, decoder_targets

In [None]:
# Mask creation func
def create_self_attention_mask(sequence_length):
    # Create a square matrix (sequence_length, sequence_length)
    mask = torch.triu(torch.ones(sequence_length, sequence_length), diagonal=1).bool() 
    return mask

# Model Building

In [None]:
class Transformer(nn.Module):
    def __init__(self, input_size, output_size, hidden_size, num_layers, num_heads, dropout):
        super().__init__()
        self.encoder = TransformerEncoder(input_size, hidden_size, num_layers, num_heads, dropout)
        self.decoder = TransformerDecoder(input_size, hidden_size, num_layers, num_heads, dropout)
        self.output_linear = nn.Linear(hidden_size, output_size)
        
    def forward(self, source, target):
        encoder_output = self.encoder(source)
        decoder_output = self.decoder(target, encoder_output)
        output = self.output_linear(decoder_output)
        return output
    
class TransformerEncoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_heads, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.pos_encoding = PositionalEncoding(hidden_size, dropout)
        self.encoder_layers = nn.ModuleList([
            TransformerEncoderLayer(hidden_size, num_heads, dropout)
            for _ in range(num_layers)
        ])
    
    def forward(self, x):
             
        out = self.embedding(x)
        out = self.pos_encoding(out)
        
        for layer in self.encoder_layers:
            out = layer(out)

        return out
    
class TransformerDecoder(nn.Module):
    def __init__(self, output_size, hidden_size, num_layers, num_heads, dropout):
        super().__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.pos_encoding = PositionalEncoding(hidden_size, dropout)
        self.decoder_layers = nn.ModuleList([
            TransformerDecoderLayer(hidden_size, num_heads, dropout)
            for _ in range(num_layers)
        ])
        
    def forward(self, x, encoder_output):

        out = self.embedding(x)
        out = self.pos_encoding(out)
        
        mask = create_self_attention_mask(x.size()[1])
        mask = mask.to(device)
        
        for layer in self.decoder_layers:
            out = layer(out, encoder_output, self_attention_mask=mask)

        return out 

class TransformerEncoderLayer(nn.Module):
    def __init__(self, hidden_size, num_heads, dropout):
        super().__init__()
        self.self_attention = MultiHeadAttention(hidden_size, num_heads)
        self.feed_forward = FeedForward(hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm1 = nn.LayerNorm(hidden_size)
        self.layer_norm2 = nn.LayerNorm(hidden_size)
    
    def forward(self, x):

        residual = x # for Add & Norm 1
        out = self.dropout(self.self_attention(x, x, x))
        out = residual + out # Add
        out = self.layer_norm1(out) # Norm
        
        residual = out # for Add & Norm 2
        out = self.dropout(self.feed_forward(out))
        out = residual + out # Add
        out = self.layer_norm2(out) # Norm

        return out
    
class TransformerDecoderLayer(nn.Module):
    def __init__(self, hidden_size, num_heads, dropout):
        super().__init__()
        self.self_attention = MultiHeadAttention(hidden_size, num_heads)
        self.encoder_attention = MultiHeadAttention(hidden_size, num_heads)
        self.feed_forward = FeedForward(hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm1 = nn.LayerNorm(hidden_size)
        self.layer_norm2 = nn.LayerNorm(hidden_size)        
        self.layer_norm3 = nn.LayerNorm(hidden_size)    
        
    def forward(self, x, encoder_output, self_attention_mask=None):

        residual = x # for Add & Norm 1
        out = self.dropout(self.self_attention(x, x, x, mask=self_attention_mask))
        out = residual + out # Add 
        out = self.layer_norm1(out) # Norm
        
        residual = out # for Add & Norm 2
        out = self.dropout(self.encoder_attention(x, encoder_output, encoder_output)) # СДЕЛАТЬ ПОМЕТКУ q, k, v
        out = residual + out # Add
        out = self.layer_norm2(out) # Norm
        
        residual = out # for Add & Norm 3
        out = self.dropout(self.feed_forward(out))
        out = residual + out # Add
        out = self.layer_norm3(out) # Norm

        return out

class MultiHeadAttention(nn.Module):
    def __init__(self, hidden_size, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.hidden_size = hidden_size
        self.head_size = hidden_size // num_heads
        
        self.q_linear = nn.Linear(hidden_size, hidden_size) 
        self.k_linear = nn.Linear(hidden_size, hidden_size) 
        self.v_linear = nn.Linear(hidden_size, hidden_size) 
        self.output_linear = nn.Linear(hidden_size, hidden_size) 
    
    def forward(self, query, key, value, mask=None):

        batch_size = query.size(0)
        
        q = self.q_linear(query)
        k = self.k_linear(key)
        v = self.v_linear(value)
        
        q = self._split_heads(q) 
        k = self._split_heads(k)
        v = self._split_heads(v)

        scores  = torch.matmul(q, k.transpose(-2, -1)) # (-2, -1) dimensions that need to be swapped out 
        scores = scores / (self.head_size ** 0.5) 
        
        if mask is not None:
            
            scores = scores.masked_fill(mask == 0, -1e9)
        
        scores = F.softmax(scores, dim=-1) 
        
        attention = torch.matmul(scores, v) 
        attention = self._combine_heads(attention) # сoncatenation of all self-attention heads
        attention = self.output_linear(attention) 
        
        return attention
    
    # Dividing the hidden_size dimension into self.num_heads and self.heads_size
    def _split_heads(self, x):

        batch_size, sequence_length, hidden_size = x.size()
        x = x.view(batch_size, sequence_length, self.num_heads, self.head_size) # .view() change the dimensions without changing the contents
        x = x.transpose(1, 2)
        x = x.contiguous().view(batch_size * self.num_heads, sequence_length, self.head_size) # .contiguous() create a new tensor with the same content, but with guaranteed consistent placement of elements in memory.

        return x
    # Combining the dimensions of self.num_heads and self.heads_size into hidden_size
    def _combine_heads(self, x):
        batch_size, sequence_length, head_size = x.size()
        x = x.view(batch_size // self.num_heads, sequence_length, self.num_heads * head_size)
        x = x.transpose(1, 2)
        x = x.contiguous().view(batch_size // self.num_heads, sequence_length, self.hidden_size) 
        return x
    
class FeedForward(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.linear1 = nn.Linear(hidden_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        
    def forward(self, x):
        out = F.relu(self.linear1(x))
        out = self.linear2(out)
        return out
    
class PositionalEncoding(nn.Module):
    def __init__(self, hidden_size, dropout, max_length=5000):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        
        position = torch.arange(0, max_length).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_size, 2) * -(math.log(10000.0) / hidden_size))
        pe = torch.zeros(max_length, hidden_size)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

In [None]:
# Define model and training parameters
input_size = VOCAB_SIZE
output_size = VOCAB_SIZE
hidden_size = 512
num_layers = 2
num_heads = 2
dropout = 0.1
learning_rate = 0.001
num_epochs = 1000
batch_size = 256

The model includes two encoder parts and two decoders (num_layers). The number of "heads" in the self-attention mechanisms in the model is 2. In addition, we add 0.1 dropout in some network layers. 

In [None]:
model = Transformer(input_size, output_size, hidden_size, num_layers, num_heads, dropout)

In [None]:
# Loading model parameters from a file
model.load_state_dict(torch.load('.../transformer_model.pth', map_location="cuda" if torch.cuda.is_available() else "cpu"))

# Model Training

In [None]:
# Define the loss function and the optimizer

criterion = nn.CrossEntropyLoss(ignore_index=padding_value)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Create an instance of dataset and data loader
dataset = ChatDataset(encoder_input_seqs, decoder_input_seqs, decoder_target_seqs)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
# Define the device on which the model will be trained
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model.to(device)

In [None]:
# Put the model in training mode
model.train()

# Learning Cycle
for epoch in range(num_epochs):
    total_loss = 0

    for encoder_inputs, decoder_inputs, decoder_targets in dataloader:

        optimizer.zero_grad()

#         Moving the training data to the device
        encoder_inputs = encoder_inputs.to(device)
        decoder_inputs = decoder_inputs.to(device)
        decoder_targets = decoder_targets.to(device)

        output = model(encoder_inputs, decoder_inputs)

        loss = criterion(output.view(-1, output.size(-1)), decoder_targets.view(-1))

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(dataloader)}")

In [None]:
# Saving model parameters
torch.save(model.state_dict(), ".../transformer_model.pth")

# Model Inference

In [None]:
# Inference function
def transformer_inference(input_text, max_output_len):
    model.eval()

    input_text = input_text.lower()

    input_tokens = encode(input_text)
    input_tensor = torch.tensor(input_tokens).unsqueeze(0) 
    
    sos_token = char2idx['<SOS>']
    decoder_inputs = torch.tensor([sos_token]).unsqueeze(0)

    idx = decoder_inputs.to(device)
    input_tensor = input_tensor.to(device)
    while True:
        with torch.no_grad(): 
            output = model(input_tensor, idx)

        logits = output[:, -1, :]
        probs = F.softmax(logits, dim=-1)
#       Sampling one token according to the probability distribution predicted by the model
        idx_next = torch.multinomial(probs, num_samples=1)
#       Concatenate the previous model decoder input with the generated token
        idx = torch.cat((idx, idx_next), dim=1)
        
        if int(idx_next.item()) == char2idx['<EOS>'] or len(idx[0].tolist()) > max_output_len:
            break
    return decode(idx[0])

In [None]:
# INFERENCE EXAMPLE
transformer_inference("i am not stupid enough to repeat your mistakes", 50)