How to create the environment (replace neat_test4 with your env name):

- conda create -n neat_test4 python=3.10 gym ipykernel pyglet
- conda activate neat_test4
- pip install neat-python
- pip install -e "<path_to_gym_location>\gym-sokoban"
- python -m ipykernel install --user --name neat_test4 --display-name "Python (neat_test4)"
- pip install graphviz

### Initial setups

In [1]:
import neat
import gym
import gym_sokoban
import pyglet
from pyglet import clock
import numpy as np
import pickle
import time
import logging
from neat.reporting import StdOutReporter
import random
import visualize
import graphviz
import os


In [2]:


## Custom rendering setup if gym's rendering is not available
class Viewer:
    def __init__(self, width, height):
        self.window = pyglet.window.Window(width, height)
        self.image = None
        self.window.on_draw = self.on_draw

    def render(self, image):
        self.image = pyglet.image.ImageData(image.shape[1], image.shape[0], 'RGB', image.tobytes(), pitch=image.shape[1] * -3)
        self.window.dispatch_event('on_draw')

    def on_draw(self):
        if self.image:
            self.window.clear()
            self.image.blit(0, 0)

In [3]:
# Custom reporter class
class CustomReporter(StdOutReporter):
    def __init__(self, show_species_detail, config_filename):
        super().__init__(show_species_detail)
        self.start_time = time.time()
        self.config_filename = config_filename
    
    def end(self):
        runtime = time.time() - self.start_time
        logging.info(f'Total runtime: {runtime:.2f} seconds')
    
    def post_evaluate(self, config, population, species_set, best_genome):
        super().post_evaluate(config, population, species_set, best_genome)
        
        # Log population's average fitness
        total_fitness = sum(genome.fitness for genome in population.values())
        avg_fitness = total_fitness / len(population)
        logging.info(f'Population\'s average fitness: {avg_fitness}')
        
        # Log adjusted fitness score
        adjusted_fitness = []
        for species_id, species in species_set.species.items():
            for genome_id in species.members:
                genome = population[genome_id]
                adjusted_fitness.append(genome.fitness / len(species.members))
        avg_adjusted_fitness = sum(adjusted_fitness) / len(adjusted_fitness)
        logging.info(f'Population\'s average adjusted fitness: {avg_adjusted_fitness}')
        
        # Log best genome information
        logging.info(f'\nBest genome:\nKey: {best_genome.key}\nFitness: {best_genome.fitness}')
        logging.info(f'Nodes:')
        for node_key, node in best_genome.nodes.items():
            logging.info(f'\t{node_key} {node}')
        logging.info(f'Connections:')
        for conn_key, conn in best_genome.connections.items():
            logging.info(f'\t{conn_key} {conn}')
        
        # Log configuration file content
        if self.config_filename:
            try:
                with open(self.config_filename, 'r') as f:
                    config_content = f.read()
                    logging.info(f'Config File:\n{config_content}')
            except FileNotFoundError:
                logging.warning(f'Config file "{self.config_filename}" not found.')

        # Log timestamp
        logging.info(f'Timestamp: {time.strftime("%Y-%m-%d %H:%M:%S")}')


In [4]:
# Initialize logging
logging.basicConfig(filename='neat_log.txt', level=logging.INFO, format='%(message)s')

#### Configs

In [5]:
# Load configuration.
config_filename = 'config-feedforward_v04'
config = neat.Config(neat.DefaultGenome, 
                     neat.DefaultReproduction, 
                     neat.DefaultSpeciesSet, 
                     neat.DefaultStagnation, 
                     config_filename)


# Check if a checkpoint exists
checkpoint_file = r'D:\Education\AI\Machine_Learning_Practice\Summer School 2024\Sokoban-SS2024\NEAT\run_config_04\empty'  # Replace with your checkpoint filename

if os.path.isfile(checkpoint_file):
    # Load the checkpoint
    p = neat.Checkpointer.restore_checkpoint(checkpoint_file)
else:
    # Create the population if no checkpoint exists
    p = neat.Population(config)
    

