<a href="https://colab.research.google.com/github/michaeldlee23/cs390-project/blob/genetic-algorithm-tests/genetic_algorithm_mnist.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Import Statements

In [32]:
import os, random, time, imageio
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Dense, Flatten
from tensorflow.keras.layers import Dropout, BatchNormalization
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam 
from PIL import Image

## Constants and Configuration

In [45]:
random.seed(1337)
tf.random.set_seed(1337)

IH, IW, IZ = 28, 28, 1
IMAGE_SIZE = IH * IW * IZ
NUM_CLASSES = 10

NUM_GENERATIONS = 50
POPULATION_SIZE = 20
SELECTION_SIZE = 5
MUTATION_RATE = 0.1

currentPopulation = list()
fitness = list()

## Genetic Algorithm

In [52]:
# def mutate(individual):
#   print('Mutating individual...')
#   for i in range(len(individual.layers)):
#     rows, cols = individual.layers[i].get_weights()[0].shape
#     for r in range(rows):
#       for c in range(cols):
#         if (random.random() < MUTATION_RATE):
#           individual.layers[i].get_weights()[0][r][c] *= random.uniform(-0.5, 0.5)
#     for j in range(individual.layers[i].get_weights()[1].shape[0]):
#       if (random.random() < MUTATION_RATE):
#         individual.layers[i].get_weights()[1][j] *= random.uniform(-0.5, 0.5)
        
#   return individual


def crossover(parents):
  newPopulation = list()

  # Parents get to survive to next generation
  newPopulation += parents

  # Performs uniform crossover for selected parents
  for i in range(SELECTION_SIZE, POPULATION_SIZE):
    # Select parents for new individual
    parentA, parentB = random.sample(parents, 2)

    # Construct new individual
    # Note that for this network, weights are [n x m] matrix
    # biases are list of size m
    individual = buildModel()
    for j in range(len(parentA.layers)):
      weights = np.zeros(parentA.layers[j].get_weights()[0].shape)
      weightsA = parentA.layers[j].get_weights()[0]
      weightsB = parentB.layers[j].get_weights()[0]
      for r in range(weights.shape[0]):
        for c in range(weights.shape[1]):
          weights[r][c] = (weightsA[r][c] if random.random() < 0.5 else weightsB[r][c])
          # Add random mutations
          if (random.random() < MUTATION_RATE):
            weights[r][c] *= random.uniform(-1.5, 1.5)
      
      biases = np.zeros(parentA.layers[j].get_weights()[1].shape)
      biasesA = parentA.layers[j].get_weights()[1]
      biasesB = parentB.layers[j].get_weights()[1]
      for k in range(biases.shape[0]):
        biases[k] = (biasesA[k] if random.random() < 0.5 else biasesB[k])
        # Add random mutations
        if (random.random() < MUTATION_RATE):
          biases[k] *= random.uniform(-1.5, 1.5)
      
      individual.layers[j].set_weights([weights, biases])
    newPopulation.append(individual)
    # Throw in possible mutations to genes
    # newPopulation.append(mutate(individual))
  
  return newPopulation


def evolve(population, losses):
  # Order the population in order of fitness
  sortedIndexes = sorted(range(len(losses)), key=lambda x: losses[x])
  sortedPopulation = [population[i] for i in sortedIndexes]

  newPopulation = crossover(sortedPopulation[:SELECTION_SIZE])
  return newPopulation



## Pipeline Functions

In [53]:
def getRawData():
  (xTrain, yTrain), (xTest, yTest) = tf.keras.datasets.mnist.load_data()
  return ((xTrain, yTrain), (xTest, yTest))


def preprocessData(raw):
  ((xTrain, yTrain), (xTest, yTest)) = raw
  xTrain, xTest = xTrain / 255.0, xTest / 255.0
  xTrainP = xTrain.reshape((np.shape(xTrain)[0], -1))
  xTestP = xTest.reshape((np.shape(xTest)[0], -1))
  yTrainP = to_categorical(yTrain, NUM_CLASSES)
  yTestP = to_categorical(yTest, NUM_CLASSES)
  return ((xTrainP, yTrainP), (xTestP, yTestP))


def buildModel():
  model = Sequential()

  model.add(Dense(IMAGE_SIZE, input_shape=(IMAGE_SIZE, )))
  model.add(Dense(64, activation='relu'))
  model.add(Dense(32, activation='relu'))
  model.add(Dense(NUM_CLASSES, activation='softmax'))

  model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
  return model


def predict(input, individual):
  prediction = individual.predict(input, 1)
  oneHot = np.zeros_like(prediction)
  oneHot[np.arange(len(prediction)), prediction.argmax(1)] = 1
  return oneHot


def train(population, data):
  ((xTrain, yTrain), (xTest, yTest)) = data
  losses = list()
  for individual in population:
    history = individual.fit(x=xTrain, y=yTrain,
                             validation_split=0.1,
                             epochs=1,
                             shuffle=True)
    losses.append(history.history['loss'][-1])

  return population, losses


def getFittestIndividual(data):
  global currentPopulation
  # Clear previous runs
  currentPopulation = list()
  for i in range(POPULATION_SIZE):
    currentPopulation.append(buildModel())

  losses = None
  for generation in range(NUM_GENERATIONS):
    start = time.time()
    print('----------GENERATION %s----------' % generation)
    population, losses = train(currentPopulation, data)
    # print(losses)

    currentPopulation = evolve(population, losses)
    end = time.time()
    print('Generation produced in %ds' % (end - start))
    # Evaluate fittest individual of this generation, which is the first
    # individual of the new generation
    evalResults(data[1], predict(data[1][0], currentPopulation[0]))


  # Order the last population in order of fitness
  sortedIndexes = sorted(range(len(losses)), key=lambda x: losses[x])
  sortedPopulation = [currentPopulation[i] for i in sortedIndexes]
  return currentPopulation[0]


def evalResults(data, predictions):
  xTest, yTest = data
  acc = 0
  for i in range(predictions.shape[0]):
    predictedValue = np.argmax(predictions[i])
    actualValue = np.argmax(yTest[i])
    if np.array_equal(predictions[i], yTest[i]):  acc += 1
  accuracy = acc / predictions.shape[0]
  print('Classifier Accuracy: %f%%' % (accuracy * 100))

  

## Pipeline

In [54]:
def main():
  raw = getRawData()
  data = preprocessData(raw)
  model = getFittestIndividual(data)
  predictions = predict(data[1][0], model)
  evalResults(data[1], predictions)


if __name__ == "__main__":
  main()

----------GENERATION 0----------
new population size: 20
Generation produced in 116s
Classifier Accuracy: 95.000000%
----------GENERATION 1----------
new population size: 20
Generation produced in 113s
Classifier Accuracy: 95.290000%
----------GENERATION 2----------
new population size: 20
Generation produced in 114s
Classifier Accuracy: 96.000000%
----------GENERATION 3----------
new population size: 20
Generation produced in 112s
Classifier Accuracy: 96.070000%
----------GENERATION 4----------
  21/1688 [..............................] - ETA: 4s - loss: 1.2558 - accuracy: 0.5417

KeyboardInterrupt: ignored