<a href="https://colab.research.google.com/github/saipragna25/deep-learning-pipeline-custom-components/blob/main/DL_pipeline_custom_components.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds
from sklearn.model_selection import train_test_split


Saving and loading model with custom object

In [5]:
# Ingesting data

dataset, info = tfds.load('cats_vs_dogs', with_info=True)
dataset = dataset['train'].take(500)

Downloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to /root/tensorflow_datasets/cats_vs_dogs/4.0.0...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Generating splits...:   0%|          | 0/1 [00:00<?, ? splits/s]

Generating train examples...: 0 examples [00:00, ? examples/s]



Shuffling /root/tensorflow_datasets/cats_vs_dogs/4.0.0.incompleteUX23G7/cats_vs_dogs-train.tfrecord*...:   0%|…

Dataset cats_vs_dogs downloaded and prepared to /root/tensorflow_datasets/cats_vs_dogs/4.0.0. Subsequent calls will reuse this data.


In [6]:
# Preprocess images

def preprocess_image(sample):
    image = sample['image']
    image = tf.cast(image, tf.float32) / 255.0
    image = tf.image.resize(image, (150, 150))
    image = tf.reshape(image, (1, 150, 150, 3))
    label = sample['label']
    return {'image': image, 'label': label}

dataset = dataset.map(preprocess_image)

In [7]:
# Split the dataset into training and testing sets

train_dataset_ = dataset.take(400)
test_dataset_ = dataset.skip(400)

train_dataset = train_dataset_.map(lambda x: (x['image'], tf.expand_dims(x['label'], axis=-1)))
test_dataset = train_dataset_.map(lambda x: (x['image'], tf.expand_dims(x['label'], axis=-1)))

In [9]:
def huber_loss(y_true, y_pred):
  y_true = tf.cast(y_true, tf.float32)
  error = y_true - y_pred
  is_small_error = tf.abs(error) < 1
  squared_loss = tf.square(error) / 2
  linear_loss  = tf.abs(error) - 0.5
  return tf.where(is_small_error, squared_loss, linear_loss)

In [10]:
def glorot_initializer(shape, dtype=tf.float32):
  stddev = tf.sqrt(2. / (shape[0] + shape[1]))
  return tf.random.normal(shape, stddev=stddev, dtype=dtype)

In [12]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(150,150,3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid', kernel_initializer=glorot_initializer)
])

model.compile(optimizer='adam', loss=huber_loss, metrics=['accuracy'])

model.fit(train_dataset, epochs=5, batch_size=16, validation_data=test_dataset)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7fc941c25390>

In [13]:
# Saving models with custom objects

model.save('model_with_custom_loss_initializer')



In [15]:
# Load saved model and run again

model = tf.keras.models.load_model('model_with_custom_loss_initializer', custom_objects={'huber_loss':huber_loss, 'glorot_initializer':glorot_initializer})
model.fit(train_dataset, epochs=5, batch_size=16)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7fc941919750>

custom loss function

In [3]:

# Load the dataset
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Preprocess the data
x_train = x_train / 255.0
x_test = x_test / 255.0
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
y_train = tf.keras.utils.to_categorical(y_train, num_classes=10)
y_test = tf.keras.utils.to_categorical(y_test, num_classes=10)


# Custom categorical crossentropy loss function
def custom_categorical_crossentropy(y_true, y_pred):
    epsilon = tf.keras.backend.epsilon()
    y_pred = tf.clip_by_value(y_pred, epsilon, 1.0 - epsilon)
    y_pred = tf.math.log(y_pred)
    loss = -tf.math.reduce_sum(y_true * y_pred, axis=-1)
    return tf.math.reduce_mean(loss)

# Create the model
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

# Compile the model with the custom loss function
model.compile(optimizer='adam',
              loss=custom_categorical_crossentropy,
              metrics=['accuracy'])

# Train the model
model.fit(x_train, y_train, epochs=5, validation_split=0.1)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f9d823871c0>

Custom Regularizer