# Add reporters to show progress in the terminal and log to file.
p.add_reporter(neat.StdOutReporter(True))
custom_reporter = CustomReporter(True, config_filename)
p.add_reporter(custom_reporter)
stats = neat.StatisticsReporter()
p.add_reporter(stats)
p.add_reporter(neat.Checkpointer(1, filename_prefix='neat-checkpoint-v04-'))

file_name = 'winner_test_04.pkl'



# Define episode and timestep parameters
num_episodes = 1
timesteps_per_episode = 40

current_episode = 0
current_timestep = 0

min_reward = -10

# param used to mutate a step
epsilon = 0.05

### Classes and methods

#### Preprocessing inputs/outputs

In [6]:
def process_observation(environment, obs):
        
    # Convert the observation to RGB frame or custom observation
    arr_walls, arr_goals, arr_boxes, arr_player = environment.render(mode='raw')

    # Initialize the combined array with walls (1s)
    combined = np.ones_like(arr_walls)
    
    # Set empty fields (0s)
    combined[arr_walls == 0] = 0
    
    # Set targets (3s)
    combined[arr_goals == 1] = 3
    
    # Set boxes (2s)
    combined[arr_boxes == 1] = 2
    
    # Set boxes on targets (4s)
    combined[(arr_boxes == 1) & (arr_goals == 1)] = 4
    
    # Set player position (5s)
    combined[arr_player == 1] = 5

    # Flatten the array
    flat_array = combined.flatten()
    
#     print("Flat array: ", flat_array)
#     print("Flat array shape: ", flat_array.shape)

    # Output the flattened array
    return flat_array



def process_state(state):
# Processes the initial state of env.reset()


    # Initialize the combined array with walls (0s)
    combined = np.ones_like(state[0])
    
    # Set empty fields (1s)
    combined[state[0] == 0] = 0

    # Set targets (3s)
    combined[state[1] == 1] = 3

    # Set boxes (2s)
    combined[state[2] == 1] = 2

    # Set boxes on targets (4s)
    combined[(state[2] == 1) & (state[1] == 1)] = 4

    # Set player position (5s)
    combined[state[3] == 1] = 5

    # Flatten the array
    flat_array = combined.flatten()
    
#     print("Flat array: ", flat_array)
#     print("Flat array shape: ", flat_array.shape)

    # Output the flattened array
    return flat_array

In [7]:
def map_action(output):
    return np.argmax(output)

### Run Neat logic

In [None]:
# Start the game
env = gym.make('Sokoban-small-v1')
# generate the level in the initial stage (env.reset) 
env.reset()


# # OPTIONAL
viewer = Viewer(160, 160)  # Adjust the size according to your environment
ACTION_LOOKUP = env.unwrapped.get_action_lookup()



def eval_genomes(genomes, config):

    global num_episodes, timesteps_per_episode, current_episode, current_timestep, min_reward

    # FOR EACH GENOME
    for genome_id, genome in genomes:
               
        # generate the neural network based on the config provided
        net = neat.nn.FeedForwardNetwork.create(genome, config)
    
        # define the initial fitness of the genome
        genome.fitness = 0.0
    
        # define episodes rewards (list) idea is to keep the fitness scores of all episodes and take the max
        episodes_rewards = []
        
        # FOR EACH EPISODE
        for episode in range(num_episodes):
            
            # Episode reward = 0  
            ep_reward = 0
            
            # reset the game state to the initial phase
            initial_state = env.reset()

            # map inputs suitable for the Neural Network (initial state of the game as flatten array, e.g. with length 49, created by the initial 7x7 grid)
            # used as an input layer in the Neural Network
            initial_inputs = process_state(initial_state)
            
            game_state_after_step = initial_inputs
            
            # FOR EACH STEP
            for step in range(timesteps_per_episode):
                
               
                # calculate probabilities for each output to be selected
                action_prob = net.activate(game_state_after_step)
                
                # select the action, based on the output's probabilities of being selected
                action = map_action(action_prob)
                
                # ADD CUSTOM MUTATION IN THE STEPS (at a random event, the alg could chose a random step)
                
                if random.random() < epsilon:
                    action = env.action_space.sample()
                else:
                    # select the action, based on the output's probabilities of being selected
                    action = map_action(action_prob)
                
