In [32]:
import tensorflow as tf
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler

In [33]:
X, y = make_regression(n_samples=1000, n_features=1, noise=0.1, random_state=42)
y = y.reshape(-1,1)

In [34]:
X_train,X_temp,y_train,y_temp = train_test_split(X,y,test_size=0.2,random_state=42)
X_val,X_test,y_val,y_test = train_test_split(X_temp,y_temp,test_size=0.5,random_state=42)

In [35]:
y_scaler = StandardScaler()
y_train = y_scaler.fit_transform(y_train)
y_val = y_scaler.transform(y_val) # scales the validaton target values using the mean and standard deviation of the training data 
y_test = y_scaler.transform(y_test) # test target values 

In [36]:
# defining the model 
class FeedForwardNN(tf.keras.Model):
    def __init__(self):
        super(FeedForwardNN,self).__init__()
        self.hidden1 = tf.keras.layers.Dense(20,activation='relu',kernel_initializer=tf.keras.initializers.GlorotUniform())
        self.hidden2 = tf.keras.layers.Dense(20,activation='relu',kernel_initializer=tf.keras.initializers.GlorotUniform())
        self.hidden3 = tf.keras.layers.Dense(20,activation='relu',kernel_initializer=tf.keras.initializers.GlorotUniform())
        self.hidden4 = tf.keras.layers.Dense(20,activation='relu',kernel_initializer=tf.keras.initializers.GlorotUniform())
        self.output_layer = tf.keras.layers.Dense(1,activation=None)
    def call(self,inputs):
        x = self.hidden1(inputs)
        x = self.hidden2(x)
        x = self.hidden3(x)
        x = self.hidden4(x)
        x = self.output_layer(x)
        return x

In [37]:
# defining the class of optimizers 
import numpy as np
import tensorflow as tf 
class Optimizers:
    def __init__(self, method='vanilla', learning_rate=0.01, beta1=0.9, beta2=0.999, epsilon=1e-7, decay_rate=0.9,batch_size=None):
        self.method = method
        self.learning_rate = learning_rate
        self.beta1 = beta1  # Momentum parameter for momentum-based optimizers
        self.beta2 = beta2  # RMSProp/Adam parameter
        self.epsilon = epsilon
        self.decay_rate = decay_rate  # For exponential decay in RMSProp
        self.momentum = None
        self.velocity = None
        self.squared_gradients = None
        self.batch_size = batch_size # For mini-batch optimization
        self.t = 0  # Time step for Adam
    def apply_gradients(self, weights, gradients):
        if self.method == 'vanilla':
            # Vanilla Gradient Descent
            for w, g in zip(weights, gradients):
                w.assign_sub(self.learning_rate * g)
        elif self.method == 'mini_batch': ## **** Doubt
        # Mini-Batch Gradient Descent: Subset of dataset
            if self.batch_size is None:
                raise ValueError("Batch size must be specified for mini-batch optimization.")
            for w, g in zip(weights, gradients):
                w.assign_sub(self.learning_rate * g / self.batch_size)  # Averaging gradients for the mini-batch
        
        elif self.method == 'sgd': # wrong code 
            # Stochastic Gradient Descent: Single sample
            for w, g in zip(weights, gradients):
                w.assign_sub(self.learning_rate * g)

        elif self.method == 'momentum':
            # Momentum
            if self.momentum is None:
                self.momentum = [tf.zeros_like(w) for w in weights]
            for i, (w, g) in enumerate(zip(weights, gradients)):
                self.momentum[i] = self.beta1 * self.momentum[i] + (1 - self.beta1) * g
                w.assign_sub(self.learning_rate * self.momentum[i])

        elif self.method == 'nesterov':
            # Nesterov Accelerated Gradient (NAG)
            if self.momentum is None:
                self.momentum = [tf.zeros_like(w) for w in weights]
            for i, (w, g) in enumerate(zip(weights, gradients)):
                prev_momentum = self.momentum[i]
                self.momentum[i] = self.beta1 * self.momentum[i] + (1 - self.beta1) * g
                w.assign_sub(self.learning_rate * (self.beta1 * prev_momentum + (1 - self.beta1) * g))

        elif self.method == 'adagrad':
            # Adagrad
            if self.squared_gradients is None:
                self.squared_gradients = [tf.zeros_like(w) for w in weights]
            for i, (w, g) in enumerate(zip(weights, gradients)):
                self.squared_gradients[i] += tf.square(g)
                adjusted_lr = self.learning_rate / (tf.sqrt(self.squared_gradients[i]) + self.epsilon)
                w.assign_sub(adjusted_lr * g)

        elif self.method == 'rmsprop':
            # RMSProp
            if self.squared_gradients is None:
                self.squared_gradients = [tf.zeros_like(w) for w in weights]
            for i, (w, g) in enumerate(zip(weights, gradients)):
                self.squared_gradients[i] = self.decay_rate * self.squared_gradients[i] + (1 - self.decay_rate) * tf.square(g)
                adjusted_lr = self.learning_rate / (tf.sqrt(self.squared_gradients[i]) + self.epsilon)
                w.assign_sub(adjusted_lr * g)

        elif self.method == 'adadelta':
            # Adadelta
            if self.squared_gradients is None:
                self.squared_gradients = [tf.zeros_like(w) for w in weights]
            if self.velocity is None:
                self.velocity = [tf.zeros_like(w) for w in weights]
            for i, (w, g) in enumerate(zip(weights, gradients)):
                self.squared_gradients[i] = self.decay_rate * self.squared_gradients[i] + (1 - self.decay_rate) * tf.square(g)
                update = tf.sqrt(self.velocity[i] + self.epsilon) / (tf.sqrt(self.squared_gradients[i]) + self.epsilon) * g
                self.velocity[i] = self.decay_rate * self.velocity[i] + (1 - self.decay_rate) * tf.square(update)
                w.assign_sub(update)

        elif self.method == 'adam':
            # Adam
            if self.momentum is None:
                self.momentum = [tf.zeros_like(w) for w in weights]
            if self.squared_gradients is None:
                self.squared_gradients = [tf.zeros_like(w) for w in weights]
            self.t += 1
            for i, (w, g) in enumerate(zip(weights, gradients)):
                self.momentum[i] = self.beta1 * self.momentum[i] + (1 - self.beta1) * g
                self.squared_gradients[i] = self.beta2 * self.squared_gradients[i] + (1 - self.beta2) * tf.square(g)
                m_hat = self.momentum[i] / (1 - tf.pow(self.beta1, self.t))
                v_hat = self.squared_gradients[i] / (1 - tf.pow(self.beta2, self.t))
                w.assign_sub(self.learning_rate * m_hat / (tf.sqrt(v_hat) + self.epsilon))

        else:
            raise ValueError(f"Unknown optimization method: {self.method}")

