# PyTorch Tutorial: Sequence-to-Sequence Attention for Date Formats

Recently I set upon picking up PyTorch to discover the power of dynamic computational graphs. The notion of being Python-friendly, more readable, and easier to debug applealed to me as those can serve well early in the prototyping and research phases. Part of my learning cycle is to implement problems from different domains - as they greatly help in rapid learning.

So, for this I was heavily inspired by an excellent tutorial that Zafaralia Ahmed created in Keras for a neural network model that translates "human" dates (for example, "Nov 5, 2016") to a standarized machine-format date (for example, "2016-11-05"). Since my tutorial below jumps right into code, I would highly recommend reading the Medium post before as it provides a lot of great background on sequence-to-sequence models.

[How to Visualize Your Recurrent Neural Network with Attention in Keras](https://medium.com/datalogue/attention-in-keras-1892773a4f22)

Of course, since the Medium post outlines the implementation in Keras, those parts while useful to see the implementation details in Keras, will be different from the PyTorch implementation below.

Also, I love being able to step through code interactively as it really helps me learn what is going on. Thus I've taken some of the data processing code from the Keras tutorial and provided it in-line to this tutorial so anyone can simply step through the cells. This does assume that any requirements needed have already been installed in your Python environment.

Finally as with any learning process, I could not have gotten here without some excellent tutorials and open source code out there. Special thanks to:

[Translation with a Sequence to Sequence Network and Attention](http://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html)

[pytorch seq2seq](https://github.com/rowanz/pytorch-seq2seq)

Further, if you are new to PyTorch, I would definitely recommend going though the PyTorch tutorials before as they provide great building blocks to get here.

In [None]:
%matplotlib inline

import matplotlib
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

import torch.nn as nn
import torch
from torch.autograd import Variable
import torch.nn.functional as F
from torch import optim
from torch.nn.utils.rnn import pad_packed_sequence, PackedSequence
    
import random

### Dataset Creation
Here we will create the training & validation data. The creation process generates the data to be used by training. Subsequentaly we will load this data using PyTorch data processing features.

In [None]:
# from: https://github.com/datalogue/keras-attention/tree/master/data

import random
import json
import os

from faker import Faker
import babel
from babel.dates import format_date

fake = Faker()
fake.seed(230517)
random.seed(230517)

# This is the complete set of formats from original implementation
# For the purpose of simplicity, I've selected only one. However, feel
# free to add others to see how the training changes with more formats
FORMATS_MANY = ['short',
           'medium',
           'long',
           'full',
           'd MMM YYY',
           'd MMMM YYY',
           'dd MMM YYY',
           'd MMM, YYY',
           'd MMMM, YYY',
           'dd, MMM YYY',
           'd MM YY',
           'd MMMM YYY',
           'MMMM d YYY',
           'MMMM d, YYY',
           'dd.MM.YY',
           ]

FORMATS = ['long', "medium"]

# change this if you want it to work with only more than US English
# again, for faster learning time for tutorial, it is currently set to US English
# LOCALES = babel.localedata.locale_identifiers()
LOCALES = ['en_US'] 

In [None]:
# from https://github.com/datalogue/keras-attention/tree/master/data

def create_date(change_case=False):
    """
        Creates some fake dates 
        :returns: tuple containing 
                  1. human formatted string
                  2. machine formatted string
                  3. date object.
    """
    dt = fake.date_object()

    # wrapping this in a try catch because
    # the locale 'vo' and format 'full' will fail
    try:
        human = format_date(dt,
                            format=random.choice(FORMATS),
                            locale=random.choice(LOCALES))
        
        if change_case:
            case_change = random.randint(0,3) # 1/2 chance of case change
            if case_change == 1:
                human = human.upper()
            elif case_change == 2:
                human = human.lower()

        machine = dt.isoformat()
    except AttributeError as e:
        # print(e)
        return None, None, None

    return human, machine, dt

In [None]:
# print a sample date
human, machine, dt = create_date()
print([human, machine, dt])

In [None]:
# from https://github.com/datalogue/keras-attention/tree/master/data

def create_dataset(dataset_name, n_examples, vocabulary=False):
    """
        Creates a csv dataset with n_examples and optional vocabulary
        :param dataset_name: name of the file to save as
        :n_examples: the number of examples to generate
        :vocabulary: if true, will also save the vocabulary
    """
    human_vocab = set()
    machine_vocab = set()

    with open(dataset_name, 'w') as f:
        for i in range(n_examples):
            h, m, _ = create_date()
            if h is not None:
                f.write('"'+h + '","' + m + '"\n')
                human_vocab.update(tuple(h))
                machine_vocab.update(tuple(m))

    if vocabulary:
        int2human = dict()
        int2human[0] =  '<unk>'
        for i,j in enumerate(human_vocab):
            int2human[i+1] = j
        int2human.update({len(int2human): '<eot>',
                          len(int2human)+1: '<bot>'})
        int2machine = dict()
        int2machine[0] =  '<unk>'
        for i,j in enumerate(machine_vocab):
            int2machine[i+1] = j
        int2machine.update({len(int2machine):'<eot>',
                            len(int2machine)+1: '<bot>'})

        human2int = {v: k for k, v in int2human.items()}
        machine2int = {v: k for k, v in int2machine.items()}

        with open('human_vocab.json', 'w') as f:
            json.dump(human2int, f)
        with open('machine_vocab.json', 'w') as f:
            json.dump(machine2int, f)

In [None]:
print('creating dataset')
NUM_TRAINING_SAMPLES = 100000
NUM_VALIDATION_SAMPLES = 1000
create_dataset('training.csv', NUM_TRAINING_SAMPLES, vocabulary=True)
create_dataset('validation.csv', NUM_VALIDATION_SAMPLES)
print('dataset created.')

### Data Loading
Now we will create a custom dataset class to load the data from the CSV file

In [None]:
# from https://github.com/datalogue/keras-attention/tree/master/data

import csv
from torch.utils.data import Dataset, DataLoader

class Vocabulary(object):

    def __init__(self, vocabulary_file, padding=None):
        """
            Creates a vocabulary from a file
            :param vocabulary_file: the path to the vocabulary
        """
        self.vocabulary_file = vocabulary_file
        with open(vocabulary_file, 'r') as f:
            self.vocabulary = json.load(f)

        self.padding = padding
        self.reverse_vocabulary = {v: k for k, v in self.vocabulary.items()}

    def size(self):
        """
            Gets the size of the vocabulary
        """
        return len(self.vocabulary.keys())

    def string_to_int(self, text):
        """
            Converts a string into it's character integer 
            representation
            :param text: text to convert
        """
        characters = list(text)

        integers = []

        if self.padding and len(characters) >= self.padding:
            # truncate if too long
            characters = characters[:self.padding - 1]

        characters.append('<eot>')

        for c in characters:
            if c in self.vocabulary:
                integers.append(self.vocabulary[c])
            else:
                integers.append(self.vocabulary['<unk>'])


        # pad:
        if self.padding and len(integers) < self.padding:
            integers.extend([self.vocabulary['<unk>']]
                            * (self.padding - len(integers)))

        if len(integers) != self.padding:
            print(text)
            raise AttributeError('Length of text was not padding.')
        return integers

    def int_to_string(self, integers):
        """
            Decodes a list of integers
            into it's string representation
        """
        characters = []
        for i in integers:
            characters.append(self.reverse_vocabulary[i])

        return characters

In [None]:
# The data load/transform functions are wrapped in PyTorch Dataset
# One of the big benefits for this would be to have a random batch generator
# This tutorial currently samples one at a time - so changing the implementation
# to handle mini-batches would a great next step.
class DateFormatDataset(Dataset):
    """Date Format dataset."""

    def __init__(self, csv_file, input_vocabulary, output_vocabulary):
        """
        Args:
            csv_file (string): Path to the csv file human & machine formats.
        """
        self.inputs = []
        self.targets = []
        
        self.input_vocab = input_vocabulary
        self.output_vocab = output_vocabulary
        
        self._read_data(csv_file)
        
        self._transform_data()

    def _read_data(self, file_name):
        with open(file_name, 'r') as f:
            reader = csv.reader(f)
            for row in reader:
                self.inputs.append(row[0])
                self.targets.append(row[1])
                
    def _transform_data(self):
        self.inputs = np.array(list(
            map(self.input_vocab.string_to_int, self.inputs)))
        self.targets = np.array(list(map(self.output_vocab.string_to_int, self.targets)))

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        sample = {'input': self.inputs[idx], 'target': self.targets[idx]}

        return sample

In [None]:
MAX_LENGTH=20

input_vocab = Vocabulary('human_vocab.json', padding=20)
output_vocab = Vocabulary('machine_vocab.json',
                              padding=20)

training = DateFormatDataset('training.csv', input_vocab, output_vocab)
validation = DateFormatDataset('validation.csv', input_vocab, output_vocab)

In [None]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, vocab_size=None):
        super(EncoderRNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.gru = nn.GRU(input_size, hidden_size, bidirectional=True)

        self.vocab_size = vocab_size
        self.embed = nn.Embedding(self.vocab_size, self.input_size)

    def forward(self, x):
        x_embed = self.embed(x)
        output, h_n = self.gru(x_embed.view(1, 1, -1))

        output_t = output.transpose(0, 1).contiguous()
        return output_t, h_n

In [None]:
class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, n_layers=1, dropout_p=0.1, max_length=MAX_LENGTH):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size * 3, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden, encoder_output, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)

        attn_weights = F.softmax(
            self.attn(torch.cat((embedded[0], hidden[0]), 1)))
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        for i in range(self.n_layers):
            output = F.relu(output)
            output, hidden = self.gru(output, hidden)

        output = F.log_softmax(self.out(output[0]))
        return output, hidden, attn_weights

    def initHidden(self):
        result = Variable(torch.zeros(1, 1, self.hidden_size))
        return result

In [None]:
teacher_forcing_ratio = 0.5
SOS_token = output_vocab.vocabulary["<bot>"]
EOS_token = output_vocab.vocabulary["<eot>"]

def train(input_variable, target_variable, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_variable.size()[0]
    target_length = target_variable.size()[0]

    encoder_outputs = Variable(torch.zeros(max_length, encoder.hidden_size*2))

    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_variable[ei])
        encoder_outputs[ei] = encoder_output[0][0]

    decoder_input = Variable(torch.LongTensor([[SOS_token]]))

    decoder_hidden = encoder_hidden

    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_output, encoder_outputs)
            loss += criterion(decoder_output, target_variable[di])
            decoder_input = target_variable[di]  # Teacher forcing

    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_output, encoder_outputs)
            topv, topi = decoder_output.data.topk(1)
            ni = topi[0][0]

            decoder_input = Variable(torch.LongTensor([[ni]]))
            
            loss += criterion(decoder_output, target_variable[di])
            if ni == EOS_token:
                break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.data[0] / target_length

