In [None]:
# Use autoreload extension so agent definition is always up to date.
# Src: https://ipython.readthedocs.io/en/stable/config/extensions/autoreload.html
%load_ext autoreload
%autoreload 2

# Initial imports
from unityagents import UnityEnvironment
from collections import deque
from matplotlib import pyplot as plt
import time
import seaborn as sns
import numpy as np
import os
import torch

import BananaAgent as Agents

sns.set_style()

print(f"Cuda available? {torch.cuda.is_available()}")

In [None]:
# TODO: Check if Banana is available
# Load environment and get initial brain
env = UnityEnvironment("Banana/Banana.x86_64", no_graphics=True)
brain_name = env.brain_names[0]
brain = env.brains[brain_name]

# Initialize environment for use of the agent
env_info = env.reset(train_mode=True)[brain_name]
action_space = brain.vector_action_space_size
state_space = env_info.vector_observations.size

print(f"Action space: {action_space}")
print(f"State space: {state_space}")


In [None]:
def do_episode(environment, agent):
    """Performs a single episode using the given environment and agent

    Args:
        environment (env): Environment that will perform the simulation
        agent (Agent): Agent that will traverse the environment

    Returns:
        (float, int): Total score and steps of the episode
    """
    episode_score = 0
    env_info = env.reset(train_mode=True)[brain_name]

    # Start the agent
    state = env_info.vector_observations[0]
    next_action = agent.start(state)

    # Take the first action
    env_info = env.step(next_action)[brain_name]

    while not env_info.local_done[0]:
        # Take a step from the agent
        reward = env_info.rewards[0]
        episode_score += reward
        state = env_info.vector_observations[0]

        next_action = agent.step(reward, state)

        # Perform action
        env_info = env.step(next_action)[brain_name]
    
    # Register last reward to the agent
    reward = env_info.rewards[0]
    episode_score += reward
    agent.end(reward)

    return episode_score

In [None]:
def create_agents():
    """Create the list of agents to test

    Returns:
        list: List of agents
    """
    agent_gamma = 0.99
    agent_alpha = 5e-4
    agent_tau = 1e-3
    agent_batch_size = 64
    agent_memory_size = int(1e5)
    agent_learn_update = 4
    seed = 134123

    agents = [
        # Agents.RandomAgent(state_space, action_space, agent_gamma, agent_alpha, seed),
        Agents.Udagent(state_space, action_space, seed=seed),
        # Agents.BananaAgent(state_space, action_space,
        #                    gamma=agent_gamma,
        #                    alpha=agent_alpha,
        #                    seed = seed,
        #                    tau=agent_tau,
        #                    buffer_size=agent_memory_size,
        #                    batch_size=agent_batch_size,
        #                    learn_every=agent_learn_update),
        # Agents.BananaAgentDouble(state_space, action_space,
        #                    gamma=agent_gamma,
        #                    alpha=agent_alpha,
        #                    seed = seed,
        #                    tau=agent_tau,
        #                    buffer_size=agent_memory_size,
        #                    batch_size=agent_batch_size,
        #                    learn_every=agent_learn_update)

    ]

    return agents

In [None]:
def do_experiment(environment, agent, episodes, print_every):
    """Performs an experiment on the given agent.

    Args:
        environment (any): Environment to use
        agent (Agent): Agent that follows the "Agent" interface
        episodes (int): Amount of episodes to perform
        print_every (int): How often to print the episode information

    Returns:
        (array_like, array_like): Scores and times that the agent took per episode
    """
    scores = np.zeros(episodes)
    times = np.zeros(episodes)

    for i in range(episodes):
        start_time = time.time()
        scores[i] = do_episode(environment, agent)
        times[i] = time.time() - start_time

        ep = i+1
        if ep % print_every == 0:
            print(f"{agent.agent_name()} :: ({ep}/{episodes}) AVG {np.average(scores[max(0, i-print_every):])}")
    
    return scores, times

In [None]:
average_window = 100

episodes = 1000
print_every = 100

# Perform each episode of the training
figure, axis = plt.subplots(3, 1, figsize=(19, 9), sharex="all")
axis[0].set_title("Scores")
axis[0].set_xlabel("Steps")
axis[1].set_title("Scores (rolling avg)")
axis[1].set_xlabel("Steps")
axis[2].set_title("Time")
axis[2].set_xlabel("Steps")

for agent in create_agents():
    # Do experiment
    scores, times = do_experiment(env, agent, episodes, print_every)
    agent_name = agent.agent_name()

    # Save agent
    torch.save(agent, f"agents/{agent_name}.pt")

    # Plot the statistics
    x = np.arange(episodes)
    sns.lineplot(x=x, y=scores, label=agent_name, ax=axis[0])

    avg = [np.average(scores[max(0, n-average_window):n+1]) for n in range(episodes)]
    sns.lineplot(x=x, y=avg, label=agent_name, ax=axis[1])
    
    sns.lineplot(x=x, y=times, label=agent_name, ax=axis[2])

plt.show()

    


In [None]:
# Close environment
env.close()