In [1]:
'''
Inspired from: https://github.com/hardmaru/slimevolleygym/blob/master/training_scripts/train_ga_selfplay.py
Trains an agent from scratch (no existing AI) using evolution
GA with no cross-over, just mutation, and random tournament selection
Not optimized for speed, and just uses a single CPU (mainly for simplicity)
'''
import os
import json
import numpy as np
import gym
import slimevolleygym
import slimevolleygym.mlp as mlp
from slimevolleygym.mlp import Model
from slimevolleygym import multiagent_rollout as rollout
from torch.utils.tensorboard import SummaryWriter
from slimevolleygym import BaselinePolicy
from tqdm import tqdm
import torch
from datetime import datetime

In [2]:
# Hyperparameters
random_seed = 612
num_agents = 128
total_num_games = 1000000
save_freq = 1000
logging_freq = 100 # Log to tensorboard every this many games
num_eval_episodes = 10
logging_dir = f"Logging/GENETIC-SELFPLAY/{datetime.now().strftime('%Y%m%d-%H%M%S')}-numagents-{num_agents}-totalnumgames-{total_num_games}"

In [3]:
# If random=True, we will use a random agent (Otherwise we use the baseline)
def evaluate(env, model_params, num_eval_episodes, random=False):

    # Load the model with the params
    policy = Model(mlp.games['slimevolleylite'])
    policy.set_model_params(model_params)

    if not random:
        opponent = BaselinePolicy()
    
    # Run num_eval_episodes episodes and calculate the total return
    total_return = 0
    for _ in range(num_eval_episodes):

        state1 = env.reset()
        state2 = state1
        done = False
        while not done:
            
            with torch.no_grad():

                # Select the actions for each agent
                # Setting mean_mode=True to avoid any randomness
                action1 = policy.predict(state1, mean_mode=True)

                if not random:
                    action2 = opponent.predict(state2)
                else:
                    action2 = env.action_space.sample()
            
            # Step the environment forward
            next_state1, reward, done, info = env.step(action1, otherAction=action2)
            next_state2 = info['otherObs']
            
            # Add the individual agents' rewards to the total returns (Since they're the same for both agents)
            total_return += reward

            # Update the states
            state1 = next_state1
            state2 = next_state2

    # Return the average return
    return total_return / num_eval_episodes

In [None]:
# Create a writer
writer = SummaryWriter(logging_dir)

# Create two instances of a feed forward policy we may need later.
policy_left = Model(mlp.games['slimevolleylite'])
policy_right = Model(mlp.games['slimevolleylite'])
param_count = policy_left.param_count

# Store our population here
population = np.random.normal(size=(num_agents, param_count)) * 0.5 # each row is an agent.
winning_streak = [0] * num_agents # store the number of wins for this agent (including mutated ones)

# Create the gym environment, and seed it
env = slimevolleygym.SlimeVolleyEnv()
env.seed(random_seed)
np.random.seed(random_seed)

# Store the history of the length of the games
history = []

# Run total_num_games games
for game in tqdm(range(1, total_num_games+1)):

  # Randomly extract 2 agents from the population
  m, n = np.random.choice(num_agents, 2, replace=False)
  policy_left.set_model_params(population[m])
  policy_right.set_model_params(population[n])

  # Run a game between them
  score, length = rollout(env, policy_right, policy_left)
  
  # Append the length of the game to the history
  history.append(length)

  # If score is positive, it means policy_right won.
  if score == 0: # If the game is tied, add noise to the left agent.
    population[m] += np.random.normal(size=param_count) * 0.1
  if score > 0:
    population[m] = population[n] + np.random.normal(size=param_count) * 0.1
    winning_streak[m] = winning_streak[n]
    winning_streak[n] += 1
  if score < 0:
    population[n] = population[m] + np.random.normal(size=param_count) * 0.1
    winning_streak[n] = winning_streak[m]
    winning_streak[m] += 1
  
  # Save the agent with the longest winning streak
  if game % save_freq == 0:
    model_filename = f"{logging_dir}/game_{game}"
    file = open(model_filename, "x")
    f = open(model_filename, "w")
    record_holder = np.argmax(winning_streak)
    record = winning_streak[record_holder]
    json.dump([population[record_holder].tolist(), record], f, sort_keys=True, indent=0, separators=(',', ': '))
    f.close()
  
  # Log the winning streak of the best agent
  if game % logging_freq == 0:
    
    # Extract the best agent as well as their winning streak
    record_holder = np.argmax(winning_streak)
    record = winning_streak[record_holder]

    # Log the winning streak of the best agent as a function of the game number
    writer.add_scalar('Best winning streak - Game', record, game)

    # Log the mean duration of the games as a function of the game number
    writer.add_scalar('Average game duration - Game', np.mean(history), game)
    writer.add_scalar('Game duration standard deviation - Game', np.std(history), game)

    # Run a few games between the best agent and a random agent
    random_score = evaluate(env, population[record_holder], num_eval_episodes, random=True)

    # Run a few games between the best agent and the baseline agent
    baseline_score = evaluate(env, population[record_holder], num_eval_episodes, random=False)

    # Log both scores
    writer.add_scalar('Best agent vs Random agent returns - Game', random_score, game)
    writer.add_scalar('Best agent vs Baseline agent returns - Game', baseline_score, game)

    # Reset the history
    history = []

  policy_left = Model(mlp.games['slimevolleylite'])
  policy_right = Model(mlp.games['slimevolleylite'])
  policy_left.set_model_params(population[m])
  policy_right.set_model_params(population[n])
 49%|█████████████████████████████████████████████████████▏                                                       | 487402/1000000 [30:05:04<112:40:30,  1.26it/s]