<a href="https://colab.research.google.com/github/jackjameswillis/BSc-Final-Year-Project/blob/main/Cascade_Correlation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Important Notes

This notebook is incomplete. Known problems:

*  Custom loss function does not recieve batches, just single instances of label/logit pairs(?).
*   Loss function is incorrect.
*   Hidden layer optimiser implements gradient descent when it should be ascent.







We begin by importing libraries.

In [79]:
import tensorflow as tf
import numpy as np

Also define a seed to keep track of the order of data after shuffling.

In [80]:
# Seed required for tracking and altering inputs from input function
# using values generated per layer. Seed value is candidate no.
seed = 201338 

We download the MNIST dataset.

In [81]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(path='mnist.npz')

The Cascade Correlation algorithm requires the manipulation of input data. Therefore, our data generation function is quite important. 

In [96]:
# The key for the input features. We have one set of inputs,
# these being the flattened images, so we provide one key.
feat_key = "x"

# We use this generator function to produce the dataset later.
def generator(X, Y):

  def _gen():

    for x, y in zip(X, Y):

      yield tf.reshape(x, [len(x)]), y

  return _gen

# We map this function onto elements of the dataset 
# to make it compatible with the expected format of 
# input data.
def featurise(x, y):
  
  features = {feat_key: x}

  return features, y

# This input function returns another function that returns a tensorflow dataset
# which is used to feed data into models. We have parameters on this function
# generator that allow us to indicate and manipulate features and labels.
def input_fn(partition, batch_size, repeat=False, alter_x=False, x_alterations=False, alter_y=False, y_alterations=False):

  if partition == 'train':

    x, y = (x_train, y_train)

  elif partition == 'debug':

    x, y = (x_train[:1], y_train[:1])
    
  else:

    x, y = (x_test, y_test)

  # We take whichever of training and testing data
  # and first randomize the their order.
  rng = np.random.default_rng(seed=seed)

  rng.shuffle(x)

  rng = np.random.default_rng(seed=seed)

  rng.shuffle(y)

  # In this section we check to see if data needs to be altered. In the case 
  # where we want to alter the labels, we simply assign the given alterations
  # to be the labels. This facilitates the alternating training between
  # regression in the hidden layers and classification in the output layer.
  if alter_y: 

    y = np.asarray(y_alterations)


  # In the case where we want to alter the features, we append given alterations
  # onto each pre-existing feature set. This facilitates the increasing input
  # size for each neuron added to the model. Aside from alterations, this code
  # also squishes and flattens the features.
  _x = []

  # If we want to alter features and have alterations:  
  if alter_x and len(x_alterations) != 0:

    for i in range(len(x)):

      _x.append(x[i]/255)

      _x[i] = np.concatenate((_x[i].flatten(),x_alterations[i]))
    
    x = np.asarray(_x)
      
  else:

    for i in range(len(x)):

      _x.append(x[i]/255)

      _x[i] = _x[i].flatten()
      
    x = np.asarray(_x)

  # This is the function returned, which generates the 
  # dataset from the preprocessed data.
  def _input_fn():
    
    # Make sure that our generator is producing a dateset of the correct data 
    # type depending on alterations made to the labels
    y_type = tf.float32 if alter_y else tf.int32

    # Use our generator function to produce the dataset
    # with the appropriate data sizes and data types.
    dataset = tf.data.Dataset.from_generator(generator(x, y), (tf.float32, y_type), ((len(x[0])), ()))

    # If we are training, we want training to continue
    # after we reach the final batch of the dataset. We
    # achieve this by 'repeating' the dataset.
    if repeat: dataset = dataset.repeat()

    # Apply featre function to data and batch according
    # to batch_size parameter.
    dataset = dataset.map(featurise).batch(batch_size)

    # Return the dataset ready to train on.
    return dataset

  return _input_fn

In [97]:
# This functions is useful for calculating error in classification by converting
# model output to binary.
def binary_representation(Y):

  return np.asarray([[1 if i==y else 0 for i in range(10)] for y in Y])

The Cascade Correlation algorithm comprises of a number of key steps:
* Train a model on top that is connected to each input and each layer.
* Get the errors of the top model.
* Train a new hidden unit to maximise the correlation between its output and
the error of the top model with inputs of the original input features and all previous hidden units.
* Freeze the new hidden unit's weights.
* Repeat.

Our code needs to facilitate this. Our input function generator allows us to provide the different types of unit in our system with the data they need. We now need to construct the architecture and feed the data into it. 

