In [1]:
# Let's start with some packages we need
from __future__ import print_function
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt
from scipy.cluster.hierarchy import dendrogram, linkage

In [2]:
# Turn a sequence of tokens into a tensor <seq_length>
def sentenceToTensor(tokens_list):
    assert(isinstance(tokens_list,list))
    tokens_index = [token_to_index[token] for token in tokens_list]
    return torch.tensor(tokens_index)

def pad_sentence(tokens_list,max_len):
    assert(isinstance(tokens_list,list))
    npad = max_len-len(tokens_list)
    tokens_list += npad*['<PAD>']
    return tokens_list

# load the training and test data
with open('data/elman_sentences.txt','r') as fid:
    lines = fid.readlines()

sentences_str = [l.strip() for l in lines]
sentences_str = ['<SOS> ' + s + ' <EOS>' for s in sentences_str]
sentences_tokens = [s.split() for s in sentences_str]
max_len = max(len(s) for s in sentences_tokens)
sentences_tokens = [pad_sentence(s, max_len) for s in sentences_tokens]
all_tokens = sorted(set(sum(sentences_tokens,[])))
n_tokens = len(all_tokens) # total number of possible symbols
token_to_index = {t : i for i,t in enumerate(all_tokens)}
index_to_token = {i : t for i,t in enumerate(all_tokens)}
training_pats = [sentenceToTensor(s) for s in sentences_tokens]
training_pats_tensor = torch.stack(training_pats,dim=0) # 10000 x 5 tensor for all training pats
ntrain = len(training_pats)
print('all tokens: %s \n' % all_tokens)
print('mapping tokens to indices: %s \n' % token_to_index)
print('first training pattern: %s \n' % sentences_str[1])
print('tensor representation of %s: \n' % training_pats[1])

all tokens: ['<EOS>', '<PAD>', '<SOS>', 'book', 'boy', 'bread', 'break', 'car', 'cat', 'chase', 'cookie', 'dog', 'dragon', 'eat', 'exist', 'girl', 'glass', 'like', 'lion', 'man', 'monster', 'mouse', 'move', 'plate', 'rock', 'sandwich', 'see', 'sleep', 'smash', 'smell', 'think', 'woman'] 

mapping tokens to indices: {'<EOS>': 0, '<PAD>': 1, '<SOS>': 2, 'book': 3, 'boy': 4, 'bread': 5, 'break': 6, 'car': 7, 'cat': 8, 'chase': 9, 'cookie': 10, 'dog': 11, 'dragon': 12, 'eat': 13, 'exist': 14, 'girl': 15, 'glass': 16, 'like': 17, 'lion': 18, 'man': 19, 'monster': 20, 'mouse': 21, 'move': 22, 'plate': 23, 'rock': 24, 'sandwich': 25, 'see': 26, 'sleep': 27, 'smash': 28, 'smell': 29, 'think': 30, 'woman': 31} 

first training pattern: <SOS> lion eat man <EOS> 

tensor representation of tensor([ 2, 18, 13, 19,  0]): 



In [3]:
def plot_dendo(X, names, exclude=['<SOS>','<EOS>','<PAD>']):
    #  X : numpy tensor [nitem x dim]
    #  names : [nitem list] list of item names
    #  exclude: list of names we want to exclude       
    names  = np.array(names)
    nitem = len(names)
    inc = np.ones(nitem,dtype=bool)
    for e in exclude:
        if e in token_to_index:
            inc[token_to_index[e]] = False
#     linked = linkage(X[inc],'single',metric='cosine', optimal_ordering=True)
    linked = linkage(X[inc],'single', optimal_ordering=True)
    plt.figure(1, figsize=(20,6))
    dendrogram(linked, labels=names[inc], color_threshold=0, leaf_font_size=8)
    plt.show()

In [4]:
class SRN(nn.Module):
    
    def __init__(self, vocab_size, hidden_size):
        #  nsymbols: number of possible input/output symbols
        super(SRN, self).__init__()
        self.hidden_size = hidden_size
        self.embed = nn.Embedding(vocab_size,hidden_size)
        self.i2h = nn.Linear(2*hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, vocab_size)
        self.softmax = nn.LogSoftmax(dim=0)

    def forward(self, input_token_index, hidden):
        # input_token_index: [integer] index of current token
        # hidden: [tensor of length hidden_size] previous hidden state
        input_embed = self.embed(input_token_index) # hidden_size tensor
        combined = torch.cat((input_embed, hidden), 0) # 2*hidden_size
        hidden = self.i2h(combined) # hidden_size
        hidden = torch.sigmoid(hidden)
        output = self.h2o(hidden) # vocab_size 
        output = self.softmax(output) # vocab_size
        return output, hidden

    def initHidden(self):
        return torch.zeros(self.hidden_size)

