# Exercício 3 - MLP e RBF

---------------------------------------------

### Participants:
 - Francielle Vargas - 9527629
 - Lucas Nunes Sequeira - 9009642
 - Emanuel Huber - 12110113

#### Date: 24/09/2021

---------------------------------------------

#### Descrição e Instrução

Este notebook foi feito para a disciplina SCC5809 - Redes Neurais

No notebook contém

1. A implementação da classe **MLP**
2. A implementação da solução **RBF**
3. Utilização dos modelos no dataset **Iris**

Para utilizá-lo basta executar todas as células.

_link de acesso ao colab: https://colab.research.google.com/drive/1PRfwXYy6K1E1zsKYF6W93L0oiiKamihl?usp=sharing_

### Libs

In [1]:
# Install Libs
!pip install numpy==1.19.5
!pip install tqdm==4.62.0
!pip install plotly==4.4.1
!pip install pandas==1.1.5
!pip install scikit-learn==0.24.2



In [2]:
# Math lib
import numpy as np

# Log lib
from tqdm.auto import tqdm

# Visualization Lib
import plotly.express as px

# Copy for deepcopy
import copy

# Pandas for visualization
import pandas as pd
import matplotlib.pyplot as plt

# Iris data
from sklearn.datasets import load_iris

# Split data
from sklearn.model_selection import train_test_split

# Scaler
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler

# Metrics
from sklearn.metrics import accuracy_score

# Feature selection
from sklearn.ensemble import ExtraTreesRegressor
from sklearn.feature_selection import SelectFromModel

# Seed
SEED = 42

# Set random seed
np.random.seed(SEED)

### Activation function

In [3]:
class Sigmoid(object):
  '''
  Sigmoid Activation Function
  
    f(x) =  1 / (1 + e^(-x))
  '''

  def __init__(self):

    # Last call data
    self.last_grad = 0
    self.last_input = 0
    self.last_output = 0

  def _update_last_call(self, x, y, grad = True):
    '''Update last call data'''

    # Gradient
    if grad:
      self.last_grad = self.gradient(x)

    # Update last input and output
    self.last_input = x
    self.last_output = y

  def __call__(self, x):
    '''Calculate sigmoid function of x'''

    if isinstance(x, list):
      x = np.array(x)

    return 1 / (1 + np.exp(-x))

  def calculate(self, x, grad: bool = True):
    '''Calculate sigmoid function of x'''

    # Calculation
    y = self(x)

    # Update last call data
    self._update_last_call(x, y, grad)

    return y

  def copy(self):
    return Sigmoid()

  def gradient(self, x):
    '''Calculate sigmoid gradient within x'''

    return self(x)*(1 - self(x))

In [4]:
class Swish(object):
  '''
  Swish Activation Function
  
    f(x) =  x * sigmoid(x)
  '''

  def __init__(self):

    self.sigmoid = Sigmoid()

    # Last call data
    self.last_grad = 0
    self.last_input = 0
    self.last_output = 0

  def _update_last_call(self, x, y, grad = True):
    '''Update last call data'''

    # Gradient
    if grad:
      self.last_grad = self.gradient(x)

    # Update last input and output
    self.last_input = x
    self.last_output = y

  def __call__(self, x):
    '''Calculate swish function of x'''

    if isinstance(x, list):
      x = np.array(x)

    return x*self.sigmoid(x)

  def calculate(self, x, grad: bool = True):
    '''Calculate swish function of x'''

    # Calculation
    y = self(x)

    # Update last call data
    self._update_last_call(x, y, grad)
    
    return y

  def copy(self):
    return Swish()

  def gradient(self, x):
    '''Calculate swish gradient within x'''

    return self.sigmoid(x)*(1 + x*(1 - self.sigmoid(x)))

In [5]:
class Relu(object):
  '''
  Relu Activation Function
  
    f(x) =  
      0 if x < 0
      x if x >= 0
  '''

  def __init__(self):

    # Last call data
    self.last_grad = 0
    self.last_input = 0
    self.last_output = 0

  def _update_last_call(self, x, y, grad = True):
    '''Update last call data'''

    # Gradient
    if grad:
      self.last_grad = self.gradient(x)

    # Update last input and output
    self.last_input = x
    self.last_output = y

  def __call__(self, x):
    '''Calculate relu function of x'''

    return np.where(x < 0, 0.0, x)

  def calculate(self, x, grad: bool = True):
    '''Calculate relu function of x'''

    # Calculation
    y = self(x)

    # Update last call data
    self._update_last_call(x, y, grad)
    
    return y

  def copy(self):
    return Relu()

  def gradient(self, x):
    '''Calculate relu gradient within x'''

    return np.where(x < 0, 0.0, 1)

In [6]:
class Tanh(object):
  '''
  Tanh Activation Function
  
    f(x) =  tanh(x)
  '''

  def __init__(self):

    # Last call data
    self.last_grad = 0
    self.last_input = 0
    self.last_output = 0

  def _update_last_call(self, x, y, grad = True):
    '''Update last call data'''

    # Gradient
    if grad:
      self.last_grad = self.gradient(x)

    # Update last input and output
    self.last_input = x
    self.last_output = y

  def __call__(self, x):
    '''Calculate sigmoid function of x'''

    if isinstance(x, list):
      x = np.array(x)

    return np.tanh(x)

  def calculate(self, x, grad: bool = True):
    '''Calculate tanh function of x'''

    # Calculation
    y = self(x)

    # Update last call data
    self._update_last_call(x, y, grad)

    return y

  def copy(self):
    return Tanh()

  def gradient(self, x):
    '''Calculate sigmoid gradient within x'''

    return 1 - np.tanh(x)**2

