<a href="https://colab.research.google.com/github/n-bzy/iannwtf/blob/main/homework07_Nico.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prepare Dataset

In [1]:
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

train_ds, val_ds = tfds.load('mnist', split=['train','test'], as_supervised=True)

def preprocess(data, seq):
    """Preprocessing the training and testing data for the LSTM network"""

    # Add first digit, subtract second, add third, subtract fourth
    l = list()
    for i, elem in enumerate(data):
        if (i% seq) == 0:
            l.append(int(elem[1]))
        else:
            if (i%2)==0:
                l.append(int(l[i-1] + elem[1]))
            else:
                l.append(int(l[i-1] - elem[1]))
    
    # Convert list of new targets to dataset            
    new_t = tf.data.Dataset.from_tensor_slices(l)
    # Zip old dataset and new targets
    data = tf.data.Dataset.zip((data, new_t))
    # Replace old with new targets
    data = data.map(lambda x, t: (x[0], t))

    data = data.map(lambda x, t: (tf.cast(x, tf.float32), t))
    data = data.map(lambda x, t: ((x/128.)-1., t))

    #data = data.batch(seq)
    data = data.window(seq)

    data = data.cache()
    data = data.shuffle(1000)
    data = data.map(lambda x,t: (x.batch(seq).get_single_element(), t.batch(seq).get_single_element()))
    data = data.batch(32)
    data = data.prefetch(tf.data.AUTOTUNE)

    return data

train = preprocess(train_ds, seq=4)
val = preprocess(val_ds, seq=4)

for x,t in val.take(1):
    print(x.shape, t.shape)


Downloading and preparing dataset 11.06 MiB (download: 11.06 MiB, generated: 21.00 MiB, total: 32.06 MiB) to ~/tensorflow_datasets/mnist/3.0.1...


Dl Completed...:   0%|          | 0/5 [00:00<?, ? file/s]

Dataset mnist downloaded and prepared to ~/tensorflow_datasets/mnist/3.0.1. Subsequent calls will reuse this data.
(32, 4, 28, 28, 1) (32, 4)


# Basic CNN

In [2]:
class CNN(tf.keras.Model):
    """Basic CNN structure for processing of mnist images"""
    def __init__(self):
        super().__init__()

        self.layer1 = tf.keras.layers.Conv2D(filters=64, kernel_size=3, padding='same', activation='relu')
        self.pool1 = tf.keras.layers.TimeDistributed(tf.keras.layers.MaxPool2D(pool_size=2, strides=2))
        # Image size 14x14
        self.layer2 = tf.keras.layers.Conv2D(filters=128, kernel_size=3, padding='same', activation='relu')
        self.pool2 = tf.keras.layers.TimeDistributed(tf.keras.layers.MaxPool2D(pool_size=2, strides=2))
        # Image size 7x7
        self.layer3 = tf.keras.layers.Conv2D(filters=128, kernel_size=3, padding='same', activation='relu')
        self.pool3 = tf.keras.layers.TimeDistributed(tf.keras.layers.GlobalAveragePooling2D())

    def __call__(self, x):
        x = self.layer1(x)
        x = self.pool1(x)
        x = self.layer2(x)
        x = self.pool2(x)
        x = self.layer3(x)
        x = self.pool3(x)
        return x

