<a href="https://colab.research.google.com/github/vahedshaik/258_Asst6/blob/main/258_dl_Assignment6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Colab demonstrating custom components in neural networks sing tensorflow

In [None]:
import sys

assert sys.version_info >= (3, 7)

In [None]:
from packaging import version
import tensorflow as tf

assert version.parse(tf.__version__) >= version.parse("2.8.0")

In [None]:
import math
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
import tensorflow_datasets as tfds

In [None]:
dataset, info = tfds.load('cats_vs_dogs', with_info=True)
dataset = dataset['train'].take(500)

Downloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to /root/tensorflow_datasets/cats_vs_dogs/4.0.0...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Generating splits...:   0%|          | 0/1 [00:00<?, ? splits/s]

Generating train examples...: 0 examples [00:00, ? examples/s]



Shuffling /root/tensorflow_datasets/cats_vs_dogs/4.0.0.incomplete6CK46Y/cats_vs_dogs-train.tfrecord*...:   0%|…

Dataset cats_vs_dogs downloaded and prepared to /root/tensorflow_datasets/cats_vs_dogs/4.0.0. Subsequent calls will reuse this data.


In [None]:
def preprocess_image(sample):
    image = sample['image']
    image = tf.cast(image, tf.float32) / 255.0
    image = tf.image.resize(image, (150, 150))
    image = tf.reshape(image, (1, 150, 150, 3))
    label = sample['label']
    return {'image': image, 'label': label}

dataset = dataset.map(preprocess_image)

In [None]:
train_dataset_ = dataset.take(400)
test_dataset_ = dataset.skip(400)

train_dataset = train_dataset_.map(lambda x: (x['image'], tf.expand_dims(x['label'], axis=-1)))
test_dataset = train_dataset_.map(lambda x: (x['image'], tf.expand_dims(x['label'], axis=-1)))

Custom loss function

In [None]:
def huber_loss(y_true, y_pred):
  y_true = tf.cast(y_true, tf.float32)
  error = y_true - y_pred
  is_small_error = tf.abs(error) < 1
  squared_loss = tf.square(error) / 2
  linear_loss  = tf.abs(error) - 0.5
  return tf.where(is_small_error, squared_loss, linear_loss)

Custom initilializer

In [None]:
def glorot_initializer(shape, dtype=tf.float32):
  stddev = tf.sqrt(2. / (shape[0] + shape[1]))
  return tf.random.normal(shape, stddev=stddev, dtype=dtype)

Saving and loading model with custom object

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(150,150,3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid', kernel_initializer=glorot_initializer)
])

model.compile(optimizer='adam', loss=huber_loss, metrics=['accuracy'])

model.fit(train_dataset, epochs=1, batch_size=16, validation_data=test_dataset)



<keras.callbacks.History at 0x7f15ad514a30>

In [None]:
model.save('model_with_custom_loss_initializer')



In [None]:
model = tf.keras.models.load_model('model_with_custom_loss_initializer', custom_objects={'huber_loss':huber_loss, 'glorot_initializer':glorot_initializer})
model.fit(train_dataset, epochs=1, batch_size=16)



<keras.callbacks.History at 0x7f159e143b20>

Custom regularizer

In [None]:
class Regularizer(tf.keras.regularizers.Regularizer):
  def __init__(self, factor):
      self.factor = factor

  def __call__(self, weights):
      return tf.reduce_sum(tf.abs(self.factor * weights))

Custom seed