In [19]:
(train_images, train_labels),(test_images, test_labels) = keras.datasets.fashion_mnist.load_data()
train_images = train_images / 255.0
test_images = test_images / 255.0
validation_images = train_images[:5000]
validation_labels = train_labels[:5000]
class_names = ["T-shirt/top", "Trouser", "Pullover", "Dress", "Coat", "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"]

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz


In [20]:
def custom_l2_regularizer(weights):
    return tf.reduce_sum(0.02 * tf.square(weights))

In [22]:
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28,28]),
    keras.layers.Dense(200, activation='relu', kernel_regularizer=custom_l2_regularizer),
    keras.layers.Dense(100, activation='relu', kernel_regularizer=custom_l2_regularizer),
    keras.layers.Dense(50, activation='relu', kernel_regularizer=custom_l2_regularizer),
    keras.layers.Dense(10, activation='softmax')
])

In [24]:
sgd = keras.optimizers.SGD(lr=0.01)
model.compile(loss="sparse_categorical_crossentropy", optimizer=sgd, metrics=["accuracy"])
model.fit(train_images, train_labels, epochs=5, validation_data=(validation_images, validation_labels))



Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f732e6cfdc0>

In [25]:
model.evaluate(test_images, test_labels)



[0.8744895458221436, 0.7936000227928162]

In [28]:
practical_test_images =  test_images[:10]
predictions = np.argmax(model.predict(practical_test_images), axis=-1)
print(predictions)
print(np.array(class_names)[predictions])


[9 2 1 1 6 1 4 6 5 7]
['Ankle boot' 'Pullover' 'Trouser' 'Trouser' 'Shirt' 'Trouser' 'Coat'
 'Shirt' 'Sandal' 'Sneaker']


Custom activation function

In [32]:
# Custom activation function
@tf.function
def custom_activation(x):
    return tf.maximum(0.0, x) * tf.sin(x)

# Model with custom activation function
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), input_shape=(28, 28, 1)),
    layers.Activation(custom_activation),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3)),
    layers.Activation(custom_activation),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128),
    layers.Activation(custom_activation),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

model.fit(x_train, y_train, epochs=5, validation_split=0.1)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f731fc47970>

  Custom Initializer

In [43]:
class CustomInitializer(tf.keras.initializers.Initializer):
    def __call__(self, shape, dtype=None):
        return tf.random.normal(shape, mean=0., stddev=0.05)

# Model with custom initializer
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1), kernel_initializer=CustomInitializer()),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu', kernel_initializer=CustomInitializer()),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu', kernel_initializer=CustomInitializer()),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

model.fit(x_train, y_train, epochs=5, validation_split=0.1)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f731f939ff0>

Custom kernel constraint

In [None]:
class CustomConstraint(tf.keras.constraints.Constraint):
    def __call__(self, w):
        return tf.clip_by_value(w, -1.0, 1.0)

# Model with custom kernel constraint
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1), kernel_constraint=CustomConstraint()),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu', kernel_constraint=CustomConstraint()),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu', kernel_constraint=CustomConstraint()),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

model.fit(x_train, y_train, epochs=5, validation_split=0.1)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7fda7c982800>

Custom Metrics

In [42]:
class CustomAccuracy(tf.keras.metrics.Metric):
    def __init__(self, name="custom_accuracy", **kwargs):
        super(CustomAccuracy, self).__init__(name=name, **kwargs)
        self.correct = self.add_weight(name="correct", initializer="zeros")
        self.total = self.add_weight(name="total", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.argmax(y_true, axis=-1)
        y_pred = tf.argmax(y_pred, axis=-1)
        self.correct.assign_add(tf.reduce_sum(tf.cast(y_true == y_pred, tf.float32)))
        self.total.assign_add(tf.cast(tf.size(y_true), tf.float32))

    def result(self):
        return self.correct / self.total

# Model with custom metrics
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=[CustomAccuracy()])

model.fit(x_train, y_train, epochs=5, validation_split=0.1)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f732f38da50>

Custom Seed

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models

# Set the custom seed
custom_seed = 42
np.random.seed(custom_seed)
tf.random.set_seed(custom_seed)