### Loss Function

In [7]:
class MSE(object):
  '''
  Mean Squared Error Loss Function
  
    f(x) =  1/(2*m) * sum((ref_y - hyp_y)**2)
  '''

  def __init__(self):
    
    # Last call data
    self.last_grad = 0
    self.last_input = 0
    self.last_output = 0

  def _update_last_call(self, x, y, grad = True):
    '''Update last call data'''

    # Gradient
    if grad:
      self.last_grad = self.gradient(x[0], x[1])

    # Update last input and output
    self.last_input = x
    self.last_output = y

  def __call__(self, ref: np.ndarray, hyp: np.ndarray):
    '''Calculate mean squared error between ref and hyp'''

    if isinstance(ref, list):
      ref = np.array(ref)
    if isinstance(hyp, list):
      hyp = np.array(hyp)

    size = len(ref)

    # Quadratic Error Sum
    quadratic_sum = np.sum((ref - hyp)**2)

    return quadratic_sum/(2*size)

  def calculate(self, ref: np.ndarray, hyp: np.ndarray, grad: bool = True):
    '''Calculate mean squared error between ref and hyp'''

    # Calculation
    y = self(ref, hyp)

    # Update last call data
    self._update_last_call((ref, hyp), y, grad)

    return y

  def copy(self):
    return MSE()

  def gradient(self, ref: np.ndarray, hyp: np.ndarray):
    '''Calculate MSE gradient within hyp'''

    size = len(ref)

    return -np.sum(ref - hyp)/size

### Metrics Function

In [8]:
class MultiClassAccuracy(object):
  '''
  MultiClass Accuracy Score
  '''

  def __init__(self):
    
    self.func = accuracy_score

  def one_hot_decode(y):
    '''One hot decode y'''

    return np.argmax(y, axis=1)

  def __call__(self, ref: np.ndarray, hyp: np.ndarray):
    '''Apply one hot decode and calculate accuracy ref and hyp'''

    if isinstance(ref, list):
      ref = np.array(ref)
    if isinstance(hyp, list):
      hyp = np.array(hyp)

    # One hot decode
    y_true = one_hot_decode(ref)
    y_pred = one_hot_decode(hyp)

    # Calculate score
    score = self.func(y_true, y_pred)

    return score

  def calculate(self, ref: np.ndarray, hyp: np.ndarray):
    '''Calculate mean squared error between ref and hyp'''

    # Calculation
    y = self(ref, hyp)

    return y

  def copy(self):
    return MultiClassAccuracy()

### Kernel Function

In [9]:
class KernelGaussian(object):
  '''
  Kernel Gaussiano function
  '''

  def __init__(self, centroids, sigma = 1):

    # Save parameters
    self.centroids = centroids
    self.sigma = sigma

  def _update_last_call(self, x, y):
    '''Update last call data'''

    # Update last input and output
    self.last_input = x
    self.last_output = y

  def __call__(self, X: np.ndarray):
    '''Apply kernel to data
    
      Params:
        X (np.ndarray): array of data
    '''

    # Calculate gaussian distance (step 1)
    dists = np.sqrt( np.sum( (X-self.centroids.T)**2, axis=1 ) )

    # Calculate gaussian distance (step 2)
    output = np.exp(-dists**2 / (2 * self.sigma**2))

    # Add batch dimention
    output = np.expand_dims(output, 0)

    # Save last call
    self._update_last_call(X, output)

    return output

  def copy(self):
    return KernelGaussian()

### Kmeans

In [10]:
def get_centroids(X, n_centroids: int = 2):
  '''
  Returns centroids of X with kmeans
  '''

  # Fit kmeans
  kmeans = KMeans(n_clusters = n_centroids).fit(X)

  # Get centroids
  centroids = kmeans.cluster_centers_

  # Return centroids
  return centroids

### Perceptron Class

