# **Ultimate Guide to Stochastic Automatic Differentiation**

This notebook outlines the concept of automatic differentiation using **stochastic automatic gradient (SAG)** computations. 
The implementation of **SAG** models are a pre-cursor to the development of an end-to-end quantum artificial intelligence model. 
It is also an important component of **software defined quantum networks (SDQN)**, where the traditional flexibility and scalability of general purpose digital computing infrastructure is combined with the power of quantum processes.

In [None]:
import os, math, random, tempfile, matplotlib, numpy as np,         \
       pandas as pd, seaborn as sns, sklearn.metrics as sk_metrics, \
       tensorflow as tf, tensorflow_datasets as tfds
from matplotlib import pyplot as plt

In [None]:
matplotlib.rcParams['figure.figsize'] = [9, 6]

In [None]:
print(tf.__version__)

## Set random seed for reproducible results 

In [None]:
# tf.random.set_seed(22)

# Computing first derivative of a function

## Compute stochastic automatic differentation
A naive python function for calculating the local derivative using stochastic automatic gradient (SAG) computation.

In [None]:
def compute_stochastic_auto_grad(x, fn, epsilon, num_steps=100, dtype='float32'):
    x = np.array(x, dtype=dtype)
    delta = np.zeros_like(x, dtype=dtype)
    rnd_steps = int(random.SystemRandom().uniform(int(0.9 * num_steps), int(1.1 * num_steps)))
    for i in range(rnd_steps):
        fwd_epsilon = np.zeros_like(x, dtype=dtype) + random.SystemRandom().uniform(epsilon * (1 - epsilon), epsilon * (1 + epsilon))
        rwd_epsilon = np.zeros_like(x, dtype=dtype) + random.SystemRandom().uniform(epsilon * (1 - epsilon), epsilon * (1 + epsilon))
        sum_epsilon = fwd_epsilon + rwd_epsilon
        delta += (fn(x + (x * fwd_epsilon)) - fn(x - (x * rwd_epsilon))) / (x * sum_epsilon)
    delta /=  np.array(rnd_steps, dtype=dtype)
    delta = np.array(delta, dtype=dtype)
    return delta

## Automatic differentiation examples

In [None]:
x = 1 / math.pi
tf_x = tf.convert_to_tensor(float(x))
print(tf_x)

### Native Python function to exponentiate an input

In [None]:
def exponentiate_fn(x, y):
    return math.pow(x, y)

### TensorFlow function to exponentiate an input

In [None]:
@tf.function
def tf_exponentiate_fn(x, y):
    return tf.math.pow(x , y)

### Auto differentiation using TensorFlow

In [None]:
@tf.function
def tf_input_fn(x):
    return tf_exponentiate_fn(x, tf.constant(2.0))

with tf.GradientTape() as g:
  g.watch(tf_x)
  tf_y = tf_input_fn(tf_x)
tf_dy_dx = g.gradient(tf_y, tf_x)
print(tf_dy_dx)

### Differentiation using simple stochastic auto gradient

In [None]:
def input_fn(x):
    return exponentiate_fn(x, 2)

sag_dy_dx = compute_stochastic_auto_grad(x, input_fn, math.pow(10, -5.4444))
print(sag_dy_dx)

In [None]:
dy_dx = 2 * x
print(dy_dx)
print(f'TensorFlow automatic gradient calculation error: {tf_dy_dx.numpy() - dy_dx}')
print(f'Simple stochastic automatic gradient calculation error: {sag_dy_dx - dy_dx}')

$
{f(x) = {x}^{\pi}}
$

${\therefore \frac{d(f(x))}{dx} = {\pi}{x}^{(\pi- 1)}}$

### Auto differentiation using TensorFlow

In [None]:
@tf.function
def tf_input_fn(x):
    return tf_exponentiate_fn(x, tf.constant(math.pi, tf.float32))

with tf.GradientTape() as g:
  g.watch(tf_x)
  tf_y = tf_input_fn(tf_x)
tf_dy_dx = g.gradient(tf_y, tf_x)
print(tf_dy_dx)

### Differentiation using simple stochastic auto gradient

