# Chapter 12: Custom Models and Training with Tensorflow



Creating a new loss function allows you to store the config, load from a config and apply ('call') the method.

Initializers, Regularizers, Constraings can be overwriten. A kernel_constraint allows you to overwrite the edge weights.



In [None]:
# Common imports

import matplotlib.cm as cm
from matplotlib.image import imread
import matplotlib as mpl
import matplotlib.pyplot as plt
import mpl_toolkits.mplot3d.axes3d as p3

import numpy as np

from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs

from sklearn.metrics import accuracy_score
from sklearn.metrics import silhouette_samples
from sklearn.metrics import silhouette_score

from sklearn.datasets import fetch_california_housing
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_samples, silhouette_score

from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import tensorflow as tf
from tensorflow import keras

print("TF version ", tf.__version__)
print("Keras version ", keras.__version__)

# Custom error handler for the entire notebook so stack traces are not lost
from IPython.core.ultratb import AutoFormattedTB

# initialize the formatter for making the tracebacks into strings
itb = AutoFormattedTB(mode = 'Plain', tb_offset = 1)

# Define a global with the stack trace that we can append to in the handler.
viki_stack_trace = ''

# this function will be called on exceptions in any cell
def custom_exc(shell, etype, evalue, tb, tb_offset=None):
    global viki_stack_trace

    # still show the error within the notebook, don't just swallow it
    shell.showtraceback((etype, evalue, tb), tb_offset=tb_offset)

    # grab the traceback and make it into a list of strings
    stb = itb.structured_traceback(etype, evalue, tb)
    sstb = itb.stb2text(stb)

    print (sstb) # <--- this is the variable with the traceback string
    viki_stack_trace = viki_stack_trace + sstb

# this registers a custom exception handler for the whole current notebook
get_ipython().set_custom_exc((Exception,), custom_exc)


"Loss" functions are used during training, and their gradient is what is optimized.

By contrast, "metrics" are used to evaluate a model, they can be anything arbitrary. They have no expectation of
having nonzero values or existence of gradients.

This is a custom loss function

In [3]:
class HuberLoss(keras.losses.Loss):
    "A custom loss function that will be used later. Just an example"

    def __init(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)

    def call(self, y_true, y_pred):
        "Evaluate the loss at this stage"
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold ** 2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    
    def get_config(self):
        "Called when model is saved to preserve existing config. This class will save its parent class' config too."
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

Here are other custom functions:


In [None]:
def activation_softplus(z):
    "Used to return a probability of seeing this output"
    return tf.math.log(tf.exp(z) + 1.0)

def initializer_glorot(shape, dtype=tf.float32):
    "Used to initialize weights before training"
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def regularizer_l1(weights):
    "Used to avoid over-fitting, and keep weights meaningful"
    return tf.reduce_sum(tf.abs(0.01 * weights))

def constraint_weights(weights):
    "Applied after the training to constrain the weights at the layer arbitrarily"
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

The above methods can be used directly, but we can also create a class that inherits from
keras.initializers.Initializer, keras.regularizers.Regularizer, and keras.constraints.Constraint appropriately.
The activation function usually has nothing to save, so if you want to have a parameter for the activation, you can create a new layer type.

Here's an example of extending just one of them, the Regularizer.

In [4]:
class VikiL1(keras.regularizers.Regularizer):
    def __init__(self, factor):
        "Create a regularizer with L1 regularization and the factor provided here"
        self.factor = factor

    def __call__(self, weights):
        "Apply this regularizer with the weights at this layer"
        return tf.reduce_sum(tf.abs(self.factor * weights))
    
    def get_config(self):
        "Returns the configuration of this class for application later"
        return {"factor": self.factor} # We don't look up the parent's config, because it has none.
    


A custom layer can be implemented that does add_weight() for all the values it needs to keep track of, and in the call() method, it provides the output from this layer. I don't quite understand how gradients are calculated at every layer. Perhaps the exercises make this clearer.

# Exercises

