# Modified Resource Gathering

In [1]:
from itertools import combinations
from collections import Counter
import matplotlib.pyplot as plt
from time import time
from tqdm import tqdm
import seaborn as sns
import pandas as pd
import numpy as np
import os

from IPython.display import clear_output

import mo_gymnasium as mo_gym
import gymnasium as gym

import torch.nn.functional as F
import torch.optim as optim
import torch.nn as nn
import torch as T

import warnings
warnings.filterwarnings('ignore')

### Agent Brain

In [2]:
class ReplayBuffer:
    """
        A replay buffer class for storing and sampling transitions for reinforcement learning.
    """

    def __init__(self, max_size: int, input_shape: list) -> None:
        """
            Initializes the ReplayBuffer class.

            Parameters:
                - max_size (int): The maximum size of the replay buffer.
                - input_shape (list): The shape of the input state.

            Returns:
                - None
        """ 
        
        self.mem_size = max_size
        self.mem_cntr = 0

        self.state_memory = np.zeros(
            (self.mem_size, *input_shape),
            dtype=np.float32
        )

        self.new_state_memory = np.zeros(
            (self.mem_size, *input_shape),
            dtype=np.float32
        )

        self.action_memory = np.zeros(self.mem_size, dtype=np.int64)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)

        # Mask to discount potential features rewards that may come after the current state
        self.terminal_memory = np.zeros(self.mem_size, dtype=bool)

    def store_transition(self, state: np.array, action: int, reward: float, state_: np.array, done: bool) -> None:
        """
            Stores a transition in the replay memory.

            Parameters:
                - state (np.array): The current state of the environment.
                - action (int): The action taken in the current state.
                - reward (float): The reward received for taking the action.
                - state_ (np.array): The next state of the environment.
                - done (bool): Indicates whether the episode is done after taking the action.

            Returns:
                - None
        """

        # Index of first free memory
        index = self.mem_cntr % self.mem_size

        # Stores the transition on the memories in the indices in the appropriate arrays
        self.state_memory[index] = state
        self.new_state_memory[index] = state_

        self.action_memory[index] = action
        self.reward_memory[index] = reward

        self.terminal_memory[index] = done

        self.mem_cntr += 1

    def sample_buffer(self, batch_size: int) -> tuple:
        """
            Randomly samples a batch of transitions from the replay memory buffer.

            Args:
                batch_size (int): The number of transitions to sample.

            Returns:
                tuple: A tuple containing the sampled states, actions, rewards, next states, and terminal flags.
        """

        max_mem = min(self.mem_cntr, self.mem_size)

        batch = np.random.choice(max_mem, batch_size, replace=False)

        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.new_state_memory[batch]
        terminal = self.terminal_memory[batch]

        return states, actions, rewards, states_, terminal

In [3]:
class DuelingDeepNetwork(nn.Module):
    """
        A class for a dueling deep neural network for reinforcement learning.
    """

    def __init__(self, learning_rate: float, n_actions: int, input_dims: list, name: str, chkpt_dir: str) -> None:
        """
            Initializes the DuelingDeepNetwork class.

            Parameters:
                - learning_rate (float): The learning rate for the optimizer.
                - n_actions (int): The number of actions in the environment.
                - input_dims (list): The dimensions of the input state.
                - name (str): The name of the network.
                - chkpt_dir (str): The directory to save the network's checkpoints.

            Returns:
                - None
        """

        super(DuelingDeepNetwork, self).__init__()

        self.name = name

        self.chkpt_dir = chkpt_dir
        self.chkpt_file = os.path.join(self.chkpt_dir, self.name)
        
        self.fc = nn.Sequential(
            nn.Linear(*input_dims, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512)
        )

        self.value = nn.Linear(512, 1)
        self.advantage = nn.Linear(512, n_actions)

        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        
        self.loss = nn.MSELoss()

        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)

    def forward(self, state: T.Tensor) -> tuple[T.Tensor, T.Tensor]:
        """
            Performs a forward pass on the network.

            Parameters:
                - state (T.Tensor): The input state.

            Returns:
                tuple[T.Tensor, T.Tensor]: The value and advantage outputs of the network.
        """

        x = self.fc(state)

        return self.value(x), self.advantage(x)

    def save_checkpoint(self) -> None:
        """
            Saves the network's checkpoint.

            Returns:
                - None
        """

        print('\tSaving checkpoint...')
        T.save(self.state_dict(), self.chkpt_file)

    def save_best(self, final_state: tuple) -> None:
        """
            Saves the network's checkpoint with the best score for a given final state.

            Parameters:
                - final_state (tuple): The final state of the environment.

            Returns:
                - None
        """

        print(f'\tSaving {self.name} with best score...')
        T.save(
            self.state_dict(), 
            os.path.join(self.chkpt_dir, f'{self.name}_{final_state}')
        )

    def load_checkpoint(self) -> None:
        """
            Loads the network's checkpoint file.

            Returns:
                - None
        """

        print('Loading checkpoint...')
        self.load_state_dict(T.load(f'{self.chkpt_file}_best'))

