## Reber Grammer
In this exercise, we will test the ability of **SRN** and **LSTM** to learn the Reber Grammar (RG) and Embedded Reber Grammar (ERG).

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import torch.optim as optim
import numpy as np
import math
import random

In [2]:
# code adapted from
# http://christianherta.de/lehre/dataScience/machineLearning/neuralNetworks/reberGrammar.php

# assign a number to each transition
chars='BTSXPVE'

# finite state machine for non-embedded Reber Grammar
graph = [[(1, 5), ('T', 'P')], [(1, 2), ('S', 'X')],
         [(3, 5), ('S', 'X')], [(6, ), 'E'],
         [(3, 2), ('V', 'P')], [(4, 5),('V','T')] ]

def get_one_example(min_length = 5):
    seq = [0]
    node = 0
    prob = []
    while node != 6:
        this_prob = np.zeros(7)
        transitions = graph[node]
        if (len(seq) < min_length - 2) and (node == 2 or node == 4):
            # choose transition to force a longer sequence
            i = 1
            this_prob[chars.find(transitions[1][1])] = 1
        else:
            # choose transition randomly
            i = np.random.randint(0, len(transitions[0]))
            for ch in transitions[1]:
                this_prob[chars.find(ch)] = 1./len(transitions[1])
        prob.append(this_prob)
        seq.append(chars.find(transitions[1][i]))
        node = transitions[0][i]
    return seq, prob

def get_one_embedded_example(min_length=9):
    i = np.random.randint(0, 2)  # choose between 'T' and 'P'
    if i == 0:
        first = 1 # 'T'
        prob1 = 1
        prob4 = 0
    else:
        first = 4 # 'P'
        prob1 = 0
        prob4 = 1
    seq_mid, prob_mid = get_one_example(min_length-4)
    seq = [0, first] + seq_mid  + [first, 6]
    prob = [(0, 0.5, 0, 0, 0.5, 0, 0), (1, 0, 0, 0, 0, 0, 0)] + prob_mid + \
           [(0, prob1, 0, 0, prob4, 0, 0), (0, 0, 0, 0, 0, 0, 1)]
    return seq, prob

def get_reber_sequence(embedded=False,min_length=4):
    if embedded:
        seq, prob = get_one_embedded_example(min_length)
    else:
        seq, prob = get_one_example(min_length)

    # convert numpy array to torch tensor
    seq_torch = torch.from_numpy(np.asarray(seq))
    input = F.one_hot(seq_torch[0:-1], num_classes=7).float()
    label = seq_torch[1:]
    probs = torch.from_numpy(np.asarray(prob)).float()
    input = input.unsqueeze(0)
    label = label.unsqueeze(0)
    probs = probs.unsqueeze(0)
    return input, label, probs

## Models
Pytorch has a built-in LSTM module. But, we have coded the SRN and LSTM from scratch, so you can see how they are implemented.

In [3]:
class SrnModel(nn.Module):
    def __init__(self, num_input, num_hid, num_out, batch_size=1):
        super().__init__()
        self.num_hid = num_hid
        self.batch_size = batch_size
        self.W = nn.Parameter(torch.Tensor(num_input, num_hid))
        self.U = nn.Parameter(torch.Tensor(num_hid, num_hid))
        self.hid_bias = nn.Parameter(torch.Tensor(num_hid))
        self.V = nn.Parameter(torch.Tensor(num_hid, num_out))
        self.out_bias = nn.Parameter(torch.Tensor(num_out))
        self.init_weights()

    def init_weights(self):
        stdv = 1.0 / math.sqrt(self.num_hid)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)

    def init_hidden(self):
        return torch.zeros(self.batch_size, self.num_hid)

    def forward(self, x, init_states=None):
        """Assumes x is of shape (batch, sequence, feature)"""
        batch_size, seq_size, _ = x.size()
        hidden_seq = []
        if init_states is None:
            h_t = torch.zeros(batch_size, self.num_hid).to(x.device)
        else:
            h_t = init_states

        for t in range(seq_size):
            x_t = x[:, t, :]
            c_t = x_t @ self.W + h_t @ self.U + self.hid_bias
            h_t = torch.tanh(c_t)
            hidden_seq.append(h_t.unsqueeze(0))
        hidden_seq = torch.cat(hidden_seq, dim=0)
        # reshape from (sequence, batch, feature)
        #           to (batch, sequence, feature)
        hidden_seq = hidden_seq.transpose(0,1).contiguous()
        output = hidden_seq @ self.V + self.out_bias
        return output