In [3]:
class LSTMCell(tf.keras.layers.AbstractRNNCell):
    """Create a LSTM Cell"""
    def __init__(self, hidden_state, cell_state):
        super().__init__()

        self.hidden_state = hidden_state
        self.cell_state = cell_state

        self.linear = tf.keras.layers.Dense(hidden_state)

        self.forget_gate = tf.keras.layers.Dense(cell_state, 
                                             kernel_initializer= tf.keras.initializers.Orthogonal(gain=1.0, seed=None),
                                             activation=tf.nn.sigmoid)
        self.input_gate = tf.keras.layers.Dense(cell_state, 
                                             kernel_initializer= tf.keras.initializers.Orthogonal(gain=1.0, seed=None),
                                             activation=tf.nn.sigmoid)
        self.candidate_gate = tf.keras.layers.Dense(cell_state, 
                                             kernel_initializer= tf.keras.initializers.Orthogonal(gain=1.0, seed=None),
                                             activation=tf.nn.tanh)
        self.output_gate = tf.keras.layers.Dense(hidden_state, 
                                             kernel_initializer= tf.keras.initializers.Orthogonal(gain=1.0, seed=None),
                                             activation=tf.nn.sigmoid)

    @property
    def state_size(self):
        return [tf.TensorShape([self.hidden_state]), 
                tf.TensorShape([self.cell_state])]
    @property
    def output_size(self):
        return [tf.TensorShape([self.hidden_state])]
    
    def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
        return [tf.zeros([self.hidden_state]), 
                tf.zeros([self.cell_state])]

    def __call__(self, x, states):
        hidden = states[0]
        cell = states[1]
        x = hidden + self.linear(x)

        f = self.forget_gate(x)
        i = self.input_gate(x)
        c_hat = self.candidate_gate(x)

        new_cell_state = f*cell + i*c_hat

        y = self.output_gate(x)*tf.math.tanh(new_cell_state)
        new_hidden_state = y

        return y, [new_hidden_state, new_cell_state]

In [7]:
class RNN(tf.keras.Model):
    """Create a RNN out of LSTM cells"""
    def __init__(self):
        """Initialize the RNN"""
        super().__init__()

        self.cnn = CNN()

        self.lstm_cell = LSTMCell(hidden_state=24, cell_state=24)
        # For debugging purpose: self.lstm = tf.keras.layers.LSTMCell(24)
        self.rnn_layer = tf.keras.layers.RNN(self.lstm_cell, return_sequences=False, unroll=True)
        
        self.output_layer = tf.keras.layers.Dense(1, activation=None)
        
        self.metrics_list = [tf.keras.metrics.Mean(name="loss"), 
                             tf.keras.metrics.Accuracy(name="accuracy")]

    @property
    def metrics(self):
        return self.metrics_list
    
    def reset_metrics(self):
        for metric in self.metrics:
            metric.reset_state()
        
    def call(self, x, training=False):
        print(x.shape)
        x = self.cnn(x)
        x = self.rnn_layer(x)
        x = self.output_layer(x)
        return x

    @tf.function
    def train(self, data):
        x,t = data
        with tf.GradientTape() as tape:
            y = self(x, training=True)
            loss = self.compiled_loss(t, y, regularization_losses=self.losses)
        gradients = tape.gradient(loss, self.trainable_variables)
        
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        
        self.metrics[0].update_state(loss)
        self.metrics[1].update_state(t, y)
        
        return {m.name : m.result() for m in self.metrics}
    
    @tf.function
    def test(self, data):
        
        x,t = data
        y = self(x, training=False)
        loss = self.compiled_loss(t, y, regularization_losses=self.losses)
                
        self.metrics[0].update_state(loss)
        self.metrics[1].update_state(t, y)
        
        return {m.name : m.result() for m in self.metrics} 

In [8]:
# For testing dimensions, layers and output shapes
model = RNN()
model(tf.keras.Input((4,28,28,1)));
model.summary()

(None, 4, 28, 28, 1)
Model: "rnn_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 cnn_1 (CNN)                 multiple                  0 (unused)
                                                                 
 lstm_cell_1 (LSTMCell)      multiple                  5496      
                                                                 
 rnn_3 (RNN)                 multiple                  5496      
                                                                 
 dense_11 (Dense)            multiple                  25        
                                                                 
Total params: 227,605
Trainable params: 227,601
Non-trainable params: 4
_________________________________________________________________


# Training with .compile and .fit

In [9]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
loss = tf.keras.losses.MeanSquaredError()

model = RNN()
# compile the model (here, adding a loss function and an optimizer)
model.compile(optimizer=optimizer, loss=loss)

In [10]:
history = model.fit(train, validation_data=val,epochs=10)

Epoch 1/10
(None, None, 28, 28, 1)


ValueError: ignored