# Gradient test - compare derivatives of neuron output with internal gradient measurements of a model built with the NeuralLayer class and the Model class

# A Neural Layer Class

<p> See <a href="twoLayerXOR.pdf">documentation</a> for a discussion on the derivation of the classes used in this notebook. </p>


In [1]:
%load_ext pycodestyle_magic

In [2]:
%flake8_on --max_line_length 119

In [3]:
import pickle
import tensorflow as tf


class NeuralLayer:
    '''
    Class for defining a layer of nuerons that can have multiple inputs and neurons/outputs - a neuron can only
    have one output, but it may server as inputs to many neurons in the next layer.  The equations used in this
    class assume that the bias term is included in the weights vector and that the input to the weight is 1.0.
    The equations assume that the loss function is the sum of the squares of the error where error is defined as
    the difference between a known target value and the output of the Psi function.  The Psi function is
    1/(1+e^-z).  And z, also known as net, is the sum of product of the weights and inputs (which includes the bias).
    '''
    def __init__(self, numberOfInputs, numberOfOutputs, learningFactor, id="me", debug=False, filePath=None):
        '''
        Nueron constructor - uses tensors - last weight is the bias term
        '''
        self.id = id
        self.debug = debug
        self.numberOfInputs = numberOfInputs
        self.numberOfNeurons = numberOfOutputs
        self.backPropagatedErrorNotSet = True
        self.learningFactor = learningFactor
        self.normalizer = 2.0
        self.delta = [0.0] * self.numberOfNeurons
        self.weights = tf.random.uniform([numberOfInputs+1, numberOfOutputs], minval=-0.5, maxval=0.5,
                                         dtype=tf.dtypes.float32)
        self.error = [0.0] * numberOfOutputs
        self.filePath = filePath
        if filePath is not None:
            try:
                fileHandle = open(filePath, "rb")
                self.weights = pickle.load(fileHandle)
                fileHandle.close()
            except FileNotFoundError:
                pass

    def storeLayer(self, filePath):
        '''
        Store the weights that have been trained
        '''
        fileHandle = open(filePath, "wb")
        pickle.dump(self.weights, fileHandle)
        fileHandle.close()

    def calculateOutput(self, inputs):
        '''
        Given the inputs, calculate the outputs
        '''
        self.inputs = tf.concat([inputs, [1.0]], 0)
        self.outputs = self.psi(self.netAKAz())
        return self.outputs

    def netAKAz(self):
        '''
        Calculate the sum of the product of the weights and the inputs and add to the bia - this is net AKA z
        '''
        return tf.tensordot(self.inputs, self.weights, 1)

    def psi(self, z):
        '''
        Apply the logistic function, ψ, to the outputs
        '''
        return 1.0 / (1.0 + tf.exp(-z))

    def netWRTWeight(self, index):
        '''
        ∂zᵢ/∂wᵢ = inputᵢ  -- the change in neuron output with respect to a weight
        '''
        return self.inputs[index]

    def netWRTWeightVector(self):
        '''
        ∂zᵢ/∂wᵢ = inputᵢ  -- the change in neuron output with respect to a weight - this is a vector
        '''
        return self.inputs

    def psiWRTz(self, index):
        '''
        ∂ψᵢ/∂zᵢ = ψᵢ*(1-ψᵢ) where ψ = 1 / (1 + e^(-z)) -- the partial change of ψ with respect to z - this
        is a scalar - must designate output index
        '''
        return self.outputs[index]*(1 - self.outputs[index])

    def errorWRTPsi(self, targetArray, index):
        '''
        ∂Eᵢ/∂ψᵢ =  -(targetOutput - ψᵢ)  # assuming that E is square of the error and ignoring the gain (2) -
        this is a scalar must designate output index
        '''
        if (self.backPropagatedErrorNotSet):
            targetOutput = targetArray[index]
            self.error[index] = - (self.normalizer * (targetOutput - self.outputs[index]))
        else:
            pass  # should have been set by a higher layer
        return self.error[index]

    def updateWeights(self, target=None, deltas=None):
        '''
        Update the weights to minimize the loss - if in batch mode, the deltas have been accumulated by updateDeltas
        '''
        if deltas is None:
            deltas = self.updateDeltas(target)
        self.weights -= self.learningFactor * tf.transpose(deltas)

    def updateDeltas(self, target, deltas=None):
        '''
        Update the deltas during batch processing
        '''
        for neuron in range(self.numberOfNeurons):
            if neuron == 0:
                deltaDeltas = tf.reshape(tf.convert_to_tensor(self.errorWRTPsi(target, neuron)
                                                              * self.psiWRTz(neuron)
                                                              * self.netWRTWeightVector()),
                                         [1, len(self.netWRTWeightVector())])  # make a 1 by n vector
            else:
                deltaDeltas = tf.concat((deltaDeltas, [self.errorWRTPsi(target, neuron)
                                                       * self.psiWRTz(neuron)
                                                       * self.netWRTWeightVector()]), 0)  # tack on a new row
            if self.debug:
                print("updateDeltas - layer {}, neuron {}, weight deltaDeltas\n{}".
                      format(self.id, neuron, deltaDeltas))

        if deltas is None:
            deltas = deltaDeltas
        else:
            deltas += deltaDeltas
        self.propagateError()  # do this before updating weights
        return deltas

    def propagateError(self):
        '''
        Determine error to send to previous layers
        For each neuron, determine the amount of error at it's output that needs to be applied to the input
        which is the output of the previous level.  Those individual neuron amounts then need to be summed
        across all neurons.
        '''
        previousLayerNeuronError = [0.0] * (self.numberOfInputs + 1)
        for thisLayerNeuron in range(self.numberOfNeurons):
            error = self.error[thisLayerNeuron]
            amountForEachPreviousLayerNeuron = error * self.weights[:, thisLayerNeuron] * self.psiWRTz(thisLayerNeuron)
            if self.debug:
                print("sum of weights for neurons at this layer: {}".
                      format(tf.reduce_sum(self.weights[:, thisLayerNeuron])))
                print("propagateError - in layer {}, neuron {}, contribution:{}".
                      format(self.id, thisLayerNeuron, amountForEachPreviousLayerNeuron))
                print("propagateError - Error {}, weights {}".format(error, self.weights[:, thisLayerNeuron]))
            previousLayerNeuronError += amountForEachPreviousLayerNeuron
        self.errorForNextLayer = previousLayerNeuronError
        if self.debug:
            print("propagateError - in layer {}, the next layer's error will be\n {}".
                  format(self.id, previousLayerNeuronError))

    def setPropagationError(self, error):
        '''
        From a higher layer, set the error propogating back to this layer
        '''
        self.error = error
        if self.debug:
            print("setPropagationError - setting propagation error in layer {} to\n {}".
                  format(self.id, self.error))
        self.backPropagatedErrorNotSet = False

    def setLearningFactor(self, factor):
        '''
        Setter for learning factor
        '''
        self.learningFactor = factor



