<a href="https://colab.research.google.com/github/tintenderete/TFM-Algoritmo-con-redes-evolutivas/blob/main/revision_evolutivo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import pandas as pd

import keras
import tensorflow as tf

from tensorflow.keras.layers import ConvLSTM1D, Dense, Conv2D, LSTM, GRU, Reshape 
from tensorflow.keras.layers import ConvLSTM2D, Conv3D, TimeDistributed, Flatten


In [2]:


# Parámetros
num_dias = 100
num_periodos = 60
num_activos = 5
num_canales = 2

# Crear datos aleatorios para X
X = np.random.uniform(low=-1, high=1, size=(num_dias, num_periodos, num_activos, num_canales))

# Aplicar las transformaciones a los activos 1 y 2
X[:, :, 0, 0] *= 2  # Multiplicar los precios del canal 1 del activo 1 por 2
X[:, :, 1, 0] /= 2  # Dividir los precios del canal 1 del activo 2 entre 2

# Crear la etiqueta Y como el cumsum del canal 1
Y = np.sum(X[:, :, :, 0], axis=1)


In [3]:
X.shape, Y.shape

((100, 60, 5, 2), (100, 5))

# Funcion listas

In [4]:
def get_ranking(Y):
  Y_ranking = np.zeros_like(Y, dtype=int)

  for i in range(Y.shape[0]):
      Y_ranking[i] = np.argsort(np.argsort(-Y[i]))

  return Y_ranking

In [5]:
Y[0:4],get_ranking(Y[0:4])

(array([[-13.71948768,  -0.83978485,   1.7708815 ,   2.38898778,
          -1.42213321],
        [-18.18264129,  -1.53352565,   5.00503963,  -1.24712711,
          -3.71935263],
        [  3.98959612,   4.56187636,   0.69985654,  -6.81714206,
          -3.17337995],
        [ 11.75903529,  -2.74608801,  -3.64006405,  -0.88683618,
           8.7359875 ]]),
 array([[4, 2, 1, 0, 3],
        [4, 2, 0, 1, 3],
        [1, 0, 2, 4, 3],
        [0, 3, 4, 2, 1]]))

# GeneticAlgorithm

In [335]:
class GeneticAlgorithm:
    def __init__(self, 
                 initial_population, 
                 fitness_function, 
                 selection_function, 
                 crossover_function, 
                 mutation_function, 
                 immigration_function,  
                 n_generations,
                 n_save_best_population):
        self.population = initial_population
        self.fitness_function = fitness_function
        self.selection_function = selection_function
        self.crossover_function = crossover_function
        self.mutation_function = mutation_function
        self.immigration_function = immigration_function 
        self.n_generations = n_generations
        self.n_save_best_population = n_save_best_population

    def run(self):
        for i in range(self.n_generations):
            fitness = self.fitness_function(self.population)
            best_population = self._get_best_population(self.population,  fitness)
            parents = self.selection_function(self.population, fitness)
            offspring = self.crossover_function(parents)
            offspring = self.mutation_function(offspring)
            self.population = np.concatenate((parents, offspring))

            immigration = self.immigration_function(self.population)  
            self.population = np.concatenate((self.population, immigration))

            if(len(best_population)>0):
              self.population = np.concatenate((self.population, best_population))

        # Devuelve la mejor solución encontrada
        fitness = self.fitness_function(self.population)
        best_idx = np.argmax(fitness)
        return self.population[best_idx]

    def _get_best_population(self,population,  fitness):
      idx_best_pop = np.argsort(fitness)[::1][:self.n_save_best_population]

      return np.array(population)[idx_best_pop]


In [339]:
pop = [[1,1],[2,2],[10,10],[11,11],[20,20],[30,30],[40,40] ]
fitness = np.array([1,2,10,11, 20,0,40])

In [340]:
geneticAlgorithm = GeneticAlgorithm(1,2,3,4,5,6,7,save_best_population_num = 3)

In [341]:
geneticAlgorithm._get_best_population(pop,fitness)

array([[30, 30],
       [ 1,  1],
       [ 2,  2]])

# Population_factory

