<a href="https://colab.research.google.com/github/sandrons/Hessian_Optimizer/blob/main/Hessian_optimizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Copyright (c) 2024 Alessandro Temperoni


In [None]:
from keras.models import Model
from keras.layers import Input
from keras.layers import Conv2D
from keras.layers import MaxPooling2D

In [None]:
import tensorflow as tf
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization,Activation, Dense, Dot, Embedding, Flatten, GlobalAveragePooling1D, Reshape, Add, Dropout
from tensorflow.keras.layers import Conv1D, MaxPooling1D
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
from tensorflow.python.keras.engine import data_adapter
from tensorflow.python.eager import backprop
from tensorflow.python.util import compat
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow import keras
from tensorflow.python.eager import def_function
from tensorflow.python.framework import ops
from tensorflow.python.keras import backend_config
from tensorflow.python.keras.optimizer_v2 import optimizer_v2
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import state_ops
from tensorflow.python.training import gen_training_ops
from tensorflow.python.util.tf_export import keras_export

In [None]:
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz


In [None]:
train_images, test_images = train_images / 255.0, test_images / 255.0

In [None]:
train_images.shape

(60000, 28, 28)

In [None]:
train_labels.shape

(60000,)

#Hessian

In [None]:
class Hessian(keras.optimizers.Optimizer):
    def __init__(self,
               learning_rate=0.005,
               beta_1=0.9,
               beta_2=0.999,
               epsilon=1e-3,
               amsgrad=False,
               name='Hessian',
               **kwargs):
        super().__init__(name, **kwargs)
        self._set_hyper('learning_rate', learning_rate)
        self._set_hyper('decay', self._initial_decay)
        self._set_hyper('beta_1', beta_1)
        self._set_hyper('beta_2', beta_2)
        self.epsilon = epsilon or backend_config.epsilon()
        self.amsgrad = amsgrad
        self.t = 0.00

    def _create_slots(self, var_list):
        """For each model variable, create the optimizer variable associated with it.
        TensorFlow calls these optimizer variables "slots".
        For momentum optimization, we need one momentum slot per model variable.
        """
        for var in var_list:
            self.add_slot(var, "m") #previous variable i.e. weight or bias
        for var in var_list:
            self.add_slot(var, "u") #previous gradient

    @tf.function
    def _resource_apply_dense(self, grad, v, apply_state=None):

        t = self.t + 1.0
        var_device, var_dtype = v.device, v.dtype.base_dtype
        coefficients = ((apply_state or {}).get((var_device, var_dtype))
                        or self._fallback_apply_state(var_device, var_dtype))
        update_ops = []
        i = 0
        g, diagonal = grad
        m = self.get_slot(v, 'm')
        u = self.get_slot(v, 'u')
        beta1=coefficients['beta_1_t']
        beta2=coefficients['beta_2_t']
        epsilon=coefficients['epsilon']
        lr=coefficients['lr_t']

        m.assign(beta1*m + (1-beta1)*g)
        if diagonal is None:
            u.assign(beta2*u + (1-beta2)*g*g)
        else:
            u.assign(beta2*u + (1-beta2)*diagonal*diagonal)

        m_hat = m/(1-tf.pow(beta1,t))
        u_hat = u/(1-tf.pow(beta2,t))
        update = -lr*m_hat/(tf.sqrt(u_hat) + epsilon)
        update_ops.append(v.assign_add(update))

        i = i + 1

        m.assign(m)
        u.assign(u)
        v.assign(v)

        tf.group(*update_ops)
        return

    def _resource_apply_sparse(self, grad, var):
        raise NotImplementedError

    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config,
            "learning_rate": self._serialize_hyperparameter("learning_rate"),
        }

    def set_weights(self, weights):
        params = self.weights
        # If the weights are generated by Keras V1 optimizer, it includes vhats
        # even without amsgrad, i.e, V1 optimizer has 3x + 1 variables, while V2
        # optimizer has 2x + 1 variables. Filter vhats out for compatibility.
        num_vars = int((len(params) - 1) / 2)
        if len(weights) == 3 * num_vars + 1:
          weights = weights[:len(params)]
        super(Hessian, self).set_weights(weights)

    def _prepare_local(self, var_device, var_dtype, apply_state):
        super(Hessian, self)._prepare_local(var_device, var_dtype, apply_state)
        local_step = math_ops.cast(self.iterations + 1, var_dtype)
        beta_1_t = array_ops.identity(self._get_hyper('beta_1', var_dtype))
        beta_2_t = array_ops.identity(self._get_hyper('beta_2', var_dtype))
        beta_1_power = math_ops.pow(beta_1_t, local_step)
        beta_2_power = math_ops.pow(beta_2_t, local_step)
        lr = (apply_state[(var_device, var_dtype)]['lr_t'] *
              (math_ops.sqrt(1 - beta_2_power) / (1 - beta_1_power)))
        apply_state[(var_device, var_dtype)].update(
            dict(
                lr=lr,
                epsilon=ops.convert_to_tensor(
                    self.epsilon, var_dtype),
                beta_1_t=beta_1_t,
                beta_1_power=beta_1_power,
                one_minus_beta_1_t=1 - beta_1_t,
                beta_2_t=beta_2_t,
                beta_2_power=beta_2_power,
                one_minus_beta_2_t=1 - beta_2_t))



    def get_config(self):
        config = super(Hessian, self).get_config()
        config.update({
            'alpha': self._serialize_hyperparameter('learning_rate'),
            'beta1': self._serialize_hyperparameter('beta_1'),
            'beta2': self._serialize_hyperparameter('beta_2'),
            'epsilon': self.epsilon
        })
        return config

