## AE3

In [1]:
import pandas as pd
import numpy as np

In [169]:
# struktura sieci neuronowej

class NeuralNetwork:
    """
    Implementation of multi layer perceptron

    Attributes
    -----------
    model_type : str
        model type, regressor or classifier
    layers : List
        list of layer sizes
    num_layers : int
        number of layers
    init_function : func
        
    """
    
    def __init__(self, layers, X, y, initalization='xavier', model_type='regression', weights=None, biases=None, activations=None):
        """
        activations - list of available functions: 'sigmoid', 'linear', 'tanh', 'relu', 'softmax' ('softmax' can be used only on the last layer)
        initialization - available types: 'xavier', 'he', 'uniform'
        model_type - available types: 'regression', 'classification'
        """        
        self.layers = layers
        self.num_layers = len(layers)
        if model_type == 'regression':
            X = X.to_numpy().reshape(-1, 1)
            y = y.to_numpy().reshape(-1, 1)
        self.X = X
        self.y = y
        
        initialization_functions = {
            'xavier': self.xavier_init,
            'he': self.he_init,
            'uniform': self.uniform_init
        }
        self.init_function = initialization_functions.get(initalization)
        
        assert model_type in ['regression', 'classification']
        self.model_type = model_type
        
        if weights is None:
            self.weights = [self.init_function(layers[i-1], layers[i]) for i in range(1, self.num_layers)]
        else:
            self.weights = weights
        
        if biases is None:
            self.biases = [self.init_function(layers[i]) for i in range(1, self.num_layers)]
        else:
            self.biases = biases
        
        if activations is None:
            if self.model_type == 'regression':
                self.activations = ['sigmoid' for i in range(1, self.num_layers - 1)] + ['linear']
            elif self.model_type == 'classification':
                self.activations = ['sigmoid' for i in range(1, self.num_layers - 1)] + ['softmax']
        else:
            self.activations = activations
        
        activation_functions = {
            'sigmoid': self._sigmoid,
            'linear': self._linear,
            'softmax': self._softmax,
            'tanh': self._tanh,
            'relu': self._relu
        }
        self.activation_funcs = list(map(lambda x: activation_functions.get(x), self.activations))
    
    def _sigmoid(self, z):
        return 1 / (1 + np.exp(-z))

    def _linear(self, z):
        return z
    
    def _tanh(self, z):
        return (np.exp(z) - np.exp(-z)) / (np.exp(z) + np.exp(-z))
    
    def _relu(self, z):
        return np.maximum(z, 0)
    
    def _softmax(self, z):
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / exp_z.sum(axis=1, keepdims=True)
    
    def tanh_derivative(self, a):
        """calculates tanh'(z) where a = tanh(z)"""
        return 1 - (a ** 2)
    
    def sigmoid_derivative(self, a):
        """calculates sigm'(z) where a = sigm(z)"""
        return a * (1-a)
    
    def relu_derivative(self, z):
        return np.maximum(0, np.sign(z))
    
    def xavier_init(self, n_in, n_out=None):
        if n_out is None:
            n_out = n_in
            variance = 1 / n_out
            stddev = np.sqrt(variance)
            return np.random.normal(0, stddev, n_out)
        variance = 2 / (n_in + n_out)
        stddev = np.sqrt(variance)
        return np.random.normal(0, stddev, (n_in, n_out))
        
    def he_init(self, n_in, n_out=None):
        if n_out is None:
            variance = 2 / n_in
            stddev = np.sqrt(variance)
            return np.random.normal(0, stddev, n_in)
        variance = 2 / n_in
        stddev = np.sqrt(variance)
        return np.random.normal(0, stddev, (n_in, n_out))
    
    def zeros_init(self, n_in, n_out=None):
        if n_out is None:
            return np.zeros(n_in)
        return np.zeros((n_in, n_out))
    
    def uniform_init(self, n_in, n_out=None):
        if n_out is None:
            return np.random.uniform(0, 1, n_in)
        return np.random.uniform(0, 1, (n_in, n_out))
    
    def feedforward(self, a, return_activations=False):
        if return_activations:
            activations = [a]
            for w, b, func in zip(self.weights, self.biases, self.activation_funcs):
                z = np.dot(a, w) + b
                a = func(z)
                activations.append(a)
            return activations
        else:
            for w, b, func in zip(self.weights, self.biases, self.activation_funcs):
                z = np.dot(a, w) + b
                a = func(z)
            return a
    
    def predict(self, X, label=False):
        """ if to use label_predictions function """
        if label:
            return label_predictions(self.feedforward(X))
        else:
            return self.feedforward(X)
    
    def mse(self, X, y, resize=False, denormalize=None):
        """
        first predictions are made, then denormalized and then mse is calculated
        
        denormalize - a tuple (mean, std)
        """
        if resize:
            X = X.to_numpy().reshape(-1, 1)
            y = y.to_numpy().reshape(-1, 1)
        predictions = self.predict(X)
        if denormalize:
            predictions = destandardize_data(predictions, denormalize)
        return np.mean((predictions - y) ** 2)
    
    def f1_score(self, X, y_true, average='weighted'):
        """
        calculates f1 score. 
        y_true : 2 dimensional array with probabilities
        average: 'weighted', 'macro'
        """
        predictions = self.predict(X)
        y_pred = label_predictions(predictions)
        y_true = label_predictions(y_true)
        f1 = f1_score(y_true, y_pred, average='weighted')
        return f1

    def cross_entropy(self, X, y):
        """ Calculates cross entropy loss """
        predictions = self.predict(X)
        epsilon = 1e-9  # Small constant to avoid taking the logarithm of zero
        return -np.mean(np.sum(y * np.log(predictions + epsilon), axis=1))
    
    def loss(self, X, y, resize=False, denormalize=None, test=False):
        """ returns appriopriate loss """
        if self.model_type == 'regression':
            return self.mse(X, y, resize=resize, denormalize=denormalize)
        elif self.model_type == 'classification':
            if test:
                return self.f1_score(X, y)
            else:
                return self.cross_entropy(X, y)

    def fitness(self, denormalize):
        return self.loss(self.X, self.y, denormalize=denormalize)
    
    def flatten_weights(self):
        """ flattens weights and biases """
        flat = []
        for weights, biases in zip(self.weights, self.biases):
            flat.extend(weights.flatten())
            flat.extend(biases.flatten())
        return flat
    
    def unflatten_weights(self, flat):
        """ unflattens flat array of weights and biases """
        weights_full = []
        biases_full = []
        
        for weights, biases in zip(self.weights, self.biases):
            w_len = len(weights.flatten())
            b_len = len(biases.flatten())
            
            w_flat = flat[:w_len]
            w_unflat = np.array(w_flat).reshape(weights.shape)
            weights_full.append(w_unflat)
            flat = flat[w_len:]
            
            b_flat = flat[:b_len]
            b_unflat = np.array(b_flat).reshape(biases.shape)
            biases_full.append(b_unflat)
            flat = flat[b_len:]
        
        return weights_full, biases_full
    