#                 # RANDOM ACTION (replace with the genome)
#                 action = env.action_space.sample()

                # make the move in the game and output game state + info + reward
                observation, reward, done, info = env.step(action)
                
#                 # TODO (MAYBE) !!!! Adjust the fitness function
                
#                 # 1) USE THE info to extract information about the move and change the reward accordingly
                
                # penalizes the passiveness of the player; rewards movement of boxes more
                if not info["action.moved_player"]:
                    reward = -0.5                
                elif info["action.moved_box"]:
                    reward = 0.2
#                 elif ....
                
                game_state_after_step = process_observation(environment=env, obs=observation)

                # IMAGE STUFF
                image = env.render(mode='rgb_array')
                viewer.render(image)

#                 # PRINT INFO
                logging.info(f'Population\'s steps info: {(ACTION_LOOKUP[action], reward, done, info)}')

                # POPULATE THE Episode reward +=
                # if the game didn't end on this step
                if not done:
                    ep_reward += reward
                    current_timestep += 1
                    
                # if the game ended
                else:
#                     current_timestep = 0
#                     current_episode += 1
                    print(f"I BEAT THE STUPID GAME!!! Happend on step {current_timestep}")
                    break
    
                    # OR BREAK

            # add the accumulated rewards for this episode into a list (episodes_rewards)   
            episodes_rewards.append(ep_reward)

        # choose the best performance on an episode for the genome
        genome.fitness = max(episodes_rewards)

#         print("All episodes finished. Closing window.")
        viewer.window.close()  # Close the Pyglet window explicitly
        


# Run until a solution is found. The number indicates the max number of generations to be produced
winner = p.run(eval_genomes, 25)


# Display the winning genome.
print('\nBest genome:\n{!s}'.format(winner))

# SAVE THE WINNER GENOME
with open("winner.pkl", "wb") as f:
    pickle.dump(winner, f)

# Show output of the most fit genome against training data.
print('\nOutput:')
winner_net = neat.nn.FeedForwardNetwork.create(winner, config)

