<center><font size="10"> 🔥Customizing TensorFlow🔥 </font></center>

### Custom Loss Functions

In [2]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

In [3]:
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error)<1
    sqaured_loss = tf.square(error)/2
    linear_loss = tf.abs(error)-0.5
    return tf.where(is_small_error, sqaured_loss, linear_loss)

In [4]:
input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                        input_shape=input_shape),
    keras.layers.Dense(1),
])

In [5]:
model.compile(loss = huber_fn, optimizer = "nadam")
model.fit(X_train, y_train, epochs=2,
            validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x22a5630eac0>

#### Saving The Custom model

In [6]:
model.save('my_custom_model.h5')

In [7]:
model = keras.models.load_model("my_custom_model.h5", custom_objects = {"huber_fn":huber_fn})

In [8]:
model.fit(X_train, y_train, epochs=2,
            validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x22a5631f0d0>

#### If u need to create a different threshold for the model u can just specify a new function but it wont be saved so add a threshold into the model while loading it

In [9]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error)<threshold
        sqaured_loss = tf.square(error)/2
        linear_loss = threshold * tf.abs(error) - threshold**2/2
        return tf.where(is_small_error, sqaured_loss, linear_loss)
    return huber_fn

model.compile(loss=create_huber(2.0), optimizer="nadam")

In [10]:
model.save("my_custom_threshold_model.h5")

In [11]:
model = keras.models.load_model("my_custom_threshold_model.h5", custom_objects={'huber_fn':create_huber(2.0)})

In [12]:
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x22a2118a610>

#### Custom Activation Functions

In [13]:
def my_softplus(z): # return value is just tf.nn.softplus(z)
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights): # return value is just tf.nn.relu(weights)
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [14]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [15]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                        input_shape=input_shape),
    keras.layers.Dense(1, activation=my_softplus,
                        kernel_regularizer=my_l1_regularizer,
                        kernel_constraint=my_positive_weights,
                        kernel_initializer=my_glorot_initializer),
])

In [16]:
model.compile(loss = "mse", optimizer="nadam", metrics=['mae'])

In [17]:
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid, y_valid))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x22a60877190>

#### Custom Metrics

In [18]:
model.compile(loss = 'mse', optimizer="nadam", metrics=[create_huber(2.0)])

In [19]:
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1]) # Streaming metrics

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [20]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

#### Custom Layers

In [21]:
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

#### Lets create a subclass to create a custom layer

In [22]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation = None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)

    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name = "kernel", shape = [batch_input_shape[-1], self.units],
            initializer = "glorot_normal")

        self.bias = self.add_weight(
            name = 'bias', shape = [self.units], initializer = "zeros")
        super().build(batch_input_shape)
    
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)

    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1]+[self.units])
    
    def get_config(self):
        base_config = super().get_config()
        return{**base_config, "units":self.units,
                "activation": keras.activations.serialize(self.activation)}

In [23]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [24]:
model = keras.models.Sequential([
        MyDense(30, activation="relu", input_shape = input_shape),
        MyDense(1)
])

In [25]:
model.compile(loss="mse", optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Epoch 2/2


0.5255994200706482

In [26]:
class MyGausianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev

    def call(self, X, training = None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X
    
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

In [27]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    MyGausianNoise(stddev=1.0),
    keras.layers.Dense(30, activation ='selu'),
    keras.layers.Dense(1)
])

In [28]:
model.compile(loss='mse', optimizer='nadam')
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Epoch 2/2


0.7448066473007202

In [29]:
class Residualblock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation = 'elu', kernel_initializer = 'he_normal')
                        for _ in range(n_layers)]
        
    def call(self, inputs):
        Z= inputs
        for layers in self.hidden:
            Z = layers(Z)
        return inputs + Z

In [30]:
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation = 'elu',
                                            kernel_initializer = 'he_normal')
        
        self.block1 = Residualblock(2,30)
        self.block2 = Residualblock(2,30)
        self.out = keras.layers.Dense(output_dim)
    
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1+3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

In [31]:
model = ResidualRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=5)
score = model.evaluate(X_test_scaled, y_test)
y_pred = model.predict(X_test_scaled)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [32]:
class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu",
                                        kernel_initializer="lecun_normal")
                        for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
        self.reconstruction_mean = keras.metrics.Mean(name="reconstruction_error")

    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        #super().build(batch_input_shape)

    def call(self, inputs, training=None):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss)
        if training:
            result = self.reconstruction_mean(recon_loss)
            self.add_metric(result)
        return self.out(Z)

In [33]:
model = ReconstructingRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=2)
y_pred = model.predict(X_test_scaled)

Epoch 1/2
Epoch 2/2


### Computing Gradients using Autodiff 

In [34]:
def f(w1, w2):
    return 3 * w1 ** 2 + 2 * w1 * w2

In [35]:
w1, w2 = 5, 3
eps = 1e-6

In [36]:
(f(w1 + eps, w2) - f(w1,w2))/eps

36.000003007075065

In [37]:
(f(w1, w2+eps) - f(w1,w2))/eps

10.000000003174137

#### U can easily do this with tensors 

In [38]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z,[w1,w2])

In [39]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [40]:
with tf.GradientTape() as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
dz_dw1

<tf.Tensor: shape=(), dtype=float32, numpy=36.0>

In [41]:
dz_dw2 = tape.gradient(z, w2)

RuntimeError: A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)

In [42]:
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)


dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
del tape

dz_dw1, dz_dw2

(<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>)

In [43]:
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])

gradients

[None, None]

In [44]:
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

### Functions in Tensorflow

In [45]:
def cube(x):
    return x**3

In [46]:
cube(2)

8

In [47]:
cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

#### Convert a normal function to Tensorflow function

In [48]:
tf_cube = tf.function(cube)
tf_cube

<tensorflow.python.eager.def_function.Function at 0x22c2b4659d0>

In [49]:
tf_cube(2)

<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [50]:
tf_cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

#### To create a TensorFlow function directly

In [51]:
@tf.function
def tf_cube(x):
    return x**3

#### U can still access this function as normal python function

In [52]:
tf_cube.python_function(2)

8