In [2]:
import numpy as np;
from tqdm import trange;
import pandas as pd;

In [3]:
print( np.__version__ );

1.26.4


In [4]:
def xor_data():
    input_array = np.array( [ [ 0.0, 0.0 ], [ 0.0, 1.0 ], [ 1.0, 0.0 ], [ 1.0, 1.0 ] ] );
    output_array = np.array( [ [ 0.0 ], [ 1.0 ], [ 1.0 ], [ 0.0 ] ] );
    return ( input_array.astype('float32'), output_array.astype('float32') ),\
( input_array.astype('float32'), output_array.astype('float32') );

def load_data( name ):
    if name == "xor":
        return xor_data();

In [5]:
print( type( load_data( "xor" )[0][0] ) );

<class 'numpy.ndarray'>


In [6]:
class Loss( object ):
    def __init__( self, name : str ):
        self.name = name;

    def __call__( self, y_pred, y_true ):
        if self.name == "mse":
            return np.mean( ( y_pred - y_true ) ** 2 );
    
        elif self.name == "cross_entropy":
            return -np.mean( y_true * np.log( y_pred ) + ( 1 - y_true ) * np.log( 1 - y_pred ) );
        
        else:
            raise ValueError( "Invalid loss function" );

In [7]:
class BackProp( object ):
    def __init__( self, data : np.ndarray , labels : np.ndarray , trainable_parameters : dict, loss : Loss ):
        self.data = data;
        self.labels = labels;
        self.trainable_parameters = trainable_parameters;
        self.loss_fn = loss;

    def __call__( self ):
        # perform backwards computation

        print( self.trainable_parameters );
        #loss = self.loss_fn( y_pred, self.labels );
        #return grads, loss;


class GradientDescent(object):
    """
    Gradient Descent optimizer.
    """
    def __init__(self, parameters: dict):
        self.parameters = parameters;
        self.name = "Gradient Descent";
    
    def minimize( self, trainable_parameters, grads, loss_obj: Loss ):
        """
        """
        for key in trainable_parameters.keys():
            trainable_parameters[key] -= self.parameters['learning_rate'] * grads[key];
        return trainable_parameters;


In [8]:
class BaseLayer( object ):
    """
    Base class for the Layer class.
    """
    def __init__( self, hyperparams : dict, name : str  ):
        """
        Constructor for the Base Layer class.

        args:
            params : dict
            node_no : int

        returns:
            None

        attributes:
            self.params : dict
            self.node_no : int

        raises:
            None

        """
        self.hyperparams = hyperparams;
        self.name = name;

In [9]:
class InputLayer( BaseLayer ):
    """
    Input layer for the perceptron.
    """
    def __init__( self, hyperparams : dict, name: str  ):
        """
        Constructor for the Input Layer class.

        args:
            params : dict
            node_no : int

        returns:
            None

        attributes:
            self.bias : array

        raises:
            None
        """
        super( InputLayer, self ).__init__( hyperparams, name );
        self.node_no = hyperparams['input_units'];
    
    def output( self, inputs ):
        """
        Mirror the inputs.

        args:
            inputs : array

        returns:
            array

        attributes:
            None

        raises:
            None
        """
        return inputs;

    def __rshift__( self, other ):
        """
        Overwrite the right shift operator to connect the layers.

        args:
            other : object

        returns:
            None

        attributes:
            None

        raises:
            None
        """
        return NeuralNetwork( self, other );

In [10]:
class Layer( BaseLayer ):
    """
    Layer class for the perceptron.
    """
    def __init__( self, hyperparams : dict, name: str, transfer: str  ):
        """
        Constructor for the Layer class.

        args:
            name : str
            params : dict
            node_no : int
            transfer : str

        returns:
            None

        attributes:
            self.name : str
            self.transfer : str
            self.inputs : object
            self.outputs : object

        raises:
            None
        """
        super( Layer, self ).__init__( hyperparams, name );
        self.transfer = transfer;

        self.inputs = None;
        self.outputs = None;
    
        if "hidden" in name:
            self.node_no = hyperparams['hidden_units'];
        else:
            self.node_no = hyperparams['output_units'];
        
        #create self.bias to be an array of size node_no with random values from a normal distribution between -1 and 1
        self.bias = np.random.uniform( -1, 1, self.node_no );
        
        
    def __rshift__( self, other ):
        """
        Overwrite the right shift operator to connect the layers.

        args:
            other : object

        returns:
            None

        attributes:
            None

        raises:
            None
        """
        return NeuralNetwork( self, other );

    def __repr__( self ) -> str:
        """
        Overwrite the __repr__ method to return the name of the layer.

        args:
            None

        returns:
            str

        attributes:
            None

        raises:
            None
        """
        return self.name;
    
    def transfer_fx( self, inputs, deriv=False ):
        """
        Transfer function for the layer.

        args:
            inputs : array

        returns:
            array

        attributes:
            None

        raises:
            None
        """
        if deriv == True:
            return inputs * ( 1 - inputs );
        # implement the sigmoid transfer function
        return 1 / ( 1 + np.exp( -inputs ) );


    def output( self, inputs, deriv=False ):
        """
        Calculate the output with the activation function and inputs.

        args:
            inputs : array

        returns:
            array

        attributes:
            self.outputs : array

        raises:
            None
        """
        
        #self.activated = self.transfer_fx( self.inputs.output( inputs ) + self.bias );
        if deriv == True:
            self.activated = self.transfer_fx( inputs + self.bias, deriv=True );
        else:
            self.activated = self.transfer_fx( inputs + self.bias );
        return self.activated;

