<a href="https://colab.research.google.com/github/leticiacnavarro/neural_network/blob/main/neural_evolution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd


PSO

In [None]:
class Particle:
    def __init__(self, no_dim, x_range, v_range):

        self.x = np.random.uniform(
            x_range[0], x_range[1], (no_dim,)
        )  # particle position in each dimension...
        self.v = np.random.uniform(
            v_range[0], v_range[1], (no_dim,)
        )  # particle velocity in each dimension...
        self.pbest = np.inf
        self.pbestpos = np.zeros((no_dim,))
class Swarm:
    def __init__(self, no_particle, no_dim, x_range, v_range, iw_range, c):
        self.p = np.array(
            [Particle(no_dim, x_range, v_range) for i in range(no_particle)]
        )
        self.gbest = np.inf
        self.gbestpos = np.zeros((no_dim,))
        self.x_range = x_range
        self.v_range = v_range
        self.iw_range = iw_range
        self.c0 = c[0]
        self.c1 = c[1]
        self.no_dim = no_dim

    def optimize(self, function, X, Y, print_step, iter):
        for i in range(iter):
            for particle in self.p:
                fitness = function(X, Y, particle.x)

                if fitness < particle.pbest:
                    particle.pbest = fitness
                    particle.pbestpos = particle.x.copy()

                if fitness < self.gbest:
                    self.gbest = fitness
                    self.gbestpos = particle.x.copy()

            for particle in self.p:
                # Here iw is inertia weight...
                iw = np.random.uniform(self.iw_range[0], self.iw_range[1], 1)[0]
                particle.v = (
                    iw * particle.v
                    + (
                        self.c0
                        * np.random.uniform(0.0, 0.5, (self.no_dim,))
                        * (particle.pbestpos - particle.x)
                    )
                    + (
                        self.c1
                        * np.random.uniform(0.0,0.5, (self.no_dim,))
                        * (self.gbestpos - particle.x)
                    )
                )
                # particle.v = particle.v.clip(min=self.v_range[0], max=self.v_range[1])
                particle.x = particle.x + particle.v
                # particle.x = particle.x.clip(min=self.x_range[0], max=self.x_range[1])

            if i % print_step == 0:
                print("iteration#: ", i + 1, " loss: ", fitness)

        print("global best loss: ", self.gbest)

    def get_best_solution(self):
        return self.gbestpos


ES

In [None]:
from __future__ import print_function
import numpy as np
import multiprocessing as mp

np.random.seed(0)


def worker_process(arg):
    get_reward_func, weights = arg
    return get_reward_func(np.array(weights))


class EvolutionStrategy(object):
    def __init__(self, weights, x_values, y_values, get_reward_func, population_size=50, sigma=0.1, learning_rate=0.03, decay=0.999,
                 num_threads=1):

        self.weights = weights
        self.get_reward = get_reward_func
        self.POPULATION_SIZE = population_size
        self.SIGMA = sigma
        self.learning_rate = learning_rate
        self.decay = decay
        self.num_threads = mp.cpu_count() if num_threads == -1 else num_threads
        self.x = x_values
        self.y = y_values

    def _get_weights_try(self, w, p):
        weights_try = []
        for index, i in enumerate(p):
            jittered = self.SIGMA * i
            weights_try.append(w[index] + jittered)
        return weights_try

    def get_weights(self):
        return self.weights

    def _get_population(self):
        population = []
        for i in range(self.POPULATION_SIZE):
            x = []
            for w in self.weights:
                x.append(np.random.randn(*w.shape))
            population.append(x)
        return population

    def _get_rewards(self, pool, population):
        if pool is not None:
            worker_args = ((self.get_reward, self._get_weights_try(self.weights, p)) for p in population)
            rewards = pool.map(worker_process, worker_args)

        else:
            rewards = []
            for p in population:
                weights_try = self._get_weights_try(self.weights, p)
                rewards.append(self.get_reward(self.x, self.y, np.array(weights_try)) * -1)
        rewards = np.array(rewards)
        return rewards

    def _update_weights(self, rewards, population):
        std = rewards.std()
        if std == 0:
            return
        rewards = (rewards - rewards.mean()) / std
        for index, w in enumerate(self.weights):
            layer_population = np.array([p[index] for p in population])
            update_factor = self.learning_rate / (self.POPULATION_SIZE * self.SIGMA)
            self.weights[index] = w + update_factor * np.dot(layer_population.T, rewards).T
        self.learning_rate *= self.decay

    def run(self, iterations, print_step=10):
        pool = mp.Pool(self.num_threads) if self.num_threads > 1 else None
        for iteration in range(iterations):

            population = self._get_population()
            rewards = self._get_rewards(pool, population)

            self._update_weights(rewards, population)

            if (iteration + 1) % print_step == 0:
                print('iter %d. reward: %f' % (iteration + 1, (self.get_reward(self.x, self.y, self.weights)) * -1 ))
        if pool is not None:
            pool.close()
            pool.join()