In [None]:
import time
import math


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

In [None]:
def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)

In [None]:
def trainIters(encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.001):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every
    plot_accuracy = []

    encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    #criterion = nn.NLLLoss()
    for iter in range(1, n_iters+1):
        choice = random.randint(0, NUM_TRAINING_SAMPLES-1)
        input_variable = torch.LongTensor(training.inputs[choice])
        input_variable = Variable(input_variable)
        target_variable = torch.LongTensor(training.targets[choice])
        target_variable = Variable(target_variable)

        loss = train(input_variable, target_variable, encoder,
                     decoder, encoder_optimizer, decoder_optimizer, criterion)
        print_loss_total += loss
        plot_loss_total += loss

        if iter % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print('%s (%d %d%%) %.4f' % (timeSince(start, iter / n_iters),
                                         iter, iter / n_iters * 100, print_loss_avg))

        if iter % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0
            # set print_results=True if you would like to see how the predictions are
            # comparing to the targets during the training cycle
            # I found this part very illuminating as you can see the neural models
            # start to learn parts of the pattern over time
            accuracy = evaluateRandomly(encoder, decoder, 10, print_results=False)
            plot_accuracy.append(accuracy)
            print("Loss: %0.2f, Accuracy: %0.2f" % (loss, accuracy))

    showPlot(plot_losses)
    showPlot(plot_accuracy)

