In [1]:
!pip install graphviz
!pip install gym-super-mario-bros
!pip install matplotlib
!pip install neat-python
!pip install numpy
!pip install opencv-python
!pip install black
!pip install isort



In [2]:
config = """[NEAT]
fitness_criterion     = max
fitness_threshold     = 500000
pop_size              = 150
reset_on_extinction   = True

[DefaultGenome]
# node activation options
activation_default      = sigmoid
activation_mutate_rate  = 0.05
activation_options      = sigmoid gauss
#abs clamped cube exp gauss hat identity inv log relu sigmoid sin softplus square tanh

# node aggregation options
aggregation_default     = random
aggregation_mutate_rate = 0.05
aggregation_options     = sum product min max mean median maxabs

# node bias options
bias_init_mean          = 0.05
bias_init_stdev         = 1.0
bias_max_value          = 30.0
bias_min_value          = -30.0
bias_mutate_power       = 0.5
bias_mutate_rate        = 0.7
bias_replace_rate       = 0.1

# genome compatibility options
compatibility_disjoint_coefficient = 1.0
compatibility_weight_coefficient   = 0.5

# connection add/remove rates
conn_add_prob           = 0.5
conn_delete_prob        = 0.5

# connection enable options
enabled_default         = True
enabled_mutate_rate     = 0.5

feed_forward            = False
#initial_connection      = unconnected
initial_connection      = partial_nodirect 0.5

# node add/remove rates
node_add_prob           = 0.5
node_delete_prob        = 0.2

# network parameters
num_hidden              = 0
num_inputs              = 960
num_outputs             = 7

# node response options
response_init_mean      = 1.0
response_init_stdev     = 0.05
response_max_value      = 30.0
response_min_value      = -30.0
response_mutate_power   = 0.1
response_mutate_rate    = 0.75
response_replace_rate   = 0.1

# connection weight options
weight_init_mean        = 0.1
weight_init_stdev       = 1.0
weight_max_value        = 30
weight_min_value        = -30
weight_mutate_power     = 0.5
weight_mutate_rate      = 0.8
weight_replace_rate     = 0.1

[DefaultSpeciesSet]
compatibility_threshold = 2.5

[DefaultStagnation]
species_fitness_func = max
max_stagnation       = 50
species_elitism      = 2

[DefaultReproduction]
elitism            = 3
survival_threshold = 0.3
"""

# Specify the file path where you want to save the configuration
file_path = "config-feedforward"

# Write the configuration to the file
with open(file_path, "w") as file:
    file.write(config)

print(f"Configuration saved to {file_path}")


Configuration saved to config-feedforward


In [3]:
train_py = """

import pickle

import cv2
import gym_super_mario_bros
import neat
import numpy as np
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from nes_py.wrappers import JoypadSpace


def nnout_to_action(nnout):
    return nnout.index(max(nnout))


def eval_genomes(genomes, config):
    for genome_id, genome in genomes:
        genome.fitness = eval_genome(genome, config, genome_id)


def eval_genome(genome, config, genome_id=None):
    env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0")
    env = JoypadSpace(env, SIMPLE_MOVEMENT)
    state = env.reset()

    iny, inx, inc = env.observation_space.shape
    inx = int(inx / 8)
    iny = int(iny / 8)

    net = neat.nn.FeedForwardNetwork.create(genome, config)

    done = False
    fitness_current = 0.0
    frames = 0
    old_x = 0
    lives_remaining = 2
    total_coins = 0
    current_status = "small"

    while not done:
        state = cv2.resize(state, (inx, iny))
        state = cv2.cvtColor(state, cv2.COLOR_BGR2GRAY)
        state = np.reshape(state, (inx, iny))
        # env.render()

        imgarray = np.ndarray.flatten(state)
        nnout = net.activate(imgarray)
        action = nnout_to_action(nnout)
        state, rew, done, info = env.step(action)
        fitness_current += rew

        # if mario gets to the flag give a very high reward meet fitness_threshold
        if info["flag_get"]:
            fitness_current += 500000

        # extra penalty for dying
        if info["life"] < lives_remaining:
            lives_remaining = info["life"]
            fitness_current -= 250

        # bonus for managing to change status
        if current_status != info["status"]:
            current_status = info["status"]
            if info["status"] != "small":
                fitness_current += 100

        total_coins = info["coins"]
        frames += 1
        if frames % 50 == 0:
            if old_x == info["x_pos"]:
                done = True
            else:
                old_x = info["x_pos"]

    # bonus for collecting coins
    fitness_current += total_coins * 10

    if genome_id:
        print(f"GenomeID: {genome_id}, Fitness: {fitness_current}")
    else:
        print(f"Fitness: {fitness_current}")

    env.close()
    return fitness_current


config = neat.Config(
    neat.DefaultGenome,
    neat.DefaultReproduction,
    neat.DefaultSpeciesSet,
    neat.DefaultStagnation,
    "config-feedforward",
)
p = neat.Population(config)
# p = neat.Checkpointer.restore_checkpoint("neat-checkpoint-144")

p.add_reporter(neat.StdOutReporter(True))
stats = neat.StatisticsReporter()

p.add_reporter(stats)

# Save the process after each 10 frames
p.add_reporter(neat.Checkpointer(1))

pe = neat.ParallelEvaluator(10, eval_genome)
winner = p.run(pe.evaluate)

# winner = p.run(eval_genomes)

with open("winner.pkl", "wb") as output:
    pickle.dump(winner, output, 1)

"""


