In [27]:
import os
import numpy as np

# suppress the TensorFlow warning
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'

In [28]:
from keras.models import Sequential
from keras.layers.core import Activation, RepeatVector, Dense, Dropout
from keras.layers.wrappers import TimeDistributed
from keras.layers import LSTM
from keras.models import load_model

In [29]:
class colors:
    ok = '\033[92m'
    fail = '\033[91m'
    close = '\033[0m'

In [30]:
# encode a given integer sequence into RNN compatible format (one-hot representation)
def encode(X,seq_len, vocab_size):
    x = np.zeros((len(X),seq_len, vocab_size), dtype=np.float32)
    for ind,batch in enumerate(X):
        for j, elem in enumerate(batch):
            x[ind, j, elem] = 1
    return x

In [31]:
# generate a stream of inputs for training
def batch_gen(batch_size=32, seq_len=10, max_no=100):
    # Randomly generate a batch of integer sequences (X) and its sorted counterpart (Y)
    x = np.zeros((batch_size, seq_len, max_no), dtype=np.float32)
    y = np.zeros((batch_size, seq_len, max_no), dtype=np.float32)

    while True:
	# Generates a batch of input
        X = np.random.randint(max_no, size=(batch_size, seq_len))
        Y = np.sort(X, axis=1)

        for ind,batch in enumerate(X):
            for j, elem in enumerate(batch):
                x[ind, j, elem] = 1

        for ind,batch in enumerate(Y):
            for j, elem in enumerate(batch):
                y[ind, j, elem] = 1

        yield x, y
        x.fill(0.0)
        y.fill(0.0)

In [32]:
def create_model(seq_len, max_no, n_layers, hidden_size):
    model = Sequential()
    # the encoder LSTM
    model.add(LSTM(hidden_size, input_shape=(seq_len, max_no)))
    # in next layer, repeat the input seq_len times
    model.add(RepeatVector(seq_len))
    # decoder RNN, which will return output sequence
    for _ in range(n_layers):
        model.add(LSTM(hidden_size, return_sequences=True))
    model.add(Dropout(0.5))
    model.add(TimeDistributed(Dense(max_no)))
    model.add(Activation('softmax'))
    return model

In [33]:
batch_size=32
seq_len = 15 # number of elements in sequence to sort
max_no = 100 # upper range of the numbers in sequence
hidden_size = 128
n_layers = 2
epochs = 1000

In [34]:
model = create_model(seq_len, max_no, n_layers, hidden_size)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [35]:
for ind,(X,Y) in enumerate(batch_gen(batch_size, seq_len, max_no)):
        loss, acc = model.train_on_batch(X, Y)
        
        if ind > epochs:
            # Save trained model
            model.save('model.h5')
            break

In [38]:
from keras.models import load_model


def encode(X,seq_len, vocab_size):
    x = np.zeros((len(X),seq_len, vocab_size), dtype=np.float32)
    for ind,batch in enumerate(X):
        for j, elem in enumerate(batch):
            x[ind, j, elem] = 1
    return x

model = load_model('model.h5')

seq_len = 3 # number of elements in sequence to sort
max_no = 100 # upper range of the numbers in sequence
verbose = False
win_cnt = 0
#numb_trials = 1000
numb_trials = 10

for ii in range(numb_trials):
    testX = np.random.randint(max_no, size=(1, seq_len))
    test = encode(testX, seq_len, max_no)
    y = model.predict(test, batch_size=1)
    #print(testX)
    print(testX)
    np_sorted = np.sort(testX)[0]
    rnn_sorted = np.argmax(y, axis=2)[0]
    print(np_sorted)
    print(rnn_sorted)
    is_equal = np.array_equal(np_sorted, rnn_sorted)
    if is_equal: win_cnt += 1
    if verbose:
        print(np_sorted, ': sorted by NumPy algorithm')
        print(rnn_sorted, ': sorted by trained RNN')
        print("\n")

print('\nSuccess Rate: {0:.2f} %'.format(100*win_cnt/numb_trials))

ValueError: Error when checking : expected lstm_7_input to have shape (15, 100) but got array with shape (3, 100)