# An example where a conventional RNN fails, due to a lack of symmetry

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import sys
from os import path
from keras.utils.np_utils import to_categorical   

Using TensorFlow backend.


## Simulate Data
$m$ sequences each of length $\ell$ and binary class $y$.  
class $y\sim \text{Bernoulli}(0.5)$  

$$x=(x_1,\ldots, x_\ell)$$
$x_i \sim \text{Bernoulli}(0.5)$ (iid)
Falls $y_i=1$, setze
$$x[t..t+10) \leftarrow 1$$

In [2]:
ell = 100
m = 5000
siglen = 10
s=2 # alphabet size

In [3]:
np.random.seed(13)

# development test data
def simulate_data(m, t, oneHot=True, standardize=False):
    x = np.random.randint(2, size = m*ell).reshape(m, ell)
    y = np.random.randint(2, size = m)
    for i in range(m):
        if y[i]:
            x[i, t:t + siglen] = 1
    if oneHot:
        x = to_categorical(x, num_classes=2)
    if standardize: # is better for SimpleRNN, oneHot or not
        x = x - 0.5 # standardize
    return x, y

## Obtain the training set

In [4]:
regenerate = True
fname = "rnn-fail-data.npz"

if regenerate or not path.exists(fname):
    # simulate new data
    x, y = simulate_data(3, t=20)
    np.savez(fname, x=x, y=y)

In [5]:
npzfile = np.load(fname)
x = npzfile['x']
y = npzfile['y']
print ("y", y, "\nx", x.shape, x[0,0:10,:])

y [1 1 1] 
x (3, 100, 2) [[1. 0.]
 [1. 0.]
 [1. 0.]
 [1. 0.]
 [1. 0.]
 [1. 0.]
 [1. 0.]
 [0. 1.]
 [1. 0.]
 [1. 0.]]


## Build the Model

In [6]:
from HMMCell import HMMCell, HMMLayer

In [7]:
seqLayer = HMMLayer(2, 11)
seqLayer(x)

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[ 6.5833330e-04,  1.6456842e-04],
       [ 2.6209950e-03,  6.2167645e-04],
       [-7.6532364e-05, -5.3644180e-06]], dtype=float32)>

In [8]:
def get_model(bidirectional = False, seqModelType = "SimpleRNN", RNNunits = 32):
    model = keras.Sequential()
    model.add(layers.InputLayer(input_shape=(None,s)))

    if seqModelType == "HMM":
        seqLayer = HMMLayer(5, 11)
    elif seqModelType == "LSTM":
        seqLayer = layers.LSTM(RNNunits)
    elif seqModelType == "GRU":
        seqLayer = layers.GRU(RNNunits)
    elif seqModelType == "SimpleRNN":
        seqLayer = layers.SimpleRNN(RNNunits)
    else:
        sys.exit("unknown sequence model type " + seqModelType)

    if bidirectional:
        seqLayer = layers.Bidirectional(seqLayer)
    
    model.add(seqLayer)
    model.add(layers.Dense(1))
    lr = 1e-3
    #if seqModelType == "HMM":
    #    lr = 1e-2
    print (f"lr={lr}")
    model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = lr),
                  loss = tf.keras.losses.BinaryCrossentropy(), metrics = ["accuracy"])
    return model

In [9]:
model = get_model(bidirectional = False, seqModelType = "SimpleRNN", RNNunits = 32)
model.summary()
model = get_model(bidirectional = False, seqModelType = "HMM")
model(x)
model.summary()
W = model.get_layer(index=0).C
W.print_pars()

lr=0.001
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
simple_rnn (SimpleRNN)       (None, 32)                1120      
_________________________________________________________________
dense (Dense)                (None, 1)                 33        
Total params: 1,153
Trainable params: 1,153
Non-trainable params: 0
_________________________________________________________________
lr=0.001
Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
hmm_layer_1 (HMMLayer)       (None, 5)                 770       
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 6         
Total params: 776
Trainable params: 776
Non-trainable params: 0
_________________________________________________________________
transition matri

## Ad hoc fixed prediction method
$$ \hat{y} = 1 :\Leftrightarrow \exists j: x[j..j+10) = 1$$

In [10]:
def adhoc_pred(x):
    x = np.squeeze(x+.5).astype('int')
    A = np.cumsum(x, axis=1)
    B = np.zeros_like(A)
    B[:,siglen:] = A[:,:-siglen]
    yhat = np.any(A-B >= siglen, axis=1).astype('int')
    return yhat

In [11]:
def adhoc_acc(x_test, y_test):
    yhat = adhoc_pred(x_test)
    correct = (yhat == y_test).astype('int')
    return correct.mean()

## Train the Model

In [12]:
from time import time

# ts = np.array(range(ell - 15, -1, -10))
ts = np.array(range(ell - 15, 39, -20))

len(ts)

3

