In [None]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from data_handler import *

In [None]:
# Creates source and target tensors for the pairs: 
# Shape of source (NumExamples, MaxSentLength), Shape of target (NumExamples, MaxSentLength). 

# Ensures source and targets have the same shape by padding them to the same length with end tokens. 
# Transformer implementation predicts one token for each input token
# During inference sufficiently pad the input with end tokens.

def convert_dataset_to_tensor(data_pairs, max_len):
    source_rows = []
    target_rows = []
    for pair in data_pairs:
        source, target = pair
        
        source_index_list = string_to_index_list(source, char_to_index, end_token)
        source_index_list = source_index_list + [end_token for et in range(max_len - (len(source_index_list)))]
        
        target_index_list = [start_token] + string_to_index_list(target, char_to_index, end_token)
        target_index_list = target_index_list + [end_token for et in range(max_len - (len(target_index_list)))]
        
        source_rows.append( torch.LongTensor(source_index_list) )
        target_rows.append( torch.LongTensor(target_index_list) )
        
    source_tensors = torch.stack(source_rows)
    target_tensors = torch.stack(target_rows)

    return source_tensors, target_tensors

line_pairs, vocab_size, idx_dict = load_data()

char_to_index = idx_dict['char_to_index']
start_token = idx_dict['start_token']
end_token = idx_dict['end_token']

num_lines = len(line_pairs)
num_train = int(0.8 * num_lines)
train_pairs, val_pairs = line_pairs[:num_train], line_pairs[num_train:]

source_strings = [pair[0] for pair in line_pairs]
target_strings = [pair[1] for pair in line_pairs]

max_input_len = max([ len(source_string)+1 for source_string in source_strings])
max_target_len = max([ len(target_string)+2 for target_string in target_strings])
max_len = max(max_input_len, max_target_len)

train_inputs, train_targets = convert_dataset_to_tensor(train_pairs, max_len)
val_inputs, val_targets = convert_dataset_to_tensor(val_pairs, max_len)

print ("Train Sequences", train_inputs.size(), train_targets.size())
print ("Val Sequences", val_inputs.size(), val_targets.size())

In [None]:
class TransformerModel(nn.Module):

    def __init__(self, ntoken, emb_size, nhead, nhid, nlayers):
        """
        emb_size: Embedding Size for the input
        ntoken: Number of tokens Vocab Size
        nhead: Number of transformer heads in the encoder
        nhid: Number of hidden units in transformer encoder layer
        nlayer: Number of layers in transformer encoder
        """
        super(TransformerModel, self).__init__()
        from torch.nn import TransformerEncoder, TransformerEncoderLayer
        
        # Initialized position input embedding, position encoding layers
        self.input_embedding = nn.Embedding(ntoken, emb_size)
        self.pos_encoding = PositionalEncoding(emb_size)
        
        # Initialized transformer encoder with nlayers and each layer having nhead heads and nhid hidden units.
        self.encoder_layers = TransformerEncoderLayer(emb_size, nhead, nhid)
        self.transformer_encoder = TransformerEncoder(self.encoder_layers, nlayers)
        
        # Decoder implemented as a linear layer on top of encoding layer
        self.decoder = nn.Linear(emb_size, ntoken)
        
    def forward(self, src):
        """
        src: tensor of shape (seq_len, batch_size)
        
        Returns:
            output: tensor of shape (seq_len, batch_size, vocab_size)
        """
        # Embed the source sequences and add the positional encoding.
        src = self.input_embedding(src)
        src = self.pos_encoding(src)
        # Pass the sequence to the transformer encoder
        output = self.transformer_encoder(src)
        # Generate and return scores using decoder linear layer
        output = self.decoder(output)
        
        return output