# Specify the file path where you want to save the configuration
file_path = "train.py"

# Write the configuration to the file
with open(file_path, "w") as file:
    file.write(train_py)

print(f"Python file saved to {file_path}")


Python file saved to train.py


In [4]:
time_limit = """


from typing import Optional

import gym


class TimeLimit(gym.Wrapper):

    def __init__(
        self,
        env: gym.Env,
        max_episode_steps: Optional[int] = None,
    ):
        super().__init__(env)
        if max_episode_steps is None and self.env.spec is not None:
            max_episode_steps = env.spec.max_episode_steps
        if self.env.spec is not None:
            self.env.spec.max_episode_steps = max_episode_steps
        self._max_episode_steps = max_episode_steps
        self._elapsed_steps = None

    def step(self, action):
        observation, reward, terminated, info = self.env.step(action)
        self._elapsed_steps += 1

        if self._elapsed_steps >= self._max_episode_steps:
            truncated = True

        return observation, reward, terminated, info

    def reset(self, **kwargs):
        self._elapsed_steps = 0
        return self.env.reset(**kwargs)
from typing import Optional

import gym


class TimeLimit(gym.Wrapper):

    def __init__(
        self,
        env: gym.Env,
        max_episode_steps: Optional[int] = None,
    ):
        super().__init__(env)
        if max_episode_steps is None and self.env.spec is not None:
            max_episode_steps = env.spec.max_episode_steps
        if self.env.spec is not None:
            self.env.spec.max_episode_steps = max_episode_steps
        self._max_episode_steps = max_episode_steps
        self._elapsed_steps = None

    def step(self, action):
        observation, reward, terminated, info = self.env.step(action)
        self._elapsed_steps += 1
        return observation, reward, terminated, info

    def reset(self, **kwargs):
        self._elapsed_steps = 0
        return self.env.reset(**kwargs)
        
"""

file_path = "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/gym/wrappers/time_limit.py"
# Write the configuration to the file
with open(file_path, "w") as file:
    file.write(time_limit)

print(f"Python file saved to {file_path}")


Python file saved to /home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/gym/wrappers/time_limit.py


In [None]:
!python3 train.py


 ****** Running generation 0 ****** 

  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.deprecation(
  logger.warn(
  logger.warn(
  logger.warn(
  return reduce(mul, x, 1.0)
  logger.deprecation(
  return reduce(mul, x, 1.0)
  logger.deprecation(
  logger.deprecation(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  return reduce(mul, x, 1.0)
  logger.deprecation(
  logger.deprecation(
  logger.deprecation(
  logger.warn(
  logger.deprecation(
  logger.warn(
  logger.deprecation(
  logger.deprecation(
  return reduce(mul, x, 1.0)
Fitness: -5.0
Fitness: -5.0
Fitness: 252.0
Fitness: -5.0
Fitness: -5.0
Fitness: 231.0
  return reduce(mul, x, 1.0)
Fitness: 241.0
  return reduce(mul, x, 1.0)
Fitness: -5.0
Fitness: 230.0
  return reduce(mul, x, 1.0)
  return reduce(mul, x, 1.0)
Fitness: 250.0
  return reduce(mul, x, 1.0)
  return reduce(mul, x, 1.0)
Fitnes