In [11]:
import numpy as np 
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.python.keras.layers import Dense

In [34]:
def integration_task(seq_len, num_samples):

    for _ in range(num_samples):
        noise = np.random.uniform(-2,2, size=(seq_len, 1))
        target = [int(np.sum(noise) > 0)]

    yield (noise, target)

In [13]:
def my_integration_task():

    # declaring data parameters
    num_samples = 50000
    seq_len= 30

    return integration_task(seq_len, num_samples)

Creating the dataset

In [14]:
def data_preprocessing(dataset):

    # cache
    dataset = dataset.cache()
    # shuffle, batch, prefetch our dataset
    dataset = dataset.shuffle(5000)
    dataset = dataset.batch(32)
    dataset = dataset.prefetch(20)
    return dataset


def data_pipeline():

    # creating dataset with self-defined generator
    dataset = tf.data.Dataset.from_generator(my_integration_task, (tf.float32, tf.int16))

    # splitting dataset in training, validation and test data
    train_dataset = dataset.take(64000)
    remaining = dataset.skip(64000)
    valid_dataset = remaining.take(16000)
    test_dataset = remaining.skip(16000)

    # preprocessing
    train_dataset = data_preprocessing(train_dataset)
    valid_dataset = data_preprocessing(valid_dataset)
    test_dataset = data_preprocessing(test_dataset)

    return train_dataset, valid_dataset, test_dataset

# Custom LSTM Network

LSTM Cell implementation

In [30]:
class LSTM_Cell(tf.keras.layers.Layer):
    
    def __init__(self, units):
        
        super(LSTM_Cell, self).__init__()
        
        self.units = units

        self.forget_gate = Dense(self.units, bias_initializer='ones', activation = 'sigmoid')
        self.input_gate = Dense(self.units, activation='sigmoid')
        self.cell_candidate = Dense(self.units, activation='tanh')
        self.output_gate = Dense(self.units, activation='sigmoid')

        
    def call(self, x, states):

        (h_t0, c_t0) = states

        # input gate
        x_i = self.input_gate(x)
        h_i = self.input_gate(h_t0)
        i = tf.nn.sigmoid(x_i+h_i)

        # forget gate
        x_f = self.forget_gate(x)
        h_f = self.forget_gate(h_t0)
        f = tf.nn.sigmoid(x_f + h_f)

        # forget old context/cell info
        c_t1 = f * c_t0

        # updating cell memory
        x_c = self.cell_candidate(x)
        h_c = self.cell_candidate(h_t0)
        c = tf.nn.tanh(x_c + h_c)

        m = c * i
        c_t1 = m + c_t1
        
        # output gate
        x_o = self.output_gate(x)
        h_o = self.output_gate(h_t0)
        our = tf.nn.sigmoid(x_o + h_o)

        # hidden output
        h_t1 = out * tf.nn.tanh(c_t1)

        return (h_t1, c_t1)

LSTM Layer implementation

In [16]:
class LSTM_Layer(tf.keras.Model):

    def __init__(self, cell):

        super(LSTM_Layer, self).__init__()

        self.cell = cell
        self.cell_units = self.cell.units


    def zero_states(self, batch_size):

        h = tf.zeros((batch_size, self.cell_units), tf.float32)
        c = tf.zeros((batch_size, self.cell_units), tf.float32)
        return(h, c)


    @tf.function
    def call(self, x):

        # sequence length per time-steps
        seq_len = x.shape[1]

        # initial states from h and c to 0
        states = self.zero_states(x.shape[0])
        hidden_states = tf.TensorArray(dtype=tf.float32, size=seq_len)

        # interating over timesteps
        for timestep in tf.range(seq_len):
            x_t = x[:,timestep,:]
            states = self.cell(x_t, states)

            # only saving the hidden-output not the cell-state
            (h, c) = states
            hidden_states = hidden_states.write(timestep, h)

        # transpose hidden_states accordingly (batch and time steps switched)
        outputs = tf.transpose(hidden_states.stack(), [1, 0, 2])

        return outputs

LSTM Model implementation