In [None]:
def input_fn(x):
    return exponentiate_fn(x, math.pi)

sag_dy_dx = compute_stochastic_auto_grad(x, input_fn, math.pow(10, -5.4444))
print(sag_dy_dx)

In [None]:
dy_dx = math.pi * math.pow(x, math.pi - 1)
print(dy_dx)
print(f'TensorFlow automatic gradient calculation error: {tf_dy_dx.numpy() - dy_dx}')
print(f'Simple stochastic automatic gradient calculation error: {sag_dy_dx - dy_dx}')

### Auto differentiation using TensorFlow

In [None]:
@tf.function
def tf_input_fn(x):
    return tf_exponentiate_fn(x, x)

with tf.GradientTape() as g:
  g.watch(tf_x)
  tf_y = tf_input_fn(tf_x)
tf_dy_dx = g.gradient(tf_y, tf_x)
print(tf_dy_dx)

### Differentiation using simple stochastic auto gradient

In [None]:
def input_fn(x):
    return exponentiate_fn(x, x)

sag_dy_dx = compute_stochastic_auto_grad(x, input_fn, math.pow(10, -5.4444))
print(sag_dy_dx)

In [None]:
dy_dx = (1 + math.log(x)) * math.pow(x, x)
print(dy_dx)
print(f'TensorFlow automatic gradient calculation error: {tf_dy_dx.numpy() - dy_dx}')
print(f'Simple stochastic automatic gradient calculation error: {sag_dy_dx - dy_dx}')

In [None]:
def input_fn(x):
    return exponentiate_fn(np.exp(x), np.power(x,2))

sag_dy_dx = compute_stochastic_auto_grad(x, input_fn, math.pow(10, -5.4444))
print(sag_dy_dx)

In [None]:
@tf.function
def tf_input_fn(x):
    return tf_exponentiate_fn(tf.math.exp(x), tf.math.pow(x,2))

with tf.GradientTape() as g:
  g.watch(tf_x)
  tf_y = tf_input_fn(tf_x)
tf_dy_dx = g.gradient(tf_y, tf_x)
print(tf_dy_dx)

# [Building a digit recongnizer using TensorFlow auto grad](https://www.tensorflow.org/guide/core/mlp_core)

## Load MNIST training and validation data

In [None]:
train_data, val_data, test_data = tfds.load('mnist', 
                                            split=['train[10000:]', 'train[0:10000]', 'test'],
                                            batch_size=128, as_supervised=True)

In [None]:
x_viz, y_viz = tfds.load('mnist', split=['train[:1500]'], batch_size=-1, as_supervised=True)[0]
x_viz = tf.squeeze(x_viz, axis=3)

for i in range(9):
    plt.subplot(3, 3, 1 + i)
    plt.axis('off')
    plt.imshow(x_viz[i], cmap='gray')
    plt.title(f'True Label: {y_viz[i]}')
    plt.subplots_adjust(hspace=.5)

In [None]:
sns.countplot(x=y_viz.numpy());
plt.xlabel('Digits')
plt.title('MNIST digit distribution');

In [None]:
def preprocess(x, y):
  # Reshaping the data
  x = tf.reshape(x, shape=[-1, 784])
  # Rescaling the data
  x = x / 255
  return x, y

train_data, val_data = train_data.map(preprocess), val_data.map(preprocess)

## Activation function

In [None]:
x = tf.linspace(-2, 2, 201)
x = tf.cast(x, tf.float32)
plt.plot(x, tf.nn.relu(x));
plt.xlabel('x')
plt.ylabel('ReLU(x)')
plt.title('ReLU activation function');

In [None]:
x = tf.linspace(-4, 4, 201)
x = tf.cast(x, tf.float32)
plt.plot(x, tf.nn.softmax(x, axis=0));
plt.xlabel('x')
plt.ylabel('Softmax(x)')
plt.title('Softmax activation function');

In [None]:
def xavier_init(shape):
  # Computes the xavier initialization values for a weight matrix
  in_dim, out_dim = shape
  xavier_lim = tf.sqrt(6.)/tf.sqrt(tf.cast(in_dim + out_dim, tf.float32))
  weight_vals = tf.random.uniform(shape=(in_dim, out_dim), 
                                  minval=-xavier_lim, maxval=xavier_lim, seed=22)
  return weight_vals