Neural Network

In [None]:
class Neural_Network:
    def __init__(self, input, hidden, output):
        self.input_nodes = input
        self.hidden_nodes = hidden
        self.output_nodes = output
    def softmax(self, logits):
        exps = np.exp(logits)
        return exps / np.sum(exps, axis=1, keepdims=True)

    def sigmoid(self, logits):     
        #np.clip( logits, -500, 500 )
        exps = 1/(1+np.exp(-logits))
        return exps

    def Negative_Likelihood(self, probs, Y):
        probs = probs/ np.sum(probs, axis=1, keepdims=True)
        num_samples = len(probs)
        corect_logprobs = -np.log(probs[range(num_samples), Y])
        return np.sum(corect_logprobs) / num_samples
        
    def weights(self, W):
        #print(W)
        w1 = W[0 : self.input_nodes * self.hidden_nodes].reshape((self.input_nodes, self.hidden_nodes))
        b1 = W[
            self.input_nodes * self.hidden_nodes : (self.input_nodes * self.hidden_nodes) + self.hidden_nodes
        ].reshape((self.hidden_nodes,))
        w2 = W[
            (self.input_nodes * self.hidden_nodes)
            + self.hidden_nodes : (self.input_nodes * self.hidden_nodes)
            + self.hidden_nodes
            + (self.hidden_nodes * self.output_nodes)
        ].reshape((self.hidden_nodes, self.output_nodes))
        b2 = W[
            (self.input_nodes * self.hidden_nodes)
            + self.hidden_nodes
            + (self.hidden_nodes * self.output_nodes) : (self.input_nodes * self.hidden_nodes)
            + self.hidden_nodes
            + (self.hidden_nodes * self.output_nodes)
            + self.output_nodes
        ].reshape((self.output_nodes,))

        return w1, b1, w2, b2  

    def forward_pass(self, X, Y, W):

        if isinstance(W, Particle):
            W = W.x

        w1, b1, w2, b2 = self.weights(W)

        z1 = np.dot(X, w1) + b1
        a1 = np.tanh(z1)
        z2 = np.dot(a1, w2) + b2
        logits = z2

        probs = self.sigmoid(np.array(logits, dtype=np.float128))
        negative = self.Negative_Likelihood(probs, Y)
        return negative
      
    def predict(self, X, W):
      # print(W)
        w1, b1, w2, b2 = self.weights(W)

        z1 = np.dot(X, w1) + b1
        a1 = np.tanh(z1)
        z2 = np.dot(a1, w2) + b2
        logits = z2

        probs = self.sigmoid(logits)
        Y_pred = np.argmax(probs, axis=1)
        return Y_pred

    def get_accuracy(self, Y, Y_pred):
        return (Y == Y_pred).mean()

    def get_dim(self):
        dim = ((self.input_nodes * self.hidden_nodes) + self.hidden_nodes + (self.hidden_nodes * self.output_nodes) + self.output_nodes)
        return dim


Executando Neural - PSO - Iris

In [None]:
def pso_iris():
    # load iris dataset..
    data = load_iris()

    # Store input & target in X and Y..
    X = data.data
    y = data.target

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    neural = Neural_Network(4, 20, 3)

    no_solution = 20

    w_range = (0.0, 1.0)
    lr_range = (0.0, 1.0)
    iw_range = (0.9, 0.9)  # iw -> inertial weight...
    c = (1.5, 0.5)  # c[0] -> cognitive factor, c[1] -> social factor...

    s = Swarm(no_solution, neural.get_dim(), w_range, lr_range, iw_range, c)
    s.optimize(neural.forward_pass, X_train, y_train, 100, 1000)
    W = s.get_best_solution()
    Y_pred = neural.predict(X_test, W)
    accuracy = neural.get_accuracy(y_test, Y_pred)
    print("Accuracy: %.3f" % accuracy)