# Save the winner
with open(file_name, 'wb') as f:
    pickle.dump(winner, f)


  logger.warn(
  logger.warn(
  logger.warn(



 ****** Running generation 0 ****** 


 ****** Running generation 0 ****** 



  logger.deprecation(
  logger.warn(


Population's average fitness: -17.50440 stdev: 4.25161
Best fitness: -3.80000 - size: (9, 441) - species 1 - id 348
Population's average fitness: -17.50440 stdev: 4.25161
Best fitness: -3.80000 - size: (9, 441) - species 1 - id 348
Average adjusted fitness: 0.154
Average adjusted fitness: 0.154
Mean genetic distance 1.675, standard deviation 0.327
Mean genetic distance 1.675, standard deviation 0.327
Population of 500 members in 3 species:
   ID   age  size  fitness  adj fit  stag
     1    0   497     -3.8    0.154     0
     2    0     1       --       --     0
     3    0     2       --       --     0
Total extinctions: 0
Generation time: 817.721 sec
Population of 500 members in 3 species:
   ID   age  size  fitness  adj fit  stag
     1    0   497     -3.8    0.154     0
     2    0     1       --       --     0
     3    0     2       --       --     0
Total extinctions: 0
Generation time: 817.721 sec
Saving checkpoint to neat-checkpoint-v04-0

 ****** Running generation 1 ****** 

In [None]:
visualize.plot_stats(stats, ylog=False, view=True)
visualize.plot_species(stats, view=True)

In [None]:
visualize.draw_net(config=config, genome=winner)


In [None]:
import neat

# Load configuration.
config_filename = 'config-feedforward_v01'
config = neat.Config(neat.DefaultGenome, 
                     neat.DefaultReproduction, 
                     neat.DefaultSpeciesSet, 
                     neat.DefaultStagnation, 
                     config_filename)

config.genome_config.add_node_mutation_prob = 0.03
config.genome_config.add_connection_mutation_prob = 0.05

# Print the parsed parameters to debug
print("Initial connection type:", config.genome_config.initial_connection)
print("Allowed connectivity options:", config.genome_config.allowed_connectivity)
print("Activation options:", config.genome_config.activation_options)


### ChatGPT

In [None]:
import neat
import gym
import gym_sokoban
import pyglet
import numpy as np

import time
import logging
from neat.reporting import StdOutReporter

# Custom rendering setup if gym's rendering is not available
class Viewer:
    def __init__(self, width, height):
        self.window = pyglet.window.Window(width, height)
        self.image = None
        self.window.on_draw = self.on_draw

    def render(self, image):
        self.image = pyglet.image.ImageData(image.shape[1], image.shape[0], 'RGB', image.tobytes(), pitch=image.shape[1] * -3)
        self.window.dispatch_event('on_draw')

    def on_draw(self):
        if self.image:
            self.window.clear()
            self.image.blit(0, 0)

# Initialize logging
logging.basicConfig(filename='neat_log.txt', level=logging.INFO, format='%(message)s')

# Custom reporter class
class CustomReporter(StdOutReporter):
    def __init__(self, show_species_detail):
        super().__init__(show_species_detail)
        self.start_time = time.time()
    
    def end(self):
        runtime = time.time() - self.start_time
        logging.info(f'Total runtime: {runtime:.2f} seconds')

    def post_evaluate(self, config, population, species_set, best_genome):
        super().post_evaluate(config, population, species_set, best_genome)
        
        # Log population's average fitness
        total_fitness = sum(genome.fitness for genome in population.values())
        avg_fitness = total_fitness / len(population)
        logging.info(f'Population\'s average fitness: {avg_fitness}')
        
        # Log adjusted fitness score
        adjusted_fitness = []
        for species_id, species in species_set.species.items():
            for genome_id in species.members:
                genome = population[genome_id]
                adjusted_fitness.append(genome.fitness / len(species.members))
        avg_adjusted_fitness = sum(adjusted_fitness) / len(adjusted_fitness)
        logging.info(f'Population\'s average adjusted fitness: {avg_adjusted_fitness}')
        
        # Log best genome information
        logging.info(f'\nBest genome:\nKey: {best_genome.key}\nFitness: {best_genome.fitness}')
        logging.info(f'Nodes:')
        for node_key, node in best_genome.nodes.items():
            logging.info(f'\t{node_key} {node}')
        logging.info(f'Connections:')
        for conn_key, conn in best_genome.connections.items():
            logging.info(f'\t{conn_key} {conn}')
        logging.info(f'Timestamp: {time.strftime("%Y-%m-%d %H:%M:%S")}')


def process_observation(environment, obs):
    # Convert the observation to RGB frame or custom observation
    arr_walls, arr_goals, arr_boxes, arr_player = environment.render(mode='raw')

    # Initialize the combined array with walls (1s)
    combined = np.ones_like(arr_walls)
    
    # Set empty fields (0s)
    combined[arr_walls == 0] = 0
    
    # Set targets (3s)
    combined[arr_goals == 1] = 3
    
    # Set boxes (2s)
    combined[arr_boxes == 1] = 2
    
    # Set boxes on targets (4s)
    combined[(arr_boxes == 1) & (arr_goals == 1)] = 4
    
    # Set player position (5s)
    combined[arr_player == 1] = 5

    # Flatten the array
    flat_array = combined.flatten()
    
    return flat_array


def process_state(state):
    # Processes the initial state of env.reset()

    # Initialize the combined array with walls (0s)
    combined = np.ones_like(state[0])
    
    # Set empty fields (1s)
    combined[state[0] == 0] = 0

    # Set targets (3s)
    combined[state[1] == 1] = 3

    # Set boxes (2s)
    combined[state[2] == 1] = 2

    # Set boxes on targets (4s)
    combined[(state[2] == 1) & (state[1] == 1)] = 4

    # Set player position (5s)
    combined[state[3] == 1] = 5

    # Flatten the array
    flat_array = combined.flatten()
    
    return flat_array


# Start the game
env = gym.make('Sokoban-small-v1')
env.reset()

# Optional viewer setup
viewer = Viewer(160, 160)  # Adjust the size according to your environment
ACTION_LOOKUP = env.unwrapped.get_action_lookup()

# Define episode and timestep parameters
num_episodes = 1
timesteps_per_episode = 40

def map_action(action_prob):
    return np.argmax(action_prob)

def eval_genomes(genomes, config):
    global num_episodes, timesteps_per_episode

    # For each genome
    for genome_id, genome in genomes:
        # Generate the neural network based on the config provided
        net = neat.nn.FeedForwardNetwork.create(genome, config)
    
        # Define the initial fitness of the genome
        genome.fitness = 0.0
    
        # Define episodes rewards (list) idea is to keep the fitness scores of all episodes and take the max
        episodes_rewards = []
        
        # For each episode
        for episode in range(num_episodes):
            # Episode reward = 0  
            ep_reward = 0
            
            # Reset the game state to the initial phase
            initial_state = env.reset()

            # Map inputs suitable for the Neural Network (initial state of the game as flatten array, e.g. with length 49, created by the initial 7x7 grid)
            # Used as an input layer in the Neural Network
            initial_inputs = process_state(initial_state)
            
            game_state_after_step = initial_inputs
            
            # For each step
            for step in range(timesteps_per_episode):
                # Calculate probabilities for each output to be selected
                action_prob = net.activate(game_state_after_step)
                
                # Select the action, based on the output's probabilities of being selected
                action = map_action(action_prob)
                
                # Make the move in the game and output game state + info + reward
                observation, reward, done, info = env.step(action)
                
                game_state_after_step = process_observation(environment=env, obs=observation)

                # Image stuff
                image = env.render(mode='rgb_array')
                viewer.render(image)

                # Print info
                print(ACTION_LOOKUP[action], reward, done, info)

                # Populate the episode reward
                if not done:
                    ep_reward += reward
                else:
                    print(f"Game finished in {step+1} steps")
                    break
    
            # Add the accumulated rewards for this episode into a list (episodes_rewards)   
            episodes_rewards.append(ep_reward)

        # Choose the best performance on an episode for the genome
        genome.fitness = max(episodes_rewards)

        print("All episodes finished. Closing window.")
        viewer.window.close()  # Close the Pyglet window explicitly

# Load configuration.
config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                     neat.DefaultSpeciesSet, neat.DefaultStagnation,
                     'config-feedforward')

# Create the population, which is the top-level object for a NEAT run.
p = neat.Population(config)

# Add a stdout reporter to show progress in the terminal.
p.add_reporter(neat.StdOutReporter(True))
p.add_reporter(CustomReporter(True))
p.add_reporter(neat.StatisticsReporter())

# Run until a solution is found. The number indicates the max number of generations to be produced
winner = p.run(eval_genomes, 5)

# Display the winning genome.
print('\nBest genome:\n{!s}'.format(winner))

# Show output of the most fit genome against training data.
print('\nOutput:')
winner_net = neat.nn.FeedForwardNetwork.create(winner, config)

# Start the Pyglet event loop to keep the window open
pyglet.app.run()


### BACKUP

In [None]:
# Start the game

env = gym.make('Sokoban-small-v1')
# generate the level in the initial stage (env.reset) 
env.reset()


# # OPTIONAL

# viewer = Viewer(160, 160)  # Adjust the size according to your environment

# ACTION_LOOKUP = env.unwrapped.get_action_lookup()

# Define episode and timestep parameters
num_episodes = 1
timesteps_per_episode = 40

current_episode = 0
current_timestep = 0

min_reward = -10


def eval_genomes(genomes, config)
# FOR EACH GENOME
    for genome_id, genome in genomes:
        
        env = gym.make()
    # net = neat.nn.FeedForwardNetwork.create(genome, config)
    # DEF INITIAL GENOME FITNESS = 0
    
    
    # EPISODES REWARDS = [] IDEA IS TO KEEP THE FITNESS SCORES OF ALL EPISODES AND THEN TAKE THE MAX
    
    # FOR EACH EPISODE
        # Episode reward = 0  
        # env.reset()
            
        # FOR EACH STEP
            # ACTION - GENERATED BY THE GENOME
            # RANDOM ACTION
            action = env.action_space.sample()
            
            # MAKE THE MOVE IN THE GAME
            # OUTPUT GAME STATE AFTER THE STEP WITH INFO + REWARD            
            observation, reward, done, info = env.step(action)
            
            
            # IMAGE STUFF
            image = env.render(mode='rgb_array')
            viewer.render(image)
            
            # PRINT INFO
            print(ACTION_LOOKUP[action], reward, done, info)

            # POPULATE THE Episode reward +=
            # if not done:
                # reward += MIN REWARD
                # current_timestep += 1

            # if done:
                # current_timestep = 0
                # current_episode += 1

                # OR BREAK

        # EPISODES REWARDS APPEND episode reward   
        
    
    # GENOME.FITNESS = max(EPISODE REWARDS)        
                

        

# # Load configuration.
# config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
#                      neat.DefaultSpeciesSet, neat.DefaultStagnation,
#                      'config-feedforward')

# # Create the population, which is the top-level object for a NEAT run.
# p = neat.Population(config)

# # Add a stdout reporter to show progress in the terminal.
# p.add_reporter(neat.StdOutReporter(False))

# # Run until a solution is found.
# winner = p.run(eval_genomes, 5)

# # Display the winning genome.
# print('\nBest genome:\n{!s}'.format(winner))

# # Show output of the most fit genome against training data.
# print('\nOutput:')
# winner_net = neat.nn.FeedForwardNetwork.create(winner, config)    

### Game logic

In [None]:
import gym
import gym_sokoban
import pyglet
from pyglet import clock
import numpy as np

## Custom rendering setup if gym's rendering is not available
class Viewer:
    def __init__(self, width, height):
        self.window = pyglet.window.Window(width, height)
        self.image = None
        self.window.on_draw = self.on_draw

    def render(self, image):
        self.image = pyglet.image.ImageData(image.shape[1], image.shape[0], 'RGB', image.tobytes(), pitch=image.shape[1] * -3)
        self.window.dispatch_event('on_draw')

    def on_draw(self):
        if self.image:
            self.window.clear()
            self.image.blit(0, 0)

env = gym.make('Sokoban-small-v1')
# generate the level in the initial stage (env.reset) 
env.reset()

print("Room Fixed")
print(env.room_fixed)
print(type(env.room_fixed))
print(env.room_fixed.shape)
print()
print(env.room_state)
print()
print(env.box_mapping)
print()


viewer = Viewer(160, 160)  # Adjust the size according to your environment

ACTION_LOOKUP = env.unwrapped.get_action_lookup()

# Define episode and timestep parameters
num_episodes = 2
timesteps_per_episode = 100

current_episode = 0
current_timestep = 0

def update_environment(dt):
    global current_episode, current_timestep, num_episodes, timesteps_per_episode

    if current_episode < num_episodes:
        if current_timestep < timesteps_per_episode:
            # RANDOM ACTION
            action = env.action_space.sample()
            observation, reward, done, info = env.step(action)
            
            
            
            image = env.render(mode='rgb_array')
            viewer.render(image)

            print(ACTION_LOOKUP[action], reward, done, info)

            if done:
                print(f"Episode finished after {current_timestep + 1} timesteps")
                current_timestep = 0
                current_episode += 1
                env.reset()
            else:
                current_timestep += 1
        else:
            current_episode += 1
            current_timestep = 0
            env.reset()
    else:
        print("All episodes finished. Closing window.")
        viewer.window.close()  # Close the Pyglet window explicitly

# Increase the frequency to match rendering needs (e.g., 60Hz)
clock.schedule_interval(update_environment, 1/60.0)

pyglet.app.run()