In [None]:
class DenseLayer(tf.Module):

  def __init__(self, out_dim, weight_init=xavier_init, activation=tf.identity):
    # Initialize the dimensions and activation functions
    self.out_dim = out_dim
    self.weight_init = weight_init
    self.activation = activation
    self.built = False

  def __call__(self, x):
    if not self.built:
      # Infer the input dimension based on first call
      self.in_dim = x.shape[1]
      # Initialize the weights and biases
      self.w = tf.Variable(self.weight_init(shape=(self.in_dim, self.out_dim)))
      self.b = tf.Variable(tf.zeros(shape=(self.out_dim,)))
      self.built = True
    # Compute the forward pass
    z = tf.add(tf.matmul(x, self.w), self.b)
    return self.activation(z)

In [None]:
class MLP(tf.Module):

  def __init__(self, layers):
    self.layers = layers

  @tf.function
  def __call__(self, x, preds=False): 
    # Execute the model's layers sequentially
    for layer in self.layers:
      x = layer(x)
    return x

In [None]:
hidden_layer_1_size = 700
hidden_layer_2_size = 500
output_size = 10

mlp_model = MLP([
    DenseLayer(out_dim=hidden_layer_1_size, activation=tf.nn.relu),
    DenseLayer(out_dim=hidden_layer_2_size, activation=tf.nn.relu),
    DenseLayer(out_dim=output_size)])

In [None]:
def cross_entropy_loss(y_pred, y):
  # Compute cross entropy loss with a sparse operation
  sparse_ce = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=y_pred)
  return tf.reduce_mean(sparse_ce)

In [None]:
def accuracy(y_pred, y):
  # Compute accuracy after extracting class predictions
  class_preds = tf.argmax(tf.nn.softmax(y_pred), axis=1)
  is_equal = tf.equal(y, class_preds)
  return tf.reduce_mean(tf.cast(is_equal, tf.float32))

In [None]:
class Adam:

    def __init__(self, learning_rate=1e-3, beta_1=0.9, beta_2=0.999, ep=1e-7):
      # Initialize optimizer parameters and variable slots
      self.beta_1 = beta_1
      self.beta_2 = beta_2
      self.learning_rate = learning_rate
      self.ep = ep
      self.t = 1.
      self.v_dvar, self.s_dvar = [], []
      self.built = False

    def apply_gradients(self, grads, vars):
      # Initialize variables on the first call
      if not self.built:
        for var in vars:
          v = tf.Variable(tf.zeros(shape=var.shape))
          s = tf.Variable(tf.zeros(shape=var.shape))
          self.v_dvar.append(v)
          self.s_dvar.append(s)
        self.built = True
      # Update the model variables given their gradients
      for i, (d_var, var) in enumerate(zip(grads, vars)):
        self.v_dvar[i].assign(self.beta_1*self.v_dvar[i] + (1-self.beta_1)*d_var)
        self.s_dvar[i].assign(self.beta_2*self.s_dvar[i] + (1-self.beta_2)*tf.square(d_var))
        v_dvar_bc = self.v_dvar[i]/(1-(self.beta_1**self.t))
        s_dvar_bc = self.s_dvar[i]/(1-(self.beta_2**self.t))
        var.assign_sub(self.learning_rate*(v_dvar_bc/(tf.sqrt(s_dvar_bc) + self.ep)))
      self.t += 1.
      return

In [None]:
def train_step(x_batch, y_batch, loss, acc, model, optimizer):
  # Update the model state given a batch of data
  with tf.GradientTape() as tape:
    y_pred = model(x_batch)
    batch_loss = loss(y_pred, y_batch)
  batch_acc = acc(y_pred, y_batch)
  grads = tape.gradient(batch_loss, model.variables)
  optimizer.apply_gradients(grads, model.variables)
  return batch_loss, batch_acc