In [4]:
class Agent:
    """
        The Agent class represents an agent that interacts with the environment and learns to make decisions.
    """

    def __init__(
        self, gamma: float, epsilon: float, learning_rate: float, n_actions: int, 
        input_dims: list, mem_size: int, batch_size: int, 
        eps_min: float = 0.01 , eps_decay: float = 5e-7, 
        replace: int = 1000, 
        chkpt_dir: str = 'backup'
    ) -> None:
        """
            Initializes the Agent object.

            Args:
                - gamma (float): Discount factor for future rewards.
                - epsilon (float): Exploration rate, determines the probability of taking a random action.
                - learning_rate (float): Learning rate for the neural network optimizer.
                - n_actions (int): Number of possible actions in the environment.
                - input_dims (list): Dimensions of the input state.
                - mem_size (int): Size of the replay memory buffer.
                - batch_size (int): Number of samples to train on in each learning iteration.
                - eps_min (float, optional): Minimum value for epsilon. Defaults to 0.01.
                - eps_decay (float, optional): Decay rate for epsilon. Defaults to 5e-7.
                - replace (int, optional): Number of steps before updating the target network. Defaults to 1000.
                - chkpt_dir (str, optional): Directory to save checkpoints. Defaults to 'backup'.
        """
       
        self. epsilon = epsilon
        self.lr = learning_rate
        self.gamma = gamma

        self.input_dims = input_dims
        self.n_actions = n_actions

        self.batch_size = batch_size
        self.mem_size = mem_size

        self.eps_decay = eps_decay
        self.eps_min = eps_min

        self.replace_target_cnt = replace
        self.learn_step_cnt = 0

        self.chkpt_dir = chkpt_dir

        self.action_space = [ action for action in range(self.n_actions) ]
        self.memory = ReplayBuffer(self.mem_size, self.input_dims)

        self.q_eval = DuelingDeepNetwork(
            self.lr, self.n_actions, self.input_dims,
            'q_eval',
            self.chkpt_dir
        )

        self.q_next = DuelingDeepNetwork(
            self.lr, self.n_actions, self.input_dims,
            'q_next',
            self.chkpt_dir
        )

    def choose_action(self, observation: list) -> tuple[int, str]:
        """
            Choose an action based on the given observation.

            Parameters:
                observation (list): The current observation.

            Returns:
                tuple[int, str]: A tuple containing the chosen action and its type.
                The first element is the action (an integer), and the second element is the action type (a string).
        """

        if np.random.random() > self.epsilon:
            # NN action
            state = T.tensor(np.array([observation]), dtype=T.float).to(self.q_eval.device)

            _, advantage = self.q_eval.forward(state)
            
            action = T.argmax(advantage).item()
            action_type = 'NN'

        else:
            # Random action
            action = np.random.choice(self.action_space)
            action_type = 'Rand'

        return action, action_type

    def store_transition(self, state: np.array, action: int, reward: float, state_, done: bool) -> None:
        """
            Stores a transition in the replay memory buffer.

            Parameters:
                - state (np.array): The current state of the environment.
                - action (int): The action taken in the current state.
                - reward (float): The reward received for taking the action.
                - state_ (np.array): The next state of the environment.
                - done (bool): Indicates whether the episode is done after taking the action.

            Returns:
                - None
        """

        self.memory.store_transition(state, action, reward, state_, done)

    def replace_target_network(self) -> None:
        """
            Replaces the target network with the evaluation network.

            This method is called periodically to update the target network with the weights of the evaluation network.
            The target network is used to estimate the Q-values for the next state during the training process.

            Returns:
                None
        """

        if self.learn_step_cnt % self.replace_target_cnt == 0:
            self.q_next.load_state_dict(self.q_eval.state_dict())
            self.learn_step_cnt = 0

    def decrement_epsilon(self) -> None:
        """
            Decrements the value of epsilon by eps_decay if epsilon is greater than eps_min.
            If epsilon is already less than or equal to eps_min, it is set to eps_min.

            Returns:
                None
        """

        self.epsilon = self.epsilon - self.eps_decay if self.epsilon > self.eps_min else self.eps_min

    def save_models(self) -> None:
        """
            Saves the models' checkpoints.

            Returns:
                None
        """

        self.q_eval.save_checkpoint()
        self.q_next.save_checkpoint()

    def save_best(self, final_state: tuple) -> None:
        """
            Saves the models' checkpoints with the best score for a given final state.

            Parameters:
                - final_state (tuple): The final state of the environment.

            Returns:
                None
        """

        self.q_eval.save_best(final_state)
        self.q_next.save_best(final_state)

    def load_models(self) -> None:
        """
            Loads the models' checkpoints.

            Returns:
                None
        """

        self.q_eval.load_checkpoint()
        self.q_next.load_checkpoint()

    def reset_memory(self) -> None:
        """
            Resets the replay memory buffer.

            Returns:
                None
        """

        self.memory = ReplayBuffer(self.mem_size, self.input_dims)

    def learn(self) -> float:
        """
            Performs the learning process by randomly sampling the memory buffer to retrieve a batch_size sequence of actions.
            It then applies the learning equations to update the network weights.

            Returns:
                float: The loss value after the learning process.
        """

        # Wait until there have been batch size memory episodes 
        if self.memory.mem_cntr < self.batch_size:
            return np.nan

        self.q_eval.optimizer.zero_grad()

        self.replace_target_network()

        state, action, reward, next_state, done = self.memory.sample_buffer(self.batch_size)

        states  = T.tensor(state).to(self.q_eval.device)
        actions = T.tensor(action).to(self.q_eval.device)
        rewards = T.tensor(reward).to(self.q_eval.device)
        states_ = T.tensor(next_state).to(self.q_eval.device)
        dones = T.tensor(done).to(self.q_eval.device)

        indices = np.arange(self.batch_size)

        V_s, A_s = self.q_eval.forward(states)
        
        V_s_eval, A_s_eval = self.q_eval.forward(states_)

        V_s_, A_s_ = self.q_next.forward(states_)

        q_pred = T.add(V_s, (A_s - A_s.mean(dim=1, keepdim=True)))[indices, actions]

        q_next = T.add(V_s_, (A_s_ - A_s_.mean(dim=1, keepdim=True)))
        q_eval = T.add(V_s_eval, (A_s_eval - A_s_eval.mean(dim=1, keepdim=True)))

        max_actions = T.argmax(q_eval, dim=1)

        # Value rewards for which the next state is terminal
        q_eval[dones] = 0.0

        q_target = rewards + self.gamma * q_next[indices, max_actions]

        loss = self.q_eval.loss(q_target, q_pred).to(self.q_eval.device)
        loss.backward()
        
        self.q_eval.optimizer.step()
        self.learn_step_cnt += 1

        self.decrement_epsilon()

        return loss.item()