def standardize_data(X):
    """
    returns:
    X_new - standardized X
    a tuple (mean, std) - normal distribution parameters from X for destandarizing
    """
    mean = np.mean(X, axis=0)
    std = np.std(X, axis=0)
    X_new = (X - mean) / std
    return X_new, (mean, std)

def destandardize_data(X, parameters):
    """parameters: a tuple (mean, std)"""
    return X * parameters[1] + parameters[0]


def label_predictions(y_pred, class_order=None):
    """
    y_pred : 2 dimensional array
    class_order : default [..., 2, 1, 0]
    """
    if class_order is None:
        class_order = np.array(range(len(y_pred[0])))
    max_indices = np.argmax(y_pred, axis=1)
    return class_order[max_indices]

def one_hot_encode(y):
    """ y : pd.Series """
    return pd.get_dummies(y, dtype=int).to_numpy()

def decode_array(array):
    """ decodes one hot encoded array """
    decoded_indices = np.argmax(array, axis=1)
    return decoded_indices.tolist()

def mse(y_true, y_pred):
    """ calculated mse """
    return np.mean((y_pred - y_true) ** 2)

def cross_entropy(y_true, y_pred):
    """ calculates cross entropy loss, y_true and y_pred should be 2-dimensional """
    return -np.mean(np.sum(y_true * np.log(y_pred), axis=1))

