# Playing Flappy Bird with RL

**Tom Labiausse** - March 2024

[Code repository associated to the projet](https://github.com/t0m1ab/flappy-bird)

The notebook is organized in 5 main parts:
* **0 - Librairies and global variables**
* **1 - Agents**
* **2 - Trainers**
* **3 - Utility functions** [define plot functions]
* **4 - Launch and visualize** [launch trainings and plot/compare results]

During the training of an agent, data is recorded and save in JSON files at the end for future visualization as well as the agents final q-values. In section 0, `DEFAULT_OUTPUTS_PATH` and `DEFAULT_MODELS_PATH` define the default location to save the training data and the agents.

## 0 - Librairies and global variables

In [1]:
import os
import sys
from pathlib import Path
import argparse
import json
import time
from abc import abstractmethod
from collections import defaultdict
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.ticker import LinearLocator
import gymnasium as gym
import text_flappy_bird_gym

DEFAULT_OUTPUTS_PATH = os.path.join(os.getcwd(), "outputs/")
DEFAULT_MODELS_PATH = os.path.join(os.getcwd(), "models/")

## 1 - Agents

In [2]:
class Agent:

    def __init__(
        self,
        action_space_size: int,
        discount_factor: float,
    ) -> None:
        """
        Initialize a Reinforcement Learning agent with an empty dictionary of state-action values (q_values), a learning rate and an epsilon.

        ARGUMENTS:
            - action_space_size: The number of possible actions
            - discount_factor: The discount factor for computing the Q-value
            - q_values: A dictionary of state: [action values]
        """
        self.action_space_size = int(action_space_size)
        self.discount_factor = discount_factor # gamma
        self.q_values = defaultdict(lambda: [0 for _ in range(self.action_space_size)]) # default q_value for a given state is given by np.zeros(2) = [0, 0]
    
    def __str__(self) -> str:
        return self.__class__.__name__
    
    @staticmethod
    def parse_state(str_state: str) -> tuple[int, int]:
        """ str: "x,y" -> tuple: (x,y) """
        elements = str_state.split(",")
        if not len(elements) == 2:
            raise ValueError(f"Invalid state string: '{str_state}'. Should look like 'x,y'")
        return (int(elements[0]), int(elements[1]))

    @staticmethod
    def from_pretrained(agent_filename: str, path: str = None) -> "MCAgent | SARSALambdaAgent":
        """
        Load the agent from a json file.
        """
        path = path if path is not None else DEFAULT_MODELS_PATH
        filepath = os.path.join(path, f"{agent_filename}.json")
        if not os.path.isfile(filepath):
            raise FileNotFoundError(f"File {filepath} not found")
        
        with open(filepath, "r") as f:
            agent_dict = json.load(f)

        agent_constructor = None
        if agent_dict["type"] == "MCAgent":
            agent_constructor = MCAgent
        elif agent_dict["type"] == "SARSALambdaAgent":
            agent_constructor = SARSALambdaAgent
        else:
            raise ValueError(f"Invalid agent type {agent_dict['type']}")

        agent = agent_constructor(action_space_size=agent_dict["action_space_size"])

        agent.q_values = defaultdict(lambda: [0 for _ in range(agent.action_space_size)])
        for k, v in agent_dict["q_values"].items():
            agent.q_values[Agent.parse_state(k)] = np.array(v)

        return agent

    def save(self, name: str = None, path: str = None) -> None:
        """
        Save the agent in a json file.
        """
        name = name if name is not None else self.__class__.__name__
        json_filename = f"{name}.json"
        path = path if path is not None else DEFAULT_MODELS_PATH
        Path(path).mkdir(parents=True, exist_ok=True)

        serialized_q_values = {
            f"{k[0]},{k[1]}": list(v) for k, v in self.q_values.items()
        }

        json_dict = {
            "type": self.__class__.__name__,
            "action_space_size": self.action_space_size,
            "q_values": serialized_q_values,
        }

        with open(os.path.join(path, json_filename), "wb") as f:
            f.write(json.dumps(json_dict).encode("utf-8"))

        print(f"Agent {self} saved in {os.path.join(path, json_filename)}")
    
    @abstractmethod
    def policy(self, state: tuple[int, int], env: gym.Env = None, epsilon: float = None) -> int:
        """
		Returns an action following an epsilon-soft policy. If env is None and epsilon is None, the agent acts greedily (inference mode).
		"""
        raise NotImplementedError

    @abstractmethod
    def update(self, *args, **kwargs) -> None:
        """
        Updates the Q-value of an action following a specific method.
        """
        raise NotImplementedError


class MCAgent(Agent):

    def __init__(
            self, 
            action_space_size: int, 
            discount_factor: float = None,
            lr: float = None,
            verbose: bool = False,
        ):
        super().__init__(action_space_size=action_space_size, discount_factor=discount_factor)
        self.lr = lr
        self.mean_return = defaultdict(lambda: (0,0)) # returns[x] is (0,0) by default for any x and represents (n, R_n) [see S&B section 2.4]
        self.verbose = verbose
        if self.verbose:
            if self.lr is not None:
                print(f"Using learning rate {self.lr} update")
            else:
                print(f"Using mean return update")
    
    def policy(self, state: tuple[int, int], env: gym.Env = None, epsilon: float = None) -> int:
        """
        Returns an action following an epsilon-soft policy. If env is None and epsilon is None, the agent acts greedily (inference mode).
        """
        if env is None and epsilon is None: # act greedily (inference mode)
            return int(np.argmax(self.q_values[state]))
        
        if np.random.random() < epsilon: # with probability epsilon return a random action to explore the environment
            return env.action_space.sample()
        else: # with probability (1 - epsilon) act greedily (exploit)       
            return int(np.argmax(self.q_values[state]))
    
    def update_mean_return(self, state: tuple[int, int], action: int, return_value: float) -> None:
        """ 
        Update the mean return of the state-action pair (state, action) following the incremental mean formula [see S&B section 2.4].
        """
        (n, R_n) = self.mean_return[(state, action)]
        self.mean_return[(state, action)] = (n+1, (n * R_n + return_value) / (n+1))

    def update(
        self,
        states: list[tuple[int, int]],
        actions: list[int],
        rewards: list[float],
    ) -> None:
        """
        Updates the Q-value of an action following the Monte-Carlo Exploring Starts method [see S&B section 5.3].
        """
        state_action_pairs = list(zip(states, actions)) # need to transform into a list because zip is an iterator and will be consumed by the first for loop
        T = len(states)
        G = 0

        for t in range(T-1,-1,-1): # loop over the state-action pairs in reverse order (from T-1 to 0)
            G = self.discount_factor * G + rewards[t+1]
            if not state_action_pairs[t] in state_action_pairs[:t]: # first visit of the (state, action) pair in the episode
                state_t, action_t = state_action_pairs[t]
                if self.lr is None: # use mean return update
                    self.update_mean_return(state_t, action_t, return_value=G)
                    self.q_values[state_t][action_t] = self.mean_return[state_action_pairs[t]][1]
                else: # use learning rate update
                    self.q_values[state_t][action_t] += self.lr * (G - self.q_values[state_t][action_t])


class SARSALambdaAgent(Agent):

    def __init__(
            self, 
            action_space_size: int, 
            discount_factor: float = None,
            lr: float = None,
            trace_decay: float = None,
        ):
        super().__init__(action_space_size=action_space_size, discount_factor=discount_factor)
        self.eligibility = defaultdict(lambda: [0 for _ in range(self.action_space_size)]) # eligibility[(s,a)] is 0 by default for any pair (state,action)
        self.lr = lr
        self.trace_decay = trace_decay

    def policy(self, state: tuple[int, int], env: gym.Env = None, epsilon: float = None) -> int:
        """
        Returns an action following an epsilon-soft policy. If env is None and epsilon is None, the agent acts greedily (inference mode).
        """
        if env is None and epsilon is None: # act greedily (inference mode)
            return int(np.argmax(self.q_values[state]))
        
        if np.random.random() < epsilon: # with probability epsilon return a random action to explore the environment
            return env.action_space.sample()
        else: # with probability (1 - epsilon) act greedily (exploit)       
            return int(np.argmax(self.q_values[state]))

    def update(
        self,
        state: tuple[int, int, bool],
        action: int,
        reward: float,
        next_state: tuple[int, int, bool],
        next_action: int,
        terminated: bool,
    ) -> None:
        """
        Updates the Q-value of an action following the SARSA-lambda method [see S&B section 12.7].
        """
        q_value = self.q_values[state][action]
        next_q_value = (not terminated) * self.q_values[next_state][next_action]
        td_error = reward + self.discount_factor * next_q_value - q_value

        # sarsa-lambda
        self.eligibility[state][action] += 1
        for s in self.q_values.keys():
            for a in range(self.action_space_size):
                self.q_values[s][a] += self.lr * td_error * self.eligibility[s][a]
                self.eligibility[s][a] *= (self.discount_factor * self.trace_decay)

        # sarsa
        # self.q_values[state][action] = q_value + self.lr * td_error


# tests

env = gym.make('TextFlappyBird-v0', height = 15, width = 20, pipe_gap = 4)

mc_agent = MCAgent(
	action_space_size=env.action_space.n,
	discount_factor=1.0,
    lr=0.01,
)
print(f"{mc_agent} is ready!")

sarsa_lambda_agent = SARSALambdaAgent(
	action_space_size=env.action_space.n,
	discount_factor=0.95,
	lr=0.01,
	trace_decay=0.9,
)
print(f"{sarsa_lambda_agent} is ready!")

MCAgent is ready!
SARSALambdaAgent is ready!


## 2 - Trainers

In [3]:
class Trainer():

    def __init__(self) -> None:
        self.env = None
        self.agent = None
        self.n_episodes = None
        self.final_epsilon = None
        self.discount_factor = None
        self.experiment_name = None
        self.max_episode_length_eval = None
        self.reset_stats()
    
    def __str__(self) -> str:
        return self.__class__.__name__

    def reset_stats(self) -> None:
        self.train_episode_durations = [] # list of train episode durations
        self.train_episode_indexes = [] # list of train episode indexes matching eavh evaluation phase
        self.eval_episode_durations = [] # list of lists of episode durations for each evaluation phase
      
    def eval(self, n_episodes: int, env: gym.Env = None, agent: Agent = None, verbose: bool = False) -> list[int]:
        """
        Evaluate the agent with n_episodes in the environment by taking greedy actions and returns the lenghts of these eval episodes.
        """

        env = self.env if env is None else env
        agent = self.agent if agent is None else agent

        pbar = range(n_episodes)
        if verbose:
            pbar = tqdm(pbar, desc=f"Eval {self} on {self.env.spec.id}")

        episode_lengths = []
        for _ in pbar:
            
            episode_lengths.append(0)
            obs, _ = env.reset()
            terminated = False
            while not terminated:
                action = agent.policy(obs) # greedy action
                obs, _, terminated, _, _ = env.step(action)
                episode_lengths[-1] += 1
                if self.max_episode_length_eval is not None and episode_lengths[-1] >= self.max_episode_length_eval:
                    break

        return episode_lengths
    
    def save_eval_episode_durations_plot(self, path: str = None) -> None:
        """ Save plot of evaluation episode durations. """

        if len(self.train_episode_indexes) == 0 or len(self.eval_episode_durations) == 0:
            print("No evaluation episode durations to plot...")
            return
        
        path = DEFAULT_OUTPUTS_PATH if path is None else path
        Path(path).mkdir(parents=True, exist_ok=True)
        
        # curve
        plt.plot(self.train_episode_indexes, [np.mean(lengths) for lengths in self.eval_episode_durations])

        # config parameters
        for k, v in self.get_config_dict().items():
            plt.scatter([], [], label=f"{k} = {v}", color="white")

        # axis, title and legend
        plt.xlabel("Training episode index")
        plt.ylabel(f"Averaged episode duration (limit={self.max_episode_length_eval})")
        plt.title(f"Evaluation episode duration over training")
        plt.legend(loc="lower right", handletextpad=0, handlelength=0, fontsize=8)

        # save plot
        plt.savefig(os.path.join(path, f"{self.experiment_name}_eval_durations.png"))
        plt.close()

        # save data in json
        with open(os.path.join(path, f"{self.experiment_name}_eval_durations.json"), "w") as f:
            f.write(json.dumps({
                "experiment_name": self.experiment_name,
                "max_episode_length_eval": self.max_episode_length_eval,
                "train_episode_indexes": self.train_episode_indexes,
                "eval_episode_durations": self.eval_episode_durations,
            }))
    
    def save_train_episode_durations_plot(self, path: str = None, window: float = None) -> None:
        """ Save plot of training episode durations avegared on slots of window size. """

        if len(self.train_episode_durations) == 0:
            print("No training episode durations to plot...")
            return
        
        if window is not None and window > len(self.train_episode_durations):
            print("Window size should be smaller than the number of training episodes...")
            return
        
        path = DEFAULT_OUTPUTS_PATH if path is None else path
        Path(path).mkdir(parents=True, exist_ok=True)
        
        indexes = np.arange(1, len(self.train_episode_durations) + 1)
        if window is None: # cumulative average
            avg_episode_durations = np.cumsum(self.train_episode_durations) / indexes
            title_tag = "(cumulative average)"
        else: # moving average
            incomplete_avg = np.cumsum(self.train_episode_durations[:window-1]) / np.arange(1, window)
            complete_avg = np.convolve(self.train_episode_durations, np.ones(window), 'valid') / window
            avg_episode_durations = np.hstack((incomplete_avg, complete_avg))
            assert len(avg_episode_durations) == len(self.train_episode_durations)
            title_tag = f"(average window = {window})"

        # curve
        plt.plot(indexes, avg_episode_durations)

        # config parameters
        for k, v in self.get_config_dict().items():
            plt.scatter([], [], label=f"{k} = {v}", color="white")

        # axis, title and legend
        plt.xlabel("Training episode index")
        plt.ylabel(f"Averaged episode duration")
        plt.title(f"Train episode duration over training {title_tag}")
        plt.legend(loc="lower right", handletextpad=0, handlelength=0, fontsize=8)

        # save plot
        plt.savefig(os.path.join(path, f"{self.experiment_name}_train_durations.png"))
        plt.close()

        # save data in json
        with open(os.path.join(path, f"{self.experiment_name}_train_durations.json"), "w") as f:
            f.write(json.dumps({
                "experiment_name": self.experiment_name,
                "train_episode_durations": self.train_episode_durations,
            }))


class MCTrainer(Trainer):

    DEFAULT_EXP_NAME = "mc"
    
    def __init__(
            self, 
            n_episodes: int, 
            discount_factor: float, 
            final_epsilon: float,
            learning_rate: float = None,
            n_eval: int = None, 
            max_episode_length_eval: int = None,
        ) -> None:
        super().__init__()
        self.n_episodes = n_episodes
        self.discount_factor = discount_factor
        self.final_epsilon = final_epsilon
        self.lr = learning_rate
        self.n_eval = n_eval
        self.max_episode_length_eval = max_episode_length_eval
    
    def get_config_dict(self) -> dict:
        return {
            "n_episodes": self.n_episodes,
            "discount_factor": self.discount_factor,
            "final_epsilon": self.final_epsilon,
        }

    def train(self, env: gym.Env, experiment_name: str = None, save_plots: bool = False, save_agent: bool = False) -> MCAgent:
        
        # self.env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=self.n_episodes)
        self.env = env

        self.agent = MCAgent(
            action_space_size=self.env.action_space.n,
            discount_factor=self.discount_factor,
            lr=self.lr,
        )

        self.experiment_name = experiment_name if experiment_name is not None else MCTrainer.DEFAULT_EXP_NAME
        
        self.reset_stats()
        eval_every_episode = self.n_episodes // self.n_eval if self.n_eval is not None else None
        eval_every_episode = None if eval_every_episode == 0 else eval_every_episode

        log_final_eps = np.log(self.final_epsilon)
        pbar = tqdm(range(self.n_episodes), desc=f"Train {self} on {self.env.spec.id}")
        for episode_idx in pbar:

            epsilon = np.exp(log_final_eps * episode_idx / self.n_episodes)

            obs, _ = self.env.reset()
            # action = np.random.choice([0,1]) # random action to explore the environment
            action = self.agent.policy(obs, env=self.env, epsilon=epsilon)
            next_obs, reward, terminated, _, _ = self.env.step(action)

            # define lists of S_t, A_t, R_t values [see S&B section 5.3]
            states = [obs, next_obs] # S list
            actions = [action] # A list
            rewards = [None, reward] # R list
            episode_length = 1

            while not terminated:

                action = self.agent.policy(states[-1], env=self.env, epsilon=epsilon)
                next_obs, reward, terminated, _, _ = self.env.step(action)

                states.append(next_obs)
                actions.append(action)
                rewards.append(reward)
                episode_length += 1
            
            if terminated: # the last state is probably out of the state space (because the chain terminated) so we don't need it
                states.pop()

            # at this stage states and actions should have T elements and rewards T+1 elements (uncomment the following lines to check)
            assert len(actions) == len(states)
            assert len(rewards) == len(states) + 1

            # update the agent
            self.agent.update(states, actions, rewards)
            self.train_episode_durations.append(episode_length) # store the train episode duration

            if eval_every_episode is not None and episode_idx % eval_every_episode == 0:
                self.train_episode_indexes.append(episode_idx)
                self.eval_episode_durations.append(self.eval(n_episodes=100))
                pbar.set_postfix({f"avg_eval_episode_duration": np.mean(self.eval_episode_durations[-1])})
                
        if save_plots:
            self.save_train_episode_durations_plot(window=100)
            self.save_eval_episode_durations_plot()

        if save_agent:
            self.agent.save()
        
        return self.agent


class SARSALambdaTrainer(Trainer):

    DEFAULT_EXP_NAME = "sarsa-lambda"
    
    def __init__(
            self, 
            n_episodes: int, 
            learnind_rate: float,
            trace_decay: float,
            discount_factor: float, 
            final_epsilon: float, 
            n_eval: int = None, 
            max_episode_length_eval: int = None
        ) -> None:
        super().__init__()
        self.n_episodes = n_episodes
        self.lr = learnind_rate
        self.trace_decay = trace_decay
        self.discount_factor = discount_factor
        self.final_epsilon = final_epsilon
        self.n_eval = n_eval
        self.max_episode_length_eval = max_episode_length_eval
    
    def get_config_dict(self) -> dict:
        return {
            "n_episodes": self.n_episodes,
            "discount_factor": self.discount_factor,
            "final_epsilon": self.final_epsilon,
            "learning_rate": self.lr,
            "trace_decay": self.trace_decay,
        }
    
    def train(self, env: gym.Env, experiment_name: str = None, save_plots: bool = False, save_agent: bool = False) -> SARSALambdaAgent:
        
        self.env = env

        self.agent = SARSALambdaAgent(
            action_space_size=self.env.action_space.n,
            discount_factor=self.discount_factor,
            lr=self.lr,
            trace_decay=self.trace_decay,
        )

        self.experiment_name = experiment_name if experiment_name is not None else SARSALambdaTrainer.DEFAULT_EXP_NAME
        
        self.reset_stats()
        eval_every_episode = self.n_episodes // self.n_eval if self.n_eval is not None else None
        eval_every_episode = None if eval_every_episode == 0 else eval_every_episode

        log_final_eps = np.log(self.final_epsilon)
        pbar = tqdm(range(self.n_episodes), desc=f"Train {self} on {self.env.spec.id}")
        for episode_idx in pbar:

            epsilon = np.exp(log_final_eps * episode_idx / self.n_episodes)

            state, _ = self.env.reset() # S1
            action = self.agent.policy(state, env=self.env, epsilon=epsilon) # A1
            terminated = False
            episode_length = 1

            # play one episode
            while not terminated:

                next_state, reward, terminated, _, _ = self.env.step(action) # R1, S2
                next_action = self.agent.policy(next_state, env=self.env, epsilon=epsilon) # A2

                # update the agent
                self.agent.update(state, action, reward, next_state, next_action, terminated)

                state = next_state
                action = next_action
                episode_length += 1

            self.train_episode_durations.append(episode_length) # store the train episode duration

            if eval_every_episode is not None and episode_idx % eval_every_episode == 0:
                self.train_episode_indexes.append(episode_idx)
                self.eval_episode_durations.append(self.eval(n_episodes=100))
                pbar.set_postfix({f"avg_eval_episode_duration": np.mean(self.eval_episode_durations[-1])})
                
        if save_plots:
            self.save_train_episode_durations_plot(window=100)
            self.save_eval_episode_durations_plot()

        if save_agent:
            self.agent.save()
                
        return self.agent


# tests

mc_trainer = MCTrainer(
	n_episodes=100,
	discount_factor=1.0,
	final_epsilon=0.1,
)
print(f"{mc_trainer} is ready!")

sarsa_lambda_trainer = SARSALambdaTrainer(
	n_episodes=100,
	learnind_rate=0.01,
	trace_decay=0.9,
	discount_factor=1.0,
	final_epsilon=0.1,
)
print(f"{sarsa_lambda_trainer} is ready!")

MCTrainer is ready!
SARSALambdaTrainer is ready!


## 3 - Utility functions

In [17]:
def plot_and_compare_train_episode_duration(
        json_file_1: str,
        json_file_2: str,
        window: int,
        path: str = None,
    ):
    """
    Compare the training episode durations of two agents whose statistics are stored in json files.
    """ 

    path = DEFAULT_OUTPUTS_PATH if path is None else path
    if not os.path.isfile(os.path.join(path, json_file_1)):
        raise FileNotFoundError(f"File {json_file_1} not found at: {path}")
    if not os.path.isfile(os.path.join(path, json_file_2)):
        raise FileNotFoundError(f"File {json_file_2} not found at: {path}")
    
    with open(os.path.join(path, json_file_1), "r") as f:
        agent1_dict = json.load(f)
    
    with open(os.path.join(path, json_file_2), "r") as f:
        agent2_dict = json.load(f)

    if len(agent1_dict["train_episode_durations"]) != len(agent2_dict["train_episode_durations"]):
        raise ValueError("Agents have different number of training episodes...")
    
    exp_name_1 = agent1_dict["experiment_name"]
    exp_name_2 = agent2_dict["experiment_name"]
    train_episodes_1 = np.array(agent1_dict["train_episode_durations"])
    train_episodes_2 = np.array(agent2_dict["train_episode_durations"])
    
    if window is not None and window > len(train_episodes_1):
        print("Window size should be smaller than the number of training episodes...")
        return
    
    indexes = np.arange(1, len(train_episodes_1) + 1)
    if window is None: # cumulative average
        avg_episode_dur_1 = np.cumsum(train_episodes_1) / indexes
        avg_episode_dur_2 = np.cumsum(train_episodes_2) / indexes
        title_tag = "(cumulative average)"
    else: # moving average
        incomplete_avg_1 = np.cumsum(train_episodes_1[:window-1]) / np.arange(1, window)
        complete_avg_1 = np.convolve(train_episodes_1, np.ones(window), 'valid') / window
        avg_episode_durations_1 = np.hstack([incomplete_avg_1, complete_avg_1])
        incomplete_avg_2 = np.cumsum(train_episodes_2[:window-1]) / np.arange(1, window)
        complete_avg_2 = np.convolve(train_episodes_2, np.ones(window), 'valid') / window
        avg_episode_durations_2 = np.hstack([incomplete_avg_2, complete_avg_2])
        title_tag = f"(average window = {window})"

    # curve
    plt.plot(indexes, avg_episode_durations_1, label=exp_name_1)
    plt.plot(indexes, avg_episode_durations_2, label=exp_name_2)

    # axis, title and legend
    plt.xlabel("Training episode index")
    plt.ylabel(f"Averaged episode duration")
    plt.title(f"Train episode duration over training {title_tag}")
    plt.legend(loc="upper left")

    # save plot
    plt.savefig(os.path.join(path, f"DIFF_train_{exp_name_1}_{exp_name_2}.png"))
    plt.close()

def plot_and_compare_eval_episode_duration(
        json_file_1: str,
        json_file_2: str,
        window: int,
        path: str = None,
    ):
    """
    Compare the evaluation episode durations of two agents whose statistics are stored in json files.
    """ 

    path = DEFAULT_OUTPUTS_PATH if path is None else path
    if not os.path.isfile(os.path.join(path, json_file_1)):
        raise FileNotFoundError(f"File {json_file_1} not found at: {path}")
    if not os.path.isfile(os.path.join(path, json_file_2)):
        raise FileNotFoundError(f"File {json_file_2} not found at: {path}")
    
    with open(os.path.join(path, json_file_1), "r") as f:
        agent1_dict = json.load(f)
    
    with open(os.path.join(path, json_file_2), "r") as f:
        agent2_dict = json.load(f)
    
    exp_name_1 = agent1_dict["experiment_name"]
    exp_name_2 = agent2_dict["experiment_name"]
    train_ep_idx_1 = np.array(agent1_dict["train_episode_indexes"])
    train_ep_idx_2 = np.array(agent2_dict["train_episode_indexes"])
    eval_episodes_1 = np.array(agent1_dict["eval_episode_durations"])
    eval_episodes_2 = np.array(agent2_dict["eval_episode_durations"])
    max_length_1 = agent1_dict["max_episode_length_eval"]
    max_length_2 = agent2_dict["max_episode_length_eval"]  

    if len(train_ep_idx_1) == 0 or len(eval_episodes_1) == 0:
        raise ValueError("No evaluation episode durations to plot for agent 1...")
    
    if len(train_ep_idx_2) == 0 or len(eval_episodes_2) == 0:
        raise ValueError("No evaluation episode durations to plot for agent 2...")
    
    if len(train_ep_idx_1) != len(train_ep_idx_2):
        raise ValueError("Agents have different number of training episodes...")
    
    if max_length_1 != max_length_2:
        raise ValueError("Agents have different maximum episode lengths...")
    
    # curve
    plt.plot(train_ep_idx_1, [np.mean(lengths) for lengths in eval_episodes_1], label=exp_name_1)
    plt.plot(train_ep_idx_2, [np.mean(lengths) for lengths in eval_episodes_2], label=exp_name_2)

    # axis, title and legend
    plt.xlabel("Training episode index")
    plt.ylabel("Averaged episode duration")
    plt.title(f"Evaluation episode duration over training (limit={max_length_1})")
    plt.legend(loc="upper left")

    # save plot
    plt.savefig(os.path.join(path, f"DIFF_eval_{exp_name_1}_{exp_name_2}.png"))
    plt.close()

def plot_epsilon_scheduler(
        final_epsilon: float = 0.01, 
        n_episodes: int = 2000, 
        path: str = None
    ):
    """
    Plot the epsilon decay over the training episodes.
    """
    path = DEFAULT_OUTPUTS_PATH if path is None else path
    Path(path).mkdir(parents=True, exist_ok=True)

    log_final_epsilon = np.log(final_epsilon)
    epsilon_list = [np.exp(log_final_epsilon * float(i) / n_episodes) for i in range(n_episodes)]

    plt.plot(epsilon_list)
    plt.xlabel("Training episode index")
    plt.ylabel("$\epsilon$")
    plt.title("Exponential epsilon decay over training episodes")
    plt.savefig(os.path.join(path, "epsilon_scheduler.png"))
    plt.close()

def plot_state_value_function(agent_filename: str, path: str = None, save_only: bool = False):
    """
    Plot the state value function of a model stored in a json file.
    """

    agent_filename = agent_filename[:-5] if agent_filename.endswith(".json") else agent_filename # remove extension
    agent = Agent.from_pretrained(agent_filename, path=path)

    min_x, max_x = float('inf'), float('-inf')
    min_y, max_y = float('inf'), float('-inf')
    for (x,y) in agent.q_values.keys():
        min_x = min(min_x, x)
        max_x = max(max_x, x)
        min_y = min(min_y, y)
        max_y = max(max_y, y)

    x_range = max_x - min_x + 1
    y_range = max_y - min_y + 1
    
    x_values = np.arange(min_x, max_x + 1, step=1)
    y_values = np.arange(min_y, max_y + 1, step=1)

    x_mesh, y_mesh = np.meshgrid(x_values, y_values)

    q_values = np.zeros((y_range, x_range)) # x_mesh.shape == y_mesh.shape
    for (x,y), q in agent.q_values.items():
        q_values[int(y-min_y), int(x-min_x)] = np.max(q)

	# 3D plot
    fig, ax = plt.subplots(subplot_kw={"projection": "3d"})
    surface = ax.plot_surface(x_mesh, y_mesh, q_values, cmap=cm.coolwarm, linewidth=0, antialiased=False)
    ax.set_zticks([])
    ax.set_xlabel("dx")
    ax.set_ylabel("dy")
    fig.colorbar(surface, shrink=0.5, aspect=5)
    plt.title(f"State value function of {agent_filename}")

    if not save_only:
        plt.show()
    else:
        plt.savefig(os.path.join(DEFAULT_OUTPUTS_PATH, f"state_value_function_{agent_filename}.png"))
    
    plt.close()

## 4 - Launch and visualize

Define environment and training parameters

In [6]:
env = gym.make('TextFlappyBird-v0', height = 15, width = 20, pipe_gap = 4)

EPISODES = 2000
DISCOUNT_FACTOR = 1.0
FINAL_EPSILON = 0.01
LEARNING_RATE = 0.1
TRACE_DECAY = 0.9

EXPERIMENT_NAME = None # if None then a default name defined in each trainer will be used

Define MC trainer

In [7]:
trainer = MCTrainer(
	n_episodes=EPISODES,
	discount_factor=DISCOUNT_FACTOR,
	final_epsilon=FINAL_EPSILON,
	learning_rate=LEARNING_RATE,
	n_eval=100,
	max_episode_length_eval=1000,
)

Define Sarsa($\lambda$) trainer

In [11]:
trainer = SARSALambdaTrainer(
	n_episodes=EPISODES,
	learnind_rate=LEARNING_RATE,
	trace_decay=TRACE_DECAY,
	discount_factor=DISCOUNT_FACTOR,
	final_epsilon=FINAL_EPSILON,
	n_eval=100,
	max_episode_length_eval=1000,
)

Launch training

In [12]:
agent = trainer.train(
	env=env,
	experiment_name=EXPERIMENT_NAME,
	save_plots=True,
	save_agent=True,
)

Train SARSALambdaTrainer on TextFlappyBird-v0: 100%|██████████| 2000/2000 [00:37<00:00, 54.01it/s, avg_eval_episode_duration=665] 


Agent SARSALambdaAgent saved in /Users/tomlab/Documents/CS/MDS/SM11/RL/assignment/flappybird/models/SARSALambdaAgent.json


Visualize results

In [10]:
# visualize the epsilon decay
plot_epsilon_scheduler(
	final_epsilon=FINAL_EPSILON,
	n_episodes=EPISODES,
)

In [13]:
# compare the training episode durations of two agents
plot_and_compare_train_episode_duration(
	json_file_1="mc_train_durations.json",
	json_file_2="sarsa-lambda_train_durations.json",
	window=100,
)

In [14]:
# compare the evaluation episode durations of two agents
plot_and_compare_eval_episode_duration(
	json_file_1="mc_eval_durations.json",
	json_file_2="sarsa-lambda_eval_durations.json",
	window=100,
)

In [19]:
# visualize the state-value functions of the agents
plot_state_value_function(agent_filename="MCAgent", save_only=True)
plot_state_value_function(agent_filename="SARSALambdaAgent", save_only=True)