### Helper Functions

In [5]:
def increment_heatmap(heatmap: np.array, episode_path: list[tuple]) -> None:
    """
        Increments the heatmap of visited cells based on the given episode path.

        Args:
            heatmap (np.array): The heatmap to be incremented.
            episode_path (list[tuple]): The path taken during the episode.

        Returns:
            None, the heatmap is modified in place.
    """

    positions_count = Counter(episode_path)

    for (row, col), count in positions_count.items():
        heatmap[row][col] += count

def generate_path_matrix(episode_path: list[tuple]) -> np.array:
    """
        Generates a matrix representing the path taken during an episode.

        Args:
            episode_path (list[tuple]): The path taken during the episode.

        Returns:
            np.array: A matrix representing the path taken during the episode.
    """

    matrix = np.zeros((5, 5), dtype=int)
   
    positions_count = Counter(episode_path)

    for (row, col), count in positions_count.items():
        matrix[row][col] += count

    return matrix

def is_cardinal_sequence(episode_path: list[tuple]) -> bool:
    """
        Checks whether the episode path is a cardinal sequence, meaning that all 
        states in the sequence are in one of the four cardinal positions of the preceding state..

        Args:
            episode_path (list[tuple]): The path taken during the episode.

        Returns:
            bool: True if the episode path is a cardinal sequence, False otherwise.
    """

    if len(episode_path) < 2:
        return False
    
    # Cardinal steps
    cardinal_steps = [(0, 0), (0, 1), (0, -1), (1, 0), (-1, 0)]
    
    for idx in range(1, len(episode_path)):
        x_diff = episode_path[idx][0] - episode_path[idx - 1][0]
        y_diff = episode_path[idx][1] - episode_path[idx - 1][1]
        
        # Check if the difference between coordinates matches a cardinal step
        if (x_diff, y_diff) not in cardinal_steps:
            return False
    
    return True