In [11]:
class WeightLayer( BaseLayer):
    """
    Weight layer for the perceptron.
    """
    
    def __init__( self, src : Layer, dest : Layer ) -> None:
        """
        Constructor for the weight layer.

        args:
            params : dict
            src : Layer
            dest : Layer

        returns:
            None

        attributes:
            self.src : Layer
            self.dest : Layer
            self.input_size : tuple
            self.output_size : tuple
            self.name : str
            self.weights : array

        raises: 
            None
        """
        self.src = src;
        self.dest = dest;

        #print( f"src: {src.node_no}, dest: {dest.node_no}" );
    
        #self.weights = np.random.randn( self.src.node_no, self.dest.node_no );
        #create self.weights to be a matrix of size src.node_no x dest.node_no with random values from a uniform distribution between -1 and 1
        self.weights = np.random.uniform( -1, 1, ( self.src.node_no, self.dest.node_no ) );
        self.name    = "W_%s_%s_layer" % ( self.src.name, self.dest.name );

        self.src.outputs = self;
        self.dest.inputs = self;

    def __repr__( self ) -> str:
        """
        Overwrite the __repr__ method to return the name of the layer.

        args:
            None

        returns:
            str

        attributes:
            None

        raises:
            None
        """
        return self.name;

    def output( self, inputs ):
        """
        Matrix multiplication between the inputs and the weights.

        args:
            inputs : array

        returns:
            array

        attributes:
            None

        raises:
            None
        """        
        return self.src.output( inputs ) @ self.weights;

