<a href="https://colab.research.google.com/github/martinpius/Applied-Predictive-Modeling2/blob/master/KERAS_MODELS_BUILDING_BLOCKS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import tensorflow as tf
import numpy as np
from tensorflow import keras

In [4]:
#Building keras layers from scratch
class MyLayer(keras.layers.Layer):
  def __init__(self, units = 64, input_shape=64):
    super(MyLayer, self).__init__()
    w_initial = tf.random_normal_initializer()
    b_initial = tf.zeros_initializer()
    self.w = tf.Variable(initial_value = w_initial(shape = (units, input_shape)), dtype = 'float32', trainable = True)
    self.b = tf.Variable(initial_value = b_initial(shape = (units, )), dtype = 'float32', trainable = True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

x = tf.ones(shape = (3,3))
layer1 = MyLayer(3,1)
out = layer1(x)
print(out)
assert layer1.weights == [layer1.w, layer1.b]

tf.Tensor(
[[-0.10312843 -0.10312843 -0.10312843]
 [-0.10312843 -0.10312843 -0.10312843]
 [-0.10312843 -0.10312843 -0.10312843]], shape=(3, 3), dtype=float32)


In [5]:
#Altenative way to add parameters in a keras layer
class AltLayer(keras.layers.Layer):
  def __init__(self, input_shape = 64, units = 64):
    super(AltLayer, self).__init__()
    self.w = self.add_weight(shape = (input_shape, units), initializer = 'random_normal',trainable = True)
    self.b = self.add_weight(shape = (units,), initializer = 'zeros', trainable = True)
  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

x = tf.ones(shape = (3,3))
layer2 = AltLayer(3,1)
out = layer2(x)
print(out)
assert layer2.weights == [layer2.w, layer2.b]

tf.Tensor(
[[0.02011857]
 [0.02011857]
 [0.02011857]], shape=(3, 1), dtype=float32)


In [6]:
#Adding the input shape in a separate block
class AddInput(keras.layers.Layer):
  def __init__(self, units):
    super(AddInput, self).__init__()
    self.units = units
  
  def build(self, input_shape):
    self.w = self.add_weight(
        shape = (input_shape[-1], self.units),
        initializer = 'random_normal',
        trainable = True
    )
    self.b = self.add_weight(
        shape = (self.units,),
        initializer = 'zeros',
        trainable = True
    )

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

layer3 = AddInput(32)
out = layer3(x)
print(out)
assert layer3.weights == [layer3.w, layer3.b]

tf.Tensor(
[[ 0.03820161 -0.00713975 -0.14460814  0.01829805  0.06159644  0.10677131
   0.0763822  -0.05080696 -0.03154647 -0.09491213 -0.03052873 -0.05248065
  -0.08826051  0.0778743  -0.09496681  0.06347735  0.01456482 -0.0052099
  -0.07416502 -0.05922581  0.05194514  0.06442119 -0.07996801 -0.02404549
   0.02627587 -0.10059783  0.06422863  0.07928954 -0.07785238  0.10868542
  -0.10411608  0.07014898]
 [ 0.03820161 -0.00713975 -0.14460814  0.01829805  0.06159644  0.10677131
   0.0763822  -0.05080696 -0.03154647 -0.09491213 -0.03052873 -0.05248065
  -0.08826051  0.0778743  -0.09496681  0.06347735  0.01456482 -0.0052099
  -0.07416502 -0.05922581  0.05194514  0.06442119 -0.07996801 -0.02404549
   0.02627587 -0.10059783  0.06422863  0.07928954 -0.07785238  0.10868542
  -0.10411608  0.07014898]
 [ 0.03820161 -0.00713975 -0.14460814  0.01829805  0.06159644  0.10677131
   0.0763822  -0.05080696 -0.03154647 -0.09491213 -0.03052873 -0.05248065
  -0.08826051  0.0778743  -0.09496681  0.06347735

In [7]:
#Stacking keras layers together
class StackLayer(keras.layers.Layer):
  def __init__(self):
    super(StackLayer, self).__init__()
    self.ln1 = AddInput(32)
    self.ln2 = AddInput(16)
    self.ln3 = AddInput(1)

  def call(self,inputs):
    x = self.ln1(inputs)
    x = tf.nn.relu(x)
    x = self.ln2(x)
    x = tf.nn.relu(x)
    return self.ln3(x)
  
model= StackLayer()
model(tf.ones(shape = (3,32)))
print(f"trainable pars: {len(model.weights)}\nNon-trainable pars: {len(model.trainable_weights)}")

    


trainable pars: 6
Non-trainable pars: 6


In [8]:
#Add Loss as a regularizer to the keras layer( This can be added as additional loss to the main loss)
class AddLoss(keras.layers.Layer):
  def __init__(self,rate = 1e-3):
    super(AddLoss, self).__init__()
    self.rate = rate
  
  def call(self, inputs):
    self.add_loss(self.rate * tf.reduce_sum(inputs))
    return inputs



In [9]:
#retrieve the loss at any stage using layer.losses
class LossRetrieve(keras.layers.Layer):
  def __init__(self):
    super(LossRetrieve, self).__init__()
    self.myregulator = AddLoss(1e-3)
  
  def call(self, inputs):
    return self.myregulator(inputs)


los1 = LossRetrieve()
_=los1(tf.zeros((1,1)))
assert len(los1.losses) == 1

In [10]:
#Adding kernel regularizer to the layer
class WithKernel(keras.layers.Layer):
  def __init__(self):
    super(WithKernel, self).__init__()
    self.mydense = keras.layers.Dense(units = 32,
                                      kernel_regularizer = tf.keras.regularizers.l2(1e-3))
  
  def call(self, inputs):
    return self.mydense(inputs)

ken = WithKernel()
_= ken(tf.zeros((1,1)))
print(ken.losses)

[<tf.Tensor: shape=(), dtype=float32, numpy=0.0018590672>]


In [11]:
#Using the above defined losses (loss, regularizer to train keras model)
inputs = keras.Input(shape = (3,))
outputs = AddLoss()(inputs)
model = keras.Model(inputs = inputs, outputs = outputs)
model.compile(loss = 'mse', optimizer = 'adam')#with the main loss also user defined losses are added
model.summary()
model.fit(np.random.random((64,3)), np.random.random((64,3)))


Model: "functional_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 3)]               0         
_________________________________________________________________
add_loss_1 (AddLoss)         (None, 3)                 0         
Total params: 0
Trainable params: 0
Non-trainable params: 0
_________________________________________________________________


<tensorflow.python.keras.callbacks.History at 0x7fd4827abb70>

In [12]:
#Training using only the user defined loss
inputs = keras.Input(shape = (32,))
outputs = AddLoss()(inputs)
model1 = keras.Model(inputs = inputs, outputs = outputs)
model1.compile(optimizer = 'adam')
model1.fit(np.random.random(size = (64,32)), np.random.random(size = (64,32)))



<tensorflow.python.keras.callbacks.History at 0x7fd481ed5828>

In [13]:
#Adding a metric to the keras model to track moving average of a quantity during training
#This compute the accuracy of the model
class LogisticLoss(keras.layers.Layer):
  def __init__(self, name = None):
    super(LogisticLoss, self).__init__(name = name)
    self.logloss = keras.losses.BinaryCrossentropy(from_logits=True)
    self.accuracy1 = keras.metrics.BinaryAccuracy()
  
  def call(self, y_hat, y_orig, sample_inputs = None):
    loss1 = self.logloss(y_hat, y_orig, sample_inputs)
    self.add_loss(loss1)
    accuracy2 = self.accuracy1(y_hat, y_orig, sample_inputs)
    self.add_metric(accuracy2, name = 'accuracy')
    return tf.nn.softmax(logits=y_hat)




In [14]:
mymetrics = LogisticLoss()
y_hat = tf.ones(shape = (3,3))
y_orig = tf.ones(shape = (3,3))
out_loss = mymetrics(y_hat, y_orig)
print(f"Layer_metric: {mymetrics.metrics}\ncurrent accuracy: {float(mymetrics.metrics[0].result())}")


Layer_metric: [<tensorflow.python.keras.metrics.BinaryAccuracy object at 0x7fd480687240>]
current accuracy: 1.0


In [15]:
#We can track this metric using fit() like in loss fn
inputs = keras.Input(shape = (3,), name = 'input')
y_orig = keras.Input(shape = (10,), name = 'target')
y_hat = keras.layers.Dense(units = 10)(inputs)
pred = LogisticLoss(name = 'predictions')(y_hat, y_orig)
model = keras.Model(inputs = [inputs, y_orig], outputs = pred)
model.compile(optimizer = 'adam')
dfm = {
    'inputs':np.random.random(size = (10,3)),
    'y_orig':np.random.random(size = (10,10))
}

model.fit(dfm)



<tensorflow.python.keras.callbacks.History at 0x7fd489590ba8>

In [16]:
#Serialize the hard coded layers as ussual keras functional API models using get_config

class SerializedLayers(keras.layers.Layer):
  def __init__(self, units = 64):
    super(SerializedLayers, self).__init__()
    self.units = units
  
  def build(self, input_shape):
    self.w = self.add_weight(
        shape =(input_shape[-1], self.units),
        initializer = 'random_normal', 
        trainable = True)
    
    self.b = self.add_weight(
        shape = (self.units,),
        initializer = 'zeros',
        trainable = True)
    
  def call(self, inputs):
    return tf.matmul(inputs, self.w) +self.b
  
  def get_config(self):
    return {'units': self.units}
    
layer = SerializedLayers(32)
out = layer(x)
print(out)
assert layer.weights == [layer.w, layer.b]
config = layer.get_config()
print(config)
stack_layer = SerializedLayers.from_config(config)


tf.Tensor(
[[ 0.1100989   0.00880036 -0.0440092   0.07522993  0.09891136  0.00375583
   0.06353734  0.00867078  0.04658917  0.05582516  0.02994575 -0.04943225
   0.04391855  0.07978    -0.08174905  0.08592284 -0.08341957 -0.0199663
  -0.09680377  0.00232024  0.01505715 -0.00750295 -0.02942482 -0.0687234
  -0.03114662  0.06874943 -0.05522085 -0.04717303 -0.00889447 -0.10763291
  -0.05303584  0.07200257]
 [ 0.1100989   0.00880036 -0.0440092   0.07522993  0.09891136  0.00375583
   0.06353734  0.00867078  0.04658917  0.05582516  0.02994575 -0.04943225
   0.04391855  0.07978    -0.08174905  0.08592284 -0.08341957 -0.0199663
  -0.09680377  0.00232024  0.01505715 -0.00750295 -0.02942482 -0.0687234
  -0.03114662  0.06874943 -0.05522085 -0.04717303 -0.00889447 -0.10763291
  -0.05303584  0.07200257]
 [ 0.1100989   0.00880036 -0.0440092   0.07522993  0.09891136  0.00375583
   0.06353734  0.00867078  0.04658917  0.05582516  0.02994575 -0.04943225
   0.04391855  0.07978    -0.08174905  0.08592284 -

In [17]:
#Customize some keras layers such as BatchNorm and Dropout!These layers are 
#very important in regulating the accuracy of the model

#make privelage for the training argument in a call method
class CustomLayer(keras.layers.Layer):
  def __init__(self, my_rate,**kwargs):
    super(CustomLayer, self).__init__(**kwargs)
    self.my_rate = my_rate
  
  def call(self, inputs, training = None):
    if training:
      tf.nn.dropout(inputs, rate = self.my_rate)
      return inputs


In [20]:
#Training VAE end to end from scratch
import tensorflow as tf
from tensorflow import keras
import numpy as np
from tensorflow.keras.datasets import mnist

In [22]:
#Sampling from the compressed layer (output of the encoder)
class Mysample(keras.layers.Layer):
  def call(self, inputs):
    mean_z, logvar_z = inputs
    eps = tf.keras.backend.random_normal(shape = (tf.shape(mean_z)[0], tf.shape(mean_z)[1]))
    return mean_z + tf.exp(0.5*logvar_z)*eps #Assumption of a std Gaussian for the decorder's input


In [24]:
#The encoder's class: This takes the input and compress into hidden representation
class MyEncoder(keras.layers.Layer):
  def __init__(self,latent1 = 64, latent2 = 32,name = 'Encoder', **kwargs):
    super(MyEncoder, self).__init__(name = name, **kwargs)
    self.l1 = keras.layers.Dense(units = latent1,activation = 'relu')
    self.l2 = keras.layers.Dense(units = latent2, activation = 'relu')#For the mean_z
    self.l3 = keras.layers.Dense(units = latent2, activation = 'relu')#For the logvar_z
    self.mysample = Mysample()#For the z's

  def call(self, inputs):
    x = self.l1(inputs)
    mean_z = self.l2(x)
    logvar_z = self.l3(x)
    z = self.mysample((mean_z, logvar_z))
    return mean_z, logvar_z, z


In [31]:
#Reconstruct the original data--Moving from compressed to readable images
class MyDecoder(keras.layers.Layer):
  def __init__(self,dim_orig, latent1 = 64,name = 'Decoder',**kwargs):
    super(MyDecoder,self).__init__(name = name,**kwargs)
    self.d1 = keras.layers.Dense(units = latent1, activation = 'relu')#starting expanding the encoded layer
    self.d2 = keras.layers.Dense(units = dim_orig, activation = 'sigmoid')#Reconstruct the original data

  def call(self, inputs):
    x = self.d1(inputs)
    return self.d2(x)

#Combining both classes and train together from end to end
class VarAutoEncoder(keras.Model):
  def __init__(
      self,
      dim_orig,
      latent1 = 64,
      latent2 = 32,name = 'autoencoder',**kwargs):
    super(VarAutoEncoder, self).__init__(name = name, **kwargs)
    self.dim_orig = dim_orig
    self.encoder = MyEncoder(latent1 = latent2, latent2=latent1)
    self.decoder = MyDecoder(dim_orig = dim_orig, latent1 = latent1)
  
  def call(self, inputs):
    mean_z, logvar_z, z = self.encoder(inputs)#Run the encoder
    reconstruction = self.decoder(z)
    KL_div = -0.5*tf.reduce_mean(
        logvar_z -tf.square(mean_z) - tf.exp(logvar_z) +1
    )
    self.add_loss(KL_div) #As regularization to assure randomness in mapping z to original x
    return reconstruction



In [35]:
#A training loop
dim_orig = 784
V_model = VarAutoEncoder(dim_orig = dim_orig,latent1 = 64, latent2 = 32)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
mse = keras.losses.MeanSquaredError()
metric_L = keras.metrics.Mean()

#Loading the dataset
(x_train,_),_= keras.datasets.mnist.load_data()
#Reshape and preprocess sampled trainining data
x_train = x_train.reshape(60000, 784).astype('float32')/255
train_dfm = tf.data.Dataset.from_tensor_slices(x_train)
#Shuffle the data and select the batches of size 64
train_dfm = train_dfm.shuffle(buffer_size = 1024).batch(64)

#Iterate over # of epochs
epochs = 10
for epoch in range(epochs):
  print('Start of epoch %d'%(epoch,))

  for step, train_batch in enumerate(train_dfm):
    with tf.GradientTape() as tape:
      reconstruction = V_model(train_batch)
      myloss = mse(train_batch, reconstruction)
      myloss+=sum(V_model.losses)
      grads = tape.gradient(myloss, V_model.trainable_weights)
      optimizer.apply_gradients(zip(grads,V_model.trainable_weights))

      metric_L(myloss)
      if step % 100 == 0:
        print("step %d: mean loss = %.4f" % (step, metric_L.result()))





Start of epoch 0
step 0: mean loss = 0.2608
step 100: mean loss = 0.1175
step 200: mean loss = 0.0948
step 300: mean loss = 0.0862
step 400: mean loss = 0.0820
step 500: mean loss = 0.0791
step 600: mean loss = 0.0773
step 700: mean loss = 0.0759
step 800: mean loss = 0.0749
step 900: mean loss = 0.0739
Start of epoch 1
step 0: mean loss = 0.0737
step 100: mean loss = 0.0731
step 200: mean loss = 0.0727
step 300: mean loss = 0.0723
step 400: mean loss = 0.0720
step 500: mean loss = 0.0717
step 600: mean loss = 0.0714
step 700: mean loss = 0.0711
step 800: mean loss = 0.0709
step 900: mean loss = 0.0707
Start of epoch 2
step 0: mean loss = 0.0706
step 100: mean loss = 0.0705
step 200: mean loss = 0.0704
step 300: mean loss = 0.0703
step 400: mean loss = 0.0702
step 500: mean loss = 0.0700
step 600: mean loss = 0.0699
step 700: mean loss = 0.0698
step 800: mean loss = 0.0697
step 900: mean loss = 0.0696
Start of epoch 3
step 0: mean loss = 0.0696
step 100: mean loss = 0.0695
step 200: me