def not_worse_path(path_len: int, treasure: int, converged_paths: dict) -> bool:
    """
        Checks whether the given path is not worse than the current best path to a given the treasure.

        Args:
            path_len (int): The length of the path to the treasure.
            treasure (int): The treasure to reach.
            converged_paths (dict): A dictionary containing the current best paths to the treasures.

        Returns:
            bool: True if the given path is not worse than the current best path to the treasure, False otherwise.
    """
    
    if treasure in [ solution[1] for solution in converged_paths.keys() ]:

        current_len_for_treasure = next(solution for solution in converged_paths.keys() if solution[1] == treasure)[0]

        if path_len > current_len_for_treasure: return False

    return True

In [6]:
def plot_learning(
    scores: list[float], 
    epsilons: list[float], 
    losses: list[float], 
    actions_history: list[dict], 
    heatmap: np.array,
    converged_episodes: dict[tuple, int], 
    env_img: np.array,
    filename: str = None
) -> None:
    """
        Plots the learning progression and visualizes the environment state.

        Args:
            scores (list[float]): List of scores for each episode.
            epsilons (list[float]): List of epsilon values for each episode.
            losses (list[float]): List of mean episode losses for each episode.
            actions_history (list[dict]): List of dictionaries containing the count of actions types for each episode.
            heatmap (np.array): 2D matrix representing the position visitation heatmap.
            converged_episodes (dict[tuple, int]): Dictionary mapping final states to the episode number at which they converged.
            env_img (np.array): 2D matrix image representing the final environment state.
            filename (str, optional): Name of the file to save the plots. Defaults to None.
            
        Returns:
            None
    """
    
    _, axes = plt.subplots(nrows=2, ncols=3, figsize=(18, 9))
    axes = axes.flatten()

    axes[0].plot(scores, color='C0')

    if len(converged_episodes):
        for episode, path_len in converged_episodes.items():
            axes[0].axvline(
                episode, alpha=0.5, 
                ls='--', c=np.random.random(3,), 
                label=f'Converged to {path_len}'
            )

        axes[0].legend(loc='lower right')
    
    axes[0].set(
        title='Score progression',
        xlabel='Episode',
        ylabel='Score'
    )

    axes[1].plot(epsilons, color='C1')
    axes[1].set(
        title=r'$\epsilon$ progression',
        xlabel='Episode',
        ylabel=r'$\epsilon$'
    )

    axes[2].plot(losses, color='C2')
    axes[2].set(
        title='Loss progression',
        xlabel='Episode',
        ylabel='Mean Episode Loss'
    )

    df = pd.DataFrame(actions_history, columns=['Rand', 'NN',  'Forced']).fillna(0)
    
    axes[3].plot(df.Rand, c='C3', label='Random')
    axes[3].plot(df.Forced, c='C8',label='Forced')
    axes[3].plot(df.NN, c='C4',label='Neural\nNetwork')
    
    axes[3].legend()
    axes[3].set(
        title='Type of actions performed',
        xlabel='Episode',
        ylabel='Quantity',
        yscale='log'
    )
    
    plt.tight_layout()
    
    if filename:
        plt.savefig(filename.replace('.png', '_histories.png'))

    sns.heatmap(
        heatmap, 
        annot=True, 
        square=True,
        fmt='.2g', 
        linewidth=.5, 
        cmap='viridis',
        ax=axes[4]
    )

    axes[4].xaxis.tick_top()
    axes[4].set(
        title='Position visitation',
        xlabel='$x$',
        ylabel='$y$'
    )

    axes[5].imshow(env_img)
    axes[5].set_title("Episode's Final Env State")
    axes[5].xaxis.set_visible(False)
    axes[5].yaxis.set_visible(False)
    
    plt.tight_layout()
    plt.show()