1. Tensorflow is a library for training, saving/restoring and applying models that runs fast on GPUs, can scale automatically to CPUs as available. It is built using Numpy arrays, and provides functionality like automatic differentiation of code, fast numerical routines, and a vibrant ecosystem of models and implementations.

2. Tensorflow is not a drop-in replacement to Numpy. Tensorflow produces tensorflow operations (ops) rather than native execution blocks. The computation graph is created by Tensorflow and then is executed by it. This is different from numpy which produces either native code or python code.

TF is also meant to be used as a way to train NN models, store them, and run them elsewhere. This could be done on better machines like GPUs, or worse machines like mobile phones with tflite.



3. tf.range(10) gives a single tensor of shape \(10\), while tf.constant\(np.arange\(10)) should give ten constants? Let's try this

In [5]:
tf.range(10)

<tf.Tensor: shape=(10,), dtype=int32, numpy=array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)>

In [None]:
tf.constant(np.arange(10))

They're identical except for the Numpy version started out with 64-bit ints, and those were used by TF too. TF by itself defaults to int32 (to make things faster on GPUs)

4. Six other data structures:
   1. Tensor arrays. Lists of tensors.
   2. String Tensors
   3. Ragged tensors (tf.RaggedTensor)
   4. Sparse matrices?
   5. Sets
   6. Queues

5. I'd use a function if I don't have parameters or I don't want the custom loss function saved along with the model. Otherwise I'd use a subclass of keras.losses.Loss

6. Similar to the previous option, I'd use a custom metric class if I intend to report on the metric during model training, and thus would like to store it, or to parameterize it somehow and remember the parameters. I would also use a custom metric in case it was a streaming metric, and it needed to remember something past this invocation to have the correct metric

7. I'd make a custom layer when I plan to use the layer frequently in other models, while a full model when the structure of the full model itself can be reused frequently for other kinds of data without further modifications.

8. You'd do your own training loop if you want gradients propagated differently. The book says that in some implementations, you want different optimizers for different sections of the graph. That's another reason.


9. Keras components should be convertible to TF. If they have Python code, they will be evaluated once, and the values will be used on subsequent runs. For example, if a random value is needed on every training run of the loop, it should be a TF operation. If instead it is a Python code, that value will be calculated once and its value used thereafter.

10. Call primitive operations, as far as possible. Don't call compiled code. Change sum() to tf.reduce_sum(). Be mindful about side-effects because they might not be caused on every run.

11. The book doesn't use the word 'dynamic' Keras model, but I think this is the same as the Residual example given. You use a dynamic model when you want to do things the sequential API doens't make easy: skip layers, two parallel paths that join up and then split again, or skipping layers later in the training, or based on the residual remaining.



12. Create a custom layer that does layer normalization. Ooh-Kaay. This is going to be fun.



In [4]:
class VikiNormalizedLayer(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        # The general initialization routine, parse the normal args and remember the units.
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)

    def call(self, inputs):
        # Perform layer normalization here
        mean, variance = tf.nn.moments(inputs, axes=-1, keepdims=True)
        std_dev = tf.math.sqrt(variance)
        # Eps is a small smoothing factor, selected to be everyones favorite: 0.001 here.
        eps = 0.001
        return self.alpha @ (inputs - mean) / (std_dev + eps) + self.beta
    
    def build(self, batch_input_shape):
        # Define two trainable weights: alpha and beta, which are the same shape as the previous out
        # and float32.
        self.alpha = self.add_weight(name="alpha", shape=[batch_input_shape[-1], self.units],
                                    initializer="ones")
        self.beta = self.add_weight(name="beta", shape=[batch_input_shape[-1], self.units],
                                    initializer="zeros")


That was too silent, I bet this is not working. How do we test out that layer? Let's create an NN using it.

In [5]:
# Let's load the data

(X, y), (testX, testy) = keras.datasets.cifar10.load_data()

# Split into training and testing
X_train, X_valid = X[:40000] / 255.0, X[40000:] / 255.0
y = y.reshape(50000)
testy.reshape(10000)

