<a href="https://colab.research.google.com/github/martinpius/keras_Functional_API_architecture/blob/main/Keras_functional_API_and_Subclassing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
from google.colab import drive
try:
  drive.mount("/content/drive/", force_remount = True)
  COLAB = True
  import tensorflow
  print(f"You are on google colab with tensorflow version {tensorflow.__version__}")
except Exception as e:
  COLAB = False
  print(f"{type(e)}: {e}\n....Please Load your Drive....")

def time_fmt(t):
  h = int(t / (60 * 60))
  m = int(t % (60 * 60)/60)
  s = int(t % 60)
  return f"{h}: {m:>03}: {s:>05.2f}"

Mounted at /content/drive/
You are on google colab with tensorflow version 2.3.0


In [4]:
import tensorflow as tf
import time
from tensorflow import keras
import numpy as np

In [5]:
#RNN from scatch. Lets build a simple RNN from scratch
units = 64
inputs_shape = 5
time_steps = 20
inputs = keras.Input(shape = (None, units), name = 'input_shape')
x = keras.layers.GlobalAveragePooling1D()(inputs)
outputs = keras.layers.Dense(units = 1)(x)
model = keras.models.Model(inputs = inputs, outputs = outputs)


In [6]:
class MyRNN(keras.layers.Layer):
  def __init__(self):
    super(MyRNN, self).__init__()
    self.units = units
    self.trf_1 = keras.layers.Dense(units = units, activation = 'tanh')
    self.trf_2 = keras.layers.Dense(units = units, activation = 'tanh')
    self.myclass = model
  
  def call(self, inputs):
    outputs = []
    state = tf.zeros(shape = (inputs.shape[0], self.units))
    for t in range(inputs.shape[1]):
      x = inputs[:, t, :]
      h = self.trf_1(x)
      y = h + self.trf_2(state)
      state = y
      outputs.append(y)
      features = tf.stack(outputs, axis = 1)
      print(features.shape)
      return self.myclass(features)

myrnn = MyRNN()
myrnn(tf.zeros(shape = (1, time_steps, units)))

(1, 1, 64)


<tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>

In [7]:
#training, evaluation and inference loops using API
#Demo with the mnist dataset from keras

In [38]:
#Loading the data
print(f'.....loading....')
(x_train, y_train),(x_test, y_test) = tf.keras.datasets.mnist.load_data()

.....loading....


In [39]:
print(f"x_train.shape: {x_train.shape}, y_train.shape: {y_train.shape}\nxtest.shape: {x_test.shape}, y_test.shape: {y_test.shape}")

x_train.shape: (60000, 28, 28), y_train.shape: (60000,)
xtest.shape: (10000, 28, 28), y_test.shape: (10000,)


In [40]:
#Reshape and preprocess the data
(x_train, x_test) = x_train.reshape(60000, 784), x_test.reshape(10000,784)

In [37]:
y_train, y_test = tf.keras.utils.to_categorical(y_train, num_classes = 10), tf.keras.utils.to_categorical(y_test, num_classes= 10)


In [41]:
#We will fit our data in memory so we can change to numy arrays to economize the memory usage
x_train, x_test = x_train.astype('float32'), x_test.astype('float32')

In [42]:
#Scale the data into 0-1 range to serve training complexity
x_train, x_test = x_train/255, x_test/255

In [14]:
#Now we can build a simple MLP to fit our data
inputs = keras.Input(shape = (784,), name = 'input_shape')
x = keras.layers.Dense(units = 64, kernel_initializer = 'random_normal',activation = 'relu',name = 'layer_1')(inputs)
x = keras.layers.Dense(units = 64, kernel_initializer = 'random_normal', activation = 'relu', name = 'layer_2')(x)
outputs = keras.layers.Dense(units = 10, activation = 'softmax', name = 'outputs')(x)
model = keras.models.Model(inputs = inputs, outputs = outputs)


In [15]:
model.summary()

Model: "functional_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_shape (InputLayer)     [(None, 784)]             0         
_________________________________________________________________
layer_1 (Dense)              (None, 64)                50240     
_________________________________________________________________
layer_2 (Dense)              (None, 64)                4160      
_________________________________________________________________
outputs (Dense)              (None, 10)                650       
Total params: 55,050
Trainable params: 55,050
Non-trainable params: 0
_________________________________________________________________


In [16]:
#compile our model
model.compile(loss = 'CategoricalCrossentropy', optimizer = 'Adam', metrics = ['accuracy'])


In [17]:
#fit the data to our model
info_out = model.fit(x_train, y_train,validation_data = (x_test, y_test), verbose = 2, epochs = 10)