# Model
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

model.fit(x_train, y_train, epochs=5, validation_split=0.1)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7fda7c7c6530>

Custom Call Back

In [None]:
class CustomCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        if logs.get('accuracy') > 0.95:
            print("\nReached 95% accuracy, stopping training.")
            self.model.stop_training = True

# Model with custom callback
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

model.fit(x_train, y_train, epochs=5, validation_split=0.1, callbacks=[CustomCallback()])


Custom Layer

In [None]:
class CustomLayer(layers.Layer):
    def __init__(self, units=32, **kwargs):
        super(CustomLayer, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                                 initializer='zeros',
                                 trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

# Model with custom layer
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    CustomLayer(128),
    layers.Activation('relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

model.fit(x_train, y_train, epochs=5, validation_split=0.1)


Custom Model

In [None]:
class CustomModel(models.Model):
    def __init__(self):
        super(CustomModel, self).__init__()
        self.conv1 = layers.Conv2D(32, kernel_size=(3, 3), activation='relu')
        self.max_pool1 = layers.MaxPooling2D(pool_size=(2, 2))
        self.conv2 = layers.Conv2D(64, kernel_size=(3, 3), activation='relu')
        self.max_pool2 = layers.MaxPooling2D(pool_size=(2, 2))
        self.flatten = layers.Flatten()
        self.dense1 = layers.Dense(128, activation='relu')
        self.dropout = layers.Dropout(0.2)
        self.dense2 = layers.Dense(10, activation='softmax')

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.max_pool1(x)
        x = self.conv2(x)
        x = self.max_pool2(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dropout(x)
        return self.dense2(x)

# Model with custom model
model = CustomModel()

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

model.fit(x_train, y_train, epochs=5, validation_split=0.1)


Custom Training Loop

In [None]:
# Model
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

# Custom training loop
@tf.function
def train_step(images, labels):
    with tf.GradientTape() as tape:
        predictions = model(images)
        loss = loss_fn(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(10000).batch(32)

for epoch in range(5):
    for step, (images, labels) in enumerate(train_dataset):
        loss = train_step(images, labels)
        if step % 100 == 0:
            print(f"Epoch {epoch + 1}, Step {step}, Loss {loss.numpy()}")


Custom tf functions

In [23]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Flatten, Lambda, Conv2D, MaxPooling2D, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras import backend as K

In [24]:
# Custom activation functions
def my_relu(x):
    return K.maximum(-0.1, x)
# Custom ELU activation function
@tf.function
def custom_elu(x, alpha=1.0):
    return tf.where(x > 0, x, alpha * (tf.exp(x) - 1))

# Custom SELU activation function
@tf.function
def custom_selu(x, alpha=1.67326, scale=1.0507):
    return scale * tf.where(x > 0, x, alpha * (tf.exp(x) - 1))

# Custom Leaky ReLU activation function
@tf.function
def custom_leaky_relu(x, alpha=0.3):
    return tf.where(x > 0, x, alpha * x)

# Custom Parametric ReLU activation function
@tf.function
def custom_prelu(x, alpha):
    return tf.where(x > 0, x, alpha * x)

# Custom function to compute the mean of a tensor
@tf.function
def custom_mean(x):
    return tf.reduce_mean(x, axis=[1, 2], keepdims=True)

# Custom function to compute the standard deviation of a tensor
@tf.function
def custom_std(x):
    return tf.math.sqrt(tf.reduce_mean(tf.square(x - tf.reduce_mean(x, axis=[1, 2], keepdims=True)), axis=[1, 2], keepdims=True))



In [25]:
# Load the MNIST dataset and normalize the data
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]


In [31]:
# Create a complex deep neural network model using custom functions
model = Sequential([
    Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    MaxPooling2D(pool_size=(2, 2)),
    Conv2D(64, kernel_size=(3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.25),
    Flatten(),
    Dense(128),
    Lambda(my_relu),
    Dropout(0.5),
    Dense(10, activation='softmax')
])

In [22]:
# Compile and train the model
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5, batch_size=128)

# Evaluate the model on the test set
model.evaluate(x_test, y_test)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


[0.04838937520980835, 0.9850999712944031]

Custom Dropout

In [None]:
class CustomDropout(layers.Layer):
    def __init__(self, rate=0.2, **kwargs):
        super(CustomDropout, self).__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

# Model with custom dropout
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    CustomDropout(0.2),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

model.fit(x_train, y_train, epochs=5, validation_split=0.1)


Custom Gradient

In [6]:
import tensorflow as tf
import tensorflow_datasets as tfds

from typing import Optional


@tf.function
def sigmoid(x: tf.Tensor) -> tf.Tensor:
    return 1 / (1 + tf.exp(-x))



ds = tfds.load("german_credit_numeric", split="train", as_supervised=True)
ds = ds.shuffle(1000).batch(100).prefetch(tf.data.AUTOTUNE)

In [7]:
class Logistic(tf.keras.layers.Layer):

    def __init__(self, units, **kwargs):
        kwargs.setdefault("name", "logistic")
        super().__init__(**kwargs)
        self.units = units
        
    def get_config(self):
        config = super().get_config()
        config = config.update({"units": self.units})
        
        return config
        
    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units), initializer="random_normal", trainable=True)
        self.b = self.add_weight(shape=(self.units, ), initializer="random_normal", trainable=True)
    
    def call(self, inputs):
        return sigmoid(tf.linalg.matmul(inputs, self.w) + self.b)

In [8]:
features_spec, labels_spec = ds.element_spec
del labels_spec  # Not used

feature_inputs = tf.keras.Input(type_spec=features_spec, name="feature_inputs")
dense = tf.keras.layers.Dense(units=4, name="dense_layer")

In [9]:
class CustomModel(tf.keras.Model):
    def __init__(self, nn_block: Optional[tf.keras.layers.Layer] = None, **kwargs):
        kwargs.setdefault("name", "custom_model")
        super().__init__(**kwargs)
        self.nn_block = nn_block
        self.logistic = Logistic(units=1, name="logistic_layer")
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")

    @property
    def metrics(self):
        # Automatically resets the metric states at the start of each epoch or at the start of evaluate()
        return [self.loss_tracker]

    def loss_fn(self, features, label):
        """Custom gradient is calculated for the output layer; autodiff used for earlier layer(s)"""
        logistic_features = self.nn_block(features) if self.nn_block else features

        @tf.custom_gradient
        def logistic_loss(x, y):
            """x is the features to the logistic regression, y is the target labels"""
            predicted = sigmoid(tf.linalg.matmul(x, self.logistic.w) + self.logistic.b)
            loss_result = tf.reduce_mean(-1 * tf.math.log(predicted), axis=0)  # Average loss per sample
            loss_result = tf.ensure_shape(loss_result, shape=(self.logistic.units, ))  # Assert that loss is unitary

            def gradient(upstream_grad, variables):
                # Gradient formulae derived with pen and paper using calculus
                # They are averaged over the training batch

                # List that holds gradients of trainable parameters accessed
                # during loss calculation via get_variables methods
                variables_grad = []
                assert variables is not None
                # Loss gradient w.r.t logistic regression inputs
                # Used with chain rule from calculus to calculate the
                # downstream gradients w.r.t. dense layer weights and biases.
                x_grad = tf.multiply(*tf.meshgrid(self.logistic.w, predicted - y))
                x_grad = tf.ensure_shape(x_grad, shape=(predicted.shape[0], self.logistic.w.shape[0]))
                x_grad = upstream_grad * x_grad  # (None x n_features)
                # Loss gradient w.r.t true labels; not used                
                y_grad = None
                # Loss gradient w.r.t logistic regression weights
                w_grad = tf.reduce_mean((predicted - y) * x, axis=0)
                w_grad = tf.reshape(w_grad, shape=tf.shape(self.logistic.w))
                w_grad = upstream_grad * w_grad
                # Loss gradient w.r.t. logistic regression bias
                b_grad = tf.reduce_mean(predicted - y, axis=0)
                b_grad = upstream_grad * b_grad
                variables_grad.append(w_grad)
                variables_grad.append(b_grad)
                
                # Return the gradients w.r.t. logistic_loss arguments (first item)
                # and w.r.t to trainable parameters (weights, biases) accessed with the
                # get_variables method to calculate the loss
                
                return (x_grad, y_grad), variables_grad

            return loss_result, gradient

        loss = logistic_loss(logistic_features, label)

        return loss

    def train_step(self, data):
        x, y = data
        y = tf.reshape(tf.cast(y, tf.float32), shape=(-1, 1))  # (batch_size x 1)
        if not self.logistic.trainable_weights:
            _ = self(x)  # required to initialize layer parameters with build() method
        with tf.GradientTape() as tape:
            loss = self.loss_fn(x, y)
        grads = tape.gradient(loss, self.trainable_weights)
        # Tell the optimizer to apply gradients on specified variables
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        # Update the running loss
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def call(self, inputs):
        logistic_input = self.nn_block(inputs) if self.nn_block else inputs
        return self.logistic(logistic_input)

In [10]:
class ReportWeightsCallback(tf.keras.callbacks.Callback):
    def __init__(self):
        super().__init__()

    def on_epoch_end(self, epoch, logs=None):
        print(self.model.get_weights())

In [11]:
model = CustomModel(nn_block=dense, name="nn_logistic_model")
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=1e-4))
model.fit(ds, epochs=5, callbacks=[ReportWeightsCallback()])