In [38]:
mse_loss = tf.keras.losses.MeanSquaredError()

In [39]:
epochs = 1000

# Initialize the model
model = FeedForwardNN()
# batch_size = 32 # For mini-batch optimization
# Initialize the optimizer
import matplotlib.pyplot as plt

# List of optimizer methods (excluding mini_batch)
optimizer_methods = ['vanilla', 'momentum', 'nesterov', 'adagrad', 'rmsprop', 'adadelta', 'adam']

# Dictionary to store test losses for each optimizer
test_loss_curves = {}

# Iterate through each optimizer method
for method in optimizer_methods:
    print(f"Starting training with optimizer: {method}")
    
    # Reinitialize the model and optimizer
    model = FeedForwardNN()
    optimizer = Optimizers(method=method, learning_rate=0.01)
    
    train_losses = []  
    val_losses = []    
    test_losses = []  

    for epoch in range(1000): # using Gradient tape for easier computation of gradients("Stack Overflow)
        with tf.GradientTape() as tape:
            predictions = model(X_train)
            train_loss = mse_loss(y_train, predictions)
        gradients = tape.gradient(train_loss, model.trainable_variables)
        optimizer.apply_gradients(model.trainable_variables, gradients)
        val_predictions = model(X_val)
        val_loss = mse_loss(y_val, val_predictions)

        test_predictions = model(X_test)
        test_loss = mse_loss(y_test, test_predictions)

        train_losses.append(train_loss.numpy())
        val_losses.append(val_loss.numpy())
        test_losses.append(test_loss.numpy())
    test_loss_curves[method] = test_losses

    # Print final losses after 1000 epochs
    print(f"Optimizer: {method}")
    print(f"  Final Train Loss: {train_losses[-1]:.4f}")
    print(f"  Final Validation Loss: {val_losses[-1]:.4f}")
    print(f"  Final Test Loss: {test_losses[-1]:.4f}")
    print(f"Finished training with optimizer: {method}\n")
# Plot test loss curves
plt.figure(figsize=(10, 6))
for method, losses in test_loss_curves.items():
    plt.plot(losses, label=method)

plt.title("Test Loss Curves for Different Optimizers")
plt.xlabel("Epochs")
plt.ylabel("Test Loss")
plt.legend()
plt.grid()
plt.show()

Starting training with optimizer: vanilla


KeyboardInterrupt: 