In [266]:
class NeuralNetwork( object ):
    """
    This class respresents a Neural Networks.
    """
    
    def __init__( self, layer0, layer1 ):
        """
        Constructor for the Neural Network class.
        Creates a network with an input layer, layer0 and an output layer, layer1.

        args:
            layer0 : object
            layer1 : object

        returns:
            None

        attributes:
            self.hyperparams          : ( dict ) hyperparameters for the network.
            self.layers               : ( list ) list of layers in the network.
            self.layer_name           : ( list ) list of names of the layers in the network.
            self.input_layer          : ( object ) input layer of the network.
            self.output_layer         : ( object ) output layer of the network.
            self.weights              : ( list ) list of weights in the network.
            self.weight_names         : ( list ) list of names of the weights in the network.
            self.loss_fn              : ( object ) loss function for the network.
            self.optimizer            : ( object ) optimizer for the network.
            self.learnable_parameters : ( dict ) dictionary of learnable parameters in the network.

        raises:
            None 
        """
        # hyperparameters dictionary
        self.hyperparams = layer0.hyperparams;

        # layers
        self.layers = [ layer0, layer1 ];
        self.layer_name = [ layer0.name, layer1.name ];
        self.input_layer = layer0;
        self.output_layer = self.layers[-1];

        # weights
        self.weights = [ WeightLayer( layer0, layer1 ) ];
        self.weight_names = [ self.weights[0].name ];

        # loss function
        self.loss_fn = None;

        # optimizer
        self.optimizer = None;

        # initialize and populate learnable parameters used for learning
        self.trainable_parameters = {};
        for weight in self.weights:
            self.trainable_parameters[weight.name] = weight.weights;
        for layer in self.layers[1:]:
            self.trainable_parameters[layer.name] = layer.bias;

        

    def init_logs( self ):
        self.logs = pd.DataFrame();

        # set the columns of the dataframe
        self.logs = pd.DataFrame( columns = [
            #primary key 
            'epoch', 

            #weights
            'weight_name', 
            'weight_node_no',  
            'weight_value', 
            
            #layer
            'layer_name', 
            'layer_node_no',
            'bias',

            #matmul
            'pre_activation',

            #activations
            'activated',
            
            'hidden_losses', 
            'output_losses' 
            
            ] );

    
    def summary( self ):
        """
        Print the summary of the network.

        args:
            None

        returns:
            None

        attributes:
            None

        raises:
            None
        """
        print( "-------" );
        print( "| Summary |" );
        print( "-------" );
        print( f"Input Layer: { self.input_layer.node_no }" );
        print( f"Hidden Layer: { self.layers[1].node_no }" );
        print( f"Output Layer: { self.output_layer.node_no }" );
    
        print( "-------" );
        print( "| Weights |" );
        print( "-------" );
        for weight_obj in self.weights:
            print( f"{weight_obj.name}: \n {weight_obj.weights} , {weight_obj.weights.shape}" );
        
        print( "-------" );
        print( "| Bias |" );
        print( "------" );
        for layer_obj in self.layers:
            if hasattr( layer_obj, 'bias' ):
                print( f"{layer_obj.name}: \n {layer_obj.bias}, {layer_obj.bias.shape}" );
            else:
                print( f"{layer_obj.name}: " );
        
        print( "---------------" );
        print( "| Hyperparameters |" );
        print( "---------------" );
        print( f"Epochs: {self.hyperparams['epochs']}" );
        print( f"Learning Rate: {self.hyperparams['lr']}" );
        print( f"Minibatch Size: {self.hyperparams['minibatch_size']}" );
        print( "---------------" );

        print( "---------------" );
        print( f"| Loss Function | : {self.loss_fn.name}" );
        print( "---------------" );
        
        print( "---------------" );
        #print( f"| Optimizer | : {self.optimizer.name}" );
        print( "---------------" );

        print( "---------------" );
        print( "| Trainable Parameters | ")
        print( "---------------" );
        print( self.trainable_parameters );
    

    def __rshift__( self, other ):
        """
        Overwrite the right shift operator to add a layer to the network
        """
        if isinstance( other, Layer ) or isinstance( other, InputLayer ):
            # add weights between the layers, append the weight name to the list
            self.weights.append( WeightLayer( self.layers[-1], other ) );
            self.weight_names.append( self.weights[-1].name );
            
            # add the layer to the network, set the output layer to the last layer, and append the layer name to the list
            self.layers.append( other );
            self.output_layer = self.layers[-1];
            self.layer_name.append( other.name );

            # add the weights and biases to the trainable parameters
            self.trainable_parameters[self.weights[-1].name] = self.weights[-1].weights;
            self.trainable_parameters[other.name] = other.bias;

            return self;
    
        if isinstance( other, Loss ):
            self.loss_fn = other;
            return self;

        if isinstance( other, GradientDescent ):
            self.optimizer = other;
            return self;

        else:
            print( type( other ) );

    def output( self, inputs ):
        """
        Calculate the output of the network.

        args:
            inputs : array

        returns:
            array

        attributes:
            None

        raises:
            None
        """
        return self.output_layer.output( inputs );

    def forward( self, inputs ):
        """
        Forward pass through the network.

        args:
            inputs : array

        returns:
            None

        attributes:
            None

        raises:
            None
        """
        fwd_output = {};

        for i in range( len( self.weights ) ):
            # log weights and biases;

            pre_activation = self.weights[i].output( inputs );
            activated_weights = self.layers[i + 1].output( pre_activation );
            #fwd_output[self.layers[i+1].name] = { 'pre_activated' : pre_activation, 
            #                                       'activated' : activated_weights 
            #                                      };
            fwd_output[self.layers[i+1].name] = activated_weights;
            fwd_output[self.weights[i].name] = pre_activation;

            inputs = activated_weights;

        return fwd_output;

    def predict( self, X ):
        """
        Predict the output of the network.

        args:
            X : array

        returns:
            array

        attributes:
            None

        raises:
            None
        """
        return self.forward( X );

    def train( self, X, y ):
        """
        Train the network.

        args:
            X : array
            y : array
            epochs : int
            lr : float

        returns:
            None

        attributes:
            None

        raises:
            None
        """
         # Define the shapes for each log entry
        log_shapes = {

            "hidden_losses" : (1,1),
            "output_losses" : (1,)

            # rewrite the above as a dict comprehension
            #"weights" : { weight.name : weight.weights.shape for weight in self.weights },
            #"biases" : { layer.name : layer.bias.shape for layer in self.layers[1:] },

            #"pre_activated" : { layer.name : (self.hyperparams['epochs'],) for layer in self.layers[1:] },
            #"activated" : { layer.name : (self.hyperparams['epochs'],) for layer in self.layers[1:] },

            
            #"weight_gradients" : { weight.name : weight.weights.shape for weight in self.weights },
            #"bias_gradients" : { layer.name : layer.bias.shape for layer in self.layers[1:] }

        }

        # Initialize the logs with zeros
        self.logs = { key: np.empty( ( self.hyperparams['epochs'],1 ) , dtype=np.float32) for key, shape in log_shapes.items()}

        #initalize the logger

        #print( self.logs )

        for epoch in trange( self.hyperparams['epochs']+2 ):
  
            # forward pass
            fwd_outputs = self.forward( X );

            #print( f"fwd_weights: {fwd_weights}" )

            # calculate the loss for each layer
            for key, value in fwd_outputs.items():
                if key in self.layer_name[1:]:
                    if epoch == 0:
                        print(  np.vstack(  [self.loss_fn( value, y )] ) )
                        self.logs[key+'_losses'] = np.vstack(  [self.loss_fn( value, y )] );
                    else:
                        self.logs[key+'_losses'] = np.append( self.logs[key+'_losses'], np.vstack(  [self.loss_fn( value, y )] ), axis=0 );
                    

                        
                #hidden_loss[key] = self.loss_fn( value['activated'], y );

        print( self.logs )
            # logging
            #self.logs['output_losses'][epoch] = self.loss_fn( fwd_weights['output']['activated'], y );
            #self.logs['hidden_losses'][epoch] = hidden_loss;

            #print( f"Epoch: { epoch + 1 }, Hidden_L Loss: { self.logs['hidden_losses'][epoch]['hidden'] } , Output_L Loss: { self.logs['output_losses'][epoch] }" );

            # backward pass
            grad, loss = BackProp( X, y, self.trainable_parameters, self.loss_fn )();

            #print( f"Epoch: { epoch + 1 }, Hidden_L Loss: { hidden_losses[epoch]['hidden'] } , Output_L Loss: { output_loss[epoch] }" );

