In [None]:
import numpy as np
import tensorflow as tf

In [None]:
def integration_task(seq_len, num_sapmles):
  for i in range(num_samples):
    noise = (np.random.rand(seq_len) * 2 - 1).reshape((seq_len, 1))
    target = int(np.sum(noise) >= 0)
    yield noise, target

In [None]:
seq_len = 25
num_samples = 80000

def my_integration_task():
  for i in integration_task(seq_len, num_samples):
    yield i

In [None]:
ds = tf.data.Dataset.from_generator(my_integration_task, output_signature=(
         tf.TensorSpec(shape=(seq_len,1), dtype=tf.float64),
         tf.TensorSpec(shape=(), dtype=tf.int32)))

def prepare_integration_data(integration_ds):
  #cache this progress in memory, as there is no need to redo it; it is deterministic after all
  integration_ds = integration_ds.cache()
  #shuffle, batch, prefetch
  integration_ds = integration_ds.shuffle(100)
  integration_ds = integration_ds.batch(32)
  integration_ds = integration_ds.prefetch(100)
  #return preprocessed dataset
  return integration_ds

ds = prepare_integration_data(ds)
train_ds = ds.take(70000)
test_ds = ds.take(10000)

In [None]:
class LSTM_Cell(tf.keras.layers.Layer):
    def __init__(self, units, kernel_regularizer=None ):
        super(LSTM_Cell, self).__init__()
        
        self.units = units
        
        self.dense_forget = tf.keras.layers.Dense(units, 
                                                 kernel_regularizer=kernel_regularizer, 
                                                 use_bias=False)
        self.forget_bias = tf.Variable(tf.ones(units), name="LSTM_forget_biases")
        
        self.dense_input = tf.keras.layers.Dense(units, 
                                                 kernel_regularizer=kernel_regularizer, 
                                                 use_bias=False)
        self.input_bias = tf.Variable(tf.zeros(units), name="LSTM_input_biases")        
        
        self.dense_cell = tf.keras.layers.Dense(units, 
                                                 kernel_regularizer=kernel_regularizer, 
                                                 use_bias=False)
        self.cell_bias = tf.Variable(tf.zeros(units), name="LSTM_cell_biases")
        
        self.dense_output = tf.keras.layers.Dense(units, 
                                                 kernel_regularizer=kernel_regularizer, 
                                                 use_bias=False)
        self.output_bias = tf.Variable(tf.zeros(units), name="LSTM_output_biases")
        
        self.dense_hidden = tf.keras.layers.Dense(units, 
                                                 kernel_regularizer=kernel_regularizer, 
                                                 use_bias=False)
        self.hidden_bias = tf.Variable(tf.zeros(units), name="LSTM_hidden_biases")
        
        self.state_size = units
        
    @tf.function
    def call(self, input_t, state, training=False):
        
        f_t = tf.keras.activations.sigmoid(self.dense_forget(tf.concat(state[0], input_t)) + self.forget_bias)
        
        i_t = tf.keras.activations.sigmoid(self.dense_input(tf.concat(state[0], input_t)) + self.input_bias)
        
        C_t = tf.keras.activations.tanh(self.dense_cell(tf.concat(state[0], input_t)) + self.cell_bias)

        cell_state = tf.linalg.matmul(f_t, state[1]) + tf.linalg.matmul(i_t, C_t)

        o_t = tf.keras.activations.sigmoid(self.dense_output(tf.concat(state[0], input_t)) + self.output_bias)

        hidden_state = tf.linalg.matmul(o_t, tf.keras.activations.tanh(cell_state))

        return (hidden_state, cell_state)

In [None]:
class RNNWrapper(tf.keras.layers.Layer):
    def __init__(self, RNN_Cell, return_sequences=False):
        super(RNNWrapper, self).__init__()
        
        self.return_sequences = return_sequences
        
        self.cell = RNN_Cell

    @tf.function
    def call(self, data, training=False):
        
        length = data.shape[1]

        # initialize state of the simple rnn cell
        state = tf.zeros((data.shape[0], self.cell.units), tf.float32)
        
        # initialize array for hidden states (only relevant if self.return_sequences == True)
        hidden_states = tf.TensorArray(dtype=tf.float32, size=length)

        for t in tf.range(length):
            input_t = data[:,t,:]

            state = self.cell(input_t, state, training)

            if self.return_sequences:
                # write the states to the TensorArray
                #hidden_states = hidden_states.write(t, state)
                hidden_states.append(state)
        
        if self.return_sequences:
            # transpose the sequence of hidden_states from TensorArray accordingly 
            #(batch and time dimensions are otherwise switched after .stack())
            outputs = tf.transpose(hidden_states.stack(), [1,0,2])
        
        else:
            # take the last hidden state of the simple rnn cell
            outputs = state
        
        return outputs