In [None]:
tf.random.set_seed(112)

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(150,150,3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid', kernel_regularizer=Regularizer(0.001),)
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(train_dataset, epochs=1, batch_size=16, validation_data=test_dataset)



<keras.callbacks.History at 0x7f15ada986d0>

Adding custom regularizer helped improve training and validation accuracy

Custom activation and custom kernel constraint

In [None]:
def activation(z):
  return tf.math.log(1.0 + tf.exp(z))/tf.exp(z)

In [None]:
def kernel_constraint(weights):
  return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(150,150,3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation=activation, kernel_constraint=kernel_constraint)
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(train_dataset, epochs=1, batch_size=16, validation_data=test_dataset)



<keras.callbacks.History at 0x7f15ac1ae3e0>

Adding this custom kernel constraint made the loss nan which could be due to vanishing or exploding gradients. 

Custom learning rate scheduler


In [None]:
K = tf.keras.backend

class LearningRateScheduler(tf.keras.callbacks.Callback):
    def __init__(self, iterations, max_lr=1e-3, start_lr=None, last_iterations=None, last_lr=None):
        self.iterations = iterations
        self.max_lr = max_lr
        self.start_lr = start_lr or max_lr / 10
        self.last_iterations = last_iterations or iterations // 10 + 1
        self.half_iteration = (iterations - self.last_iterations) // 2
        self.last_lr = last_lr or self.start_lr / 1000
        self.iteration = 0

    def _interpolate(self, iter1, iter2, lr1, lr2):
        return (lr2 - lr1) * (self.iteration - iter1) / (iter2 - iter1) + lr1

    def on_batch_begin(self, batch, logs):
        if self.iteration < self.half_iteration:
            lr = self._interpolate(0, self.half_iteration, self.start_lr,
                                   self.max_lr)
        elif self.iteration < 2 * self.half_iteration:
            lr = self._interpolate(self.half_iteration, 2 * self.half_iteration,
                                   self.max_lr, self.start_lr)
        else:
            lr = self._interpolate(2 * self.half_iteration, self.iterations,
                                   self.start_lr, self.last_lr)
        self.iteration += 1
        K.set_value(self.model.optimizer.learning_rate, lr)

In [None]:
learning_rate_schedule = LearningRateScheduler(math.ceil(len(train_dataset) / 16) * 2, max_lr=0.001)

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(150,150,3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(train_dataset, epochs=2, batch_size=16, validation_data=test_dataset, callbacks=[learning_rate_schedule])

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x7f15ac193340>

Custom callback

In [None]:
class CustomCallback(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        keys = list(logs.keys())
        print('Starting training; got log keys: {}'.format(keys))

    def on_train_end(self, logs=None):
        keys = list(logs.keys())
        print('Stop training; got log keys: {}'.format(keys))

    def on_test_begin(self, logs=None):
        keys = list(logs.keys())
        print('Start testing; got log keys: {}'.format(keys))

    def on_test_end(self, logs=None):
        keys = list(logs.keys())
        print('Stop testing; got log keys: {}'.format(keys))

    def on_predict_begin(self, logs=None):
        keys = list(logs.keys())
        print('Start predicting; got log keys: {}'.format(keys))

    def on_predict_end(self, logs=None):
        keys = list(logs.keys())
        print('Stop predicting; got log keys: {}'.format(keys))

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(150,150,3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(train_dataset, epochs=2, batch_size=16, validation_data=test_dataset, callbacks=[CustomCallback()])

Starting training; got log keys: []
Epoch 1/2
Stop testing; got log keys: ['loss', 'accuracy']
Epoch 2/2
Stop testing; got log keys: ['loss', 'accuracy']
Stop training; got log keys: ['loss', 'accuracy', 'val_loss', 'val_accuracy']


<keras.callbacks.History at 0x7f159cca1270>

Custom layer

In [None]:
df = fetch_california_housing()

X_train, X_test, y_train, y_test = train_test_split(df.data, df.target.reshape(-1, 1), random_state=42)

sc = StandardScaler()
X_train_scaled = sc.fit_transform(X_train)
X_test_scaled = sc.transform(X_test)

In [None]:
class CustomDense(tf.keras.layers.Layer):
    def __init__(self, units, activation='relu'):
      super().__init__()
      self.units = units
      self.activation = tf.keras.activations.get(activation)

    def build(self, input_shape):
      self.kernel = self.add_weight(shape=(input_shape[1], self.units), initializer='normal', trainable=True)
      self.biases = self.add_weight(shape=(self.units,), initializer='zeros', trainable=True)
      super().build(input_shape)

    def call(self, X):
      return self.activation(X @ self.kernel + self.biases)

Custom model

In [None]:
class CustomModel(tf.keras.Model):
    def __init__(self, **kwargs):
      super().__init__(**kwargs)
      self.hidden1 = CustomDense(100)
      self.hidden2 = CustomDense(50)
      self.hidden3 = CustomDense(10)
      self.output_ = CustomDense(1)

    def call(self, input):
      hidden1 = self.hidden1(input)
      hidden2 = self.hidden2(hidden1)
      hidden3 = self.hidden3(hidden2)
      concat = tf.keras.layers.concatenate([input, hidden3])
      output = self.output_(concat)
      return output

model = CustomModel()

In [None]:
model.compile(loss='mse', optimizer='adam', metrics=['accuracy'])
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_test_scaled, y_test))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x7f15ad898f70>

Custom gradient

In [None]:
@tf.custom_gradient
def custom_gradient_fn(z):
  def custom_gradient_fn_gradients(grads):  
      return grads * (1 - 1 / (1 + tf.exp(z))) 

  result = tf.math.log(1 + tf.exp(-tf.abs(z))) + tf.maximum(0., z)
  return result, custom_gradient_fn_gradients

x = tf.Variable([1000.])
with tf.GradientTape() as tape:
    z = custom_gradient_fn(x)

z, tape.gradient(z, [x])

(<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1000.], dtype=float32)>,
 [<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>])