Epoch 1/5
 1/10 [==>...........................] - ETA: 5s - loss: 0.3964[array([[ 0.44023272,  0.09094044,  0.16280405, -0.06908312],
       [-0.23155104,  0.1355314 , -0.05066523,  0.43366265],
       [-0.06188461,  0.1093087 ,  0.1608595 , -0.14510208],
       [ 0.0711062 ,  0.09652383,  0.22005047,  0.26559407],
       [-0.14420123,  0.17159823,  0.39667857, -0.3854463 ],
       [ 0.35798386,  0.23194352,  0.43772295, -0.12702398],
       [-0.02942524,  0.44041955, -0.07873238,  0.05501085],
       [ 0.12390338,  0.12265257, -0.15077321,  0.17889762],
       [ 0.01787707,  0.36869952, -0.1532474 ,  0.07990661],
       [-0.18903798, -0.07746837,  0.15345986, -0.42206267],
       [-0.22069892, -0.44832066, -0.13565437, -0.31968084],
       [-0.11970737,  0.21496074, -0.25022247, -0.01439001],
       [ 0.01688699, -0.4044529 ,  0.15093064, -0.19210514],
       [ 0.12802374,  0.12552813, -0.44976383, -0.27050754],
       [ 0.06746761, -0.45451078, -0.42895716,  0.09177345],
       [-0.

<keras.callbacks.History at 0x7f9d76685c90>

Custom Optimizer 

modified adam optimizer's behavior

In [16]:
class CustomOptimizer(tf.keras.optimizers.Adam):
  def __init__(self, dropout_rate):
    super().__init__()
    self.dropout_rate = dropout_rate
  
  def _resource_apply_dense(self, grad, var):
    dropout_mask = tf.keras.backend.random_binomial(tf.shape(var), p=1-self.dropout_rate)
    grad = grad * dropout_mask
    return super()._resource_apply_dense(grad, var)

In [17]:
model.compile(optimizer=CustomOptimizer(dropout_rate=0.1), loss='binary_crossentropy', metrics=['accuracy'])

model.fit(train_dataset, epochs=2, batch_size=16, validation_data=test_dataset)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x7fc8f747a6b0>

Custom Learning Rate Scheduler

In [None]:
def custom_learning_rate_schedule(epoch, lr):
    if epoch % 10 == 0 and epoch > 0:
        return lr * 0.1
    return lr

# Model with custom learning rate scheduler
model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

callback = tf.keras.callbacks.LearningRateScheduler(custom_learning_rate_schedule)

model.fit(x_train, y_train, epochs=5, validation_split=0.1, callbacks=[callback])


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7fda641d35e0>