### Learning Loop

In [7]:
pareto_front = np.array([
    [8, 130],
    [8, 195],
    [8, 216],
    [8, 275],
    [8, 361],
    [8, 391],
    [8, 441],
])

pareto_front_rewards = list(map(
    lambda point: np.dot([-0.5, 0.5], point),
    pareto_front
))

print(f'Scalarized pareto front: {pareto_front_rewards}')

Scalarized pareto front: [61.0, 93.5, 104.0, 133.5, 176.5, 191.5, 216.5]


In [8]:
r1, r2, r3, r4 = 80, 145, 166, 175

treasure_map = {
    50: 'Ø',
    r1 + 50: 'R1',
    r2 + 50: 'R2',
    r3 + 50: 'R3', 
    r4 + 50: 'R4',

    (r1 + r2) + 50: 'R1-R2',
    (r1 + r3) + 50: 'R1-R3',
    (r1 + r4) + 50: 'R1-R4',
    (r2 + r3) + 50: 'R2-R3',
    (r2 + r4) + 50: 'R2-R4',
    (r3 + r4) + 50: 'R3-R4',

    (r1 + r2 + r3) + 50: 'R1-R2-R3',
    (r1 + r2 + r4) + 50: 'R1-R2-R4',
    (r1 + r3 + r4) + 50: 'R1-R3-R4',
    (r2 + r3 + r4) + 50: 'R2-R3-R4',

    (r1 + r2 + r3 + r4) + 50: 'R1-R2-R3-R4'
}