Epoch 1/10
1875/1875 - 3s - loss: 0.2898 - accuracy: 0.9169 - val_loss: 0.1615 - val_accuracy: 0.9526
Epoch 2/10
1875/1875 - 3s - loss: 0.1261 - accuracy: 0.9614 - val_loss: 0.1286 - val_accuracy: 0.9592
Epoch 3/10
1875/1875 - 3s - loss: 0.0911 - accuracy: 0.9723 - val_loss: 0.1023 - val_accuracy: 0.9673
Epoch 4/10
1875/1875 - 3s - loss: 0.0720 - accuracy: 0.9780 - val_loss: 0.1098 - val_accuracy: 0.9665
Epoch 5/10
1875/1875 - 3s - loss: 0.0586 - accuracy: 0.9812 - val_loss: 0.0938 - val_accuracy: 0.9722
Epoch 6/10
1875/1875 - 3s - loss: 0.0484 - accuracy: 0.9846 - val_loss: 0.0852 - val_accuracy: 0.9744
Epoch 7/10
1875/1875 - 3s - loss: 0.0412 - accuracy: 0.9866 - val_loss: 0.0943 - val_accuracy: 0.9735
Epoch 8/10
1875/1875 - 3s - loss: 0.0354 - accuracy: 0.9881 - val_loss: 0.0881 - val_accuracy: 0.9768
Epoch 9/10
1875/1875 - 3s - loss: 0.0307 - accuracy: 0.9895 - val_loss: 0.0931 - val_accuracy: 0.9752
Epoch 10/10
1875/1875 - 3s - loss: 0.0275 - accuracy: 0.9905 - val_loss: 0.0975 - 

In [18]:
display(info_out.history)

{'accuracy': [0.9168999791145325,
  0.9614333510398865,
  0.972266674041748,
  0.9779833555221558,
  0.9812333583831787,
  0.98458331823349,
  0.9865999817848206,
  0.9881333112716675,
  0.989466667175293,
  0.9905333518981934],
 'loss': [0.28983160853385925,
  0.12610550224781036,
  0.09114266186952591,
  0.07196151465177536,
  0.0586111806333065,
  0.048396993428468704,
  0.041168373078107834,
  0.035381220281124115,
  0.030723676085472107,
  0.027524393051862717],
 'val_accuracy': [0.9526000022888184,
  0.9592000246047974,
  0.9672999978065491,
  0.9664999842643738,
  0.9721999764442444,
  0.974399983882904,
  0.9735000133514404,
  0.9768000245094299,
  0.9751999974250793,
  0.9732999801635742],
 'val_loss': [0.16146746277809143,
  0.1285717487335205,
  0.10226282477378845,
  0.10979767143726349,
  0.0937642976641655,
  0.08517073094844818,
  0.09428945928812027,
  0.08814976364374161,
  0.09309753775596619,
  0.09753544628620148]}

In [19]:
#evaluate the model using the test data
eval = model.evaluate(x_test, y_test, batch_size= 64, verbose = 2)

157/157 - 0s - loss: 0.0975 - accuracy: 0.9733


In [20]:
print(f"loss: {eval[0]:.4f}\naccuracy: {eval[1]:.4f}")

loss: 0.0975
accuracy: 0.9733


In [21]:
#Get some predictions to see what real the model does
pred = model.predict(x_test[:2])
print(f"real_value: {x_test[:2]}\npredicted_value: {pred}")