# A Model Class


In [4]:
import tensorflow as tf


class Model:
    '''
    Class for defining a model that consists of neural layers.  The first layer is always the inputs, which
    means that it does not exist as a NeuralLayer.  All subsequent layers are completely interconnected
    except for the final layer.
    '''
    def __init__(self, inputOutputList, debug=False, filePath=None):
        '''
        Model constructor - contrusts layers from the list entries
        '''
        self.layers = []
        layerIndex = 0

        for entryTuple in inputOutputList:
            layerid = None
            if isinstance(entryTuple[-1], str):
                layerid = entryTuple[-1]
            inputs = entryTuple[0]
            outputs = entryTuple[1]
            learningFactor = entryTuple[2]
            if filePath is None:
                weightFilePath = filePath
            else:
                weightFilePath = filePath + str(layerIndex)
                layerIndex += 1
            self.layers.append(NeuralLayer(inputs, outputs, learningFactor, layerid, debug, weightFilePath))

    def storeModel(self, filePath):
        '''
        Store the weights for all of the layers of this model
        '''
        layerIndex = 0
        for layer in self.layers:
            layer.storeLayer(filePath + str(layerIndex))
            layerIndex += 1

    def feedForward(self, inputs):
        '''
        Given the inputs, propagate them through the model layers
        '''
        layerOutputs = inputs
        for aLayer in self.layers:
            layerOutputs = aLayer.calculateOutput(layerOutputs)
        return layerOutputs

    def updateDeltas(self, target, deltas=None):
        '''
        Update the deltas in all the layers
        '''
        reversedLayers = self.layers.copy()
        reversedLayers.reverse()
        lastLayer = len(reversedLayers) - 1
        newDeltaList = False
        if deltas is None:
            deltas = []  # make a list of deltas, one for each layer
            newDeltaList = True
        for index, layer in enumerate(reversedLayers):
            if newDeltaList:
                deltas.append(layer.updateDeltas(target))
            else:
                deltas[index] = layer.updateDeltas(target, deltas=deltas[index])
            if index < lastLayer:
                reversedLayers[index+1].setPropagationError(layer.errorForNextLayer)
        return deltas

    def updateWeights(self, target=None, deltas=None):
        '''
        Update the weights and propagate the error of all layers
        '''
        reversedLayers = self.layers.copy()
        reversedLayers.reverse()
        lastLayer = len(reversedLayers) - 1
        for index, layer in enumerate(reversedLayers):
            if deltas is None:
                layer.updateWeights(target)
                if index < lastLayer:
                    reversedLayers[index+1].setPropagationError(layer.errorForNextLayer)
            else:
                layer.updateWeights(deltas=deltas[index])


# Gradient test - approach: set initial conditions, perturb a weight, measure its affect, compare it to the gradient

In [5]:
import inspect
import matplotlib.pyplot as plt
import tensorflow as tf


def printBasicInfo(one, two, layer, neuron, weight, deltas, model):
    '''
    print summary gradient information
    '''
    top = len(model.layers) - 1
    print("++++++++")
    print("layer {}, neuron {}, weight {}".format(layer, neuron, weight))
    print("layer {}, input: {}".format(layer, model.layers[layer].inputs))
    print("First Output {}, Second Output {}".format(one, two))
    calcG = (two*two - one*one)/0.001
    ratio = "--"
    if calcG != 0.0:
        ratio = deltas[top-layer][neuron][weight]/calcG
    print("Model Gradient: {}, Calculated Gradient {}, ratio {}".
          format(deltas[top-layer][neuron][weight], calcG, ratio))
    print("++++++++")