In [7]:
class Population_factory():
  def __init__(self, layers_length, activations_length, 
               num_individuos, gen_length, num_genes, 
               min_range_units, max_range_units, min_range_kernel, max_range_kernel  ):
    self.layers_length = layers_length
    self.activations_length = activations_length

    self.num_individuos = num_individuos
    self.num_genes = num_genes
    self.gen_length = gen_length
    self.size_chromosome = self.gen_length * self.num_genes

    self.min_range_units = min_range_units
    self.max_range_units = max_range_units
    self.min_range_kernel = min_range_kernel
    self.max_range_kernel = max_range_kernel
    
  def run(self):
    layers_random = np.random.randint(-1, self.layers_length, (self.num_individuos, self.num_genes))
    activations_random = np.random.randint(0, self.activations_length , (self.num_individuos, self.num_genes))
    units_random = np.random.randint(self.min_range_units, self.max_range_units, (self.num_individuos, self.num_genes))
    kernels_size_random = np.random.randint(self.min_range_kernel, self.max_range_kernel, (self.num_individuos, self.num_genes))

    individuos = np.zeros((self.num_individuos, self.size_chromosome), dtype=int)
    individuos[:,::self.gen_length ] = layers_random
    individuos[:,1::self.gen_length] = activations_random
    individuos[:,2::self.gen_length] = units_random
    individuos[:,3::self.gen_length] = kernels_size_random

    return individuos

    

In [8]:
population_factory = Population_factory(
                layers_length = 10, activations_length = 3, 
                num_individuos = 10, gen_length = 4, num_genes = 4, 
                min_range_units = 1, max_range_units = 100, min_range_kernel = 2, max_range_kernel = 4 )

pop = population_factory.run()
print(pop.shape)
print(pop)

(10, 16)
[[ 6  1 73  3  7  1 87  2  2  1 30  3  1  2 45  2]
 [ 2  0 80  3 -1  0 98  3  0  2 90  3  5  2 81  2]
 [ 7  0  4  3  8  0 38  2  4  0 45  2  4  0 30  2]
 [ 3  1  6  2  3  2 45  2  3  1 20  2  8  2  4  2]
 [ 4  2 97  3  2  1 17  2  3  0 21  2  3  2 34  2]
 [ 5  0 46  3  6  2 28  3  5  0 25  2  4  1 48  2]
 [ 6  0 20  3  6  2 59  2  0  0 26  3  5  0 29  2]
 [ 3  0 40  3  3  2 38  2  5  0 67  3  7  0 87  2]
 [ 1  1 36  2  6  1 82  2  3  1 37  2  2  1 81  2]
 [ 4  2 16  2  7  0 47  2  5  0 57  2  1  1 69  2]]


# Models factory

In [359]:
class Models_factory():
  def __init__(self, layers = [], layers_dims = [], activations = [], gen_length = 0, funcion_fitness = 'mse', learning_rate = 0.1, activation_output = 'relu' ):
    
    if layers == []:
      self.layers, self.layers_dims = self.default_layers()
    else:
      self.layers = layers
      self.layers_dims = layers_dims

    if activations == []:
      self.activations = self.default_activations()
    else:
      self.activations = activations

    self.gen_length = gen_length
    self.funcion_fitness = funcion_fitness 
    self.learning_rate = learning_rate 
    self.activation_output = activation_output

  def default_layers(self):
    layers = []
    layers_dims = []

    layers.append( lambda info : Dense(  activation=self.activations[info[0]],   units=info[1] ))
    layers_dims.append(-1)

    layers.append( lambda info : LSTM(                                           units=info[1] ))
    layers_dims.append(3)

    layers.append( lambda info : GRU(                                            units=info[1] ))
    layers_dims.append(3)

    return layers, layers_dims

  def default_activations(self):
    activations = []

    activations.append('relu')
    activations.append('sigmoid')

    return activations

  
  def increase_dimensions(self, model):
    return tuple( tf.expand_dims(model, axis=-1).shape[1:].as_list() )

  def reduce_dimensions(self, shape):
    d = list(shape[1:])
    new_shape = d[:-2] + [d[-2] * d[-1]]
    return tuple(new_shape)

  def run(self, individuos):
    models = []

    for ind in individuos:
      
      # Creamos un nuevo modelo para nuestro individuo
      inputs = keras.Input(shape=(X.shape[1:]))
      m = inputs
      ############################################################################

      for i in range(0, len(ind), self.gen_length ):

        # Guardamos el gen entero
        gen = ind[i:i+self.gen_length ]
        
        ############################################################################

        if gen[0] != -1:
          
          # Creamos el nuevo layer
          tipo_layer = gen[0]
          info = gen[1:]
          new_layer = self.layers[tipo_layer](info)
          dim_layer = self.layers_dims[tipo_layer]
          ############################################################################

          # Observamos si el output actual del modelo es compatible con la nueva layer #
          current_m_dim = len(m.shape) 
          new_layer_dim = current_m_dim if dim_layer == -1 else dim_layer 
          diferencia = new_layer_dim - current_m_dim
          ############################################################################

          # Hacemos los reshapes necesarios para hacer compatible las dos layers #
          while diferencia != 0:
            
            if diferencia > 0:
              new_d = self.increase_dimensions(m)
              m = Reshape(new_d)(m)
            elif diferencia < 0:
              new_d = self.reduce_dimensions(m.shape)
              m = Reshape(new_d)(m)
            # recalculamos el output dentro del while para iterar todas las veces necesarias
            current_m_dim = len(m.shape) 
            new_layer_dim = current_m_dim if dim_layer == -1 else dim_layer 
            diferencia = new_layer_dim - current_m_dim
          ##############################################################################

          # Añadimos el nuevo layer al modelo #
          m = new_layer(m)
          ##############################################################################
        else:
          break

      # Una vez recorridos todos los genes compilamos el modelo #
      m = Flatten()(m)
      outputs = Dense(Y.shape[1], activation = self.activation_output)(m)
      model = keras.Model(inputs=inputs, outputs=outputs)
      model.compile(
        loss=self.funcion_fitness,
        optimizer=keras.optimizers.Adam(learning_rate=self.learning_rate),
        metrics=["accuracy"],
      )

      models.append(model)
      ##############################################################################

    return models

