<a href="https://colab.research.google.com/github/shernee/04_cmpe258/blob/master/AdvancedDL_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
import numpy as np

from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [2]:
(X_train_full, y_train_full), (X_test_full, y_test_full) = tf.keras.datasets.mnist.load_data(
    path='mnist.npz'
)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [27]:
X_train = X_train_full[:400]
y_train = tf.cast(y_train_full[:400], dtype=np.float32)
X_test = X_test_full[:100]
y_test = tf.cast(y_test_full[:100], dtype=np.float32)
X_train = X_train.astype(np.float32) / 255.
X_test = X_test.astype(np.float32) / 255.

#### Some custom objects

In [4]:
# custom loss

class CustomCELoss(tf.keras.losses.Loss):
    def __init__(self):
        super().__init__()

    def call(self, y_true, y_pred):        
        log_y_pred = tf.math.log(y_pred)
        elements = -tf.math.multiply_no_nan(x=log_y_pred, y=y_true)
        return tf.reduce_mean(tf.reduce_sum(elements,axis=1))

    def get_config(self):
        base_config = super().get_config()
        return {**base_config}

In [5]:
# custom regularizer

def custom_l2_regularizer(weights):
    return tf.reduce_sum(0.02 * tf.square(weights))

In [6]:
# custom activation

def custom_relu(x):
    return tf.keras.backend.maximum(-0.1, x)


In [7]:
# custom initializer

def custom_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

In [8]:
# custom kernel constraint

class CustomKernelConstraint(tf.keras.constraints.Constraint):
  def __init__(self):
    super().__init__()
  
  def call(self, weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

  def get_config(self):
        base_config = super().get_config()
        return {**base_config}

custom_constraint = CustomKernelConstraint()

In [9]:
# custom seed generator

def custom_seed_generator(seed: int):
  tf.random.set_seed(seed)
  np.random.seed(seed)

In [10]:
# custom metric

class CustomCEMetric(tf.keras.metrics.Mean):
    def __init__(self, name='CEMetric', dtype=None):
        self.ce_func = CustomCELoss()
        super().__init__(name=name, dtype=dtype)

    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.ce_func(y_true, y_pred)
        super(CustomCEMetric, self).update_state(metric, sample_weight)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config}


In [11]:
# custom callback - early stopping

class CustomEarlyStopping(tf.keras.callbacks.Callback):

    def __init__(self, patience=0):
        super().__init__()
        self.patience = patience 
        self.best_weights = None

    def on_train_begin(self, logs=None):
        # The number of epoch it has waited when loss is no longer minimum.
        self.wait = 0
        # The epoch the training stops at.
        self.stopped_epoch = 0
        # Initialize the best as infinity.
        self.best = np.Inf

    def on_epoch_end(self, epoch, logs=None):
        current = logs.get("loss")
        if np.less(current, self.best):
            self.best = current
            self.wait = 0
            self.best_weights = self.model.get_weights()
        else:
            self.wait += 1
            if self.wait >= self.patience:
                self.stopped_epoch = epoch
                self.model.stop_training = True
                print("Restoring model weights from the end of the best epoch.")
                self.model.set_weights(self.best_weights)

    def on_train_end(self, logs=None):
        if self.stopped_epoch > 0:
            print("Epoch %05d: early stopping" % (self.stopped_epoch + 1))


In [12]:
# custom optimizer

class CustomOptimizer(tf.keras.optimizers.Adam):
  def __init__(self, dropout_rate):
    super().__init__()
    self.dropout_rate = dropout_rate
  
  def _resource_apply_dense(self, grad, var):
    dropout_mask = tf.keras.backend.random_binomial(tf.shape(var), p=1-self.dropout_rate)
    grad = grad * dropout_mask
    return super()._resource_apply_dense(grad, var)

In [13]:
# custom lr scheduler

def exponential_decay(lr0, s):
    def exponential_decay_fn(epoch):
        return lr0 * 0.1 ** (epoch / s)
    return exponential_decay_fn

exponential_decay_fn = exponential_decay(lr0=0.01, s=20)
lr_scheduler = tf.keras.callbacks.LearningRateScheduler(exponential_decay_fn)

In [14]:
# custom dropout