In [None]:
def es_iris():
    # load iris dataset..
    data = load_iris()

    # Store input & target in X and Y..
    X = data.data
    y = data.target

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    neural = Neural_Network(4, 20, 3)

    w_range = (0.0, 1.0)
    lr_range = (0.0, 1.0)
    iw_range = (0.9, 0.9)
    weights2 = np.random.uniform(
        w_range[0], w_range[1], (neural.get_dim(),)
    )
    es = EvolutionStrategy(weights2, X_train, y_train, neural.forward_pass, population_size=50, sigma=0.4, learning_rate=0.6, decay=0.995, num_threads=1)
    es.run(5000, print_step=500)
    W = es.get_weights()
    Y_pred = neural.predict(X_test, W)
    accuracy = neural.get_accuracy(y_test, Y_pred)
    print("Accuracy: %.3f" % accuracy)


In [None]:
es_iris()

iter 500. reward: -0.276334
iter 1000. reward: -0.276896
iter 1500. reward: -0.277652
iter 2000. reward: -0.277011
iter 2500. reward: -0.276973
iter 3000. reward: -0.276972
iter 3500. reward: -0.276972
iter 4000. reward: -0.276972
iter 4500. reward: -0.276972
iter 5000. reward: -0.276972
Accuracy: 0.633


In [None]:
!wget http://archive.ics.uci.edu/ml/machine-learning-databases/waveform/waveform.data.Z
!7z e /content/waveform.data.Z

--2021-05-13 00:30:29--  http://archive.ics.uci.edu/ml/machine-learning-databases/waveform/waveform.data.Z
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 176908 (173K) [application/x-httpd-php]
Saving to: ‘waveform.data.Z’


2021-05-13 00:30:29 (797 KB/s) - ‘waveform.data.Z’ saved [176908/176908]


7-Zip [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21
p7zip Version 16.02 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,64 bits,2 CPUs Intel(R) Xeon(R) CPU @ 2.30GHz (306F0),ASM,AES-NI)

Scanning the drive for archives:
  0M Scan /content/                   1 file, 176908 bytes (173 KiB)

Extracting archive: /content/waveform.data.Z
--
Path = /content/waveform.data.Z
Type = Z

  0%    Everything is Ok

Size:       555497
Compressed: 176908


In [None]:
def es_waveform():
    waveform = pd.read_csv('/content/waveform.data',  header=None)
    waveform.head()

    previsores = waveform.iloc[:, 1:21].values
    classe = waveform.iloc[:, 21].values

    X_train, X_test, y_train, y_test = train_test_split(previsores, classe, test_size=0.2, random_state=42)
    neural = Neural_Network(20, 20, 3)
    no_solution = 20

    w_range = (0.0, 1.0)
    lr_range = (0.0, 1.0)
    iw_range = (0.9, 0.9)
    weights2 = np.random.uniform(
        w_range[0], w_range[1], (neural.get_dim(),)
    )
    es = EvolutionStrategy(weights2, X_train, y_train, neural.forward_pass, population_size=50, sigma=0.4, learning_rate=0.6, decay=0.995, num_threads=1)
    es.run(5000, print_step=500)
    W = es.get_weights()
    Y_pred = neural.predict(X_test, W)
    accuracy = neural.get_accuracy(y_test, Y_pred)
    print("Accuracy: %.3f" % accuracy)

es_waveform()

iter 500. reward: -0.309706
iter 1000. reward: -0.305107
iter 1500. reward: -0.304667
iter 2000. reward: -0.304654
iter 2500. reward: -0.304653
iter 3000. reward: -0.304653
iter 3500. reward: -0.304653
iter 4000. reward: -0.304653
iter 4500. reward: -0.304653
iter 5000. reward: -0.304653
Accuracy: 0.836