In [10]:
models_factory = Models_factory(gen_length = 4)

In [11]:
population_factory = Population_factory(
                layers_length = len(models_factory.layers), activations_length = len(models_factory.activations), 
                num_individuos = 10, gen_length = 4, num_genes = 4, 
                min_range_units = 1, max_range_units = 100, min_range_kernel = 2, max_range_kernel = 4 )

pop = population_factory.run()

In [12]:
pop

array([[ 2,  0, 50,  2,  1,  0, 52,  2,  0,  0, 49,  2,  1,  1, 41,  2],
       [ 0,  0, 79,  2,  1,  1, 62,  3,  2,  1, 43,  2, -1,  1, 45,  2],
       [ 0,  1, 54,  3, -1,  0, 58,  2,  2,  1, 94,  2,  1,  1, 79,  2],
       [ 1,  0, 44,  3, -1,  0, 69,  2,  0,  1, 14,  3, -1,  1, 77,  3],
       [ 0,  1, 97,  3,  2,  1, 54,  2,  1,  0, 17,  3,  1,  1, 91,  2],
       [ 1,  0, 62,  2, -1,  1, 99,  2,  1,  0, 55,  3, -1,  0, 19,  2],
       [ 0,  0,  7,  2,  1,  1,  9,  2,  1,  0, 76,  2,  1,  0, 84,  3],
       [ 2,  1,  5,  3,  1,  1, 36,  2,  2,  1, 82,  3,  0,  0, 87,  3],
       [-1,  1, 69,  3,  1,  1, 72,  2,  2,  0, 69,  2,  1,  0, 54,  2],
       [ 1,  0, 46,  3,  2,  1, 61,  2,  2,  1, 23,  3, -1,  0, 78,  3]])

In [13]:
pop_models = models_factory.run(individuos = pop)

In [14]:
for m in pop_models:
  m.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 60, 5, 2)]        0         
                                                                 
 reshape (Reshape)           (None, 60, 10)            0         
                                                                 
 gru (GRU)                   (None, 50)                9300      
                                                                 
 reshape_1 (Reshape)         (None, 50, 1)             0         
                                                                 
 lstm (LSTM)                 (None, 52)                11232     
                                                                 
 dense (Dense)               (None, 49)                2597      
                                                                 
 reshape_2 (Reshape)         (None, 49, 1)             0     

# Fitness_function

In [15]:
class Fitness_keras():
  def __init__(self, models_factory_run,   X_train = [] , Y_train = [] , batch_size = 1, verbose = 0, epochs = 1, callbacks = [] ):
    self.callbacks = callbacks
    self.X_train = X_train
    self.Y_train = Y_train
    self.batch_size = batch_size
    self.verbose = verbose 
    self.epochs = epochs
    self.models_factory_run = models_factory_run

  def run(self, population):

    population_models = self.models_factory_run(individuos = population)

    fitness = []

    for m in population_models:
     
      h = self.keras_train(m)

      fitness.append(h.history["loss"][-1])

    return fitness

  def keras_train(self,m):

    h = m.fit(self.X_train, self.Y_train, 
                epochs=self.epochs, 
                batch_size = self.batch_size , 
                callbacks=self.callbacks, 
                verbose=self.verbose)
    return h

    

