<a href="https://colab.research.google.com/github/Francja/geneticAlgorithm/blob/master/Genetic_algorithm_for_DNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%tensorflow_version 2.x  # this line is not required unless you are in a notebook
from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf
import pandas as pd
from random import random
from random import randint
from random import shuffle

# Length of table, how many layers would neural network have (min. 2)
MIN_LAYER = 2
MAX_LAYER = 3
# Max value of element in table, max neurons in each layer
MAX_NEURONS = 5
# Population of individuals, needs to be dividable by 4.
POPULATION = 8
# How many iterations will the algorithm go through
GENERATIONS = 3
# Target score, after it's accomplished algorithm stops
TARGET_SCORE = 0.5
# Mutation chance - it's applied to each individual in each generation
MUTATION_CHANCE = 0.01
# Number of generations without change in score after the algorithm will stop
NO_EVOLVE_GENS = 3

CSV_COLUMN_NAMES = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species']
SPECIES = ['Setosa', 'Versicolor', 'Virginica']


def generate_individual():
    new_ind = []
    layers = randint(MIN_LAYER, MAX_LAYER)
    for n in range(0, layers):
            new_ind.append(randint(1, MAX_NEURONS))
    return new_ind


def initialize():
    pop = []
    for n in range(0, POPULATION):
        pop.append(generate_individual())
    return pop


def cost(ind):
    #c = (ind[0]*ind[-1])
    c = DNN(ind)
    return c


def sort(pop_with_cost):
    pop_with_cost.sort(key=lambda x: x[1])
    return pop_with_cost


def roulette(pop_with_cost):
    cost_sum = sum(n for _, n in pop_with_cost)

    cost_table = []
    for ind in pop_with_cost:
        cost_table.append(ind[1])

    rel_cost = [cost_ind / cost_sum for cost_ind in cost_table]
    probability = [sum(rel_cost[:i + 1]) for i in range(len(rel_cost))]

    new_pop = []
    for n in range(POPULATION//2):
        r = random()
        for (i, individual) in enumerate(pop_with_cost):
            if r <= probability[i]:
                new_pop.append(individual)
                break
    return new_pop


def cross_parents(parent1, parent2, cross_point):
    child = []
    child.extend(parent1[:cross_point])
    child.extend(parent2[cross_point:])
    return child


def crossover(pop):
    random_index_table = []

    for x in range(0, len(pop)):
        random_index_table.append(x)
    shuffle(random_index_table)

    for x in range(0, len(pop), 2):
        p1 = pop[random_index_table[x]][0]
        p2 = pop[random_index_table[x+1]][0]
        max_cross_point = min(len(p1), len(p2)) - 1
        cross_point = randint(1, max_cross_point)
        pop.append(cross_parents(p1, p2, cross_point))
        pop.append(cross_parents(p2, p1, cross_point))
    return pop


def calculate_missing_cost(pop):
    pop_with_scores = []
    for ind in pop:
        if isinstance(ind[0], list):
            pop_with_scores.append(ind)
        else:
            pop_with_scores.append([ind, cost(ind)])
    return pop_with_scores


def mutate(pop):
    mutated_pop = []
    for ind in pop:
        rand = random()
        if rand <= MUTATION_CHANCE:
            rand2 = randint(0, 100)
            mut_value = 1 if rand2 % 2 == 0 else -1
            if isinstance(ind[0], list):  # individual has cost:
                target_cell = randint(0, len(ind[0]) - 1)
                new_value = ind[0][target_cell] + mut_value
                new_value = 1 if new_value > MAX_NEURONS else new_value
                new_value = MAX_NEURONS if new_value == 0 else new_value
                ind[0][target_cell] = new_value
                mutated_pop.append(ind[0])  # so the cost function will be cleared
            else:
                target_cell = randint(0, len(ind) - 1)
                new_value = ind[target_cell] + mut_value
                new_value = 1 if new_value > MAX_NEURONS else new_value
                new_value = MAX_NEURONS if new_value == 0 else new_value
                ind[target_cell] = new_value
                mutated_pop.append(ind)
        else:
            mutated_pop.append(ind)
    return mutated_pop


def gather_text(pop, g, best, text):
    pop.sort(key=lambda x: x[1], reverse=True)
    t = f"Generation {g + 1} ~~ best score = {round(best, 2)}\n"
    for n in range(0, 5):
        text = f"#{5 - n}: {pop[4-n][0]}; \tscore: {round(pop[4-n][1], 2)}\n" + text
    text = t + text + t
    return text


def final_text(stop, text):
    print(text)
    if stop == "success":
        print(f"SUCCESS: Target of score of {TARGET_SCORE} was accomplished.")
    elif stop == "stagnation":
        print(f"FAILED: Population stopped evolving.")
    else:
        print("FAILED: Target was not accomplished.")


def input_fn(features, labels, training=True, batch_size=256):
    # Convert the inputs to a Dataset.
    dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

    # Shuffle and repeat if you are in training mode.
    if training:
        dataset = dataset.shuffle(1000).repeat()
    
    return dataset.batch(batch_size)


def DNN(population_for_DNN):
    #Build a DNN with 2 hidden layers with 30 and 10 hidden nodes each.
    classifier = tf.estimator.DNNClassifier(
    feature_columns=my_feature_columns,
    # Two hidden layers of 30 and 10 nodes respectively.
    hidden_units = population_for_DNN,
    # The model must choose between 3 classes.
    n_classes=3)

    classifier.train(
        input_fn=lambda: input_fn(train, train_y, training=True),
        steps=5000)
      
    # We include a lambda to avoid creating an inner function previously

    eval_result = classifier.evaluate(
        input_fn=lambda: input_fn(test, test_y, training=False))

    print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))
    print(eval_result)
    print('\n \n')
    print(dict(list(eval_result.items())[0:3]))
    print(classifier.train)
    print('\n \n')
    print(tf.losses.Reduction)
    print(f"hidden_units")
    return(eval_result.get('accuracy'))