class LstmModel(nn.Module):
    def __init__(self,num_input,num_hid,num_out,batch_size=1,num_layers=1):
        super().__init__()
        self.num_hid = num_hid
        self.batch_size = batch_size
        self.num_layers = num_layers
        self.W = nn.Parameter(torch.Tensor(num_input, num_hid * 4))
        self.U = nn.Parameter(torch.Tensor(num_hid, num_hid * 4))
        self.hid_bias = nn.Parameter(torch.Tensor(num_hid * 4))
        self.V = nn.Parameter(torch.Tensor(num_hid, num_out))
        self.out_bias = nn.Parameter(torch.Tensor(num_out))
        self.init_weights()

    def init_weights(self):
        stdv = 1.0 / math.sqrt(self.num_hid)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)

    def init_hidden(self):
        return(torch.zeros(self.num_layers, self.batch_size, self.num_hid),
               torch.zeros(self.num_layers, self.batch_size, self.num_hid))

    def forward(self, x, init_states=None):
        """Assumes x is of shape (batch, sequence, feature)"""
        batch_size, seq_size, _ = x.size()
        hidden_seq = []
        if init_states is None:
            h_t, c_t = (torch.zeros(batch_size,self.num_hid).to(x.device),
                        torch.zeros(batch_size,self.num_hid).to(x.device))
        else:
            h_t, c_t = init_states

        NH = self.num_hid
        for t in range(seq_size):
            x_t = x[:, t, :]
            # batch the computations into a single matrix multiplication
            gates = x_t @ self.W + h_t @ self.U + self.hid_bias
            i_t, f_t, g_t, o_t = (
                torch.sigmoid(gates[:, :NH]),  # input gate
                torch.sigmoid(gates[:, NH:NH*2]),  # forget gate
                torch.tanh(gates[:, NH*2:NH*3]),   # new values
                torch.sigmoid(gates[:, NH*3:])  # output gate
            )
            c_t = f_t * c_t + i_t * g_t
            h_t = o_t * torch.tanh(c_t)
            hidden_seq.append(h_t.unsqueeze(0))
        hidden_seq = torch.cat(hidden_seq, dim=0)
        # reshape from (sequence, batch, feature)
        #           to (batch, sequence, feature)
        hidden_seq = hidden_seq.transpose(0,1).contiguous()
        output = hidden_seq @ self.V + self.out_bias
        return output

## Training Loop

In [4]:
def train(model_type='srn', hid=8, embed=False, length=4, lr=0.3):
    if model_type == 'srn':
        model = SrnModel(7, hid, 7)
    elif model_type == 'lstm':
        model = LstmModel(7, hid, 7)

    loss_function = F.nll_loss
    optimizer = optim.SGD(model.parameters(), lr=lr)

    np.set_printoptions(suppress=True,precision=2)

    for epoch in range(50001):
        model.zero_grad()
        input, label, prob = get_reber_sequence(embedded=embed,
                                                min_length=length)
        model.init_hidden()
        output = model(input)
        log_prob  = F.log_softmax(output, dim=2)
        loss = loss_function(log_prob.squeeze(), label.squeeze())
        loss.backward()
        optimizer.step()

        if epoch % 1000 == 0:
            # Check accuracy during training
            with torch.no_grad():
                model.eval()
                input, label, prob = get_reber_sequence(embedded=embed,
                                                        min_length=length)
                model.init_hidden()
                output = model(input)
                log_prob  = F.log_softmax(output, dim=2)
                prob_out = torch.exp(log_prob)
                print('-----')
                symbol = [chars[index] for index in label.squeeze().tolist()]
                print('symbol = B'+''.join(symbol))
                print('label =',label.squeeze().numpy())
                print('true probabilities:')
                print(prob.squeeze().numpy())
                print('output probabilities:')
                print(prob_out.squeeze().numpy())
                print('epoch: %d' %epoch)
                if embed:
                    prob_out_mid   = prob_out[:,2:-3,:]
                    prob_out_final = prob_out[:,-2,:]
                    prob_mid   = prob[:,2:-3,:]
                    prob_final = prob[:,-2,:]
                    print('error: %1.4f' %torch.mean((prob_out_mid - prob_mid)
                                                    *(prob_out_mid - prob_mid)))
                    print('final: %1.4f' %torch.mean((prob_out_final - prob_final)
                                                    *(prob_out_final - prob_final)))
                else:
                    print('error: %1.4f' %torch.mean((prob_out - prob)
                                                    *(prob_out - prob)))
                model.train()