In [43]:
pop = [[1,1,1,1,1,1,1,1],[2,2,2,2,2,2,2,2]]

fitness_keras = Fitness_keras(models_factory_run = models_factory.run, X_train = X , Y_train = Y, epochs = 3 )

In [44]:
fitness = fitness_keras.run(pop)

In [45]:
print(fitness)

[25.533761978149414, 25.5397891998291]


In [46]:
m = models_factory.run(pop)

In [47]:
h = fitness_keras.keras_train(m[0])

In [48]:
h.history['loss']

[25.546894073486328, 25.543712615966797, 25.54129409790039]

In [49]:
h.history["loss"][-1]

25.54129409790039

# Selection_function

# work : ahora mismo no permito que se repitan las parejas, pero puede que sea una mala idea. Ya que gracias a los cruces se generaaran un mayor numero de nuevos modelos con buenas caracteristicas  

In [342]:
class Selection_function():
  def __init__(self, number_of_couples = 1, tournament_size = 3):
    self.number_of_couples = number_of_couples
    self.num_parents = 2
    self.tournament_size = tournament_size
    

  def select_mating_pool(self, population, fitness):
    all_parents = {}
    for i in range(self.number_of_couples):
      parents = self._select_mating_pool(pop, fitness, self.num_parents)

      if parents[0] != parents[1]:
        all_parents[(parents[0],parents[1])] = [ pop[parents[0]], pop[parents[1]]]
  
    return list(all_parents.values())

  def tournament_selection(self, population, fitness):
    all_parents = {}
    for i in range(self.number_of_couples):
      parents = self._tournament_selection(pop, fitness, self.num_parents, self.tournament_size)
      if parents[0] != parents[1]:
        all_parents[(parents[0],parents[1])] = [ pop[parents[0]], pop[parents[1]]]
  
    return list(all_parents.values())

  def _select_mating_pool(self, population, fitness, num_parents):
    """
    population: lista con todos los individuos.
    fitness: lista con los valores de fitness de cada individuo.
    num_parents: número de padres que queremos seleccionar.
    """

    # Invierte los valores de fitness
    fitness = 1.0 / fitness

    # Normalizamos los valores de fitness para que sumen 1
    fitness = fitness / np.sum(fitness)
    
    # Seleccionamos los padres usando la función np.random.choice con los valores de fitness como pesos
    idx = np.random.choice(np.arange(len(population)), size=self.num_parents, replace=False, p=fitness)

    # Devolvemos los individuos seleccionados
    return idx#[population[i] for i in idx]


  def _tournament_selection(self,population, fitness, num_parents, tournament_size):
    # Array para almacenar los índices de los padres seleccionados
    parents = np.empty(num_parents, dtype = int)

    for i in range(self.num_parents):
        # Seleccionamos los índices de 'tournament_size' individuos aleatoriamente
        tournament_indices = np.random.randint(0, len(population), size=tournament_size)
        
        # Seleccionamos el individuo con el mínimo fitness de entre los seleccionados
        best_individual_idx = np.argmin(fitness[tournament_indices])
        
        # Guardamos el índice del ganador del torneo
        parents[i] = tournament_indices[best_individual_idx]

    return parents

In [343]:
selection = Selection_function(number_of_couples = 10)

In [344]:
pop = [[1,1],[2,2],[10,10],[11,11],[20,20],[30,30],[40,40] ]
fitness = np.array([1,2,10,11, 20,30,40])
num_parents = 2
tournament_size = 3

In [357]:
print(selection.select_mating_pool(pop, fitness))

[[[1, 1], [20, 20]], [[2, 2], [1, 1]], [[1, 1], [11, 11]], [[1, 1], [30, 30]], [[2, 2], [10, 10]], [[2, 2], [11, 11]]]


In [358]:
selection = Selection_function(number_of_couples = 10)
print(selection.tournament_selection(pop, fitness))

[[[1, 1], [11, 11]], [[2, 2], [20, 20]], [[2, 2], [10, 10]], [[1, 1], [2, 2]], [[11, 11], [2, 2]], [[10, 10], [20, 20]], [[30, 30], [11, 11]], [[1, 1], [10, 10]]]


# Crossover_function