Custom training loop

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(150,150,3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.02)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.02)),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

In [None]:
epochs = 2
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
loss_fn = tf.keras.losses.binary_crossentropy
metrics = [tf.keras.metrics.Accuracy()]

In [None]:
for epoch in range(epochs):
  print('epoch =', epoch)
  for step, (X_batch, y_batch) in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      y_pred = model(X_batch, training=True)
      y_batch = tf.reshape(y_batch, (1, 1))
      loss = loss_fn(y_batch, y_pred)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    for metric in metrics:
        metric(y_batch, y_pred)

epoch = 0
epoch = 1


Custom optimizer

Modifying Adam optimizer's behavior

In [None]:
class CustomOptimizer(tf.keras.optimizers.Adam):
  def __init__(self, dropout_rate):
    super().__init__()
    self.dropout_rate = dropout_rate
  
  def _resource_apply_dense(self, grad, var):
    dropout_mask = tf.keras.backend.random_binomial(tf.shape(var), p=1-self.dropout_rate)
    grad = grad * dropout_mask
    return super()._resource_apply_dense(grad, var)

In [None]:
model.compile(optimizer=CustomOptimizer(dropout_rate=0.1), loss='binary_crossentropy', metrics=['accuracy'])

model.fit(train_dataset, epochs=2, batch_size=16, validation_data=test_dataset)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x7f159de4c0a0>

Custom metrics

In [None]:
def huber_metric(threshold):
  def huber_loss(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)
    error = y_true - y_pred
    is_small_error = tf.abs(error) < threshold
    squared_loss = tf.square(error) / 2
    linear_loss  = threshold * tf.abs(error) - threshold ** 2 / 2
    return tf.where(is_small_error, squared_loss, linear_loss)
  return huber_loss 

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(150,150,3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[huber_metric(0.4)])

model.fit(train_dataset, epochs=2, batch_size=16, validation_data=test_dataset)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x7f159e6b9bd0>

Custom dropout

In [None]:
def custom_dropout(x, p):
    mask = tf.cast(tf.random.uniform(shape=tf.shape(x)) >= p, dtype=x.dtype)
    return x * mask

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(150,150,3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Lambda(lambda x: custom_dropout(x, p=0.3)),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Lambda(lambda x: custom_dropout(x, p=0.2)),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(train_dataset, epochs=2, batch_size=16, validation_data=test_dataset)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x7f159e49aef0>