In [None]:
num_reps = 1 # 30
num_epochs = 20
modelnames = ["HMM"] #"SimpleRNN", "LSTM", "GRU", "ad hoc"]
directions = [False] # , True] # bidirectional?
test_accs = { (name, direction) : np.empty((len(ts), num_reps)) for name in modelnames for direction in directions}

for modelname in modelnames:
    start_time = time()
    for direction in directions:
        print (modelname, "dir=", direction, "\t", end = "")
        for j in range(len(ts)):
            print (j, " ", end = "")
            t = ts[j]
            accs = test_accs[(modelname, direction)]
            for r in range(num_reps):
                accuracy = 0.0
                x_test, y_test = simulate_data(1000, t)
                if modelname == "ad hoc":
                    accuracy = adhoc_acc(x_test, y_test)
                else:
                    model = get_model(bidirectional = direction, seqModelType = modelname, RNNunits = 32)
                    x_train, y_train = simulate_data(m, t)
                    x_val, y_val = simulate_data(100, t)
                    model.get_layer(index=0).C.print_pars()
                    model.fit(x_train, y_train,
                          validation_data = (x_val, y_val), batch_size = 16, epochs = num_epochs,
                              verbose=1 if modelname=="HMM" else 0)
                    if modelname == "HMM":
                        # print transition and emission matrices
                        model.get_layer(index=0).C.print_pars()
                    results = model.evaluate(x_test, y_test, batch_size = 16, verbose=0)
                    # print ("j=", j, "\tt=", t, "\tr=", r, "\tresults=", results)
                    accuracy = results[1]
                accs[j, r] = accuracy
        print ("\n", accs)
    end_time = time()
    seconds_elapsed = end_time - start_time
    print ("time [s]:", seconds_elapsed)

HMM dir= False 	0  lr=0.001
transition matrices A:
 [[[0.09071 0.09378 0.09017 0.08941 0.09309 0.08931 0.09032 0.09398 0.09064 0.08956 0.08902]
  [0.09358 0.08748 0.09213 0.09411 0.09284 0.08929 0.08923 0.08736 0.09094 0.09    0.09304]
  [0.08941 0.09374 0.09169 0.09036 0.08828 0.09031 0.08863 0.09122 0.09194 0.09292 0.0915 ]
  [0.09301 0.08718 0.09461 0.08919 0.0932  0.08843 0.09379 0.09145 0.09043 0.08824 0.09049]
  [0.09292 0.09269 0.09342 0.08753 0.09165 0.09066 0.08931 0.08748 0.09254 0.09364 0.08817]
  [0.09542 0.09318 0.08977 0.08689 0.08995 0.09302 0.08836 0.09325 0.08686 0.09521 0.08809]
  [0.08989 0.09356 0.08919 0.08918 0.09666 0.09248 0.08789 0.09203 0.08784 0.08946 0.09183]
  [0.09128 0.08909 0.0926  0.09514 0.09068 0.08963 0.08863 0.09095 0.09241 0.0894  0.09018]
  [0.09657 0.09065 0.09125 0.08924 0.08819 0.08897 0.08881 0.09166 0.09449 0.0922  0.08795]
  [0.08819 0.08635 0.09364 0.09152 0.08744 0.0943  0.09372 0.08785 0.09267 0.09192 0.0924 ]
  [0.08819 0.08878 0.09269 0

## Plot Results

In [None]:
#  single plot
somemodelnames = ["HMM"] #["SimpleRNN", "LSTM", "GRU", "ad hoc"]
fig, ax = plt.subplots(1,2, figsize=(14, 5))
for j, direction in enumerate(directions):
    for i, model in enumerate(somemodelnames):
        avg_test_acc = np.mean(test_accs[(model, direction)], axis = 1)
        ax[j].plot(ts, avg_test_acc, 'o-', label = model)
    ax[j].set_title('Performance of established ' + ("bidirectional" if direction else "unidirectional") + " sequence models" )
    ax[j].set_xlabel("t")
    ax[j].set_ylabel("accuracy");
    ax[j].legend()
print ('Accuracy was averaged over ' + str(num_reps) + ' repetitions')
fig.savefig('failing1.pdf') 

In [None]:
modelnames

In [None]:
# one plot each
fig, ax = plt.subplots(len(modelnames), len(directions), squeeze=False, figsize=(25,20))
for i, model in enumerate(modelnames):
    for j, direction in enumerate(directions):
        avg_test_acc = np.mean(test_accs[(model, direction)], axis = 1)
        ax[i,j].plot(ts, avg_test_acc)
        ax[i,j].set_ylim([.45, 1])
        ax[i,j].set_title('Test accuracy of ' + model + ' model ' + ("(Bidirectional)" if direction else "") + ' averaged over ' + str(num_reps) + ' repetitions')
        ax[i,j].set_xlabel("t")
        ax[i,j].set_ylabel("accuracy");

In [None]:
fig.savefig('failing.pdf') 