In [98]:
class CassCor:

  # Each hidden unit effectively corresponds to a new hidden layer.
  layers = []

  # The top layer is treated independently of the rest since it has a unique
  # function.
  top_layer = None

  top_head = None

  top_feature_columns = None

  input_dimension = None

  def __init__(self, feature_columns, top_head, input_dimension):

    self.top_head = top_head

    self.top_feature_columns = feature_columns

    self.input_dimension = input_dimension

  # The feedforward of this architecture requires us to produce a new dataset
  # for each layer.
  def feedforward_layers(self, partition, batch_size):

    additions = []

    y = None

    # For each hidden unit.
    for l in self.layers:

      # Generate the input function.
      current_input_fn = input_fn(
          
          partition=partition,

          batch_size=batch_size,

          alter_x=True,

          x_alterations=additions)

      # Get its predictions
      logits = list(l.predict(input_fn=current_input_fn))

      # If there are already hidden unit logits, add the new logits to the list.
      if additions:
        
        additions = [additions[i] + list(logits[i]['predictions']) for i in range(len(logits))]

      # If there are no hidden unit logits yet, create the appropriate structure
      # for the new logits.
      else:

        additions = [list(logits[i]['predictions']) for i in range(len(logits))]

    return y, additions
  
  # The residual errors are an important part of the loss function for the
  # hidden units of this system.
  def residual_errors(self, partition, batch_size):

    # Generate the required feature additions for the top layer.
    _, additions = self.feedforward_layers(
        
        partition=partition,
        
        batch_size=batch_size)
    
    # Generate an input function used to get top layer output.
    predict_input_fn = input_fn(
        
        partition=partition,

        batch_size=batch_size,

        alter_x=True,

        x_alterations=additions)
    
    # Generate predictions of the top layer on the generated input function.
    predictions = self.top_layer.predict(
        
        input_fn=predict_input_fn)
    
    # Get the classification for each input.
    predictions = [prediction['class_ids'] for prediction in predictions]
    
    # Get the appropriate labels for calculating error.
    if partition == 'train':

      Y = y_train

    else:

      Y = y_test

    # Shuffle the labels in the same way they were shuffled in the 
    # input function.
    rng = np.random.default_rng(seed=seed)

    rng.shuffle(Y)
    
    # Calculate the error.
    error = np.asarray([1 if Y[i] == predictions[i][0] else 0 for i in range(len(predictions))])

    # Use the error to calculate the mean error.
    mean_error = sum(error)/len(error)

    # Return the residual error.
    return error - mean_error

  # This function simply re-trains the top layer, which involves producing a 
  # new classifier. The top layer is then evaluated on a testing input function.
  def train_test_top(self, train_input_fn, test_input_fn, steps):

    self.top_feature_columns = [tf.feature_column.numeric_column(key=feat_key, shape=[self.input_dimension + len(self.layers)])]

    self.top_layer = tf.estimator.DNNEstimator(
        
        head=self.top_head,

        hidden_units=[],

        feature_columns=self.top_feature_columns,

        activation_fn=tf.nn.sigmoid,

        optimizer='SGD')

    self.top_layer.train(
        
        input_fn=train_input_fn,

        max_steps=steps)

    results = self.top_layer.evaluate(
        
        input_fn=test_input_fn,
        
        steps=None)
    
    print("Accuracy:", results["accuracy"])

    print("Loss:", results["average_loss"])

  # This function trains a new layer. It begins by defining a regressor to the
  # required specification, trains on the provided training input function,
  # and appends the regressor onto the layers of the model.
  def train_layer(self, train_input_fn, loss_fn, steps):

    new_layer_feature_columns = [tf.feature_column.numeric_column(key=feat_key, shape=[self.input_dimension + len(self.layers)])]

    new_layer_head = tf.estimator.RegressionHead(label_dimension=1, loss_fn=loss_fn,)

    new_layer = tf.estimator.DNNEstimator(
        
        head=new_layer_head,

        hidden_units=[],

        feature_columns=new_layer_feature_columns,

        activation_fn=tf.nn.sigmoid,
        
        optimizer='SGD')
    
    new_layer.train(
        
        input_fn=train_input_fn,

        max_steps=steps)
    
    self.layers.append(new_layer)
  
  # This function collects the other training functions together into one
  # coherent procedure. Until a total number of training steps has been met,
  # the top layer of the model is trained, followed by a new layer being trained.
  def train_test(self, batch_size, total_steps, steps_per_unit):

    # Since we train the top layer and a new layer we train two units each
    # iterations, this 2*steps_per_unit.
    for i in range(0,total_steps,2*steps_per_unit):
      
      # Generate additional features on training data.
      _, train_additions = self.feedforward_layers(
          
          partition='train',
          
          batch_size=batch_size)
      
      # Generate a new training input function based on those features.
      train_input_fn = input_fn(
          
          partition='train', 
          
          batch_size=batch_size, 
          
          repeat=True,

          alter_x=True, 
          
          x_alterations=train_additions)
      
      # Generate additional features on testing data.
      _, test_additions = self.feedforward_layers(
          
          partition='test',
          
          batch_size=batch_size)

      # Generate a new testing input function based on those features.
      test_input_fn = input_fn(
          
          partition='test', 
          
          batch_size=batch_size, 

          alter_x=True,
          
          x_alterations=test_additions)
      
      # Train and test the top layer.
      self.train_test_top(
          
          train_input_fn=train_input_fn, 
          
          test_input_fn=test_input_fn, 
          
          steps=steps_per_unit)
      
      # Calculate the residual errors of training the new layer.
      errors = self.residual_errors(
          
          partition='train',
          
          batch_size=batch_size)
      
      # Produce a new training input function for the new layer based on the
      # previously generated training additions and the generated errors.
      train_input_fn = input_fn(
          
          partition='train',

          batch_size=batch_size,

          repeat=True,

          alter_x=True,

          x_alterations=train_additions,

          alter_y=True,

          y_alterations=errors)
      
      # Generate and train the new layer.
      self.train_layer(
          
          train_input_fn=train_input_fn,

          loss_fn=loss_fn,

          steps=steps_per_unit)

The loss function will be defined as per the original paper.

In [91]:
def loss_fn(labels, logits):

  loss = tf.math.abs(logits * labels)

  return loss

In [92]:
feature_columns = [tf.feature_column.numeric_column('x', shape=(28*28))]

In [93]:
top_head = tf.estimator.MultiClassHead(n_classes=10)

In [94]:
model = CassCor(
    
    feature_columns=feature_columns,
    
    top_head=top_head,
    
    input_dimension=28*28)

In [None]:
model.train_test(
    
    batch_size=64,

    total_steps=5000,

    steps_per_unit=100)