In [11]:
class Perceptron():

  def __init__(self, input_size: int = 2, init_rule: str = 'zero', activation = None):
    '''Initializes Perceptron
    
      Params:
        input_size (int): size of input data
        init_rule (str): initialization parameter to set initial weight values
        activation (func): function to apply activation step
      '''

    # Perceptron size
    self.input_size = input_size
    self.size = input_size + 1

    # Initialize weights
    self.init_weights(init_rule)

    # Activation function
    if activation is None:
      activation = Sigmoid()
    self.activation = activation.copy()

    # Health check
    self._health_check()
  
  def _health_check(self):
    '''Perform a health check prediction'''

    # 1D (input_size)
    X = np.random.rand(self.input_size)
    self(X)

    # 2D (4 items, input_size)
    X = np.random.rand(4, self.input_size)
    self(X)

  def init_weights(self, init_rule: str = 'zero'):
    '''Initialize weights
    
      Params:
        init_rule (str): initialization parameter to set initial weight values
    '''

    # Assure initializarion rule
    assert init_rule in ['zero', 'rand'], "'init_rule' must be zero or rand"

    if init_rule == 'zero':
      # Zero values
      self.weights = np.zeros(self.size)
    
    elif init_rule == 'rand':
      # Random values in [-0.1, 0,1]
      self.weights = np.random.rand(self.size) - 0.5
      self.weights /= 0.5

  def set_weights(self, weights: np.ndarray):
    '''Update weights

      Params:
        weights (np.ndarray): array of weights
    '''

    # Assure perceptron size equals given weights size
    assert self.size == len(weights), f"Perceptron size ({self.size}) != weights size ({len(weights)})"

    # Update weights
    self.weights = weights.copy()

  def get_weights(self) -> np.ndarray:
    '''Return a copy of current weights'''

    # Return weights
    return self.weights.copy()

  def _add_bias_term(self, X: np.ndarray, value = 1.0, is_batch = True) -> np.ndarray:
    '''Add bias term to X values, ie:
       Given (x_i) in = [1, 0] -> out: [value, 1, 0]

      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        value (float): Bias factor value. Default = 1.0
        is_batch (bool): Boolean to explicity that is or not a batch of items

      Returns:
        X (np.ndarray) with the bias term concatenated, eg:
    '''

    if is_batch:
      # Get batch_size
      batch_size = X.shape[0]

      # Add bias term
      X = np.concatenate([value*np.ones((batch_size, 1)), X], axis=1)
    
    else:
      # Add bias term
      X = np.concatenate([[value], X])

    return X

  def _prepare_input(self, X: np.ndarray) -> np.ndarray:
    '''Prepare input X
       
       1. Add batch dimension (if applies)
       2. Add bias term

      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
    '''

    # Make sure is a np.ndarray
    X = np.array(X)

    # Verify if it is sigle item and batch it
    if len(X.shape) == 1:
      # Add batch dimension
      X = np.expand_dims(X, 0)

    # Add bias term
    X = self._add_bias_term(X)

    return X


  def forward(self, X: np.ndarray) -> np.ndarray:
    '''Make a batch or single prediction
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        
      Returns:
        output (np.ndarray) logits
    '''
    
    # Prepare input data
    X = self._prepare_input(X)

    # Inner product of inputs and weigths (net)
    net = self.weights * X # multiplication
    net = np.sum(net, axis = 1) # sum reduction

    # Apply activation
    output = self.activation.calculate(net)

    return output

  def __call__(self, X: np.ndarray) -> np.ndarray:
    '''Make a batch or single prediction (runs forward method)
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        
      Returns:
        output (np.ndarray) logits
    '''

    return self.forward(X)

### Perceptron Layer Class

In [12]:
class PerceptronLayer():

  def __init__(self, input_size: int = 2, units: int = 2, init_rule: str = 'zero',
               activation = None):
    '''Initializes Perceptron Layer
    
      Params:
        input_size (int): size of input data
        units (int): perceptron units to use in layer
        init_rule (str): initialization parameter to set initial weight values
        activation (func): function to apply activation step
      '''

    # Perceptron Layer sizes
    self.input_size = input_size
    self.size = input_size + 1
    self.num_units = units
    self.output_size = self.num_units

    # Activation function
    if activation is None:
      activation = Sigmoid()
    self.activation = activation

    # Initialization rule
    self.init_rule = init_rule

    # Initialize units
    self._init_units()

    # Health check
    self._health_check()

  def _save_foward_transform(self, x, y):
    self.last_input = x
    self.last_output = y

  def _init_units(self):
    '''Initialize units of Layer'''

    # Layer list
    self.units = []

    # Iterate of amount of units
    for unit_id in range(self.num_units):
      self.units.append(
          Perceptron(
              input_size=self.input_size,
              init_rule=self.init_rule,
              activation=self.activation
          )
      )
  
  def _health_check(self):
    '''Perform a health check prediction'''

    # 1D (input_size)
    X = np.random.rand(self.input_size)
    self(X)

    # 2D (4 items, input_size)
    X = np.random.rand(4, self.input_size)
    self(X)


  def set_weights(self, weights: np.ndarray):
    '''Update weights per unit

      Params:
        weights (np.ndarray): array of weights; size: (num_units, len(unit.weights))
    '''

    # Assure weights have same length as number of units
    assert len(weights) == self.num_units, "Array of weigths must have shape (num_units, len(unit.weights))"

    for unit_weights, unit in zip(weights, self.units):

      # Update weights for each unit
      unit.set_weights(unit_weights)

  def get_weights(self) -> np.ndarray:
    '''Return a array of a copy of current weights per unit'''

    units_weights = []

    for unit in self.units:

      # Append unit weights
      units_weights.append(unit.get_weights())

    # Return weights
    return np.array(units_weights)

  def backward(self, delta, value):
    '''
    Apply backward propagation of the layer
    '''

    # Get gradient
    grad = self.activation.gradient(value)

    # Return new delta
    new_delta = (delta @ self.get_weights())[:, 1:] * grad

    return new_delta

  def forward(self, X: np.ndarray) -> np.ndarray:
    '''Make a batch or single prediction
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        
      Returns:
        output (np.ndarray) logits
    '''
    
    # Initalizate logits list (size of num units)
    logits = []

    for unit in self.units:
      
      # Apply foward in perceptron unit
      logit = unit(X)

      # Save logit
      logits.append(logit)

    # Return transpose logits
    logits = np.array(logits).T

    return logits

  def __call__(self, X: np.ndarray) -> np.ndarray:
    '''Make a batch or single prediction (runs forward method)
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        
      Returns:
        output (np.ndarray) logits
    '''

    y = self.forward(X)

    self._save_foward_transform(X, y)

    return y

### MultiLayerPerceptron Class