def val_step(x_batch, y_batch, loss, acc, model):
  # Evaluate the model on given a batch of validation data
  y_pred = model(x_batch)
  batch_loss = loss(y_pred, y_batch)
  batch_acc = acc(y_pred, y_batch)
  return batch_loss, batch_acc

In [None]:
def train_model(mlp, train_data, val_data, loss, acc, optimizer, epochs):
  # Initialize data structures
  train_losses, train_accs = [], []
  val_losses, val_accs = [], []

  # Format training loop and begin training
  for epoch in range(epochs):
    batch_losses_train, batch_accs_train = [], []
    batch_losses_val, batch_accs_val = [], []

    # Iterate over the training data
    for x_batch, y_batch in train_data:
      # Compute gradients and update the model's parameters
      batch_loss, batch_acc = train_step(x_batch, y_batch, loss, acc, mlp, optimizer)
      # Keep track of batch-level training performance
      batch_losses_train.append(batch_loss)
      batch_accs_train.append(batch_acc)

    # Iterate over the validation data
    for x_batch, y_batch in val_data:
      batch_loss, batch_acc = val_step(x_batch, y_batch, loss, acc, mlp)
      batch_losses_val.append(batch_loss)
      batch_accs_val.append(batch_acc)

    # Keep track of epoch-level model performance
    train_loss, train_acc = tf.reduce_mean(batch_losses_train), tf.reduce_mean(batch_accs_train)
    val_loss, val_acc = tf.reduce_mean(batch_losses_val), tf.reduce_mean(batch_accs_val)
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    print(f'Epoch: {epoch}')
    print(f'Training loss: {train_loss:.3f}, Training accuracy: {train_acc:.3f}')
    print(f'Validation loss: {val_loss:.3f}, Validation accuracy: {val_acc:.3f}')
  return train_losses, train_accs, val_losses, val_accs

In [None]:
train_losses, train_accs, val_losses, val_accs = train_model(
                                                     mlp_model, 
                                                     train_data, 
                                                     val_data, 
                                                     loss=cross_entropy_loss, acc=accuracy,
                                                     optimizer=Adam(), 
                                                     epochs=10
                                                 )

In [None]:
def plot_metrics(train_metric, val_metric, metric_type):
  # Visualize metrics vs training Epochs
  plt.figure()
  plt.plot(range(len(train_metric)), train_metric, label = f'Training {metric_type}')
  plt.plot(range(len(val_metric)), val_metric, label = f'Validation {metric_type}')
  plt.xlabel('Epochs')
  plt.ylabel(metric_type)
  plt.legend()
  plt.title(f'{metric_type} vs Training epochs');

In [None]:
plot_metrics(train_losses, val_losses, 'cross entropy loss')

In [None]:
plot_metrics(train_accs, val_accs, 'accuracy')

In [None]:
class ExportModule(tf.Module):
  def __init__(self, model, preprocess, class_pred):
    # Initialize pre and postprocessing functions
    self.model = model
    self.preprocess = preprocess
    self.class_pred = class_pred

  @tf.function(input_signature=[tf.TensorSpec(shape=[None, None, None, None], dtype=tf.uint8)]) 
  def __call__(self, x):
    # Run the ExportModule for new data points
    x = self.preprocess(x)
    y = self.model(x)
    y = self.class_pred(y)
    return y

In [None]:
def preprocess_test(x):
  # The export module takes in unprocessed and unlabeled data
  x = tf.reshape(x, shape=[-1, 784])
  x = x / 255
  return x

def class_pred_test(y):
  # Generate class predictions from MLP output
  return tf.argmax(tf.nn.softmax(y), axis=1)

In [None]:
mlp_model_export = ExportModule(
                       model=mlp_model,
                       preprocess=preprocess_test,
                       class_pred=class_pred_test
                   )

In [None]:
models = tempfile.mkdtemp()
save_path = os.path.join(models, 'mlp_model_export')
tf.saved_model.save(mlp_model_export, save_path)

In [None]:
mlp_loaded = tf.saved_model.load(save_path)

In [None]:
def accuracy_score(y_pred, y):
  # Generic accuracy function
  is_equal = tf.equal(y_pred, y)
  return tf.reduce_mean(tf.cast(is_equal, tf.float32))

