# Genetic algorithm for Web based game

<i><strong style="color:red;">N.B. When running this program a local server will start on <a href="http://127.0.0.1:5000" target="_blank">127.0.0.1:5000</a> - Open in another tab to see</strong></i>

In [1]:
import numpy as np
import json
import tensorflow as tf
from numpy.random import randint
import random
from random import random as rnd
from flask import Flask, render_template, request, jsonify
from flask_cors import CORS
from keras.layers import Dense, Concatenate,concatenate,LeakyReLU, Softmax
from keras.models import Sequential, Model
from keras.optimizers import Adam
import math
import logging
from sklearn.preprocessing import normalize

Using TensorFlow backend.


## Init app

In [2]:
# Define App
app = Flask(__name__)
CORS(app)

# Hide logging
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR) # Comment out to see server requests

## Variable definitions

In [3]:
ready = False
input_size = 4
output_size = 2
population = []
max_fitness = 0
mating_pool = []
neurons = 8
generation = 0
processed_count = 0
leaky = True

# Error fix
graph = tf.get_default_graph()

## Hyperparameters

In [4]:
number_of_individuals = 30
mutation_rate = 0.01
breeding_rate = 0
learning_rate = 0.095

## Class definitions

In [5]:
class Bird:
    def __init__(self):
        self.brain = self.create_brain()
        self.score = 0
        self.fitness = 0
        self.prediction = None
        self.alive = 1

    def create_brain(self, weights = None):
        model = Sequential()    
        if leaky:
            model.add(Dense(neurons, input_dim=input_size))
            model.add(LeakyReLU())
            model.add(Dense(neurons))
            model.add(LeakyReLU())
            model.add(Dense(neurons))
            model.add(LeakyReLU())
        else:
            model.add(Dense(input_size, input_dim=input_size, activation='relu'))
            model.add(Dense(neurons, activation='relu'))
            model.add(Dense(neurons, activation='relu'))
        model.add(Dense(output_size))
        #model.add(Softmax())
        model.compile(loss='mse', optimizer=Adam(lr=learning_rate), metrics=["accuracy"])
        model._make_predict_function()
        return model
    
    def update_brain(self, layer_0,layer_1,layer_2,layer_3):
        global leaky
        #mutation_0 = mutate(layer_0)
        mutation_1 = mutate(layer_1)
        mutation_2 = mutate(layer_2)
        #mutation_3 = mutate(layer_3)
        if leaky:
            #self.brain.layers[0].set_weights([np.array(mutation_0),np.zeros(4)])
            self.brain.layers[2].set_weights([np.array(mutation_1),np.zeros(neurons)])
            self.brain.layers[4].set_weights([np.array(mutation_2),np.zeros(neurons)])
            #self.brain.layers[6].set_weights([np.array(mutation_2),np.zeros(2)])
        else:
            #self.brain.layers[0].set_weights([np.array(mutation_0),np.zeros(4)])
            self.brain.layers[1].set_weights([np.array(mutation_1),np.zeros(neurons)])
            self.brain.layers[2].set_weights([np.array(mutation_2),np.zeros(neurons)])
            #self.brain.layers[3].set_weights([np.array(mutation_2),np.zeros(2)])
    
    def predict(self, self_state, pipe_states):
        if self.alive == 0:
            return self.prediction
        return_val = []
        inputs = [self_state, pipe_states[0], pipe_states[1], pipe_states[2]]
        reshaped = np.array([inputs])
        prediction = self.brain.predict(reshaped)
        self.prediction = prediction.tolist()
        return self.prediction
    
    def set_status(self, status):
        self.alive = status
    
    def set_score(self, score):
        self.score = score

## Method definitions

In [6]:
def create_first_generation(callback):
    global ready, generation
    for i in range(number_of_individuals):
        bird = Bird()
        population.append(bird)
    
    generation = generation+1
    ready = True
    callback()

In [7]:
def bird_predictions(bird_states, pipe_states):
    if not bird_states or not pipe_states:
        return
    
    bird_predictions_array = []
    for i in range(len(population)):
        population[i].set_score(bird_states[i][2])
        population[i].set_status(bird_states[i][0])
        temp_predict = population[i].predict(bird_states[i][1], pipe_states)
        bird_predictions_array.append(temp_predict)
    return bird_predictions_array

In [8]:
def division_by_zero(n, d):
    return n / d if d else 0

In [9]:
def calculate_fitness():
    global max_fitness
    sum = 0
    
    for i in range(len(population)):
        sum = sum+population[i].score

    max_fitness = 0
    for i in range(len(population)):
        #population[i].fitness = division_by_zero(population[i].score, sum)
        population[i].fitness = population[i].score/sum
        if population[i].fitness > max_fitness:
            max_fitness = population[i].fitness

In [10]:
def natural_selection(filter = True): #Mating pool
    global max_fitness, mating_pool
    mating_pool = []
    
    for i in range(len(population)):
        n = math.floor(population[i].fitness*100)
        # Get rid of the worst ones
        #if n < 10 and filter == True:
        #    continue
        for j in range(n):
            mating_pool.append(i)
    
    #if len(mating_pool) == 0 and filter == True:
    #    natural_selection(False)
    
    #print("Mating pool: "+str(mating_pool))
    

In [11]:
def get_dense_layers(layers):
    global leaky
    if leaky:
        return (layers[0].get_weights()[0], 
                layers[2].get_weights()[0], 
                layers[4].get_weights()[0], 
                layers[6].get_weights()[0])
    else:
        return (layers[0].get_weights()[0], 
                layers[1].get_weights()[0], 
                layers[2].get_weights()[0], 
                layers[3].get_weights()[0])
        

