# Homework 07
This weeks task is to implement a LSTM (Long short-term memory) network. To test our implementation we will predict if the integral of some given noise sequence is positive of negative.


In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple
from training import training


## 1. Generate Dataset

In [2]:
SEQUENCE_LENGTH = 8
DATASET_LENGTH = 10_000
def integration_task(seq_len, num_samples):
    for _ in range(num_samples):
        x = tf.random.normal((seq_len,1))
        y = tf.expand_dims(tf.cast(tf.math.reduce_sum(x) > 0, tf.int16), -1)
        #Y = np.expand_dims(int(tf.math.reduce_sum(x) == 1), -1) # why == 1? will never be true
        yield x, y


def my_integration_task():
    for x,y in integration_task(SEQUENCE_LENGTH, DATASET_LENGTH):
        yield x, y

data = tf.data.Dataset.from_generator(
    my_integration_task,
    output_signature=(
        tf.TensorSpec(shape=(SEQUENCE_LENGTH,1), dtype=tf.float32),
        tf.TensorSpec(shape=(1,), dtype=tf.int16))
)

In [3]:
def preprocessing_pipeline(data, batch_size) -> tf.data:
    """Apply preproccesing pipeline to the given dataset.
    
    :param data: data to be preprocessed
    :type data: tensorflow 'Dataset'
    :param means_array: array of the same shape as an image, containing every
        feature's mean over all images in the train dataset
    :type means_array: numpy array of floats
    :param std_array: array of the same shape as an image, containing every
        feature's standart deviation over all images in the train dataset
    :type std_array: numpy array of floats
    :param batch_size: batch size of the created dataset
    :type batch_size: integer
    :return: preprocessed dataset
    :rtype: tensorflow 'Dataset'
    """
    # cache the dataset
    data = data.cache()
    # shuffle, batch and prefetch the dataset
    data = data.shuffle(1000)
    data = data.batch(batch_size)
    data = data.prefetch(100)
    return data

In [None]:


def split_dataset(ds, split_proportions = {
    'train': 0.7,
    'valid': 0.1,
    'test': 0.2}):
    assert sum(split_proportions.values()) <= 1,\
        "The sum of split_proportions is larger than 1!"
    

In [4]:
BATCH_SIZE = 10
datasets = {
    'train': ,
    'valid': valid_ds,
    'test': test_ds
}

datasets = {key:preprocessing_pipeline(ds, BATCH_SIZE) for key, ds in datasets.items()}




tf.Tensor([5 8 1], shape=(3,), dtype=int32) tf.Tensor([5 1], shape=(2,), dtype=int32)


In [5]:
class LSTM_Cell(tf.keras.layers.Layer):
    def __init__(self, units, unit_forget_bias=True) -> None:
        """Constructor function"""
        super(LSTM_Cell, self).__init__()
        self.units = units
        # forget gate
        self.fg_layer = tf.keras.layers.Dense(
            units,
            activation='sigmoid',
            bias_initializer='ones' if unit_forget_bias else 'glorot_uniform'
        )
        # input gate
        self.ig_layer = self.fg_W = tf.keras.layers.Dense(
            units,
            activation='sigmoid'
        )
        # output gate
        self.og_layer = self.fg_W = tf.keras.layers.Dense(
            units,
            activation='sigmoid'
        )
        # cell
        self.cell_layer = self.fg_W = tf.keras.layers.Dense(
            units,
            activation='tanh'
        )
    

    def call(self, x, states) -> Tuple[tf.Tensor]: 
        prev_hidden_state, prev_cell_state = states
        # gate inputs
        xh = tf.concat([x, prev_hidden_state], axis=1)
        # forget gate output
        ffilter = self.fg_layer(xh)
        # input gate output
        ifilter = self.ig_layer(xh)
        # cell state candidates
        cs_cand = self.cell_layer(xh)
        # update cell state
        cell_state = tf.math.multiply(ffilter, prev_cell_state) +\
                     tf.math.multiply(ifilter, cs_cand)
        # output gate output
        ofilter = self.og_layer(xh)
        # new hidden state
        hidden_state = tf.math.multiply(ofilter, tf.nn.tanh(cell_state))
        return hidden_state, cell_state

In [6]:
class LSTM_Layer(tf.keras.layers.Layer):
    def __init__(self, cells) -> None:
        super(LSTM_Layer, self).__init__()
        self.cells = cells
    
    #@tf.function
    def call(self, x, states=None) -> tf.Tensor:
        batch_size = tf.shape(x)[0]
        sequence_length = tf.shape(x)[1]
        if not states:
            states = self.zero_states(batch_size)
        output_sequence = []
        # iterate through time steps in input sequence
        for c_i, cell in enumerate(self.cells):
            cell_state = (states[0][:, c_i, :], states[1][:, c_i, :]) 
            cell_state_agg = []
            for seq_idx in range(sequence_length):
                input = x[:,seq_idx,:]
                cell_state = cell(input, cell_state)
                cell_state_agg.append(cell_state[0])
            output_sequence.append(cell_state_agg)
        # rearange output sequence
        output_sequence = tf.transpose(output_sequence, perm=[2, 1, 0, 3])
        # concat outputs from all lstm-cells
        #print(output_sequence)
        os_shape = tf.shape(output_sequence)
        output_sequence = tf.reshape(output_sequence, (os_shape[0],
                                                       os_shape[1],
                                                       os_shape[2]*os_shape[3])
                                    )
        return output_sequence
    
    def zero_states(self, batch_size):
        return (tf.zeros((batch_size, len(self.cells), self.cells[0].units)),
                tf.zeros((batch_size, len(self.cells), self.cells[0].units)))