In [9]:
def learn_env(
    env: gym.Env, agent: Agent, 
    conversion_threshold: int = 300,
    n_trial: int = 1,
    write_results: bool = False, 
) -> None:
    """
        Learn the environment using the Deep Q-Managed algorithm.

        Args:
            env (gym.Env): The environment to train the agent in.
            agent (Agent): The agent to be trained.
            conversion_threshold (int, optional): The threshold for considering a path as converged. Defaults to 300.
            n_trial (int, optional): The number of trials. Defaults to 1.
            write_results (bool, optional): Whether to write the results. Defaults to False.
    """

    n_episodes = 100000

    scores, eps_history = [], []
    loss_history, episode_losses = [], []
    actions_history = []

    # Saves the paths taken in the episodes to check for converged states
    paths_hashs = Counter()

    # Saves the paths of conversion  to an optimal solution { solution_final_reward: path }
    converged_paths = {}
    removed_converged_paths = {}

    # Saves the episode of conversion to an optimal solution: { episode: path_length }
    converged_episodes = {}

    # Prevents the agent from reaching solutions with the same evaluation
    converged_evals = []

    # Map space tresure locations
    treasure_locs = set([ (0, 2), (2, 2), (4, 2), (4, 0) ])

    # Expected solution set
    expected_solutions = set([
        (8, 130), (8, 195), (8, 216), # One treasure
        (8, 275), (8, 361), (8, 391), # Two treasures
        (8, 441),                     # Three treasures
        (12, 616)                     # Four treasures 
    ])

    # Matrix to save the position visitation
    heatmap = np.zeros(env.map.shape, dtype=int)

    gold_home = 0
    got_lost = 0

    forced_paths = 0
    tried_paths = 0

    boosted_paths = 0
    boosted_long_paths = 0

    last_epsilon_reset = 0
    eps_since_valid_path = 0

    for episode in range(n_episodes):

        # Checking if the agent converged all optimal solutions
        if len(expected_solutions & set(converged_paths.keys())) >= 7:
            print('Agent converged for desired solutions')
            break
        
        observation, _ = env.reset()  

        done = False
        score = 0
        treasure = 0
        
        episode_losses = [0]
        actions_type = []
        transitions = []
        
        episode_path = []
        episode_path_hash = None
        
        stop = False

        while not done:       
            action, action_type = agent.choose_action(observation)

            action_would_take_home = tuple(env.current_pos + env.dir[action]) == tuple(env.final_pos)
            found_equivalent_path  = score + 24.5 in converged_evals

            current_solution = next((solution for solution in converged_paths.keys() if solution[1] == treasure + 50), False)
            better_path = len(episode_path) + 1 < current_solution[0] if current_solution and action_would_take_home else False

            # If an improved solution is identified compared to the one currently learned, the existing solution is 
            # discarded, allowing the agent to pursue the superior alternative
            if better_path:
                converged_evals = [ eval for eval in converged_evals if eval != score + 24.5]

                removed_converged_paths[current_solution] = converged_paths[current_solution]
                del converged_paths[current_solution]

                old_conversion_ep = next((key for key, val in converged_episodes.items() if val == score + 24.5), None)
                del converged_episodes[old_conversion_ep]

                break

            # Stops the agent from going to the final state when an equivalent solution has already been found
            if action_would_take_home and found_equivalent_path and current_solution and not better_path:
                
                low_visits = np.argwhere(
                    (heatmap <= np.mean(heatmap)) &
                    ~generate_path_matrix([(0, 0), *episode_path]).astype(bool)
                )
                new_pos = low_visits[np.random.randint(0, len(low_visits))]

                env.set_state(new_pos)
                episode_path.append(tuple(new_pos))

                action, _ = agent.choose_action(new_pos)
                action_type = 'Forced'
                forced_paths += 1
       
            actions_type.append(action_type)

            next_observation, reward, done, _, info = env.step(action)

            episode_path.append(tuple(env.current_pos))

            score += reward
            treasure += info['vector_reward'][1]
            
            agent.store_transition(observation, action, reward, next_observation, int(done))
            transitions.append((observation, action, reward, next_observation, int(done)))
            
            loss = agent.learn()
            
            episode_losses.append(loss)

            observation = next_observation

            if len(episode_path) == 100:
                got_lost += 1
                eps_since_valid_path += 1
                break
        
        else:
            if is_cardinal_sequence(episode_path) and not_worse_path(len(episode_path), int(treasure), converged_paths):
                
                print(f'Path with len {len(episode_path)} with treasure {treasure} added to tracker')

                eps_since_valid_path = 0

                episode_path_hash = f'{hash(str(episode_path))}{len(episode_path):03d}{treasure:05.1f}'
                paths_hashs[episode_path_hash] += 1

                good_short_path = len(episode_path) == 8 and treasure_locs & set(episode_path)
                good_long_path  = 8 < len(episode_path) <= 14 and len(treasure_locs & set(episode_path)) == 4
                if (good_short_path or good_long_path) and len(converged_evals):
                    print(f'Boost relevance of path with len {len(episode_path)} and treasure {treasure}: {episode_path}')

                    boosted_paths += 10
                    
                    for observation, action, reward, next_observation, done in [ transition for _ in range(10) for transition in transitions ]:
                        agent.store_transition(observation, action, reward + 10 if good_short_path else reward + 100, next_observation, done)
                    
                    for _ in range(10): agent.learn()

                    if 8 < len(episode_path) <= 14: boosted_long_paths += 1
                
        increment_heatmap(heatmap, episode_path)
        tried_paths += 1
        
        if stop: break
        
        if episode_path[-1] == (4, 4) and treasure_locs & set(episode_path):
            gold_home += 1

        scores.append(score)    
        eps_history.append(agent.epsilon)
        loss_history.append(np.nanmean(episode_losses))
        actions_history.append(dict(Counter(actions_type)))

        if episode - last_epsilon_reset > 1000 or eps_since_valid_path > 100:
            
            print(f'\nIncreasing epsilon\n')

            agent.eps_decay = 1e-3
            agent.epsilon = 0.7

            last_epsilon_reset = episode

            eps_history.append(agent.epsilon) 
        
        else:
            print(f'Episodes since last epsilon reset: {episode - last_epsilon_reset}\tEpisodes since last valid path: {eps_since_valid_path}')

        _, hash_count = paths_hashs.most_common(1)[0] if paths_hashs else (0, 0)
        if hash_count >= conversion_threshold:
            
            print('\n___________________')
            print('Converged to solution')
            print(f"{episode_path} {reward} {info['vector_reward']}")
            print('___________________\n\n')

            # Saving episode of conversion
            converged_episodes[episode] = score
            last_epsilon_reset = episode

            # Saving the paths of conversion
            converged_paths[(len(episode_path), int(treasure))] = episode_path

            # Blocks the agent from reaching solutions with the same evaluation
            converged_evals.append(score)

            # Increasing agent randomness
            agent.eps_decay = 1e-3
            agent.epsilon = 0.7   
            eps_history.append(agent.epsilon)        

            # Reseting paths taken in the episodes 
            agent.reset_memory()
            paths_hashs = Counter()
            gold_home = 0
            got_lost = 0
            tried_paths = 0
            forced_paths = 0
            boosted_paths = 0

            # Saves the agent's weights
            agent.save_best((len(episode_path), int(treasure)))

        print(f'\nEpisode {episode} of {n_episodes}')
        
        treasure_mapping = treasure_map[treasure] if episode_path[-1] == (4, 4) else treasure_map[treasure + 50]
        print(f'\tScore: {score:.2f}\tTreasure: {int(treasure)} ({treasure_mapping})\tAVG Score: {np.mean(scores[-100:]):.2f}\tMean Loss: {loss_history[-1]:3f}\tEpsilon: {eps_history[-1]:5f}')

        top_repetitions = dict(sorted(
            Counter([ 
                (int(path_hash[-8:-5]), float(path_hash[-5:])) for path_hash, reps in paths_hashs.most_common(5) for _ in range(reps)
            ]).items()
        ))
        print(f'\n\tUnique valid paths: {len(paths_hashs)} -> Top 5 repetitions: {top_repetitions}\n')

        gold_home_percent = f'{gold_home     / tried_paths * 100:.1f}' if tried_paths > 0 else '-%'
        got_lost_percent  = f'{got_lost      / tried_paths * 100:.1f}' if tried_paths > 0 else '-%'
        forced_percent    = f'{forced_paths  / tried_paths * 100:.1f}' if tried_paths > 0 else '-%'
        boosted_percent   = f'{boosted_paths / tried_paths * 100:.1f}' if tried_paths > 0 else '-%'
        print(f'\tReturned home with gold: {gold_home} ({gold_home_percent}%)\tGot lost: {got_lost} ({got_lost_percent}%)\tPaths with forced actions: {forced_paths} ({forced_percent}%)\tBoosted Paths: {boosted_paths} ({boosted_percent}%)')
        
        print(f'\tLatest episode path length: {len(episode_path)} {episode_path if len(episode_path) < 12 else f"[{episode_path[0]}, ..., {episode_path[-1]}]"}')

        print('\n\tActions taken in episode: NN: {NN}, Rand: {Rand}, Forced: {Forced}'.format_map(Counter(actions_type)))
        
        print(f'\n\tConverged epsiodes: {converged_episodes}')

        print(f'\n\tConverged solutions: {list(converged_paths.keys())}')

        print(f'\n\tBoosted long path: {boosted_long_paths}')

        print('\n')
        print(generate_path_matrix(episode_path))
        print(heatmap)
            
        clear_output(wait=True)
        
        if write_results:

            with open(f'rg_solutions/solution_rg_{conversion_threshold}_{n_trial}.txt', 'a') as solution_file:
                
                discovered_front = ' '.join([ 
                    f'{-1 * path_len} {treasure}' if path else '0 0'
                    for (path_len, treasure), path in sorted({
                        **converged_paths,
                        **{ key: [] for key in expected_solutions.difference(set(converged_paths.keys())) }
                    }.items())
                ])

                solution_file.write(f'{discovered_front}\n')

    print(f'Done with {episode}/{n_episodes}')
    print(f'\n\tConverged epsiodes: {converged_episodes}')
    print(f'\n\tConverged paths: {converged_paths}')

### Modified Resource Gathering Env

In [10]:
mrg_env = mo_gym.make('modified-resource-gathering-v0', render_mode='rgb_array')
mrg_env = mo_gym.LinearReward(mrg_env, weight=np.array([0.5, 0.5]))

agent = Agent(
    gamma=0.8, 
    epsilon=1.0, eps_decay=3e-3,
    learning_rate=1e-4, 
    n_actions=4, 
    input_dims=[2], 
    mem_size=10000, 
    batch_size=10, 
    replace=500,
    chkpt_dir = 'backup'
)

In [None]:
start_time = time()

scores, eps_history, loss_history, actions_history, heatmap, converged_episodes = learn_env(
    mrg_env, agent, 
    100, 1, False
)

end_time = time()

print(f'Elapsed time: {(end_time - start_time)/60:.3f} min')

In [None]:
mrg_env.reset()

plot_learning(
    scores, 
    eps_history, loss_history, 
    actions_history, 
    heatmap,
    converged_episodes, 
    mrg_env.render()
)