In [170]:
class EvoAlg:
    
    def __init__(self, X, y, layers, model_type, population_size, denormalize=None, generations=100):
        self.default_mlp = NeuralNetwork(layers=layers, X=X, y=y, model_type=model_type)
        self.X = X
        self.y = y
        self.layers = layers
        self.model_type = model_type
        self.denormalize = denormalize
        self.dimension = len(self.default_mlp.flatten_weights())
        self.population_size = population_size
        self.population = np.array([NeuralNetwork(layers=layers, X=X, y=y, model_type=model_type) for i in range(population_size)])
        self.generations = generations
        self.mutation_rate = None
        self.crossover_rate = None
    
    def point_crossover(self, parent1, parent2):
        if np.random.rand() < self.crossover_rate:
            point = np.random.randint(1, self.dimension)
            
            parent1_flat = parent1.flatten_weights()
            parent2_flat = parent2.flatten_weights()
            
            child1_flat = np.concatenate((parent1_flat[:point], parent2_flat[point:]))
            child2_flat = np.concatenate((parent2_flat[:point], parent1_flat[point:]))

            child1_weights, child1_biases = parent1.unflatten_weights(child1_flat)
            child2_weights, child2_biases = parent2.unflatten_weights(child2_flat)

            # child1 = NeuralNetwork(layers=self.layers, X=self.X, y=self.y, model_type=self.model_type)
            # child2 = NeuralNetwork(layers=self.layers, X=self.X, y=self.y, model_type=self.model_type)
            child1 = deepcopy(self.default_mlp)
            child2 = deepcopy(self.default_mlp)
            
            child1.weights = child1_weights
            child1.biases = child1_biases
            child2.weights = child2_weights
            child2.biases = child2_biases
            
            return child1, child2
        return parent1, parent2
    
    def gaussian_mutation(self, individual):
        if np.random.rand() < self.mutation_rate:
            
            ind_flat = np.array(individual.flatten_weights())
            ind_flat += np.random.normal(0, 1, self.dimension)
            ind_weights, ind_biases = individual.unflatten_weights(ind_flat)
            new_ind = deepcopy(individual)
            new_ind.weights = ind_weights
            new_ind.biases = ind_biases
            return new_ind
            
        return individual
    
    def evaluation(self, population):
        return np.array([ind.fitness(denormalize=self.denormalize) for ind in population])
    
    def wheel_selection(self, population, size=None):
        if size is None:
            size = self.population_size
        fitness = self.evaluation(population)
        fitness_inverted = 1 / fitness
        probabilities = fitness_inverted / np.sum(fitness_inverted)
        selected_indices = np.random.choice(len(population), size, p=probabilities)
        # print(selected_indices)
        return population[selected_indices]
    
    def choose_random_individuals(self, population_size, n):
        individuals = self.population[np.random.choice(population_size, n, replace=False)]
        if n == 1:
            return individuals[0]
        else:
            return individuals
    
    def train(self, mutation_rate=0.1, crossover_rate=0.7, mute_print=False):
        
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        
        best_fitness_history = []
        
        for generation in range(self.generations):

            new_population = []
            while len(new_population) < self.population_size: # -> hiperparametr
            # for i in range(int(self.population_size / 2)):
                parent1, parent2 = self.choose_random_individuals(self.population_size, 2)
                child1, child2 = self.point_crossover(parent1, parent2)
                new_population.extend([child1, child2])
            # population = population.extend(new_population)
            population = np.vstack([self.population, np.array(new_population)]).flatten()
            
            new_population = []
            for i, parent in enumerate(population):
                child = self.gaussian_mutation(parent)
                new_population.extend([child])
                # population[i] = child
            population = np.vstack([population, np.array(new_population)]).flatten()
            # dodać zmutowane
            
            fitness = self.evaluation(population)

            # Number of top individuals to retain
            num_top_individuals = max(1, int(0.1 * len(population)))
        
            # Get indices of the individuals sorted by their fitness (ascending)
            sorted_indices = np.argsort(fitness)
        
            # Retain the top 10% individuals
            top_individuals = population[sorted_indices[:num_top_individuals]]
        
            # Apply wheel selection to the rest of the population
            remaining_population = self.wheel_selection(population, size=self.population_size - num_top_individuals)
        
            # Combine the top individuals with the selected individuals from the remaining population
            self.population = np.concatenate((top_individuals, remaining_population))
        
            # Update the fitness and the best individual
            fitness = self.evaluation(self.population)
            best_individual = self.population[np.argmin(fitness)]
            best_fitness = best_individual.fitness(self.denormalize)
            
            # self.population = self.wheel_selection(population)
            
            # fitness = self.evaluation(self.population)
            # best_individual = self.population[np.argmin(fitness)]
            # best_fitness = best_individual.fitness()
            # zostwic top 10%
            best_fitness_history.append(best_fitness)
            if not mute_print:
                print(f"Generation {generation+1}: best individual: {best_individual}, fitness: {best_fitness}")
        
        if mute_print:
            print(f"Best individual: {best_individual}, fitness: {best_fitness}")
        history = History(best_fitness_history)
        return history, (best_individual, best_fitness)
    
class History:
    
    def __init__(self, fitness):
        self.fitness = fitness
        self.generations = list(range(len(fitness)))
    
    def plot_history(self):
        plt.plot(self.generations, self.fitness)
        plt.ylabel("Best individual fitness")
        plt.xlabel("Generation")
        plt.title("Evolution history")
        plt.show()