In [7]:
class LSTM_Model(tf.keras.Model):
    def __init__(self, layer_list=[
        tf.keras.layers.Dense(5, activation='sigmoid'),
        LSTM_Layer(cells=[LSTM_Cell(6)]),
        tf.keras.layers.Dense(3, activation='sigmoid'),
        tf.keras.layers.Dense(1, activation='relu')
    ]) -> None:
        super(LSTM_Model, self).__init__()
        self.layer_list = layer_list
    
    def call(self, x):
        input = x
        for layer in self.layer_list:
            input = layer(input)
        return input[:,-1,:]

        

In [13]:
print(len(list(dataset)))
train = dataset.take(100)
test = dataset.skip(100)
valid = test.skip(100)
test = test.take(100)
datasets = {'train':train, 'valid':valid, 'test':test}
print(len(list(datasets['train'])))
print(len(list(datasets['valid'])))
print(len(list(datasets['test'])))
# Initialize the loss-function
cross_entropy_loss = tf.keras.losses.BinaryCrossentropy()
# Initialize the optimizer
optimizer = tf.keras.optimizers.SGD(0.001)

losses, accuracies = training(LSTM_Model(), datasets,
                              cross_entropy_loss,
                              optimizer, epochs=10)

200
100
0
100


KeyboardInterrupt: 

In [None]:
lstm = LSTM_Layer([LSTM_Cell(3), LSTM_Cell(3), LSTM_Cell(3), LSTM_Cell(3)])
dlayer = tf.keras.layers.Dense(1,'sigmoid')
for input, _ in dataset.take(1):
    print(input)
    print(dlayer(input))
    #output = lstm.call(input, None)
print(dlayer.get_weights())


tf.Tensor(
[[[ 0.865401  ]
  [-0.07156665]
  [-0.942625  ]
  [ 0.34527218]
  [-0.8537018 ]
  [-1.4697397 ]
  [ 0.914009  ]
  [ 1.051807  ]]

 [[-0.76230496]
  [-0.09492179]
  [ 0.19700608]
  [-0.8741309 ]
  [ 1.3292289 ]
  [ 2.191437  ]
  [ 0.6160504 ]
  [-0.5027481 ]]

 [[-1.4027071 ]
  [ 0.62084323]
  [ 2.0610416 ]
  [-0.09388001]
  [-0.73883116]
  [-0.04252322]
  [-0.5677695 ]
  [ 1.3584127 ]]

 [[ 0.92604935]
  [ 0.89551175]
  [ 0.48916686]
  [-0.22881404]
  [ 0.3331596 ]
  [-0.22380687]
  [ 0.3951474 ]
  [-1.6837456 ]]], shape=(4, 8, 1), dtype=float32)
tf.Tensor(
[[[0.33760962]
  [0.5139302 ]
  [0.6757057 ]
  [0.4331786 ]
  [0.66034985]
  [0.7585263 ]
  [0.32919678]
  [0.30594712]]

 [[0.64420784]
  [0.5184726 ]
  [0.46171853]
  [0.6639091 ]
  [0.26207936]
  [0.15359649]
  [0.38230568]
  [0.5966521 ]]

 [[0.7488358 ]
  [0.3814246 ]
  [0.16726774]
  [0.51827   ]
  [0.6400068 ]
  [0.50827837]
  [0.6087766 ]
  [0.25770772]]

 [[0.32712942]
  [0.33238566]
  [0.40589592]
  [0.5444319 ]

In [None]:
@tf.function
def fibonacci(n):
  ta = tf.TensorArray(tf.float32, size=0, dynamic_size=True)
  ta = ta.unstack([0., 1.])

  for i in range(2, n):
    ta = ta.write(i, ta.read(i - 1) + ta.read(i - 2))

  return ta.stack()

fibonacci(7)

<tf.Tensor: shape=(7,), dtype=float32, numpy=array([0., 1., 1., 2., 3., 5., 8.], dtype=float32)>

In [None]:
fg_w = np.random.normal(size=(4,10))
fg_b = np.ones((10, 1))
x = np.random.rand(4)

x @ fg_w

array([-0.3049741 ,  0.02408729, -0.68296275,  0.2989081 , -0.36264464,
       -0.89054254,  1.37596087,  0.00981885,  0.66994094, -0.61534579])