## Train and SRN on Reber Grammar

In [5]:
model = 'srn'
hid = 8
embed = False
length = 4
train(model_type=model, hid=hid, embed=embed, length=length)

-----
symbol = BTSSXSE
label = [1 2 2 3 2 6]
true probabilities:
[[0.  0.5 0.  0.  0.5 0.  0. ]
 [0.  0.  0.5 0.5 0.  0.  0. ]
 [0.  0.  0.5 0.5 0.  0.  0. ]
 [0.  0.  0.5 0.5 0.  0.  0. ]
 [0.  0.  0.5 0.5 0.  0.  0. ]
 [0.  0.  0.  0.  0.  0.  1. ]]
output probabilities:
[[0.17 0.1  0.21 0.17 0.17 0.1  0.08]
 [0.15 0.12 0.19 0.18 0.15 0.12 0.08]
 [0.14 0.11 0.18 0.19 0.17 0.1  0.11]
 [0.14 0.11 0.2  0.19 0.16 0.11 0.09]
 [0.16 0.08 0.22 0.19 0.15 0.13 0.07]
 [0.12 0.12 0.14 0.23 0.17 0.09 0.13]]
epoch: 0
error: 0.0559
-----
symbol = BPTVVE
label = [4 1 5 5 6]
true probabilities:
[[0.  0.5 0.  0.  0.5 0.  0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.  0.  0.  0.5 0.5 0. ]
 [0.  0.  0.  0.  0.  0.  1. ]]
output probabilities:
[[0.   0.38 0.   0.   0.62 0.   0.  ]
 [0.   0.4  0.   0.01 0.01 0.58 0.  ]
 [0.   0.39 0.   0.   0.01 0.59 0.  ]
 [0.   0.01 0.   0.   0.67 0.32 0.01]
 [0.   0.   0.   0.   0.   0.   0.99]]
epoch: 1000
error: 0.0036
-----
symbol = BT

## Train an SRN on the Embedded Reber Grammar

In [6]:
model = 'srn'
hid = 8
embed = True
length = 4
train(model_type=model, hid=hid, embed=embed, length=length)

-----
symbol = BTBPVVETE
label = [1 0 4 5 5 6 1 6]
true probabilities:
[[0.  0.5 0.  0.  0.5 0.  0. ]
 [1.  0.  0.  0.  0.  0.  0. ]
 [0.  0.5 0.  0.  0.5 0.  0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.  0.  0.  0.5 0.5 0. ]
 [0.  0.  0.  0.  0.  0.  1. ]
 [0.  1.  0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0.  0.  1. ]]
output probabilities:
[[0.1  0.15 0.11 0.2  0.11 0.17 0.17]
 [0.09 0.16 0.1  0.19 0.12 0.19 0.15]
 [0.08 0.19 0.09 0.24 0.09 0.15 0.15]
 [0.1  0.16 0.08 0.21 0.12 0.17 0.16]
 [0.08 0.18 0.08 0.23 0.1  0.17 0.15]
 [0.07 0.18 0.1  0.22 0.1  0.18 0.15]
 [0.07 0.2  0.1  0.21 0.1  0.19 0.13]
 [0.07 0.18 0.12 0.19 0.12 0.18 0.14]]