In [None]:
def evaluate(encoder, decoder, input_variable, max_length=MAX_LENGTH):
    input_length = input_variable.size()[0]

    encoder_outputs = Variable(torch.zeros(max_length, encoder.hidden_size*2))

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_variable[ei])
        encoder_outputs[ei] = encoder_outputs[ei] + encoder_output[0][0]

    decoder_input = Variable(torch.LongTensor([[SOS_token]]))  # SOS

    decoder_hidden = encoder_hidden

    decoded_words = []
    decoder_attentions = torch.zeros(max_length, max_length)

    for di in range(max_length):
        decoder_output, decoder_hidden, decoder_attention = decoder(
            decoder_input, decoder_hidden, encoder_output, encoder_outputs)
        decoder_attentions[di] = decoder_attention.data
        topv, topi = decoder_output.data.topk(1)
        ni = topi[0][0]
        if ni == EOS_token:
            decoded_words.append('<eot>')
            break
        else:
            decoded_words.append(output_vocab.reverse_vocabulary[ni])

        decoder_input = Variable(torch.LongTensor([[ni]]))

    return decoded_words, decoder_attentions[:di + 1]

In [None]:
def evaluateRandomly(encoder, decoder, n, print_results=False):
    total_score = 0
    for i in range(n):
        choice = random.randint(0, NUM_VALIDATION_SAMPLES-1)
        input_variable = torch.LongTensor(validation.inputs[choice])
        input_variable = Variable(input_variable)
        target_variable = torch.LongTensor(validation.targets[choice])
        target_variable = Variable(target_variable)
        
        target_sentence = output_vocab.int_to_string(target_variable.data)
        if print_results:
            print('>', input_vocab.int_to_string(input_variable.data))
            print('=', target_sentence)
        output_words, attentions = evaluate(encoder, decoder, input_variable)
        output_sentence = ' '.join(output_words)
        if print_results:
            print('<', output_sentence)
            
        # determine the accuracy score
        # here it measures % of input characters were correctly matched
        score = 0
        for di, letter in enumerate(output_words):
            if letter == target_sentence[di]:
                score += 1
        
        total_score += (score / len(output_words))
        
        if print_results:
            print('')
        
    return (total_score / n)

In [None]:
HIDDEN_SIZE = 256
NUM_ITERATIONS = 20000

encoder = EncoderRNN(
    MAX_LENGTH,
    HIDDEN_SIZE,
    vocab_size=input_vocab.size()
)

attn_decoder = AttnDecoderRNN(HIDDEN_SIZE, output_vocab.size(),
                               1, dropout_p=0.1)

trainIters(encoder, attn_decoder, NUM_ITERATIONS)