def getLocation():
    '''
    get the line number of the print
    '''
    callerFrameRecord = inspect.stack()[1]
    frame = callerFrameRecord[0]
    info = inspect.getframeinfo(frame)
    returnString = "trace from: " + str(info.function) + ", line: " + str(info.lineno) + " ----> "
    return returnString


def setWeight(weights, layer, neuron, weight, value, model):
    '''
    set one of the weights to a slightly different value
    '''
    rowsCols = tf.shape(weights[layer])
    # the row is the weight, the column is the neuron
    vector = [0.1] * (rowsCols[1] * rowsCols[0]).numpy()
    vector[weight * rowsCols[1].numpy() + neuron] = value
    weights[layer] = tf.reshape(vector, rowsCols)
    model.layers[layer].weights = weights[layer]
    return weights


def generalizedGradientSweep(modelDefinition, debug=False):
    '''
    Using the model definition, find the gradient at every weight within the model
    '''
    numberOfLayers = len(modelDefinition)
    model = Model(modelDefinition, debug)
    weightShape = []
    for layer in range(numberOfLayers):
        weightShape.append(tf.shape(model.layers[layer].weights))
        if debug:
            print("{} weight shape at layer {} is {}".format(getLocation(), layer, weightShape[-1]))
    weightsForLayers = []
    for layer in range(numberOfLayers):
        weightsThisLayer = weightShape[layer][0] * weightShape[layer][1]
        vectorOfWeights = [0.1] * weightsThisLayer.numpy()
        vectorOfWeights = tf.convert_to_tensor(vectorOfWeights)
        matrixOfWeights = tf.reshape(vectorOfWeights, weightShape[layer])
        weightsForLayers.append(matrixOfWeights)
    if debug:
        print("{} initial weight settings: {}".format(getLocation(), weightsForLayers))
    for layer in range(numberOfLayers):
        model.layers[layer].weights = weightsForLayers[layer]
        if debug:
            print("{} model weights for layer {}:\n{}".format(getLocation(), layer, model.layers[layer].weights))
    vectorOfInput = [0.5] * (tf.shape(weightsForLayers[0])[0].numpy() - 1)
    if debug:
        print("{} vectorOfInput: {}, type: {}".format(getLocation(), vectorOfInput, type(vectorOfInput)))
    output0 = model.feedForward(vectorOfInput)
    for layer in range(numberOfLayers):
        numberOfNeurons = weightShape[layer][1]
        numberOfWeights = weightShape[layer][0]
        if debug:
            print("{} in layer: {} number of neurons: {}, number of weights: {}".
                  format(getLocation(), layer, numberOfNeurons, numberOfWeights))
        for neuron in range(numberOfNeurons):
            for weight in range(numberOfWeights):
                weightsForLayers = setWeight(weightsForLayers, layer, neuron, weight, 0.101, model)
                output1 = model.feedForward(vectorOfInput)
                deltas = model.updateDeltas([0.0])
                if debug:
                    print("{} weightsForLayer\n{}".format(getLocation(), weightsForLayers))
                    print("{} deltas\n{}".format(getLocation(), deltas))
                printBasicInfo(output0, output1, layer, neuron, weight, deltas, model)
                weightsForLayers = setWeight(weightsForLayers, layer, neuron, weight, 0.1, model)


def main():
    '''
    Gradient test
    '''
    generalizedGradientSweep([(2, 3, 1000.0, "0"), (3, 1, 1000.0, "1")])


if __name__ == '__main__':
    main()

++++++++
layer 0, neuron 0, weight 0
layer 0, input: [0.5 0.5 1. ]
First Output [0.56585276], Second Output [0.5658558]
Model Gradient: 0.003440552158281207, Calculated Gradient [0.00342727], ratio [1.0038763]
++++++++
++++++++
layer 0, neuron 0, weight 1
layer 0, input: [0.5 0.5 1. ]
First Output [0.56585276], Second Output [0.5658558]
Model Gradient: 0.003440552158281207, Calculated Gradient [0.00342727], ratio [1.0038763]
++++++++
++++++++
layer 0, neuron 0, weight 2
layer 0, input: [0.5 0.5 1. ]
First Output [0.56585276], Second Output [0.56585884]
Model Gradient: 0.006880785804241896, Calculated Gradient [0.00688434], ratio [0.9994843]
++++++++
++++++++
layer 0, neuron 1, weight 0
layer 0, input: [0.5 0.5 1. ]
First Output [0.56585276], Second Output [0.5658558]
Model Gradient: 0.003440552158281207, Calculated Gradient [0.00342727], ratio [1.0038763]
++++++++
++++++++
layer 0, neuron 1, weight 1
layer 0, input: [0.5 0.5 1. ]
First Output [0.56585276], Second Output [0.5658558]
Mod