In [17]:
class LSTM_Model(tf.keras.layers.Layer):
    
    def __init__(self, units):
        
        super(LSTM_Model, self).__init__()

        self.units = units

        # first layer
        self.LSTM_layer = LSTM_Layer(LSTM_Cell(self.units))
        # classification
        self.out = Dense(1, activation="sigmoid")
        

    @tf.function
    def call(self,x):
        x = self.LSTM_layer(x)
        x = self.out(x)

        return x

# Training the network
Functions for the training step and testing

In [18]:
def train_step(model, input, target, loss_function, optimizer):

    with tf.GradientTape() as tape:
        prediction = model(input)
        # only the prediction from the last timestep
        prediction = prediction[:,-1,:]
        loss = loss_function(target, prediction)
        gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    return loss


def test(model, test_data, loss_function):

    accuracy_aggregator = []
    loss_aggregator = []

    for (input, target) in test_data:
        prediction = model(input)
        prediction = prediction[:,-1,:]
        loss = loss_function(target, prediction)
        loss_aggregator.append(loss.numpy())

        for t, p in zip(target, prediction):
            accuracy_aggregator.append(tf.cast(np.round(t.numpy(),0) == np.round(p.numpy(),0), tf.float32))

    loss = tf.reduce_mean(loss_aggregator)
    accuracy = tf.reduce_mean(accuracy_aggregator)

    return loss, accuracy

In [23]:
def classification(model, optimizer, num_epochs, train_dataset, valid_dataset):

    # Testing on the validation dataset once before we begin
    valid_loss, valid_accuracy = test(model, valid_dataset, cross_entropy_loss)
    valid_losses.append(valid_loss)
    valid_accuracies.append(valid_accuracy)

    # Testing on the training dataset once before we begin
    train_loss, _ = test(model, train_dataset, cross_entropy_loss)
    train_losses.append(train_loss)

    # Training the model for num_epochs epochs
    for epoch in range(num_epochs):
        print(
            f'Epoch: {str(epoch+1)} starting with (validation set) accuracy {valid_accuracies[-1]} and loss {valid_losses[-1]}')

        # training and calculating loss
        epoch_loss_agg = []

        for input, target in train_dataset:
            train_loss = train_step(model, input, target, cross_entropy_loss, optimizer)
            epoch_loss_agg.append(train_loss)

        # tracking the training loss
        train_losses.append(tf.reduce_mean(epoch_loss_agg))
        print(f'Epoch: {str(epoch+1)} train loss: {train_losses[-1]}')

        # testing our model in each epoch to track accuracy and loss on the validation set
        valid_loss, valid_accuracy = test(model, valid_dataset, cross_entropy_loss)
        valid_losses.append(valid_loss)
        valid_accuracies.append(valid_accuracy)

    results = [train_losses, valid_losses, valid_accuracies]
    
    return results

In [35]:
tf.keras.backend.clear_session()

train_dataset, valid_dataset, test_dataset = data_pipeline()

### Hyperparameters
learning_rate = 0.001

# initialize the loss: categorical cross entropy
cross_entropy_loss = tf.keras.losses.BinaryCrossentropy()
# initialize the opimizer: adam
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.999, epsilon=1e-07)
# initialize the model
model = LSTM_Model(25)
# initialize lists for later visualization
train_losses = []
valid_losses = []
valid_accuracies = []

with tf.device('/device:gpu:0'):
    # training the model
    results = classification(model, optimizer, 10, train_dataset, valid_dataset)
    model.summary()

    # saving results for visualization
    train_losses.append(results[0])
    valid_losses.append(results[1])
    valid_accuracies.append(results[2])

    # testing the trained model
    _, test_accuracy = test(trained_model, test_dataset, tf.keras.losses.CategoricalCrossentropy(), False)
    print("Accuracy (test set):", test_accuracy)

    # visualizing losses and accuracy
    visualize(train_losses, valid_losses, valid_accuracies)


ValueError: ignored

In [None]:
def visualize(train_losses, valid_losses, valid_accuracies):
    # Visualize loss and accuracy
    plt.figure()

    line1, = plt.plot(train_losses)
    line2, = plt.plot(valid_losses)
    line3, = plt.plot(valid_accuracies)
    
    plt.legend((line1, line2, line3),(" train_dataset loss", " valid_dataset loss", " valid_dataset accuracy"))
    plt.xlabel("Training epoch")
    plt.show()