In [None]:
class RNN_Model(tf.keras.Model):
    def __init__(self, units):
        super(RNN_Model, self).__init__()

        self.RNNWrapper = RNNWrapper(CustomSimpleRNNCell(units), return_sequences=False)
        
    @tf.function
    def call(self, data, training=False):
        
        x = self.RNNWrapper(data, training)
        x = tf.keras.activations.sigmoid(x)
        
        return x

In [None]:
def train_step(model, input, target, loss_function, optimizer):
  # loss_object and optimizer_object are instances of respective tensorflow classes
  with tf.GradientTape() as tape:
    prediction = model(input)
    loss = loss_function(target, prediction[:,(seq_len-1)])
    gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

def test(model, test_data, loss_function, accuracy_function):
  # test over complete test data

  test_accuracy_aggregator = []
  test_loss_aggregator = []
  for (input, target) in test_data:
    prediction = model(input)
    sample_test_loss = loss_function(target, prediction[:,(seq_len-1)])
    sample_test_accuracy = accuracy_function.update_state(target, prediction[:,(seq_len-1)])
    sample_test_accuracy = accuracy_function.result()
    test_loss_aggregator.append(sample_test_loss.numpy())
    test_accuracy_aggregator.append(sample_test_accuracy.numpy())

  test_loss = tf.reduce_mean(test_loss_aggregator)
  test_accuracy = tf.reduce_mean(test_accuracy_aggregator)

  return test_loss, test_accuracy

In [None]:
tf.keras.backend.clear_session()

#For showcasing we only use a subset of the training and test data (generally use all of the available data!)
#train_dataset = train_dataset.take(1000)
#test_dataset = test_dataset.take(100)

### Hyperparameters
num_epochs = 15
learning_rate = 0.05

# Initialize the model.
model = RNN_Model(25)
# Initialize the loss: categorical cross entropy. Check out 'tf.keras.losses'.
cross_entropy_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)
# Initialize the optimizer: SGD with default parameters. Check out 'tf.keras.optimizers'
optimizer = tf.keras.optimizers.SGD(learning_rate)

accuracy_function = tf.keras.metrics.BinaryAccuracy(name='binary_accuracy', threshold=0.5)

# Initialize lists for later visualization.
train_losses = []

test_losses = []
test_accuracies = []

#testing once before we begin
test_loss, test_accuracy = test(model, test_ds, cross_entropy_loss, accuracy_function)
test_losses.append(test_loss)
test_accuracies.append(test_accuracy)

#check how model performs on train data once before we begin
train_loss, _ = test(model, train_ds, cross_entropy_loss, accuracy_function)
train_losses.append(train_loss)

# We train for num_epochs epochs.
for epoch in range(num_epochs):
    print(f'Epoch: {str(epoch)} starting with accuracy {test_accuracies[-1]}')

    #training (and checking in with training)
    epoch_loss_agg = []
    for input,target in train_ds:
        train_loss = train_step(model, input, target, cross_entropy_loss, optimizer)
        epoch_loss_agg.append(train_loss)

    #track training loss
    train_losses.append(tf.reduce_mean(epoch_loss_agg))

    #testing, so we can track accuracy and test loss
    test_loss, test_accuracy = test(model, test_ds, cross_entropy_loss, accuracy_function)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)

Epoch: 0 starting with accuracy 0.5577328205108643
Epoch: 1 starting with accuracy 0.6235326528549194
Epoch: 2 starting with accuracy 0.7120217084884644
Epoch: 3 starting with accuracy 0.7625622749328613
Epoch: 4 starting with accuracy 0.7946837544441223
Epoch: 5 starting with accuracy 0.8171601295471191
Epoch: 6 starting with accuracy 0.8345286250114441
Epoch: 7 starting with accuracy 0.8487794995307922
Epoch: 8 starting with accuracy 0.8598180413246155
Epoch: 9 starting with accuracy 0.8683062791824341
Epoch: 10 starting with accuracy 0.8754562735557556
Epoch: 11 starting with accuracy 0.8813861608505249
Epoch: 12 starting with accuracy 0.8862486481666565
Epoch: 13 starting with accuracy 0.8896233439445496
Epoch: 14 starting with accuracy 0.892823338508606