y_train, y_valid = y[:40000]        , y[40000:]

print("Validation: ", X_valid.shape)
print("Training: ", X_train.shape)
print("Labels validation: ", y_valid.shape)
print("Labels training: ", y_train.shape)

print("Test: ", testX.shape)
print("Labels test: ", testy.shape)


from sklearn.base import clone

def create_keras_classifier_model(n_classes=100):
    """Keras multinomial logistic regression creation model
 
    Args:
        n_classes(int): Number of classes to be classified
 
    Returns:
        Compiled keras model
 
    """
    # create model
    model = keras.models.Sequential()
    
    # The input: we get 32x32 pixels, each with 3 colors (rgb)
    model.add(keras.layers.Flatten(input_shape=[32,32,3]))
    # Then the hidden layers, fully connected (100 by default)
    for i in range(20):
        model.add(keras.layers.Dense(
            n_classes, 
            activation="elu",
            kernel_initializer=tf.keras.initializers.HeNormal(),
            kernel_regularizer=tf.keras.regularizers.l2(0.01),
        ))
        model.add(keras.layers.BatchNormalization())
    # Now add the output layer: 10 classes in CIFAR10, so 10 outputs.
    model.add(keras.layers.Dense(10, activation="softmax"))

    # print(model.summary())
    # Compile model
    nadam = keras.optimizers.Nadam(learning_rate=0.0001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

    model.compile(
        loss="sparse_categorical_crossentropy", 
        optimizer=nadam,
        metrics=["accuracy"]
    )
    return model

# Clear the errors, in case we observe them in the long run.
viki_stack_trace = ''

# Got to remember them. mm_bn is the model with Batch normalization
mm_bn = create_keras_classifier_model(100)
print ("Model built: ", mm_bn)

history_bn = mm_bn.fit(X_train, y_train, epochs=10, verbose=0,
                 batch_size=32,
                 validation_data=(X_valid, y_valid))


Validation:  (10000, 32, 32, 3)
Training:  (40000, 32, 32, 3)
Labels validation:  (10000,)
Labels training:  (40000,)
Test:  (10000, 32, 32, 3)
Labels test:  (10000, 1)
Model built:  <tensorflow.python.keras.engine.sequential.Sequential object at 0x7faf8e230e20>


In [None]:
# Based on that, let's create a model using my Normalization layer

def create_keras_classifier_model(n_classes=100):
    """Keras multinomial logistic regression creation model
 
    Args:
        n_classes(int): Number of classes to be classified
 
    Returns:
        Compiled keras model
 
    """
    # create model
    model = keras.models.Sequential()
    
    # The input: we get 32x32 pixels, each with 3 colors (rgb)
    model.add(keras.layers.Flatten(input_shape=[32,32,3]))
    # Then the hidden layers, fully connected (100 by default)
    for i in range(3):
        model.add(keras.layers.Dense(
            n_classes, 
            activation="elu",
            kernel_initializer=tf.keras.initializers.HeNormal(),
            kernel_regularizer=tf.keras.regularizers.l2(0.01),
        ))
        model.add(VikiNormalizationLayer())
    # Now add the output layer: 10 classes in CIFAR10, so 10 outputs.
    model.add(keras.layers.Dense(10, activation="softmax"))

    # print(model.summary())
    # Compile model
    nadam = keras.optimizers.Nadam(learning_rate=0.0001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

    model.compile(
        loss="sparse_categorical_crossentropy", 
        optimizer=nadam,
        metrics=["accuracy"]
    )
    return model

# Clear the errors, in case we observe them in the long run.
viki_stack_trace = ''

# Got to remember them. mm_bn is the model with Batch normalization
mm_bn = create_keras_classifier_model(100)
print ("Model built: ", mm_bn)

history_bn = mm_bn.fit(X_train, y_train, epochs=10, verbose=0,
                 batch_size=32,
                 validation_data=(X_valid, y_valid))