In [13]:
class MultiLayerPerceptron():

  def __init__(self, layers: list, loss_func = None, metric = None):
    '''Initializes MultiLayerPerceptron Model
    
      Params:
        layers (list): list of perceptron layers
        loss_func (func): loss function to be applied
      '''

    # Perceptron Layers
    self.layers = layers
    self.num_layers = len(layers)

    # Input and output sizes
    self.input_size = layers[0].input_size
    self.output_size = layers[-1].output_size

    # Save loss
    if loss_func is None:
      self.loss_func = MSE()
    else:
      self.loss_func = loss_func

    # Save metric
    self.metric = metric

    # Health check
    self._health_check()

    # Last dWs (weight variations)
    self.last_dWs = {}
  
  def _health_check(self):
    '''Perform a health check prediction'''

    # 1D (input_size)
    X = np.random.rand(self.input_size)
    self(X)

    # 2D (4 items, input_size)
    X = np.random.rand(4, self.input_size)
    self(X)

  def _extend(self, vec):
    
    return np.hstack([np.ones((vec.shape[0], 1)), vec])

  def _backpropagate(self, x_input: np.ndarray, predicted_y: np.ndarray, 
                     reference_y: np.ndarray, learning_rate: float, momentum_rate: float):
    '''
    Backpropagate loss to update each perceptron set of weigths in
    each layer of the model

    Params:
      x_input (np.ndarray): batch of input x
      reference_y (np.ndarray): batch of reference y's
      predicted_y (np.ndarray): batch of predicted y's
      learning_rate (float): learning rate param
      momentum_rate (float): momentum rate param
    '''

    # Get first delta
    delta = predicted_y - reference_y

    # Get last predicted layer output
    last_output = predicted_y

    # Initialize weights dict variations
    dWs = {}

    # Iterate backwards over layers
    for i in range(-1, -len(self.layers), -1):
      
      # Get layer last output
      last_output = self.layers[i - 1].last_output

      # Update layer weights variation
      dWs[i] = delta.T @ self._extend(last_output)

      # Get new delta
      delta = self.layers[i].backward(delta, last_output)

    # Update layer weights variation (first layer)
    dWs[-self.num_layers] = delta.T @ self._extend(x_input)

    # Initialize current dWs variations
    current_dWs = {}

    # Update each layer weights
    for k, dW in dWs.items():
      
      # Get current weights
      weights = self.layers[k].get_weights()

      # Can't apply momentum yet
      if self.last_dWs == {}:
        variation = -(learning_rate * dW)
      
      # Apply momentum
      else:
        variation = -(learning_rate * dW) + momentum_rate * self.last_dWs[k]

      # Update weights
      weights += variation

      # Update weights
      self.layers[k].set_weights(weights)

      # Update current dWs
      current_dWs[k] = variation
    
    # Update last dWs
    self.last_dWs = current_dWs

  def get_weights(self) -> np.ndarray:
    '''Return a list of arrays of a copy of current weights per layer and unit'''

    layer_weights = []

    for layer in self.layers:

      # Append unit weights
      layer_weights.append(layer.get_weights())

    # Return weights
    return layer_weights

  def _get_batch(self, X: np.ndarray, y: np.ndarray, batch_size: int):
    '''Generator of batch of items from X and y input data'''

    # X and y lengths must match
    assert len(X) == len(y), f"X (len = {len(X)}) and y (len = {len(y)}) lengths must match"

    # Produce batches
    batches = []

    # For each batch step append items
    for step in range(len(X)//batch_size + 2):
      
      # Get batch
      X_batch = X[step*batch_size:(step+1)*batch_size]
      y_batch = y[step*batch_size:(step+1)*batch_size]

      if len(X_batch) == 0: break

      batches.append({
          'X': X_batch,
          'y': y_batch
      })

    # Generate each batch pre-computed
    for batch in batches:

      # Return item
      yield batch

  def fit(self, X: np.ndarray, y: np.ndarray, learning_rate: float = 0.1, momentum_rate: float = 1e-5,
          max_epochs: int = 5, stop_threshold: float = 1e-3, batch_size: int = 1):
    '''
    Fit the MLP model using a max_epochs steps, or when the stop_threshold
    is met

      Params:
        X (np.ndarray): a array of inputs, each input must match model input_size (number of features)
        y (np.ndarray): a array of target values (labels)
        learning_rate (float): hyperparameter to be used on backpropagation
        momentum_rate (float): hyperparameter to be used on backpropagation
        max_epochs (int): number of maximum epochs to iterate
        stop_threshold (float): number to be used to stop training if epoch loss is lower
        batch_size (int): size of each batch for the training steps

      Returns:
        history (dict): A dictionary containing training data over training as epoch loss
    '''

    # Assure X and y has same size
    assert len(X) == len(y), f"X (len = {len(X)}) and y (len = {len(y)}) lengths must match"
    assert len(y[0]) == self.layers[-1].num_units, f"y_i (len = {len(y[0])}) and output layer (len = {self.layers[-1].num_units}) lengths must match"

    # Epoch iterator
    iterator = tqdm(range(max_epochs), leave=False)

    # Num items
    num_items = len(y)

    # History of train
    history = {'loss': [], 'score': []}

    for epoch in iterator:

      # Log epoch
      iterator.set_description(f'Epoch {epoch+1}/{max_epochs}')

      # Initialize epoch loss (mean absolute value)
      epoch_loss = 0
      
      # Iterate over all items
      for batch in self._get_batch(X, y, batch_size):

        # Retriever X and y batch
        input_x = batch['X']
        reference_y = batch['y']
        
        # Make prediction
        predicted_y = self(input_x)

        # Calculate error (loss)
        loss = self.loss_func.calculate(reference_y, predicted_y)

        # Add to epoch loss
        epoch_loss += batch_size * loss/num_items

        # Update weights
        self._backpropagate(input_x, predicted_y, reference_y, learning_rate, momentum_rate)

      # Calculate metric
      if self.metric:
        score = self.evaluate(X, y)
      
        # Log epoch loss and metric
        iterator.set_postfix({'loss': epoch_loss, 'score': score})
        history['score'].append(score)
      else:
        # Log epoch loss
        iterator.set_postfix({'loss': epoch_loss})

      # Append history
      history['loss'].append(epoch_loss)

      # Stop Threshold
      if epoch_loss < stop_threshold:
        print(f'>> Loss met stop condition (at epoch {epoch+1}): loss = {epoch_loss} < {stop_threshold}')
        break

    return history

  def evaluate(self, X: np.ndarray, y: np.ndarray):
    '''Make a batch or single prediction
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        y (np.ndarray): Batch of expected predictions (2D array) or a item (1D array)
        
      Returns:
        score (dict)
    '''

    # Make prediction
    y_pred = self(X)

    return self.metric(y, y_pred)

  def forward(self, X: np.ndarray) -> np.ndarray:
    '''Make a batch or single prediction
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        
      Returns:
        output (np.ndarray) logits
    '''

    # Apply forward on each sequential layer
    for layer in self.layers:
      X = layer(X)
    
    return X

  def __call__(self, X: np.ndarray) -> np.ndarray:
    '''Make a batch or single prediction (runs forward method)
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        
      Returns:
        output (np.ndarray) logits
    '''

    return self.forward(X)

### RBF Class

In [14]:
class RBFModel():

  def __init__(self, input_size, output_size, kernel, loss_func = None, metric = None, init_rule = 'zero'):
    '''Initializes RBF Model
    
      Params:
        input_size (int): size of input data
        output_size (int): size of output data
        loss_func (func): loss function to be applied
      '''

    # Input and output sizes
    self.input_size = input_size
    self.size = input_size
    self.output_size = output_size

    # Save loss
    if loss_func is None:
      self.loss_func = MSE()
    else:
      self.loss_func = loss_func

    # Save metric
    self.metric = metric

    # Save kernel function
    self.kernel = kernel

    # Initialize weights
    self.init_weights(init_rule=init_rule)

    # Health check
    self._health_check()

  def init_weights(self, init_rule: str = 'zero'):
    '''Initialize weights
    
      Params:
        init_rule (str): initialization parameter to set initial weight values
    '''

    # Assure initializarion rule
    assert init_rule in ['zero', 'rand'], "'init_rule' must be zero or rand"

    if init_rule == 'zero':
      # Zero values
      self.weights = np.zeros((self.size, self.output_size))
    
    elif init_rule == 'rand':
      # Random values in [-0.1, 0,1]
      self.weights = np.random.random((self.size, self.output_size)) - 0.5
      self.weights /= 0.5
  
  def _health_check(self):
    '''Perform a health check prediction'''

    # 1D (input_size)
    X = np.random.rand(self.input_size)
    self(X)

    # 2D (1 items, input_size)
    X = np.random.rand(1, self.input_size)
    self(X)

  def set_weights(self, weights: np.ndarray):
    '''Update weights

      Params:
        weights (np.ndarray): array of weights; size: (input_size, output_size)
    '''

    # Assure weights have same length as number of units
    assert weights.shape == self.weights.shape, "Array of weigths must have shape (input_size, output_size)"

    self.weights = weights.copy()

  def get_weights(self) -> np.ndarray:
    '''Return a copy of current weights'''

    return self.weights.copy()

  def _get_batch(self, X: np.ndarray, y: np.ndarray, batch_size: int):
    '''Generator of batch of items from X and y input data'''

    # X and y lengths must match
    assert len(X) == len(y), f"X (len = {len(X)}) and y (len = {len(y)}) lengths must match"

    # Produce batches
    batches = []

    # For each batch step append items
    for step in range(len(X)//batch_size + 2):
      
      # Get batch
      X_batch = X[step*batch_size:(step+1)*batch_size]
      y_batch = y[step*batch_size:(step+1)*batch_size]

      if len(X_batch) == 0: break

      batches.append({
          'X': X_batch,
          'y': y_batch
      })

    # Generate each batch pre-computed
    for batch in batches:

      # Return item
      yield batch

  def _fit_weights(self, kernel_output: np.ndarray, predicted_y: np.ndarray, 
                     reference_y: np.ndarray, learning_rate: float):
    '''
    Backpropagate loss to update each perceptron set of weigths in
    each layer of the model

    Params:
      kernel_output (np.ndarray): batch of kernel(input_x)
      reference_y (np.ndarray): batch of reference y's
      predicted_y (np.ndarray): batch of predicted y's
      learning_rate (float): learning rate param
    '''

    # Calculate batch differences
    diffs = (reference_y-predicted_y)

    # Get batch size
    batch_size = kernel_output.shape[0]

    # Iterate over batch items
    for batch_id in range(batch_size):
      
      # Get difference
      diff = diffs[batch_id].reshape(-1, 1)

      # Get kernel item output
      kernel_output_item = kernel_output[batch_id].reshape(-1, 1)

      # Get current weights
      current_weights = self.get_weights()

      # Calculate delta weights
      delta_weights = learning_rate*(kernel_output_item * diff.T)

      # Update weights
      self.set_weights(current_weights + delta_weights)

  
  def fit(self, X: np.ndarray, y: np.ndarray, learning_rate: float = 0.1, momentum_rate: float = 1e-5,
          max_epochs: int = 5, stop_threshold: float = 1e-3, batch_size: int = 1):
    '''
    Fit the MLP model using a max_epochs steps, or when the stop_threshold
    is met

      Params:
        X (np.ndarray): a array of inputs, each input must match model input_size (number of features)
        y (np.ndarray): a array of target values (labels)
        learning_rate (float): hyperparameter to be used on backpropagation
        momentum_rate (float): hyperparameter to be used on backpropagation
        max_epochs (int): number of maximum epochs to iterate
        stop_threshold (float): number to be used to stop training if epoch loss is lower
        batch_size (int): size of each batch for the training steps

      Returns:
        history (dict): A dictionary containing training data over training as epoch loss
    '''

    # Assure X and y has same size
    assert len(X) == len(y), f"X (len = {len(X)}) and y (len = {len(y)}) lengths must match"
    assert len(y[0]) == self.output_size, f"y_i (len = {len(y[0])}) and output size (len = {self.output_size}) lengths must match"
    assert batch_size == 1, "Only batch size == 1 fits"

    # Epoch iterator
    iterator = tqdm(range(max_epochs), leave=False)

    # Num items
    num_items = len(y)

    # History of train
    history = {'loss': [], 'score': []}

    for epoch in iterator:

      # Log epoch
      iterator.set_description(f'Epoch {epoch+1}/{max_epochs}')

      # Initialize epoch loss (mean absolute value)
      epoch_loss = 0
      
      # Iterate over all items
      for batch in self._get_batch(X, y, batch_size):

        # Retriever X and y batch
        input_x = batch['X']
        reference_y = batch['y']
        
        # Make prediction
        predicted_y = self(input_x)
        kernel_output = self.kernel.last_output

        # Calculate error (loss)
        loss = self.loss_func.calculate(reference_y, predicted_y)

        # Add to epoch loss
        epoch_loss += batch_size * loss/num_items

        # Update weights
        self._fit_weights(kernel_output, predicted_y, reference_y, learning_rate)

      # Calculate metric
      if self.metric:
        score = self.evaluate(X, y)
      
        # Log epoch loss and metric
        iterator.set_postfix({'loss': epoch_loss, 'score': score})
        history['score'].append(score)
      else:
        # Log epoch loss
        iterator.set_postfix({'loss': epoch_loss})

      # Append history
      history['loss'].append(epoch_loss)

      # Stop Threshold
      if epoch_loss < stop_threshold:
        print(f'>> Loss met stop condition (at epoch {epoch+1}): loss = {epoch_loss} < {stop_threshold}')
        break

    return history

  def evaluate(self, X: np.ndarray, y: np.ndarray):
    '''Make a batch or single prediction
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        y (np.ndarray): Batch of expected predictions (2D array) or a item (1D array)
        
      Returns:
        score (dict)
    '''

    y_pred = []

    # Make predictions
    for x_item in X: 

      # Apply forward
      pred = self(x_item).reshape(-1)

      # Append prediction
      y_pred.append(pred)

    # Turn to np array
    y_pred = np.array(y_pred)

    return self.metric(y, y_pred)

  def _prepare_input(self, X: np.ndarray) -> np.ndarray:
    '''Prepare input X
       
       1. Add batch dimension (if applies)
       2. Add bias term

      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
    '''

    # Make sure is a np.ndarray
    X = np.array(X)

    # Verify if it is sigle item and batch it
    if len(X.shape) == 1:
      # Add batch dimension
      X = np.expand_dims(X, 0)

    return X

  def forward(self, X: np.ndarray) -> np.ndarray:
    '''Make a batch or single prediction
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        
      Returns:
        output (np.ndarray) logits
    '''

    # Prepare data (add batch dimension if applies)
    X = self._prepare_input(X)

    # Apply forward
    y = self.kernel(X) @ self.weights

    return y

  def __call__(self, X: np.ndarray) -> np.ndarray:
    '''Make a batch or single prediction (runs forward method)
    
      Params:
        X (np.ndarray): Batch of items (2D array) or a item (1D array)
        
      Returns:
        output (np.ndarray) logits
    '''

    return self.forward(X)

### Iris Dataset

#### Load Dataset

In [15]:
from google.colab import files

print("Por favor faça o upload do arquivo iris.dat")
uploaded = files.upload()

Por favor faça o upload do arquivo iris.dat


In [16]:
############################ Implementação 1 ############################

import numpy as np
import pandas as pd
import random
from sklearn import model_selection
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler


#loading dataset
dataset = pd.read_csv('iris.dat')

# Get features
X = dataset[['SepalLengthCm',	'SepalWidthCm',	'PetalLengthCm',	'PetalWidthCm']]

# Get target class
Y = dataset[['Species']]

Y = LabelEncoder().fit_transform(Y)

#preparating features for newtwork
data = np.array( X, dtype=np.float32 )
labels = np.array( Y, dtype=np.int32 )

#preparating labels for newtwork
num_categories = 3
new_labels = np.zeros( [ len(labels), num_categories ] )
for i in range( len(labels) ):
  new_labels[ i, labels[i]-1 ] = 1.
labels = new_labels


#print features and labels
print('---features---')
print(data)
print('---class---')
print(labels)


#separating test and training data
validation_size = 0.30
seed = 7
scoring = 'accuracy'
X_train, X_validation, Y_train, Y_validation = model_selection.train_test_split(data, labels, test_size=validation_size, random_state=seed, shuffle= True)


#normalization
scaler = MinMaxScaler( (-1,1) )
X_train = scaler.fit_transform( X_train )
X_validation = scaler.transform( X_validation )
print(X_train.shape, X_validation.shape)


#RBF NETWORK
from sklearn.cluster import KMeans

###initialize the weights of the intermediate layer
def initialize_centroids( n_centroids, X ):
    kmeans = KMeans( n_clusters = n_centroids ).fit(X)
    centroids = kmeans.cluster_centers_
    return centroids

###initialize the weights of the output layer
def initialize_weights( n_centroids, n_outputs ):
    W = np.random.normal( loc=0, scale=0.1, size=( n_centroids, n_outputs ) )
    return W

###sigmoid  function
def gaussian( C, X, sigma=1. ):
    dists = np.sqrt( np.sum( (X-C)**2, axis=1 ) )
    return np.exp(-dists**2 / (2 * sigma**2))

###step function
def step( v ):
    if v > 0:
        return 1
    return 0

###forward
def forward( C, W, X ):
    phi = gaussian( C, X )
    V = np.dot( phi, W )
    Y = [step(v) for v in V]
    Y = np.array( Y )
    return Y

###prediction
def predict( C, W, data ):
    outputs = list()
    for X in data:
        Y = forward( C, W, X )
        outputs.append( Y )
    return outputs

##acurracy
def evaluate( C, W, data, t ):
    Y = predict( C, W, data )
    hits = np.sum( [ np.argmax(Y[i]) == np.argmax(t[i]) for i in range( len(Y) ) ] )
    acc = hits / len(Y)
    return acc

###root-mean-square deviation (RMSD) for each simple
def compute_mse( y, t ):
  return 1/2 * np.sum( [ (t[i] - y[i])**2 for i in range(len(y)) ] )

###root-mean-square deviation (RMSD) for dataset
def compute_total_mse( C, W, data, labels ):
  y = predict( C, W, data )
  E = [ compute_mse( y[i], labels[i] ) for i in range(len(data)) ]
  return np.mean( E )

###trainning
def train( X_train, Y_train, n_centroids, sigma=1.2, eta=0.001, epochs=1000, epsilon=0.1 ):
  # Camada intermediária
  C = initialize_centroids( n_centroids, X_train )
  # Camada de saída
  W = initialize_weights( n_centroids, Y_train.shape[1] )
  nRows = X_train.shape[0]
  error = np.inf
  for epoch in range( epochs ):
    if error < epsilon:
      break
    new_W = W
    for i in range( nRows ):
      Y = forward( C, W, X_train[i] )
      for j in range( Y_train.shape[1] ):
        new_W[:,j] += eta * gaussian( X_train[i], C, sigma ) * ( Y_train[i,j]-Y[j] )
    W = new_W
    error = compute_total_mse( C, W, X_train, Y_train )
    if not epoch % 200:
      print(epoch, error)
  return C, W


########RESULTS
C, W = train( X_train, Y_train, n_centroids=2, sigma=1.0, epochs=1000, epsilon=0.1 )
print( 'Train accuracy:', evaluate( C, W, X_train, Y_train ) )
print( 'Test accuracy:', evaluate( C, W, X_validation, Y_validation ) )

############################ Implementação 1 ############################


A column-vector y was passed when a 1d array was expected. Please change the shape of y to (n_samples, ), for example using ravel().



---features---
[[5.1 3.5 1.4 0.2]
 [4.9 3.  1.4 0.2]
 [4.7 3.2 1.3 0.2]
 [4.6 3.1 1.5 0.2]
 [5.  3.6 1.4 0.2]
 [5.4 3.9 1.7 0.4]
 [4.6 3.4 1.4 0.3]
 [5.  3.4 1.5 0.2]
 [4.4 2.9 1.4 0.2]
 [4.9 3.1 1.5 0.1]
 [5.4 3.7 1.5 0.2]
 [4.8 3.4 1.6 0.2]
 [4.8 3.  1.4 0.1]
 [4.3 3.  1.1 0.1]
 [5.8 4.  1.2 0.2]
 [5.7 4.4 1.5 0.4]
 [5.4 3.9 1.3 0.4]
 [5.1 3.5 1.4 0.3]
 [5.7 3.8 1.7 0.3]
 [5.1 3.8 1.5 0.3]
 [5.4 3.4 1.7 0.2]
 [5.1 3.7 1.5 0.4]
 [4.6 3.6 1.  0.2]
 [5.1 3.3 1.7 0.5]
 [4.8 3.4 1.9 0.2]
 [5.  3.  1.6 0.2]
 [5.  3.4 1.6 0.4]
 [5.2 3.5 1.5 0.2]
 [5.2 3.4 1.4 0.2]
 [4.7 3.2 1.6 0.2]
 [4.8 3.1 1.6 0.2]
 [5.4 3.4 1.5 0.4]
 [5.2 4.1 1.5 0.1]
 [5.5 4.2 1.4 0.2]
 [4.9 3.1 1.5 0.1]
 [5.  3.2 1.2 0.2]
 [5.5 3.5 1.3 0.2]
 [4.9 3.1 1.5 0.1]
 [4.4 3.  1.3 0.2]
 [5.1 3.4 1.5 0.2]
 [5.  3.5 1.3 0.3]
 [4.5 2.3 1.3 0.3]
 [4.4 3.2 1.3 0.2]
 [5.  3.5 1.6 0.6]
 [5.1 3.8 1.9 0.4]
 [4.8 3.  1.4 0.3]
 [5.1 3.8 1.6 0.2]
 [4.6 3.2 1.4 0.2]
 [5.3 3.7 1.5 0.2]
 [5.  3.3 1.4 0.2]
 [7.  3.2 4.7 1.4]
 [6.4 3.2 4.5 1.

In [17]:
dataset = pd.read_csv('iris.dat')

# Get features
X = dataset[['SepalLengthCm',	'SepalWidthCm',	'PetalLengthCm',	'PetalWidthCm']]

# Get target class
y = dataset[['Species']]

print('>> X:')
print(X.describe())
print()
print('>> Y:')
print(y.describe())

>> X:
       SepalLengthCm  SepalWidthCm  PetalLengthCm  PetalWidthCm
count     150.000000    150.000000     150.000000    150.000000
mean        5.843333      3.054000       3.758667      1.198667
std         0.828066      0.433594       1.764420      0.763161
min         4.300000      2.000000       1.000000      0.100000
25%         5.100000      2.800000       1.600000      0.300000
50%         5.800000      3.000000       4.350000      1.300000
75%         6.400000      3.300000       5.100000      1.800000
max         7.900000      4.400000       6.900000      2.500000

>> Y:
            Species
count           150
unique            3
top     Iris-setosa
freq             50


#### Preprocess Dataset

In [18]:
def one_hot_encode(y):
  '''
  One hot encode y
  '''

  # Initialize a array of zeros
  one_hot_y = np.zeros(shape=(len(y), len(np.unique(y))))

  # Iterate over values and set class as 1
  for i, y_value in enumerate(y):
    one_hot_y[i, y_value] = 1

  return one_hot_y

def one_hot_decode(y):
  '''
  One hot decode y
  '''

  return np.argmax(y, axis=1)

def label_encode(y):
  '''
  Label encode y
  '''

  # Initialize encoder
  encoder = LabelEncoder()

  # Encoding
  encoded_y = encoder.fit_transform(y)

  return encoded_y, encoder

def label_decode(y, encoder):
  '''
  Label decode y
  '''

  return encoder.inverse_transform(y)

##### Label encoder

In [19]:
y_encoded, encoder = label_encode(y.values.reshape(-1))

##### One hot / Data split

In [20]:
# One hot encode y
one_hot_y = one_hot_encode(y_encoded)

# Divide train/test
test_size = 0.2
X_train, X_test, Y_train, Y_test = train_test_split(X.values, one_hot_y, test_size = test_size, random_state=SEED)
X_train.shape, X_test.shape, Y_train.shape, Y_test.shape

((120, 4), (30, 4), (120, 3), (30, 3))

##### Scale data

In [21]:
# Initialize scaler
scaler = StandardScaler()

# Scale data
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Features
IRIS_NUM_FEATURES = X_train_scaled.shape[1]

X_train_scaled.shape, X_test_scaled.shape

((120, 4), (30, 4))

#### Build Models

In [22]:
def build_model(kind = 'mlp', kernel=None):
  '''
  Return untrained model for Iris Problem
  '''

  # Set random seed
  np.random.seed(SEED)

  # Assert kind is 'sigmoid' or 'tanh'
  assert kind in ['mlp', 'rbf'], "kind must be 'mlp' or 'rbf'"

  if kind == 'mlp':
    
    # MLP model
    layer1 = PerceptronLayer(input_size=IRIS_NUM_FEATURES, units=2, init_rule='rand', activation=Swish())
    layer2 = PerceptronLayer(input_size=2, units=3, init_rule='rand', activation=Sigmoid())

    # Model
    model = MultiLayerPerceptron(layers = [layer1, layer2], loss_func=MSE(), metric=MultiClassAccuracy())

  elif kind == 'rbf':

    assert kernel is not None

    # RBG model
    model = RBFModel(input_size=4, output_size=3, kernel=kernel, loss_func=MSE(), metric=MultiClassAccuracy(), init_rule='zero')
  
  # Return model
  return model

In [23]:
# MLP #
model_mlp = build_model(kind='mlp')

# RBF #
# Get centroids with kmeans
centroids = get_centroids(X_train_scaled, n_centroids=4)

# Initialize kernel function
kernel = KernelGaussian(centroids)

# get model
model_rbf = build_model(kind='rbf', kernel=kernel)

#### Train

##### MLP

In [24]:
# Model accuracy before train:

print('>> Accuracy (before on test set)')
model_mlp.evaluate(X_test_scaled, Y_test)

>> Accuracy (before on test set)


0.36666666666666664

In [25]:
hist_mlp = model_mlp.fit(X_train_scaled, Y_train, max_epochs=500, batch_size=1, learning_rate=0.007, momentum_rate=0.03)

  0%|          | 0/500 [00:00<?, ?it/s]

In [26]:
# Model accuracy after train:

print('>> Accuracy (after train on test set)')
model_mlp.evaluate(X_test_scaled, Y_test)

>> Accuracy (after train on test set)


1.0

##### RBF

In [27]:
# Model accuracy before train:

print('>> Accuracy (before on test set)')
model_rbf.evaluate(X_test_scaled, Y_test)

>> Accuracy (before on test set)


0.3333333333333333

In [28]:
hist_rbf = model_rbf.fit(X_train_scaled, Y_train, max_epochs=1000, batch_size=1, learning_rate=0.003)

  0%|          | 0/1000 [00:00<?, ?it/s]

In [29]:
# Model accuracy after train:

print('>> Accuracy (after train on test set)')
model_rbf.evaluate(X_test_scaled, Y_test)

>> Accuracy (after train on test set)


0.8666666666666667

#### Training Visualization

In [30]:
fig = px.line(y=hist_mlp['score'])
fig.update_layout(
    title = 'Iris | Accuracy vs Epochs - MLP',
    xaxis_title = 'Epoch',
    yaxis_title = 'Accuracy'
)

In [31]:
fig = px.line(y=hist_rbf['score'])
fig.update_layout(
    title = 'Iris | Accuracy vs Epochs - RBF',
    xaxis_title = 'Epoch',
    yaxis_title = 'Accuracy'
)

Conclusão: A rede MLP (1 camada, com 2 unidades) teve uma acurácia de 100% no conjunto de testes, enquanto a rede RBG com kernel Gaussiano teve acurácia de 86%.