real_value: [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
predicted_value: [[1.6469119e-10 4.0964516e-09 3.1635614e-07 1.3155272e-06 1.0526840e-13
  9.4298257e-11 5.8410590e-18 9.9999809e-01 2.9756966e-09 2.6966839e-07]
 [1.3450781e-07 4.1044338e-11 9.9999452e-01 5.2791496e-08 6.8916590e-22
  1.4107991e-08 1.9278250e-09 2.7737401e-16 5.2833852e-06 4.6048100e-12]]


In [22]:
# We may want to call the training step multiple times
# i.e when we carries out model's turning we may want to try several optimization techniques and therefore it may
# be necessary to write a simple function which will hold both our pre-compiled and compiled model

In [23]:
def get_pre_compiled_model():
  inputs = keras.Input(shape = (784,), name = 'input_shape')
  x = keras.layers.Dense(units = 64, kernel_initializer = 'random_normal', activation = 'relu', name = 'layer_1')(inputs)
  x = keras.layers.Dense(units = 64, kernel_initializer = 'random_normal', activation = 'relu', name = 'layer_2')(x)
  outputs = keras.layers.Dense(units = 10, activation = 'softmax', name = 'classes')(x)
  return keras.models.Model(inputs = inputs, outputs = outputs, name = 'MLP')

def get_compiled_model():
  model = get_pre_compiled_model() #get the precompiled model
  model.compile(loss = 'Categorical_Crossentropy',
                        optimizer = 'Adam',
                        metrics = ['accuracy'])
  return model # return the compiled model.


In [24]:
#try out our functions
model = get_pre_compiled_model()

In [25]:
model.summary()

Model: "MLP"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_shape (InputLayer)     [(None, 784)]             0         
_________________________________________________________________
layer_1 (Dense)              (None, 64)                50240     
_________________________________________________________________
layer_2 (Dense)              (None, 64)                4160      
_________________________________________________________________
classes (Dense)              (None, 10)                650       
Total params: 55,050
Trainable params: 55,050
Non-trainable params: 0
_________________________________________________________________


In [26]:
print(get_compiled_model())

<tensorflow.python.keras.engine.functional.Functional object at 0x7ff6723540b8>


In [27]:
#Customized loss. keras API has many alternative ready made losses functions 
#however, we may be interersted to design our own loss functions from scratch

In [28]:
def my_loss(y_real, y_hat):
  loss = tf.math.reduce_mean(tf.square(y_real-y_hat)) #The simplest loss function is l2 norm
  return loss

In [29]:
# We may use this loss in our model like as follow
model = get_pre_compiled_model()

In [30]:
model.compile(loss = my_loss, optimizer= 'Adam')

In [43]:
y_train_hot = tf.one_hot(y_train, depth = 10)

In [32]:
model.fit(x_train, y_train, batch_size = 64, verbose = 2, epochs = 5,validation_split=0.2)

Epoch 1/5
750/750 - 2s - loss: 0.0167 - val_loss: 0.0099
Epoch 2/5
750/750 - 1s - loss: 0.0082 - val_loss: 0.0070
Epoch 3/5
750/750 - 1s - loss: 0.0061 - val_loss: 0.0058
Epoch 4/5
750/750 - 1s - loss: 0.0049 - val_loss: 0.0053
Epoch 5/5
750/750 - 1s - loss: 0.0042 - val_loss: 0.0053


<tensorflow.python.keras.callbacks.History at 0x7ff66f451e80>

In [33]:
#Customized loss with an added regulirizer
#It is possible to add an aditioal parameters to the argument list of a loss function by subclassing the loss


In [34]:
class MyLoss(keras.losses.Loss):
  def __init__(self, factor = 0.1, **kwargs):#Here we can include any number of parameters we needed to comput our loss
    super().__init__(**kwargs)
    self.factor = factor #Adjust a loss by adding a factor which is the maximum distance of the predicted value from the centre
  
  def call(self, y_real, y_hat):
    loss = tf.math.reduce_mean(tf.square(y_real - y_hat))
    reg = tf.math.reduce_mean(tf.square(0.5 - y_hat)) # compute the devience from the centre (0.5)
    loss_total = loss + reg* self.factor
    return loss_total


In [44]:
mymodel = get_pre_compiled_model()
mymodel.compile(loss = MyLoss(), optimizer = 'Adam')
mymodel.fit(x_train, y_train_hot, batch_size = 64, epochs = 3, verbose = 2, validation_split=0.2)

Epoch 1/3
750/750 - 2s - loss: 0.0405 - val_loss: 0.0330
Epoch 2/3
750/750 - 2s - loss: 0.0322 - val_loss: 0.0308
Epoch 3/3
750/750 - 2s - loss: 0.0302 - val_loss: 0.0301


<tensorflow.python.keras.callbacks.History at 0x7ff66df52c18>

In [None]:
#Customized metric:
#We can also write our own metric depend on the research area. Some metrics may require some additional 
#variables defined by the user to suit his study. We can implement the metric by subclassing as follows


In [66]:
class Custom_Metrics(keras.metrics.Metric):
  def __init__(self, name = 'cat_true_positive', **kwargs):
    super(Custom_Metrics, self).__init__(name = name, **kwargs)
    self.true_pos = self.add_weight(name = 'initial_weights', initializer = 'zeros')#initializing the weights to zero

  def update_state(self, y_real, y_hat, sample_weight = None):
    y_hat = tf.reshape(tf.argmax(y_hat, axis = 1), shape = (-1,1)) #grabs only true possitive (predictions which are maximum)
    values = tf.cast(y_real, 'int32')==tf.cast(y_hat, 'int32') #return a boolian and reuslt will be tru for true possitive
    values = tf.cast(values, 'float32') #convert to float to allow the maths
    if sample_weight is not None:
      sample_weight = tf.cast(sample_weight, 'float32')
      values = tf.multiply(values, sample_weight)#updating is actually happening here
    self.true_pos.assign_add(tf.reduce_sum(values))# increment by total values
  
  def result(self):
    return self.true_pos

  def reset_states(self):
    self.true_pos.assign_add(0.0) #reset the state to 0 every time new epoch begins


In [67]:
model = get_pre_compiled_model()

In [68]:
model.summary()

Model: "MLP"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_shape (InputLayer)     [(None, 784)]             0         
_________________________________________________________________
layer_1 (Dense)              (None, 64)                50240     
_________________________________________________________________
layer_2 (Dense)              (None, 64)                4160      
_________________________________________________________________
classes (Dense)              (None, 10)                650       
Total params: 55,050
Trainable params: 55,050
Non-trainable params: 0
_________________________________________________________________


In [69]:
model.compile(loss = MyLoss(),
              metrics = [Custom_Metrics()],
              optimizer = 'Adam')

In [70]:
model.fit(x_train, y_train_hot,validation_split = 0.2, verbose = 2, epochs = 2, batch_size = 64)

Epoch 1/2
750/750 - 2s - loss: 0.0398 - cat_true_positive: 53046.0000 - val_loss: 0.0326 - val_cat_true_positive: 65690.0000
Epoch 2/2
750/750 - 2s - loss: 0.0318 - cat_true_positive: 113968.0000 - val_loss: 0.0307 - val_cat_true_positive: 126280.0000


<tensorflow.python.keras.callbacks.History at 0x7ff6651c5390>