class CustomDropout(tf.keras.layers.Layer):
    def __init__(self, rate=0.5, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate

    def build(self, input_shape):
        super().build(input_shape)

    def call(self, inputs, training=None):
        def dropout():
            # Generate a random tensor with the same shape as inputs
            mask = tf.keras.backend.random_binomial(tf.shape(inputs), p=1-self.rate)
            # Scale the masked values to preserve the mean
            outputs = inputs * mask / (1 - self.rate)
            return outputs

        return tf.keras.backend.in_train_phase(dropout(), inputs, training=training)

    def get_config(self):
        config = super().get_config()
        config['rate'] = self.rate
        return config

In [15]:
# custom layer

class CustomDense(tf.keras.layers.Layer):
    def __init__(self, units, activation='relu'):
      super().__init__()
      self.units = units
      self.activation = tf.keras.activations.get(activation)

    def build(self, input_shape):
      self.kernel = self.add_weight(shape=(input_shape[1], self.units), initializer='normal', trainable=True)
      self.biases = self.add_weight(shape=(self.units,), initializer='zeros', trainable=True)
      super().build(input_shape)

    def call(self, X):
      return self.activation(X @ self.kernel + self.biases)

#### Using a model with multiple custom objects defined above

In [22]:
custom_seed_generator(21)


In [50]:
n_epochs = 2
batch_size = 16
n_steps = len(X_train) // batch_size
custom_ce_metric = CustomCEMetric()
custom_loss = CustomCELoss()

model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=[28, 28]),
    tf.keras.layers.Dense(60, activation=custom_relu,
                          kernel_initializer=custom_glorot_initializer,
                          kernel_regularizer=custom_l2_regularizer,
                          kernel_constraint=custom_constraint),
    CustomDropout(rate=0.2),
    CustomDense(30),
    tf.keras.layers.Dense(1, activation="softmax"),
])


model.compile(loss=custom_loss, optimizer=CustomOptimizer(dropout_rate=0.1), metrics=[custom_ce_metric])

model.fit(x=X_train, y=y_train, epochs=n_epochs, batch_size=batch_size, validation_data=(X_test, y_test), callbacks=[CustomEarlyStopping(), lr_scheduler])

  return dispatch_target(*args, **kwargs)




<keras.callbacks.History at 0x7f030845a920>

#### Saving and loading a model with custom objects

In [53]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
loss_fn = tf.keras.losses.categorical_crossentropy
metrics = [tf.keras.metrics.categorical_crossentropy]

model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=[28, 28]),
    tf.keras.layers.Dense(60, activation=custom_relu),
    tf.keras.layers.Dropout(rate=0.2),
     tf.keras.layers.Dense(30, activation="relu"),
    tf.keras.layers.Dense(1, activation="softmax"),
])

model.compile(loss=loss_fn, optimizer=optimizer, metrics=metrics)
model.fit(x=X_train, y=y_train, epochs=n_epochs, batch_size=batch_size)

  return dispatch_target(*args, **kwargs)




<keras.callbacks.History at 0x7f0308127430>

In [55]:
model.save('model_with_custom_activation')



In [56]:
# Load saved model and run again

model = tf.keras.models.load_model('model_with_custom_activation', 
                                   custom_objects={
                                       'custom_relu':custom_relu, 
                                  })
model.fit(x=X_train, y=y_train, epochs=1, batch_size=16)

  return dispatch_target(*args, **kwargs)




<keras.callbacks.History at 0x7f02f9f9c490>

#### Custom model

In [26]:
df = fetch_california_housing()

X_train_1, X_test_1, y_train_1, y_test_1 = train_test_split(df.data, df.target.reshape(-1, 1), random_state=42)

sc = StandardScaler()
X_train_scaled = sc.fit_transform(X_train_1)
X_test_scaled = sc.transform(X_test_1)

In [25]:
class CustomModel(tf.keras.Model):
    def __init__(self, **kwargs):
      super().__init__(**kwargs)
      self.hidden1 = CustomDense(100)
      self.hidden2 = CustomDense(50)
      self.hidden3 = CustomDense(10)
      self.output_ = CustomDense(1)

    def call(self, input):
      hidden1 = self.hidden1(input)
      hidden2 = self.hidden2(hidden1)
      hidden3 = self.hidden3(hidden2)
      concat = tf.keras.layers.concatenate([input, hidden3])
      output = self.output_(concat)
      return output

model = CustomModel()

model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=2)
score = model.evaluate(X_test_scaled, y_test)
model.predict(X_test_scaled[:3])

Epoch 1/2
Epoch 2/2


array([[0.4658325],
       [1.6273526],
       [3.342863 ]], dtype=float32)

#### Custom training loop

In [36]:
n_epochs = 2
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
loss_fn = tf.keras.losses.mse
metrics = [tf.keras.metrics.Accuracy()]

model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(30, activation=custom_relu, kernel_initializer=custom_glorot_initializer),
    tf.keras.layers.Dense(1, kernel_regularizer=custom_l2_regularizer)
])

def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

for epoch in range(n_epochs):
  print('epoch =', epoch)
  for step in range(1, n_steps + 1):
        X_batch, y_batch = X_train[step], y_train[step]
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)

        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        for metric in metrics:
          metric(y_batch, y_pred[step])

epoch = 0
epoch = 1