x_test, y_test = tfds.load('mnist', split=['test'], batch_size=-1, as_supervised=True)[0]
test_classes = mlp_loaded(x_test)
test_acc = accuracy_score(test_classes, y_test)
print(f'Test Accuracy: {test_acc:.3f}')

In [None]:
print('Accuracy breakdown by digit:')
print('---------------------------')
label_accs = {}
for label in range(10):
  label_ind = (y_test == label)
  # extract predictions for specific true label
  pred_label = test_classes[label_ind]
  labels = y_test[label_ind]
  # compute class-wise accuracy
  label_accs[accuracy_score(pred_label, labels).numpy()] = label
for key in sorted(label_accs):
  print(f'Digit {label_accs[key]}: {key:.3f}')

In [None]:
def show_confusion_matrix(test_labels, test_classes):
  # Compute confusion matrix and normalize
  plt.figure(figsize=(10,10))
  confusion = sk_metrics.confusion_matrix(test_labels.numpy(), 
                                          test_classes.numpy())
  confusion_normalized = confusion / confusion.sum(axis=1, keepdims=True)
  axis_labels = range(10)
  ax = sns.heatmap(
      confusion_normalized, xticklabels=axis_labels, yticklabels=axis_labels,
      cmap='Blues', annot=True, fmt='.4f', square=True)
  plt.title('Confusion matrix')
  plt.ylabel('True label')
  plt.xlabel('Predicted label')

show_confusion_matrix(y_test, test_classes)

# Building a trainable multi layer perceptron (MLP) using stochastic auto grad (SAG)

## Activation functions

In [None]:
def sigmoid(x, derivative=False):
    '''
    Parameters:
      x: input
      derivative: boolean to specify if the derivative of the function should be computed
    '''
    if derivative:
        return (x * (1 - x))
    return (1 / (1 + np.exp(-x)))

def ReLU(x, derivative=False):
  if derivative:
      return np.where(x < 0, 0, 1)
  x_relu = np.maximum(x, 0)
  return x_relu

def relu(x, derivative=False):
  return ReLU(x, derivative=derivative)

def tanh(x, derivative=False):
    if (derivative == True):
        return (1 - (x ** 2))
    return np.tanh(x)

## Function to return one-hot-encoded labels

In [None]:
def one_hot_encoded_labels(labels):
    out_labels = []
    max_val = max(labels) + 1
    for label in labels:
        out_labels.append([1 if label == i else 0 for i in range(max_val)])
    return np.array(out_labels)

## Function to compute the partial derivative using stochastic automatic differentiation

In [None]:
def compute_stochastic_auto_differentiation(x, input_fn, epsilon, num_eval_steps=10):
    auto_diff_out = 0
    for i in range(num_eval_steps):
        fn_x = input_fn(x)

        rand_epsilon_a = random.SystemRandom().uniform(abs(epsilon), (abs(epsilon) / 100))
        rand_epsilon_b = random.SystemRandom().uniform(abs(epsilon), (abs(epsilon) / 100))

        x_a = x + rand_epsilon_a
        x_b = x + rand_epsilon_b

        x_delta = x_a - x_b
        
        fn_auto_diff_a = input_fn(x_a)
        fn_auto_diff_b = input_fn(x_b)

        auto_diff_out += (fn_auto_diff_a - fn_auto_diff_b) / x_delta
        auto_diff_out += (fn_auto_diff_a - fn_x)           / rand_epsilon_a
        auto_diff_out += (fn_auto_diff_b - fn_x)           / rand_epsilon_b

    auto_diff_out = auto_diff_out / (3 * num_eval_steps)

    return auto_diff_out

## Stochastic automatic gradient (SAG) for a neural network
Function to compute the layer gradient using stochastic automatic differentiation.

In [None]:
def stochastic_auto_gradient(layer, layer_error, activation_fn, epsilon):
    return layer_error * compute_stochastic_auto_differentiation(layer, activation_fn, epsilon)

## Training layers using forward pass and back-propagation