class PositionalEncoding(nn.Module):
    """
    Adds positional embedding to the input for conditioning on time. 
    From the paper "Attention is all you need"
    """
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        x: tensor of shape (seq_len, batch_size, embedding_size)
        Returns:
            x: tensor of shape (seq_len, batch_size, embedding_size)
        """
        x = x + self.pe[:x.size(0), :]
        return x

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ntokens = vocab_size # the size of vocabulary
batch_size = 16
emsize = 50 # embedding dimension
nhid = 50 # the dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 2 # the number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 2 # the number of heads in the multiheadattention models
lr = 0.002 # learning rate
epochs = 50 # The number of epochs

model = TransformerModel(ntokens, emsize, nhead, nhid, nlayers).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

In [None]:
def get_batch(inputs, targets, batch_no, batch_size, max_num):
    start = batch_no * batch_size
    end = min(start + batch_size, max_num)
    
    return inputs[start:end].t(),targets[start:end].t()

In [None]:
def train(net):
    # Training Loop
    for epoch in range(epochs):
        num_batches = int(np.ceil(train_inputs.size()[0] / float(batch_size)))
        train_loss = 0.0

        for batch_no in range(num_batches):
            # Read input for a batch
            inputs, target = get_batch(train_inputs, train_targets, batch_no, batch_size, train_inputs.size()[0])

            # Process input to the model, get the ouput, compute loss
            optimizer.zero_grad()
            output = net(inputs)
            output_flatten = output.view(-1,ntokens)
            
            # CELoss - Output is (N,C) and Target is (N)
            loss = criterion(output_flatten, target.reshape(-1))
            
            # Backpropagate loss
            loss.backward()
            #update weights
            optimizer.step()

            # Sum of training loss over the batches
            train_loss += loss.item()

        # Average training loss over the batches
        train_loss /= num_batches

        val_loss, val_acc = evaluate(net, val_inputs, val_targets)
        sample_translation = translate(net, "testsequence", idx_dict, max_len)
        
        # Check progress of training
        #print ("Epoch:{:3d} | Train Loss:{:.5f} ".format(epoch, train_loss))
        print ("Epoch: {:3d} | Train loss: {:.3f} | Val loss: {:.3f} | Val Acc:{} ".format(epoch, train_loss, val_loss, val_acc))
        print (sample_translation)

        # Move to training mode for the next batch
        net.train() 

        # LR schedular after each epoch
        scheduler.step()

def evaluate(net, val_inputs, val_targets):
    """
    # Return validation loss and
    # Accuracy -> percentage of validation sequences that were translated correctly.
    """
    # Turn on the evaluation mode
    net.eval() 
    val_loss = 0.0

    num_batches = int(np.ceil(val_inputs.size()[0] / float(batch_size)))

    with torch.no_grad():
        for batch_no in range(num_batches):
            # Read input for a batch
            inputs, target = get_batch(val_inputs, val_targets, batch_no, batch_size, val_inputs.size()[0])
            output = net(inputs)
            output_flatten = output.view(-1,ntokens)
            loss = criterion(output_flatten, target.reshape(-1))
            
            val_loss += loss.item()

        # Average training loss over the batches
        val_loss /= num_batches

    #TODO: val_accuracy
    val_accuracy = None
    return val_loss, val_accuracy

def translate(net, input_sequence, idx_dict, max_len):
    """Translates a given string from English to Pig-Latin.
    """
    # Translates the input sequence to piglatin using the trained model
    net.eval()
    
    char_to_index = idx_dict['char_to_index']
    index_to_char = idx_dict['index_to_char']
    start_token = idx_dict['start_token']
    end_token = idx_dict['end_token']
    
    gen_string = ''
    
    # Convert input_sequence to a tensor of appropriate shape of (1,max_len)
    index_list = string_to_index_list(input_sequence, char_to_index, end_token)
    index_list = index_list + [end_token for et in range(max_len - (len(index_list)))]
    indexes = torch.LongTensor(index_list).reshape(1,-1)
    
    # Process it through the model
    output = net(indexes).squeeze(0)
    n = F.softmax(output,dim = 1).argmax(dim = 1)

    # Predict translation
    for i in range(max_len):
        if int(n[i]) == end_token:
            break
        else:
            gen_string += index_to_char[n[i].item()]

    return gen_string

In [None]:
train(model)