epoch: 0
error: 0.0517
final: 0.1100
-----
symbol = BTBPTTTVPSETE
label = [1 0 4 1 1 1 5 4 2 6 1 6]
true probabilities:
[[0.  0.5 0.  0.  0.5 0.  0. ]
 [1.  0.  0.  0.  0.  0.  0. ]
 [0.  0.5 0.  0.  0.5 0.  0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.  0.  0.

Here "final" is the MSE for predicting the second-last character ('T' or 'P')
while "error" is the MSE for predicting all the other characters in the sequence.
If you look at the second last line of the output probabilities,
you will see it predicts 'T' and 'P' with roughly equal probability,
and the "final" error gets stuck at around 0.06.

## Train an LSTM on the Embedded Reber Grammar

In [7]:
model='lstm'
hid=8
embed=True
length=4

train(model_type=model, hid=hid, embed=embed, length=length)

-----
symbol = BTBPTTVVETE
label = [1 0 4 1 1 5 5 6 1 6]
true probabilities:
[[0.  0.5 0.  0.  0.5 0.  0. ]
 [1.  0.  0.  0.  0.  0.  0. ]
 [0.  0.5 0.  0.  0.5 0.  0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.  0.  0.  0.5 0.5 0. ]
 [0.  0.  0.  0.  0.  0.  1. ]
 [0.  1.  0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0.  0.  1. ]]
output probabilities:
[[0.11 0.16 0.18 0.13 0.13 0.17 0.11]
 [0.11 0.16 0.19 0.14 0.13 0.17 0.11]
 [0.1  0.16 0.18 0.14 0.13 0.17 0.11]
 [0.11 0.15 0.18 0.14 0.14 0.18 0.11]
 [0.11 0.16 0.18 0.14 0.14 0.17 0.1 ]
 [0.11 0.16 0.19 0.14 0.14 0.17 0.1 ]
 [0.11 0.14 0.18 0.15 0.14 0.18 0.1 ]
 [0.11 0.13 0.17 0.15 0.14 0.19 0.1 ]
 [0.11 0.15 0.18 0.14 0.13 0.18 0.11]
 [0.11 0.16 0.18 0.14 0.13 0.17 0.1 ]]
epoch: 0
error: 0.0470
final: 0.1211
-----
symbol = BPBPVPSEPE
label = [4 0 4 5 4 2 6 4 6]
true probabilities:
[[0.  0.5 0.  0.  0.5 0.  0. ]
 [1.  0.  0.  0.  0.  0.  0. ]
 [0.  0.5 0.  0.  0.5 0.  0. ]
 [0. 

This time, the "final" error should come down close to zero,
and it should predict the second last character with precision.

## More
We can make the task harder by training only on sequences that exceed a specified minimum length. Here we will train an LSTM with 16 hidden units on sequences of length at least 12. If you run it a few times, you might find that it is successful on some runs but not on others.

In [8]:
model='lstm'
hid=16
embed=True
length=12

train(model_type=model, hid=hid, embed=embed, length=length)

-----
symbol = BPBPVPXTTTVPSEPE
label = [4 0 4 5 4 3 1 1 1 5 4 2 6 4 6]
true probabilities:
[[0.  0.5 0.  0.  0.5 0.  0. ]
 [1.  0.  0.  0.  0.  0.  0. ]
 [0.  0.5 0.  0.  0.5 0.  0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.  0.  0.  1.  0.  0. ]
 [0.  0.  0.  1.  0.  0.  0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.5 0.  0.  0.  0.5 0. ]
 [0.  0.  0.  0.  0.5 0.5 0. ]
 [0.  0.  0.5 0.5 0.  0.  0. ]
 [0.  0.  0.  0.  0.  0.  1. ]
 [0.  0.  0.  0.  1.  0.  0. ]
 [0.  0.  0.  0.  0.  0.  1. ]]
output probabilities:
[[0.15 0.12 0.16 0.15 0.13 0.17 0.12]
 [0.15 0.12 0.15 0.16 0.13 0.16 0.12]
 [0.16 0.12 0.15 0.16 0.12 0.16 0.12]
 [0.15 0.12 0.15 0.17 0.12 0.16 0.12]
 [0.16 0.11 0.15 0.17 0.12 0.15 0.13]
 [0.16 0.12 0.15 0.17 0.12 0.15 0.13]
 [0.16 0.12 0.15 0.17 0.12 0.15 0.13]
 [0.15 0.12 0.15 0.17 0.12 0.16 0.13]
 [0.15 0.12 0.15 0.16 0.12 0.17 0.13]
 [0.14 0.12 0.15 0.16 0.12 0.17 0.13]
 [0.15 0.11 0.15 0.16 0.12 0.16 0.14]
 [