In [171]:
y_norm_modal, parameters_modal = standardize_data(multimodal['y'])

In [172]:
ae = EvoAlg(X=multimodal['x'], y=y_norm_modal, denormalize=parameters_modal, layers=[1,10,1], model_type='regression',
            population_size=20, generations=50)

In [173]:
ae.train()

Generation 1: best individual: <__main__.NeuralNetwork object at 0x000001D55840D050>, fitness: 218.99462670449228
Generation 2: best individual: <__main__.NeuralNetwork object at 0x000001D558BB3890>, fitness: 72.50842603534781
Generation 3: best individual: <__main__.NeuralNetwork object at 0x000001D558BB3890>, fitness: 72.50842603534781
Generation 4: best individual: <__main__.NeuralNetwork object at 0x000001D558BB3890>, fitness: 72.50842603534781
Generation 5: best individual: <__main__.NeuralNetwork object at 0x000001D558872D10>, fitness: 50.137228361659304
Generation 6: best individual: <__main__.NeuralNetwork object at 0x000001D559D3A6D0>, fitness: 40.62181172782724
Generation 7: best individual: <__main__.NeuralNetwork object at 0x000001D559D3A6D0>, fitness: 40.62181172782724
Generation 8: best individual: <__main__.NeuralNetwork object at 0x000001D559D3A6D0>, fitness: 40.62181172782724
Generation 9: best individual: <__main__.NeuralNetwork object at 0x000001D559D3A6D0>, fitness:

(<__main__.History at 0x1d55a579110>,
 (<__main__.NeuralNetwork at 0x1d559d3a6d0>, 40.62181172782724))

In [10]:
multimodal = pd.read_csv("dane/multimodal-large-training.csv", index_col=0).reset_index()

In [12]:
multimodal

Unnamed: 0,x,y
0,-0.685726,-74.197483
1,-0.879898,-30.504177
2,1.411932,10.754122
3,1.688954,100.248297
4,-0.573238,-73.832310
...,...,...
9995,-0.194391,44.275897
9996,1.012924,100.169387
9997,0.569789,-93.553272
9998,0.538241,-96.450956


In [None]:
# def __init__(self, X, y, layers, model_type, population_size, generations=100):

In [81]:
from copy import deepcopy

In [77]:
mlp = NeuralNetwork([1, 5, 5, 1], multimodal['x'], multimodal['y'], model_type='regression')

In [61]:
mlp.mse(multimodal['x'], multimodal['y'], resize=True)

5369.088791491285

In [73]:
flat = mlp.flatten_weights()

In [74]:
mlp.weights

[array([[ 0.70004499,  0.52552866, -1.29394331,  1.02121036, -0.78481941]]),
 array([[ 0.28908235, -0.00997967,  0.17720337,  0.35732718, -0.17456391],
        [-0.56095764, -0.11253742,  0.2139857 , -0.29504541,  0.20333845],
        [-0.56179019, -0.59112699,  0.11949727, -0.94020714,  0.16769898],
        [-0.11263718, -0.87777692, -0.86608131, -0.35686873, -0.02388226],
        [ 0.01145561,  0.68650088,  0.88771622, -0.33011953, -0.23520118]]),
 array([[-1.15392248],
        [-0.03544772],
        [ 0.62856561],
        [ 0.25814476],
        [ 1.2643477 ]])]

In [75]:
weights, biases = mlp.unflatten_weights(flat)
weights

[array([[ 0.70004499,  0.52552866, -1.29394331,  1.02121036, -0.78481941]]),
 array([[ 0.28908235, -0.00997967,  0.17720337,  0.35732718, -0.17456391],
        [-0.56095764, -0.11253742,  0.2139857 , -0.29504541,  0.20333845],
        [-0.56179019, -0.59112699,  0.11949727, -0.94020714,  0.16769898],
        [-0.11263718, -0.87777692, -0.86608131, -0.35686873, -0.02388226],
        [ 0.01145561,  0.68650088,  0.88771622, -0.33011953, -0.23520118]]),
 array([[-1.15392248],
        [-0.03544772],
        [ 0.62856561],
        [ 0.25814476],
        [ 1.2643477 ]])]

In [8]:
pd.read_csv("iris.data", header=None)

Unnamed: 0,0,1,2,3,4
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,Iris-virginica
146,6.3,2.5,5.0,1.9,Iris-virginica
147,6.5,3.0,5.2,2.0,Iris-virginica
148,6.2,3.4,5.4,2.3,Iris-virginica


In [3]:
# struktura algorytmu ewolucyjnego