In [5]:
class bSRN(nn.Module):
    
    def __init__(self, vocab_size, hidden_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size,hidden_size)
        self.rnn = nn.RNN(input_size=hidden_size,hidden_size=hidden_size,batch_first=True)
        self.h2o = nn.Linear(hidden_size, vocab_size)

    def forward(self, z):
        # z: [bsize x seqlen] input tokens as indices
        z_embed = self.embed(z) # bsize x max_len x hidden_size
        hidden, _ = self.rnn(z_embed) 
            # output : bsize x max_len x hidden_size
        output = self.h2o(hidden) # bsize x max_len x vocab_size
        return output
    
def btrain(batch,rnn):
    rnn.train()
    rnn.zero_grad()
    loss = 0
    inputs = batch[:,:-1] # bsize x max_len-1 x vocab_size
    targets = batch[:,1:] # bsize x max_len-1 x vocab_size
    output = rnn(inputs) # bsize x max_len x vocab_size
    loss = loss_fn(output.transpose(1,2),targets)
    loss.backward()
    optimizer.step()
    return loss.item()

nepochs = 100 # number of passes through the entire training set 
nhidden = 10 # number of hidden units
rnn = bSRN(n_tokens, nhidden) # create the network
optimizer = torch.optim.AdamW(rnn.parameters(), weight_decay=0.04)
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=token_to_index['<PAD>'])
# loss_fn = torch.nn.CrossEntropyLoss()
D = torch.utils.data.TensorDataset(training_pats_tensor)
dataloader = torch.utils.data.DataLoader(D, batch_size=8, shuffle=True)
for myiter in range(1,nepochs+1):
    loss_total = 0.
    b_total = 0
    for batch in dataloader:
        batch = batch[0]
        loss = btrain(batch, rnn)
        loss_total += loss
        b_total += 1
    if myiter % 1 ==0:
        print("epoch  %s: train loss %2.2f" % (myiter, loss_total/b_total))

epoch  1: train loss 2.03
epoch  2: train loss 1.67
epoch  3: train loss 1.63
epoch  4: train loss 1.61
epoch  5: train loss 1.59
epoch  6: train loss 1.58
epoch  7: train loss 1.58
epoch  8: train loss 1.57
epoch  9: train loss 1.57
epoch  10: train loss 1.57
epoch  11: train loss 1.57
epoch  12: train loss 1.56
epoch  13: train loss 1.56
epoch  14: train loss 1.56
epoch  15: train loss 1.56
epoch  16: train loss 1.56
epoch  17: train loss 1.56
epoch  18: train loss 1.56
epoch  19: train loss 1.56
epoch  20: train loss 1.56
epoch  21: train loss 1.56
epoch  22: train loss 1.56
epoch  23: train loss 1.56


KeyboardInterrupt: 

In [None]:
def train(seq_tensor, rnn):
    # seq_tensor: [seq_length tensor]
    # rnn : instance of SRN class
    hidden = rnn.initHidden()
    rnn.train()
    rnn.zero_grad()
    loss = 0
    seq_length = seq_tensor.shape[0]
    for i in range(seq_length-1):
        output, hidden = rnn(seq_tensor[i], hidden)
        loss += criterion(output, seq_tensor[i+1])
    loss.backward()
    optimizer.step()
    return loss.item() / float(seq_length-1)

def gen(rnn,maxlen=4):
    hidden = rnn.initHidden()
    rnn.eval()
    S = [token_to_index['<SOS>']]
    for i in range(maxlen):
        output, hidden = rnn(torch.tensor(S[-1]), hidden)
        m = torch.distributions.categorical.Categorical(logits=output)
        S.append(m.sample().item())
    print(' '.join([index_to_token[s] for s in S]))


In [None]:
nepochs = 10 # number of passes through the entire training set 
nhidden = 20 # number of hidden units

rnn = SRN(n_tokens,nhidden) # create the network
optimizer = torch.optim.AdamW(rnn.parameters(), weight_decay=0.04) # stochastic gradient descent
# optimizer = torch.optim.Adam(rnn.parameters()) # stochastic gradient descent
criterion = nn.NLLLoss() #log-likelihood loss function

for myiter in range(1,nepochs+1): # for each epoch
    permute = np.random.permutation(ntrain)
    loss = np.zeros(ntrain)
    for p in permute:
        pat = training_pats[p]
        loss[p] = train(pat, rnn)
    if myiter % 1 ==0:
        print("epoch  %s: train loss %2.2f" % (myiter, np.mean(loss)))

In [None]:
gen(rnn)

In [None]:
plot_dendo(rnn.embed(torch.arange(n_tokens)).detach().numpy(),all_tokens)

In [None]:
plot_dendo(rnn.embed(torch.arange(n_tokens)).detach().numpy(),all_tokens)

In [None]:
plot_dendo(rnn.embed(torch.arange(n_tokens)).detach().numpy(),all_tokens)