In [None]:
def forward_pass(input_data, synapse_list, bias_val_list, activation_fn):
    layer_list = [input_data]
    for i, synapse in enumerate(synapse_list):
        layer_list.append(activation_fn(np.dot(layer_list[i], synapse) + bias_val_list[i]))
    return layer_list

def back_propagation(output_data, layer_list, synapse_list, learning_rate, verbose=False):
    output_loss_derivative = output_data - layer_list[-1]
    bprop_loss = np.mean(np.abs(output_loss_derivative))
    if verbose:
        print ('Prediction error during training : ' + str(bprop_loss))

    layer_error = learning_rate * output_loss_derivative

    synapse_list = list(reversed(synapse_list))
    layer_list = list(reversed(layer_list))

    for i, layer in enumerate(layer_list):
        if i + 1 < len(layer_list):
            layer_delta = stochastic_auto_gradient(layer, layer_error, activation_fn, 1e-8)
            layer_error = layer_delta.dot(synapse_list[i].T)
            synapse_list[i] += layer_list[i + 1].T.dot(layer_delta)

    synapse_list = list(reversed(synapse_list))
    layer_list = list(reversed(layer_list))

    return synapse_list, layer_list, bprop_loss

## Train XOR gate solver

In [None]:
x = np.asarray([[0, 0],
                [1, 1],
                [1, 0],
                [0, 1]])
y = np.asarray([[0],
                [0],
                [1],
                [1]])

print(x.shape, y.shape)

input_dim = x.shape[1]
output_dim = y.shape[1]
hidden_units_list = [input_dim, 8, output_dim]
bias_val_list = [1, 1, 1, 1]
synapse_list = []
for i, unit_size in enumerate(hidden_units_list):
    if i + 1 < len(hidden_units_list):
        synapse_list.append(2 * np.random.random((unit_size, hidden_units_list[i + 1])) - bias_val_list[i])

print(synapse_list[0].shape)

training_steps = 10000

loss_col = []
learning_rate = 1
activation_fn = sigmoid # tanh # relu # 

for t in range(training_steps):
    input_data, output_data = x, y
    layer_list = forward_pass(input_data, synapse_list, bias_val_list, activation_fn)
    synapse_list, layer_list, loss = back_propagation(output_data, layer_list, synapse_list, learning_rate, verbose=(t + 1) % (0.1 * training_steps) == 0)
    loss_col.append(bprop_loss)
print ('Training completed ...')
print (layer_list[-1])

## Train MNIST classifier

In [None]:
input_dim = 28 * 28
output_dim = 10

hidden_units_list = [input_dim, 16, 32, output_dim]
bias_val_list = [1, 1, 1, 1]
synapse_list = []
for i, unit_size in enumerate(hidden_units_list):
    if i + 1 < len(hidden_units_list):
        synapse_list.append(2 * np.random.random((unit_size, hidden_units_list[i + 1])) - bias_val_list[i])

loss_col = []

training_steps = 32
learning_rate = 1e-3
activation_fn = sigmoid # tanh # relu #

best_synapse_list, best_layer_list = [], []
loss_plateau_patience = 5
loss_plateau = 0
for t in range(training_steps):
    for x_batch, y_batch in train_data:
        input_data, output_data = x_batch.numpy(), y_batch.numpy()
        if loss_plateau_patience < loss_plateau and len(best_synapse_list) > 0:
            layer_list = forward_pass(input_data, best_synapse_list, bias_val_list, activation_fn)
        else:
            layer_list = forward_pass(input_data, synapse_list, bias_val_list, activation_fn)
        output_data = one_hot_encoded_labels(output_data)
        synapse_list, layer_list, loss = back_propagation(output_data, layer_list, synapse_list, learning_rate)
        if len(loss_col) > 0 and loss < min(loss_col) and loss_plateau > 0:
            best_synapse_list, best_layer_list = synapse_list, layer_list
        else:
            loss_plateau += 1
        loss_col.append(loss)
print ('Training completed ...')

In [None]:
for i in range(10):
    print(np.argmax(forward_pass(input_data[i], best_synapse_list, bias_val_list, activation_fn)[-1]), y_batch.numpy()[i])
    plt.imshow(input_data[i].reshape(28, 28))
    plt.show()