In [267]:
params = {
    # model hyperparameters
    "input_units" : 2,
    "hidden_units" : 2,
    "output_units" : 1,

    # optimizer hyperparameters
    "lr" : 0.01,

    # training hyperparameters
    "epochs" : 1,
    "minibatch_size" : 4
}

In [268]:
training, testing = load_data( "xor" );
x_train, y_train = training[0], training[1];
x_test, y_test = testing[0], testing[1];

In [269]:
print( f"Training set: \n {x_train}, {x_train.shape}" );
print( f"Labels: \n {y_train}, {y_train.shape}" );

Training set: 
 [[0. 0.]
 [0. 1.]
 [1. 0.]
 [1. 1.]], (4, 2)
Labels: 
 [[0.]
 [1.]
 [1.]
 [0.]], (4, 1)


In [270]:
input_layer = InputLayer( params, "input" );
hidden_layer = Layer( params, "hidden", "sigmoid" );
output_layer = Layer( params, "output", "sigmoid" );

loss = Loss( "cross_entropy" );
optimizer = GradientDescent( params );

In [271]:
nn = input_layer >> hidden_layer >> output_layer >> loss;
nn.summary();

-------
| Summary |
-------
Input Layer: 2
Hidden Layer: 2
Output Layer: 1
-------
| Weights |
-------
W_input_hidden_layer: 
 [[-0.34563078  0.62021313]
 [-0.65881599  0.07018818]] , (2, 2)
W_hidden_output_layer: 
 [[ 0.11830519]
 [-0.1223834 ]] , (2, 1)
-------
| Bias |
------
input: 
hidden: 
 [-0.93997788 -0.34924307], (2,)
output: 
 [-0.4860153], (1,)
---------------
| Hyperparameters |
---------------
Epochs: 1
Learning Rate: 0.01
Minibatch Size: 4
---------------
---------------
| Loss Function | : cross_entropy
---------------
---------------
---------------
---------------
| Trainable Parameters | 
---------------
{'W_input_hidden_layer': array([[-0.34563078,  0.62021313],
       [-0.65881599,  0.07018818]]), 'hidden': array([-0.93997788, -0.34924307]), 'W_hidden_output_layer': array([[ 0.11830519],
       [-0.1223834 ]]), 'output': array([-0.4860153])}


In [272]:
nn.train( x_train, y_train );

100%|██████████| 3/3 [00:00<00:00, 2268.42it/s]

[[0.82464778]]
[[0.72584187]]
{'hidden_losses': array([[0.82464778],
       [0.82464778],
       [0.82464778]]), 'output_losses': array([[0.72584187],
       [0.72584187],
       [0.72584187]])}





In [19]:
nn.predict( x_test );