if __name__ == "__main__" and MIN_LAYER >= 2 and POPULATION % 4 == 0:

    train_path = tf.keras.utils.get_file(
        "iris_training.csv", "https://storage.googleapis.com/download.tensorflow.org/data/iris_training.csv")
    test_path = tf.keras.utils.get_file(
        "iris_test.csv", "https://storage.googleapis.com/download.tensorflow.org/data/iris_test.csv")
    train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0)
    test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0)
    stop_condition = ""
    text_summary = ""
    pop_bag = []

    # train DNN 
    train.head()
    train_y = train.pop('Species')
    test_y = test.pop('Species')
    train.head() # the species column is now gone
    train.shape  # we have 120 entires with 4 features

    # Feature columns describe how to use the input.
    my_feature_columns = []
    for key in train.keys():
        my_feature_columns.append(tf.feature_column.numeric_column(key=key))

    # creates the initial randomized population bag:
    init_pop_bag = initialize()

    # initial pop go through neural network and setting the score:
    for init_ind in init_pop_bag:
        pop_bag.append([init_ind, cost(init_ind)])

    # creates places to gather scores
    best_of_gen = max(pop_bag, key=lambda x: x[1])[1]
    max_score = best_of_gen
    score_table = [best_of_gen]

    # loop that goes through rest of generations:
    for gen in range(0, GENERATIONS):

        text_summary = gather_text(pop_bag, gen, best_of_gen, text_summary)

        #checks if generations maximum was better that top score
        if max_score <= best_of_gen:
            max_score = best_of_gen

        # stop condition check
        if max_score >= TARGET_SCORE:
            stop_condition = "success"
            break

        # checks if the population stopped evolving
        if score_table[-NO_EVOLVE_GENS:].count(score_table[-1]) >= NO_EVOLVE_GENS:
            stop_condition = "stagnation"
            break

        # choosing new population based on roulette
        pop_bag = roulette(pop_bag)

        # crossover of randomly selected parents
        pop_bag = crossover(pop_bag)

        # mutating
        pop_bag = mutate(pop_bag)

        # calculating score for new children and mutated ones
        pop_bag = calculate_missing_cost(pop_bag)

        # getting best score of generation
        best_of_gen = max(pop_bag, key=lambda x: x[1])[1]
        score_table.append(best_of_gen)

    final_text(pop_bag, text_summary))

else:
    print("Please correct POPULATION or MIN_LAYER")




`%tensorflow_version` only switches the major version: 1.x or 2.x.
You set: `2.x  # this line is not required unless you are in a notebook`. This will be interpreted as: `2.x`.


TensorFlow 2.x selected.
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpp7s9qsrk', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_