In [12]:
def breed(father, mother, orig):
    global leaky
    (father_0,father_1,father_2,father_3) = get_dense_layers(father.brain.layers)
    (mother_0,mother_1,mother_2,mother_3) = get_dense_layers(mother.brain.layers)
    (orig_0,orig_1,orig_2,orig_3) = get_dense_layers(orig.brain.layers)
    if leaky:
        child_0 = []
        for i in range(len(father_0)):
            if(breeding_rate < random.random()):
                temp = []
                for j in range(len(father_0[i])):
                    temp.append(np.dot(father_0[i][j], mother_0[i][j]))
                child_0.append(temp)
            else:
                child_0.append(orig_0[i])
        child_0 = normalize(child_0)
    else:
        child_0 = normalize(np.dot(father_0, mother_0))
    child_1 = []
    for i in range(len(father_1)):
        if(breeding_rate < random.random()):
            temp = []
            for j in range(len(father_1[i])):
                temp.append(np.dot(father_1[i][j], mother_1[i][j]))
            child_1.append(temp)
        else:
            child_1.append(orig_1[i])
    child_1 = normalize(child_1)
    child_2 = []
    for i in range(len(father_2)):
        if(breeding_rate < random.random()):
            temp = []
            for j in range(len(father_2[i])):
                temp.append(np.dot(father_2[i][j], mother_2[i][j]))
            child_2.append(temp)
        else:
            child_2.append(orig_2[i])
    child_2 = normalize(child_2)
    child_3 = []
    for i in range(len(father_3)):
        if(breeding_rate < random.random()):
            temp = []
            for j in range(len(father_3[i])):
                temp.append(np.dot(father_3[i][j], mother_3[i][j]))
            child_3.append(temp)
        else:
            child_3.append(orig_3[i])
    child_3 = normalize(child_3)
    return (child_0,child_1,child_2,child_3)

In [13]:
def breed_population():
    global mating_pool, ready, generation, processed_count
    for i in range(len(population)):
        # Get two random indecies from the mating pool 
        a = mating_pool[math.floor(np.random.choice(len(mating_pool)))]
        b = mating_pool[math.floor(np.random.choice(len(mating_pool)))]
        (layer_0, layer_1, layer_2, layer_3) = breed(population[a], population[b], population[i])
        population[i].update_brain(layer_0, layer_1, layer_2, layer_3)
        population[i].alive = 1
        processed_count = processed_count+1
        
    generation = generation+1
    server_ready()

In [14]:
def mutate(individual_weights):
    global mutation_rate
    length = len(individual_weights[0])
    for swapped in range(length):
        if(random.random() < mutation_rate):
            swap_with = int(random.random() * length)
            weight1 = individual_weights[0][swapped]
            weight2 = individual_weights[0][swap_with]
            individual_weights[0][swapped] = weight1
            individual_weights[0][swap_with] = weight2
        if(random.random() < mutation_rate):
            random_index = int(random.random() * length)
            random_index_2 = int(random.random() * length)
            weight1 = individual_weights[0][random_index]
            weight2 = individual_weights[0][random_index_2]
            if weight1 < 0:
                weigth1 = abs(weight1)
            else:
                weight1 = weight1*-1
            
            if weight2 < 0:
                weigth2 = abs(weight2)
            else:
                weight2 = weight2*-1
                
            individual_weights[0][random_index] = weight1
            individual_weights[0][random_index_2] = weight2

    return individual_weights

## Routes for server/client communication

In [15]:
# Create route for requesting the individuals NN brain
@app.route("/predictions")
def get_predictions():
    global ready
    if not ready:
        return "server busy"
    parsed_bird_states = (json.loads(request.args.get('bird_states')))
    parsed_pipe_states = (json.loads(request.args.get('pipe_states')))
    if len(parsed_bird_states) == 0 or len(parsed_pipe_states) == 0:
        return "No birds"
    global graph
    with graph.as_default():
        try:
            return jsonify(bird_predictions(parsed_bird_states, parsed_pipe_states))
        except:
            print("Warning")
            return "Warning"

In [16]:
@app.route("/is_ready")
def is_ready():
    global ready
    return str(ready)

In [17]:
@app.route("/get_next_generation")
def get_next_generation():
    server_busy()
    calculate_fitness()
    natural_selection()
    debug_this = False
    if debug_this == True:
        breed_population()
        return "breed_debug"
    else:
        global graph
        with graph.as_default():
            try:
                breed_population()
                return "breeding population"
            except:
                print("breed_population() exception")
                return "breed_population() exception"

In [18]:
# Create route for requesting the number of individuals
@app.route("/get_count")
def get_count():
    return str(number_of_individuals)

In [19]:
@app.route("/get_info")
def get_info():
    global generation,max_fitness,mating_pool
    info = [
        generation,
        max_fitness,
        len(mating_pool)
    ]
    return jsonify(info)

In [20]:
# Create route for index file
@app.route("/")
def index():
    return render_template("index.html")

## Run program

In [21]:
def server_ready():
    global ready, processed_count    
    if processed_count == number_of_individuals:
        ready = True
        processed_count = 0
    else:
        ready = False
    #print("server is ready")

In [22]:
def server_busy():
    global ready
    ready = False
    #print("server is busy")

In [23]:
def run_app():
    app.run(debug=False)

In [None]:
# Run program
if __name__ == '__main__':
    # Start the app
    create_first_generation(run_app)

Instructions for updating:
Colocations handled automatically by placer.
 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off