#Model

In [None]:
class network(Model):
    def __init__(self, X_test_d):
        super(network, self).__init__()

        self.conv1 = Conv1D(28, (3), activation='relu', input_shape=(28, 28))
        self.max1 = MaxPooling1D((2))

        self.conv3 = Conv1D(64, (3), activation='relu')
        self.max2 = MaxPooling1D((2))

        self.conv4 = Conv1D(64, (3), activation='relu')

        self.flatten = Flatten()
        self.dense = Dense(64, activation='relu')
        self.dense1 = Dense(10, activation='softmax')

    #@tf.function
    def call(self, pair):

        x = self.conv1(pair)
        x = self.max1(x)

        x = self.conv3(x)
        x = self.max2(x)

        x = self.conv4(x)

        x = self.flatten(x)
        x = self.dense(x)
        x = self.dense1(x)

        return x

    def hvp3(self,var,vec,x,y):
      # hessian-vector product; takes advantage of weighted gradient (hess is with respect to weight matrix)
      # second derivative (on top)
      with tf.GradientTape() as outer_tape:
        # first derivative (inner)
        with tf.GradientTape() as inner_tape:
          logits = self(x, training=True)
          loss = -self.compiled_loss(y,logits,regularization_losses=self.losses)
        grads = inner_tape.gradient(loss,var)

      hess_vec = outer_tape.gradient(grads,var,output_gradients=vec)
      return hess_vec


    #@tf.function
    def train_step(self, data):
        x, y = data

        with backprop.GradientTape() as tape:
            y_pred = self(x, training=True)
            loss = self.compiled_loss(
                y, y_pred, sample_weight=None, regularization_losses=self.losses)

        #calculate accuracy
        self.compiled_metrics.update_state(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        #compute diagonals
        diagonals = []
        for var in trainable_vars:
          z = tf.random.normal(var.shape)
          #Hessian-free method is an oracle to compute the multiplication between the H with a random vector z
          Hz = self.hvp3(var,z,x,y)
          #Hutchinson's method
          diagonal = tf.multiply(Hz,z)
          diagonals.append(diagonal)

        self.optimizer.apply_gradients(zip(zip(gradients,diagonals), self.trainable_variables))

        return {m.name: m.result() for m in self.metrics}

#Training

In [None]:
METRICS = 'accuracy'
model = network(train_images)

model.compile(optimizer=Hessian(learning_rate=0.005,epsilon=1e-3), loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=METRICS)
history = model.fit(train_images, train_labels, validation_split=0.2, callbacks=[
        # patience: number of epochs with no improvement after which training will be stopped
        EarlyStopping(monitor='val_loss', min_delta=.01, patience=5, mode='auto', verbose=0)],
        batch_size=16,epochs=100)


In [None]:
history = model.fit(train_images, train_labels, validation_split=0.2, callbacks=[
        # patience: number of epochs with no improvement after which training will be stopped
        EarlyStopping(monitor='val_loss', min_delta=.001, patience=5, mode='auto', verbose=0)]
        ,batch_size=256,epochs=100)