# Taxi Env (Don't Modify it )

In [1]:
from contextlib import closing
from io import StringIO
from os import path

import numpy as np

import gymnasium as gym
from gymnasium import Env, spaces, utils
from gymnasium.envs.toy_text.utils import categorical_sample
from gymnasium.error import DependencyNotInstalled

MAP_0 = [
    "+---------+",
    "|R: | : :G|",
    "| : | : : |",
    "| : : : : |",
    "| | : | : |",
    "|Y| : |B: |",
    "+---------+",
]

MAP_1 = [
    "+---------+",
    "|R| : : :G|",
    "| | : | : |",
    "| : : | : |",
    "| : | : : |",
    "|Y: | :B: |",
    "+---------+",
]

MAP_2 = [
    "+---------+",
    "|R: : : |G|",
    "| : : : | |",
    "| : | : : |",
    "| : | | : |",
    "|Y: : |B: |",
    "+---------+",
]

MAP = MAP_0
MAPS = [MAP_0, MAP_1, MAP_2]
WINDOW_SIZE = (550, 350)

In [2]:
# DO NOT MODIFY TaxiEnv
class TaxiEnv(Env):

    metadata = {
        "render_modes": ["human", "ansi", "rgb_array"],
        "render_fps": 4,
    }

    def __init__(
        self,
        render_mode: str | None = None,
        is_rainy: bool = True,
        fickle_passenger: bool = False,
        use_multiple_maps: bool = True,
        reward_step: float = -1,
        reward_delivery: float = 20,
        reward_illegal: float = -10,
    ):
        self.use_multiple_maps = use_multiple_maps

        if use_multiple_maps:
            self.desc_maps = [np.asarray(m, dtype="c") for m in MAPS]
            self.P_maps = []
            self.current_map_id = 0
            self.desc = self.desc_maps[0]
        else:
            self.desc = np.asarray(MAP, dtype="c")

        self.locs = locs = [(0, 0), (0, 4), (4, 0), (4, 3)]
        self.locs_colors = [(255, 0, 0), (0, 255, 0), (255, 255, 0), (0, 0, 255)]

        self.reward_step = reward_step
        self.reward_delivery = reward_delivery
        self.reward_illegal = reward_illegal

        num_states = 500
        num_rows = 5
        num_columns = 5
        self.max_row = num_rows - 1
        self.max_col = num_columns - 1
        self.initial_state_distrib = np.zeros(num_states)
        num_actions = 6


        if use_multiple_maps:

            for map_idx in range(3):
                self.desc = self.desc_maps[map_idx]
                P = {
                    state: {action: [] for action in range(num_actions)}
                    for state in range(num_states)
                }
                self.P = P

                # Iterate through all possible state combinations
                for row in range(num_rows):
                    for col in range(num_columns):
                        for pass_idx in range(len(locs) + 1):
                            for dest_idx in range(len(locs)):
                                state = self.encode(row, col, pass_idx, dest_idx)
                                if pass_idx < 4 and pass_idx != dest_idx:
                                    self.initial_state_distrib[state] += 1
                                for action in range(num_actions):
                                    if is_rainy:
                                        self._build_rainy_transitions(
                                            row, col, pass_idx, dest_idx, action,
                                        )
                                    else:
                                        self._build_dry_transitions(
                                            row, col, pass_idx, dest_idx, action,
                                        )

                self.P_maps.append(P)


            self.desc = self.desc_maps[0]
            self.P = self.P_maps[0]
        else:

            self.P = {
                state: {action: [] for action in range(num_actions)}
                for state in range(num_states)
            }

            for row in range(num_rows):
                for col in range(num_columns):
                    for pass_idx in range(len(locs) + 1):
                        for dest_idx in range(len(locs)):
                            state = self.encode(row, col, pass_idx, dest_idx)
                            if pass_idx < 4 and pass_idx != dest_idx:
                                self.initial_state_distrib[state] += 1
                            for action in range(num_actions):
                                if is_rainy:
                                    self._build_rainy_transitions(
                                        row, col, pass_idx, dest_idx, action,
                                    )
                                else:
                                    self._build_dry_transitions(
                                        row, col, pass_idx, dest_idx, action,
                                    )

        self.initial_state_distrib /= self.initial_state_distrib.sum()
        self.action_space = spaces.Discrete(num_actions)
        self.observation_space = spaces.Discrete(num_states)

        self.render_mode = render_mode
        self.fickle_passenger = fickle_passenger
        self.fickle_step = self.fickle_passenger and self.np_random.random() < 0.5

        # pygame utils
        self.window = None
        self.clock = None
        self.cell_size = (
            WINDOW_SIZE[0] / self.desc.shape[1],
            WINDOW_SIZE[1] / self.desc.shape[0],
        )
        self.taxi_imgs = None
        self.taxi_orientation = 0
        self.passenger_img = None
        self.destination_img = None
        self.median_horiz = None
        self.median_vert = None
        self.background_img = None

    def _pickup(self, taxi_loc, pass_idx, reward):
        """Computes the new location and reward for pickup action."""
        if pass_idx < 4 and taxi_loc == self.locs[pass_idx]:
            new_pass_idx = 4
            new_reward = reward
        else:  # passenger not at location
            new_pass_idx = pass_idx
            new_reward = self.reward_illegal

        return new_pass_idx, new_reward

    def _dropoff(self, taxi_loc, pass_idx, dest_idx, default_reward):
        """Computes the new location and reward for return dropoff action."""
        if (taxi_loc == self.locs[dest_idx]) and pass_idx == 4:
            new_pass_idx = dest_idx
            new_terminated = True
            new_reward = self.reward_delivery
        elif (taxi_loc in self.locs) and pass_idx == 4:
            new_pass_idx = self.locs.index(taxi_loc)
            new_terminated = False
            new_reward = default_reward
        else:  # dropoff at wrong location
            new_pass_idx = pass_idx
            new_terminated = False
            new_reward = self.reward_illegal

        return new_pass_idx, new_reward, new_terminated

    def _calc_new_position(self, row, col, movement, offset=0):
        """Calculates the new position for a row and col to the movement."""
        dr, dc = movement
        new_row = max(0, min(row + dr, self.max_row))
        new_col = max(0, min(col + dc, self.max_col))
        if self.desc[1 + new_row, 2 * new_col + offset] == b":":
            return new_row, new_col
        else:  # Default to current position if not traversable
            return row, col

    def _build_rainy_transitions(self, row, col, pass_idx, dest_idx, action):
        """Computes the next action for a state (row, col, pass_idx, dest_idx) and action for `is_rainy`."""
        state = self.encode(row, col, pass_idx, dest_idx)

        taxi_loc = left_pos = right_pos = (row, col)
        new_row, new_col, new_pass_idx = row, col, pass_idx
        reward = self.reward_step
        terminated = False

        moves = {
            0: ((1, 0), (0, -1), (0, 1)),  # Down
            1: ((-1, 0), (0, -1), (0, 1)),  # Up
            2: ((0, 1), (1, 0), (-1, 0)),  # Right
            3: ((0, -1), (1, 0), (-1, 0)),  # Left
        }

        # Check if movement is allowed
        if (
            action in {0, 1}
            or (action == 2 and self.desc[1 + row, 2 * col + 2] == b":")
            or (action == 3 and self.desc[1 + row, 2 * col] == b":")
        ):
            dr, dc = moves[action][0]
            new_row = max(0, min(row + dr, self.max_row))
            new_col = max(0, min(col + dc, self.max_col))

            left_pos = self._calc_new_position(row, col, moves[action][1], offset=2)
            right_pos = self._calc_new_position(row, col, moves[action][2])
        elif action == 4:  # pickup
            new_pass_idx, reward = self._pickup(taxi_loc, new_pass_idx, reward)
        elif action == 5:  # dropoff
            new_pass_idx, reward, terminated = self._dropoff(
                taxi_loc, new_pass_idx, dest_idx, reward
            )
        intended_state = self.encode(new_row, new_col, new_pass_idx, dest_idx)

        if action <= 3:
            left_state = self.encode(left_pos[0], left_pos[1], new_pass_idx, dest_idx)
            right_state = self.encode(
                right_pos[0], right_pos[1], new_pass_idx, dest_idx
            )

            self.P[state][action].append((0.8, intended_state, self.reward_step, terminated))
            self.P[state][action].append((0.1, left_state, self.reward_step, terminated))
            self.P[state][action].append((0.1, right_state, self.reward_step, terminated))
        else:
            self.P[state][action].append((1.0, intended_state, reward, terminated))


    def encode(self, taxi_row, taxi_col, pass_loc, dest_idx):
        # (5) 5, 5, 4
        i = taxi_row
        i *= 5
        i += taxi_col
        i *= 5
        i += pass_loc
        i *= 4
        i += dest_idx
        return i

    def decode(self, i):
        out = []
        out.append(i % 4)
        i = i // 4
        out.append(i % 5)
        i = i // 5
        out.append(i % 5)
        i = i // 5
        out.append(i)
        assert 0 <= i < 5
        return reversed(out)

    def action_mask(self, state: int):
        """Computes an action mask for the action space using the state information."""
        mask = np.zeros(6, dtype=np.int8)
        taxi_row, taxi_col, pass_loc, dest_idx = self.decode(state)
        if taxi_row < 4:
            mask[0] = 1
        if taxi_row > 0:
            mask[1] = 1
        if taxi_col < 4 and self.desc[taxi_row + 1, 2 * taxi_col + 2] == b":":
            mask[2] = 1
        if taxi_col > 0 and self.desc[taxi_row + 1, 2 * taxi_col] == b":":
            mask[3] = 1
        if pass_loc < 4 and (taxi_row, taxi_col) == self.locs[pass_loc]:
            mask[4] = 1
        if pass_loc == 4 and (
            (taxi_row, taxi_col) == self.locs[dest_idx]
            or (taxi_row, taxi_col) in self.locs
        ):
            mask[5] = 1
        return mask

    def step(self, a):
        transitions = self.P[self.s][a]
        i = categorical_sample([t[0] for t in transitions], self.np_random)
        p, s, r, t = transitions[i]
        self.lastaction = a

        shadow_row, shadow_col, shadow_pass_loc, shadow_dest_idx = self.decode(self.s)
        taxi_row, taxi_col, pass_loc, _ = self.decode(s)

        # If we are in the fickle step, the passenger has been in the vehicle for at least a step and this step the
        # position changed
        if (
            self.fickle_passenger
            and self.fickle_step
            and shadow_pass_loc == 4
            and (taxi_row != shadow_row or taxi_col != shadow_col)
        ):
            self.fickle_step = False
            possible_destinations = [
                i for i in range(len(self.locs)) if i != shadow_dest_idx
            ]
            dest_idx = self.np_random.choice(possible_destinations)
            s = self.encode(taxi_row, taxi_col, pass_loc, dest_idx)

        self.s = s

        if self.render_mode == "human":
            self.render()
        # truncation=False as the time limit is handled by the `TimeLimit` wrapper added during `make`
        return int(s), r, t, False, {"prob": p, "action_mask": self.action_mask(s)}

    def reset(
        self,
        *,
        seed: int | None = None,
        options: dict | None = None,
    ):
        super().reset(seed=seed)


        if self.use_multiple_maps and seed is not None:
            map_id = seed % 3
            self.current_map_id = map_id
            self.desc = self.desc_maps[map_id]
            self.P = self.P_maps[map_id]

        self.s = categorical_sample(self.initial_state_distrib, self.np_random)
        self.lastaction = None
        self.fickle_step = self.fickle_passenger and self.np_random.random() < 0.3
        self.taxi_orientation = 0

        if self.render_mode == "human":
            self.render()
        return int(self.s), {"prob": 1.0, "action_mask": self.action_mask(self.s)}

    def render(self):
        if self.render_mode is None:
            assert self.spec is not None
            gym.logger.warn(
                "You are calling render method without specifying any render mode. "
                "You can specify the render_mode at initialization, "
                f'e.g. gym.make("{self.spec.id}", render_mode="rgb_array")'
            )
            return
        elif self.render_mode == "ansi":
            return self._render_text()
        else:  # self.render_mode in {"human", "rgb_array"}:
            return self._render_gui(self.render_mode)

    def _render_gui(self, mode):
        try:
            import pygame  # dependency to pygame only if rendering with human
        except ImportError as e:
            raise DependencyNotInstalled(
                'pygame is not installed, run `pip install "gymnasium[toy-text]"`'
            ) from e

        if self.window is None:
            pygame.init()
            pygame.display.set_caption("Taxi")
            if mode == "human":
                self.window = pygame.display.set_mode(WINDOW_SIZE)
            elif mode == "rgb_array":
                self.window = pygame.Surface(WINDOW_SIZE)

        assert (
            self.window is not None
        ), "Something went wrong with pygame. This should never happen."
        if self.clock is None:
            self.clock = pygame.time.Clock()
        if self.taxi_imgs is None:
            file_names = [
                path.join(path.dirname(__file__), "img/cab_front.png"),
                path.join(path.dirname(__file__), "img/cab_rear.png"),
                path.join(path.dirname(__file__), "img/cab_right.png"),
                path.join(path.dirname(__file__), "img/cab_left.png"),
            ]
            self.taxi_imgs = [
                pygame.transform.scale(pygame.image.load(file_name), self.cell_size)
                for file_name in file_names
            ]
        if self.passenger_img is None:
            file_name = path.join(path.dirname(__file__), "img/passenger.png")
            self.passenger_img = pygame.transform.scale(
                pygame.image.load(file_name), self.cell_size
            )
        if self.destination_img is None:
            file_name = path.join(path.dirname(__file__), "img/hotel.png")
            self.destination_img = pygame.transform.scale(
                pygame.image.load(file_name), self.cell_size
            )
            self.destination_img.set_alpha(170)
        if self.median_horiz is None:
            file_names = [
                path.join(path.dirname(__file__), "img/gridworld_median_left.png"),
                path.join(path.dirname(__file__), "img/gridworld_median_horiz.png"),
                path.join(path.dirname(__file__), "img/gridworld_median_right.png"),
            ]
            self.median_horiz = [
                pygame.transform.scale(pygame.image.load(file_name), self.cell_size)
                for file_name in file_names
            ]
        if self.median_vert is None:
            file_names = [
                path.join(path.dirname(__file__), "img/gridworld_median_top.png"),
                path.join(path.dirname(__file__), "img/gridworld_median_vert.png"),
                path.join(path.dirname(__file__), "img/gridworld_median_bottom.png"),
            ]
            self.median_vert = [
                pygame.transform.scale(pygame.image.load(file_name), self.cell_size)
                for file_name in file_names
            ]
        if self.background_img is None:
            file_name = path.join(path.dirname(__file__), "img/taxi_background.png")
            self.background_img = pygame.transform.scale(
                pygame.image.load(file_name), self.cell_size
            )

        desc = self.desc

        for y in range(0, desc.shape[0]):
            for x in range(0, desc.shape[1]):
                cell = (x * self.cell_size[0], y * self.cell_size[1])
                self.window.blit(self.background_img, cell)
                if desc[y][x] == b"|" and (y == 0 or desc[y - 1][x] != b"|"):
                    self.window.blit(self.median_vert[0], cell)
                elif desc[y][x] == b"|" and (
                    y == desc.shape[0] - 1 or desc[y + 1][x] != b"|"
                ):
                    self.window.blit(self.median_vert[2], cell)
                elif desc[y][x] == b"|":
                    self.window.blit(self.median_vert[1], cell)
                elif desc[y][x] == b"-" and (x == 0 or desc[y][x - 1] != b"-"):
                    self.window.blit(self.median_horiz[0], cell)
                elif desc[y][x] == b"-" and (
                    x == desc.shape[1] - 1 or desc[y][x + 1] != b"-"
                ):
                    self.window.blit(self.median_horiz[2], cell)
                elif desc[y][x] == b"-":
                    self.window.blit(self.median_horiz[1], cell)

        for cell, color in zip(self.locs, self.locs_colors):
            color_cell = pygame.Surface(self.cell_size)
            color_cell.set_alpha(128)
            color_cell.fill(color)
            loc = self.get_surf_loc(cell)
            self.window.blit(color_cell, (loc[0], loc[1] + 10))

        taxi_row, taxi_col, pass_idx, dest_idx = self.decode(self.s)

        if pass_idx < 4:
            self.window.blit(self.passenger_img, self.get_surf_loc(self.locs[pass_idx]))

        if self.lastaction in [0, 1, 2, 3]:
            self.taxi_orientation = self.lastaction
        dest_loc = self.get_surf_loc(self.locs[dest_idx])
        taxi_location = self.get_surf_loc((taxi_row, taxi_col))

        if dest_loc[1] <= taxi_location[1]:
            self.window.blit(
                self.destination_img,
                (dest_loc[0], dest_loc[1] - self.cell_size[1] // 2),
            )
            self.window.blit(self.taxi_imgs[self.taxi_orientation], taxi_location)
        else:  # change blit order for overlapping appearance
            self.window.blit(self.taxi_imgs[self.taxi_orientation], taxi_location)
            self.window.blit(
                self.destination_img,
                (dest_loc[0], dest_loc[1] - self.cell_size[1] // 2),
            )

        if mode == "human":
            pygame.event.pump()
            pygame.display.update()
            self.clock.tick(self.metadata["render_fps"])
        elif mode == "rgb_array":
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(self.window)), axes=(1, 0, 2)
            )

    def get_surf_loc(self, map_loc):
        return (map_loc[1] * 2 + 1) * self.cell_size[0], (
            map_loc[0] + 1
        ) * self.cell_size[1]

    def _render_text(self):
        desc = self.desc.copy().tolist()
        outfile = StringIO()

        out = [[c.decode("utf-8") for c in line] for line in desc]
        taxi_row, taxi_col, pass_idx, dest_idx = self.decode(self.s)

        def ul(x):
            return "_" if x == " " else x

        if pass_idx < 4:
            out[1 + taxi_row][2 * taxi_col + 1] = utils.colorize(
                out[1 + taxi_row][2 * taxi_col + 1], "yellow", highlight=True
            )
            pi, pj = self.locs[pass_idx]
            out[1 + pi][2 * pj + 1] = utils.colorize(
                out[1 + pi][2 * pj + 1], "blue", bold=True
            )
        else:  # passenger in taxi
            out[1 + taxi_row][2 * taxi_col + 1] = utils.colorize(
                ul(out[1 + taxi_row][2 * taxi_col + 1]), "green", highlight=True
            )

        di, dj = self.locs[dest_idx]
        out[1 + di][2 * dj + 1] = utils.colorize(out[1 + di][2 * dj + 1], "magenta")
        outfile.write("\n".join(["".join(row) for row in out]) + "\n")
        if self.lastaction is not None:
            outfile.write(
                f"  ({['South', 'North', 'East', 'West', 'Pickup', 'Dropoff'][self.lastaction]})\n"
            )
        else:
            outfile.write("\n")

        with closing(outfile):
            return outfile.getvalue()

    def close(self):
        if self.window is not None:
            import pygame

            pygame.display.quit()
            pygame.quit()

# Training Strategy(Policy Gradient)

In [3]:
import numpy as np
import sys
from pathlib import Path

In [4]:
class PolicyGradientAgentOptimized:

    def __init__(self, n_states, n_actions,
                 learning_rate=0.01,
                 value_lr=0.1,
                 lr_decay=0.9999,
                 lr_min=0.0001,
                 discount_factor=0.99,
                 entropy_coef=0.01):
        self.n_states = n_states
        self.n_actions = n_actions
        self.lr = learning_rate
        self.lr_init = learning_rate
        self.value_lr = value_lr
        self.lr_decay = lr_decay
        self.lr_min = lr_min
        self.gamma = discount_factor
        self.entropy_coef = entropy_coef

        # policy parameters
        self.theta = np.zeros((n_states, n_actions))

        # Value function
        self.V = np.zeros(n_states)

        # statistics information
        self.episode_count = 0

    def get_policy(self, state):
        """calculate action probability distribution (softmax)"""
        theta_state = self.theta[state] - np.max(self.theta[state])
        exp_theta = np.exp(theta_state)
        return exp_theta / np.sum(exp_theta)

    def choose_action(self, state):
        """sample action according to policy"""
        policy = self.get_policy(state)
        return np.random.choice(self.n_actions, p=policy)

    def update(self, episode_history):
        """
        update policy using Advantage and Entropy
        """
        if len(episode_history) == 0:
            return

        # 1. calculate return G_t
        returns = []
        G = 0
        for state, action, reward in reversed(episode_history):
            G = reward + self.gamma * G
            returns.insert(0, G)
        returns = np.array(returns)

        # 2. update Value function
        for t, (state, action, reward) in enumerate(episode_history):
            td_error = returns[t] - self.V[state]
            self.V[state] += self.value_lr * td_error

        # 3. calculate Advantage
        advantages = []
        for t, (state, action, reward) in enumerate(episode_history):
            advantage = returns[t] - self.V[state]
            advantages.append(advantage)

        # 4. standardize Advantage
        advantages = np.array(advantages)
        if len(advantages) > 1:
            advantages = (advantages - np.mean(advantages)) / (np.std(advantages) + 1e-9)

        # 5. update policy parameters
        for t, (state, action, reward) in enumerate(episode_history):
            policy = self.get_policy(state)

            # Policy gradient
            grad = np.zeros(self.n_actions)
            grad[action] = 1.0
            grad -= policy

            # Entropy gradient
            entropy_grad = -np.log(policy + 1e-9) - 1

            # combine update
            total_grad = (advantages[t] * grad +
                         self.entropy_coef * entropy_grad)

            self.theta[state] += self.lr * total_grad

        # 6. decay learning rate
        self.decay_learning_rate()
        self.episode_count += 1

    def decay_learning_rate(self):
        """gradually decrease learning rate"""
        self.lr = max(self.lr_min, self.lr * self.lr_decay)

    @property
    def Q(self):
        """compatible test function"""
        return self.theta

# Reward Wrapper

In [5]:
class TaxiRewardWrapper(gym.Wrapper):
    """
    Success-aware Reward Wrapper for Taxi-v3 with custom reward rules
    """

    def __init__(self, env, reward_step=-5, reward_delivery=20, reward_illegal=-1):
        super().__init__(env)
        # These are the base rewards from the environment, we'll modify them
        # self.custom_reward_step = reward_step
        # self.custom_reward_delivery = reward_delivery
        # self.custom_reward_illegal = reward_illegal

        # --- CUSTOM REWARD RULES ---
        self._valid_move_penalty = -0.5
        self._wall_penalty = -2
        self._successful_pickup_bonus = 5
        self._wrong_pickup_penalty = -5
        self._successful_dropoff_reward = 20
        self._wrong_dropoff_penalty = -5
        self._delivery_bonus = 30 # This will be added on top of successful_dropoff_reward
        self._timeout_penalty = -15
        self._closer_reward = 1 # Reward for getting closer to the target
        self._farther_penalty = -1.5 # Penalty for getting farther from the target


        # trackers
        self._prev_state = None
        self._prev_distance = None
        self._picked = False
        self._last_action = None
        self._last_pos = None
        self._episode_timed_out = False # Added to track timeout

    def reset(self, **kwargs):
        self._prev_state = None
        self._prev_distance = None
        self._picked = False
        self._last_action = None
        self._last_pos = None
        self._episode_timed_out = False # Reset on new episode
        obs, info = super().reset(**kwargs)
        self._prev_state = self.env.s
        self._prev_distance, _, _ = self._distance_to_target(self._prev_state) # Initialize previous distance
        return obs, info

    def _decode_state(self, state):
        return self.env.decode(state)

    def _distance_to_target(self, state):
        taxi_row, taxi_col, pass_loc, dest_idx = self._decode_state(state)
        target_row, target_col = self.env.locs[dest_idx if pass_loc == 4 else pass_loc]
        return abs(taxi_row - target_row) + abs(taxi_col - target_col), pass_loc, dest_idx

    def action_mask(self, state):
        """Pass through the action_mask call to the wrapped environment."""
        return self.env.action_mask(state)

    def step(self, action):
        # Get state before taking the step
        prev_taxi_row, prev_taxi_col, prev_pass_loc, prev_dest_idx = self._decode_state(self.env.s)
        prev_taxi_loc = (prev_taxi_row, prev_taxi_col)


        obs, reward, terminated, truncated, info = self.env.step(action)
        next_state = obs

        # Get state after taking the step
        next_taxi_row, next_taxi_col, next_pass_loc, next_dest_idx = self._decode_state(next_state)
        next_taxi_loc = (next_taxi_row, next_taxi_col)

        custom_reward = 0

        # --- Apply Custom Reward Rules ---

        # 1. Successful drop-off and Delivery bonus
        if terminated and (next_pass_loc == next_dest_idx) and (next_taxi_loc == self.env.locs[next_dest_idx]):
            custom_reward += self._successful_dropoff_reward + self._delivery_bonus
            info["success"] = True
        # 2. Episode too long (timeout)
        elif truncated:
            custom_reward += self._timeout_penalty
            self._episode_timed_out = True
            info["timeout"] = True
        # 3. Hitting a wall (invalid move) - Check if taxi location didn't change but action was a move
        elif reward == self.env.reward_illegal and action in {0, 1, 2, 3}:
             custom_reward += self._wall_penalty
        # 4. Wrong pickup (no passenger) - Check if pickup action was taken but passenger is not in taxi and not at pickup location
        elif action == 4 and next_pass_loc != 4 and prev_taxi_loc != self.env.locs[prev_pass_loc]:
             custom_reward += self._wrong_pickup_penalty
        # 5. Wrong drop-off - Check if dropoff action was taken but not at destination or passenger not in taxi
        elif action == 5 and (next_pass_loc != next_dest_idx or next_taxi_loc != self.env.locs[next_dest_idx]):
             custom_reward += self._wrong_dropoff_penalty
        # 6. Successful pickup
        elif action == 4 and next_pass_loc == 4 and prev_pass_loc != 4:
            custom_reward += self._successful_pickup_bonus
            self._picked = True
        # 7. Distance-based reward/penalty for valid moves
        elif reward == self.env.reward_step and action in {0, 1, 2, 3}: # Only apply to movement actions
            current_distance, _, _ = self._distance_to_target(next_state)
            if current_distance < self._prev_distance:
                custom_reward += self._closer_reward
            elif current_distance > self._prev_distance:
                custom_reward += self._farther_penalty
            else: # Stayed in the same spot but it was a valid move
                custom_reward += self._valid_move_penalty # Apply a small penalty for not moving closer


        # Update trackers
        self._last_action = action
        self._last_pos = (next_taxi_row, next_taxi_col)
        self._prev_state = next_state
        self._prev_distance, _, _ = self._distance_to_target(next_state) # Update previous distance

        # Add info about picked status and distance for potential debugging/analysis
        dist, _, _ = self._distance_to_target(next_state)
        info.update({
            "picked": self._picked,
            "distance_to_target": dist,
            "episode_timed_out": self._episode_timed_out # Include timeout info
        })

        return obs, custom_reward, terminated, truncated, info

# Training

In [6]:
def train(n_episodes=50000, max_steps=200,
          seed_start=0, seed_end=40000, verbose=True,
          reward_step=-5, reward_delivery=20, reward_illegal=-1):
    """train optimized Policy Gradient Agent"""

    # Wrap environment with custom rewards
    env = TaxiRewardWrapper(
        TaxiEnv(),
        # reward_step=reward_step,
        # reward_delivery=reward_delivery,
        # reward_illegal=reward_illegal,
        # reward_closer_to_target=reward_closer_to_target,
        # reward_pickup=reward_pickup
    )

    agent = PolicyGradientAgentOptimized(
        n_states=env.observation_space.n,
        n_actions=env.action_space.n,
        learning_rate=0.01,      # TODO
        value_lr=0.1,         # TODO
        lr_decay=0.99999,       # TODO
        discount_factor=0.99,     # TODO
        entropy_coef=0.1        # TODO
    )

    episode_rewards = []
    success_count = 0

    if verbose:
        print(f"training episodes: {n_episodes}")
        print("=" * 70)

    for episode in range(n_episodes):
        state, info = env.reset(seed=seed_start + episode % seed_end)

        episode_history = []
        total_reward = 0

        for step in range(max_steps):
            action = agent.choose_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            episode_history.append((state, action, reward))

            total_reward += reward
            state = next_state

            if done:
                if terminated and info.get("success", False): # Check for success from the wrapper info
                    success_count += 1
                break


        agent.update(episode_history)
        episode_rewards.append(total_reward)

        if verbose and (episode + 1) % 1000 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            success_rate = success_count / min(1000, episode + 1)

            print(f"episode {episode + 1}/{n_episodes} | "
                  f"avg reward: {avg_reward:.2f} | "
                  f"success rate: {success_rate:.1%} | "
                  f"learning rate: {agent.lr:.6f}")

            if episode >= 999: # Reset success count every 1000 episodes
                success_count = 0

    if verbose:
        print("=" * 70)
        print("Training completed!")

    env.close()
    return agent, episode_rewards

# Testing

In [7]:
def test(model_filename, n_episodes=100, seed_start=42, verbose=True,
         reward_step=-5, reward_delivery=20, reward_illegal=-1):
    """test agent"""

    # Load model for testing
    print(f"\nload model")

    # Wrap environment with custom rewards
    env = TaxiRewardWrapper(
        TaxiEnv(),
        # reward_step=reward_step,
        # reward_delivery=reward_delivery,
        # reward_illegal=reward_illegal,
        # reward_closer_to_target=reward_closer_to_target,
        # reward_pickup=reward_pickup
    )

    agent = PolicyGradientAgentOptimized(
    n_states=env.observation_space.n,
    n_actions=env.action_space.n
    )
    agent.theta = np.load(model_filename)

    rewards = []
    steps_list = []
    successes = 0

    for episode in range(n_episodes):
        state, info = env.reset(seed=seed_start + episode)
        episode_reward = 0
        step_count = 0

        for step in range(200):
            # use deterministic policy during testing
            action = np.argmax(agent.theta[state])

            next_state, reward, terminated, truncated, info = env.step(action)

            episode_reward += reward
            step_count += 1

            if terminated or truncated:
                if info.get("success", False): # Check for success from the wrapper info
                    successes += 1
                break

            state = next_state

        rewards.append(episode_reward)
        steps_list.append(step_count)

    avg_reward = np.mean(rewards)
    avg_steps = np.mean(steps_list)
    success_rate = successes / n_episodes

    # calculate evaluation score (success rate 20%, steps 80%)
    normalized_steps = avg_steps / 200
    step_score = 1 - normalized_steps
    evaluation_score = success_rate * 0.2 + step_score * 0.8

    if verbose:
        print(f"\ntest result (seed {seed_start}-{seed_start+n_episodes-1}):")
        print("=" * 70)
        print(f"   avg reward: {avg_reward:.2f}")
        print(f"   avg steps: {avg_steps:.2f}")
        print(f"   success rate: {success_rate:.1%}")
        print(f"   evaluation score: {evaluation_score:.4f} ({evaluation_score*100:.2f}%)")
        print("=" * 70)

    env.close()

    return {
        'avg_reward': avg_reward,
        'avg_steps': avg_steps,
        'success_rate': success_rate,
        'evaluation_score': evaluation_score,
    }

# Baseline (Policy Gradient with Custom Reward)

In [None]:
print("=" * 70)

# Initial reward parameters
REWARD_STEP = -1
REWARD_DELIVERY = 20
REWARD_ILLEGAL = -5

# train
agent, rewards = train(
    n_episodes=50000,
    verbose=True,
    reward_step=REWARD_STEP,
    reward_delivery=REWARD_DELIVERY,
    reward_illegal=REWARD_ILLEGAL,
)

# save model
model_filename = 'policy_gradient_optimized.npy'
np.save(model_filename, agent.theta)
print(f"\nmodel saved to {model_filename}")

# test
test(
    model_filename,
    n_episodes=1000,
    seed_start=420000,
    verbose=True,
    reward_step=REWARD_STEP,
    reward_delivery=REWARD_DELIVERY,
    reward_illegal=REWARD_ILLEGAL,
)

training episodes: 50000
episode 1000/50000 | avg reward: -314.23 | success rate: 6.1% | learning rate: 0.009900
episode 2000/50000 | avg reward: -293.03 | success rate: 15.0% | learning rate: 0.009802
episode 3000/50000 | avg reward: -246.99 | success rate: 22.4% | learning rate: 0.009704


# Q-Learning (Main Model)

In [9]:
class QLearningAgent:
    def __init__(self, n_states, n_actions, learning_rate=0.1, discount_factor=0.99, epsilon=1.0, epsilon_decay_rate=0.999, epsilon_min=0.01):
        self.n_states = n_states
        self.n_actions = n_actions
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay_rate = epsilon_decay_rate
        self.epsilon_min = epsilon_min
        self.q_table = np.zeros((n_states, n_actions))

    def choose_action(self, state, env):
        """Choose action using epsilon-greedy policy"""
        if np.random.rand() < self.epsilon:
            # Explore: choose a random action
            return np.random.choice(self.n_actions)
        else:
            # Exploit: choose the action with the highest Q-value
            return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state):
        """Update Q-table using the Q-learning update rule"""
        best_next_action = np.argmax(self.q_table[next_state])
        td_target = reward + self.gamma * self.q_table[next_state][best_next_action]
        td_error = td_target - self.q_table[state][action]
        self.q_table[state][action] += self.lr * td_error

    def decay_epsilon(self):
        """Decay epsilon"""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay_rate)

In [10]:
def train_q_learning(n_episodes=50000, max_steps=200,
                     seed_start=0, seed_end=40000, verbose=True):
    """train Q-learning Agent"""

    # Wrap environment with custom rewards
    env = TaxiRewardWrapper(TaxiEnv())

# The best hyperparameters identified through the grid search were a learning_rate of 0.1, discount_factor of 0.99, epsilon_decay_rate of 0.999, and epsilon_min of 0.01.
# This hyperparameter combination achieved the highest evaluation score of 0.7042.
# The performance metrics for the best hyperparameters included an average reward of 15.18, an average of 66.95 steps per episode, and a success rate of 86.0%.

    agent = QLearningAgent(
        n_states=env.observation_space.n,
        n_actions=env.action_space.n,
        learning_rate=0.1,      #
        discount_factor=0.99,     #
        epsilon=1.0,          #
        epsilon_decay_rate=0.999, # Updated from 0.9999 based on hyperparameter tuning
        epsilon_min=0.01        #
    )

    episode_rewards = []
    success_count = 0

    if verbose:
        print(f"training episodes: {n_episodes}")
        print("=" * 70)

    for episode in range(n_episodes):
        state, info = env.reset(seed=seed_start + episode % seed_end)

        episode_history = []
        total_reward = 0

        for step in range(max_steps):
            action = agent.choose_action(state, env)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            agent.update(state, action, reward, next_state)

            total_reward += reward
            state = next_state

            if done:
                if terminated and info.get("success", False): # Check for success from the wrapper info
                    success_count += 1
                break

        agent.decay_epsilon()
        episode_rewards.append(total_reward)

        if verbose and (episode + 1) % 1000 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            success_rate = success_count / min(1000, episode + 1)

            print(f"episode {episode + 1}/{n_episodes} | "
                  f"avg reward: {avg_reward:.2f} | "
                  f"success rate: {success_rate:.1%} | "
                  f"epsilon: {agent.epsilon:.4f}")

            if episode >= 999: # Reset success count every 1000 episodes
                success_count = 0


    if verbose:
        print("=" * 70)
        print("Training completed!")

    env.close()
    return agent, episode_rewards

In [11]:
def test_q_learning(agent, n_episodes=100, seed_start=42, verbose=True):
    """test Q-learning agent"""

    # Wrap environment with custom rewards
    env = TaxiRewardWrapper(TaxiEnv())

    rewards = []
    steps_list = []
    successes = 0

    # Set epsilon to 0 for deterministic policy during testing
    agent.epsilon = 0

    for episode in range(n_episodes):
        state, info = env.reset(seed=seed_start + episode)
        episode_reward = 0
        step_count = 0

        for step in range(200):
            # use deterministic policy during testing
            action = np.argmax(agent.q_table[state])

            next_state, reward, terminated, truncated, info = env.step(action)

            episode_reward += reward
            step_count += 1

            if terminated or truncated:
                if info.get("success", False): # Check for success from the wrapper info
                    successes += 1
                break

            state = next_state

        rewards.append(episode_reward)
        steps_list.append(step_count)

    avg_reward = np.mean(rewards)
    avg_steps = np.mean(steps_list)
    success_rate = successes / n_episodes

    # calculate evaluation score (success rate 20%, steps 80%)
    normalized_steps = avg_steps / 200
    step_score = 1 - normalized_steps
    evaluation_score = success_rate * 0.2 + step_score * 0.8

    if verbose:
        print(f"\ntest result (seed {seed_start}-{seed_start+n_episodes-1}):")
        print("=" * 70)
        print(f"   avg reward: {avg_reward:.2f}")
        print(f"   avg steps: {avg_steps:.2f}")
        print(f"   success rate: {success_rate:.1%}")
        print(f"   evaluation score: {evaluation_score:.4f} ({evaluation_score*100:.2f}%)")
        print("=" * 70)

    env.close()

    return {
        'avg_reward': avg_reward,
        'avg_steps': avg_steps,
        'success_rate': success_rate,
        'evaluation_score': evaluation_score,
    }

In [12]:
print("=" * 70)

# train
q_agent, q_rewards = train_q_learning(
    n_episodes=50000,
    verbose=True
)

# save model
q_model_filename = 'Model.npy' # 'q_learning_agent.npy'
np.save(q_model_filename, q_agent.q_table)
print(f"\nmodel saved to {q_model_filename}")

# test
test_q_learning(
    q_agent,
    n_episodes=1000,
    seed_start=420000,
    verbose=True
)

training episodes: 50000
episode 1000/50000 | avg reward: -14.27 | success rate: 47.6% | epsilon: 0.3677
episode 2000/50000 | avg reward: 26.32 | success rate: 87.5% | epsilon: 0.1352
episode 3000/50000 | avg reward: 41.73 | success rate: 93.2% | epsilon: 0.0497
episode 4000/50000 | avg reward: 42.19 | success rate: 93.8% | epsilon: 0.0183
episode 5000/50000 | avg reward: 41.56 | success rate: 93.8% | epsilon: 0.0100
episode 6000/50000 | avg reward: 46.03 | success rate: 94.3% | epsilon: 0.0100
episode 7000/50000 | avg reward: 39.19 | success rate: 94.4% | epsilon: 0.0100
episode 8000/50000 | avg reward: 45.81 | success rate: 95.8% | epsilon: 0.0100
episode 9000/50000 | avg reward: 38.31 | success rate: 95.7% | epsilon: 0.0100
episode 10000/50000 | avg reward: 47.47 | success rate: 94.9% | epsilon: 0.0100
episode 11000/50000 | avg reward: 48.02 | success rate: 94.7% | epsilon: 0.0100
episode 12000/50000 | avg reward: 48.26 | success rate: 96.6% | epsilon: 0.0100
episode 13000/50000 | a

{'avg_reward': np.float64(41.1815),
 'avg_steps': np.float64(63.747),
 'success_rate': 0.894,
 'evaluation_score': np.float64(0.7238120000000001)}

In [13]:
# test
test_q_learning(
    q_agent,
    n_episodes=1000,
    seed_start=30000,
    verbose=True
)


test result (seed 30000-30999):
   avg reward: 41.37
   avg steps: 62.52
   success rate: 89.2%
   evaluation score: 0.7283 (72.83%)


{'avg_reward': np.float64(41.374),
 'avg_steps': np.float64(62.523),
 'success_rate': 0.892,
 'evaluation_score': np.float64(0.728308)}

# Task: Grid Search Hyperparameter Tuning (Optional)
Create grid search to do hyperparameter tuning using q learning

## Define the hyperparameter grid

### Subtask:
Specify the ranges or lists of values for the hyperparameters you want to tune (e.g., learning rate, discount factor, epsilon decay rate, epsilon min).


**Reasoning**:
Create a dictionary to store the hyperparameter grid with specified ranges for learning rate, discount factor, epsilon decay rate, and epsilon min.



In [None]:
param_grid = {
    'learning_rate': [0.01, 0.1, 0.2],
    'discount_factor': [0.9, 0.95, 0.99],
    'epsilon_decay_rate': [0.999, 0.9999, 0.99999],
    'epsilon_min': [0.01, 0.05]
}

## Create a training and evaluation loop

### Subtask:
Write a loop that iterates through all possible combinations of hyperparameters defined in the grid.


**Reasoning**:
Get all possible hyperparameter combinations and initialize a list to store results.



In [None]:
import itertools

keys, values = zip(*param_grid.items())
hyperparameter_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]

results = []

for params in hyperparameter_combinations:
    print(f"Training with params: {params}")


Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.01}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.05}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.9999, 'epsilon_min': 0.01}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.9999, 'epsilon_min': 0.05}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.99999, 'epsilon_min': 0.01}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.99999, 'epsilon_min': 0.05}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.95, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.01}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.95, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.05}
Training with params: {'learning

## Train and evaluate the agent for each combination

### Subtask:
Inside the loop, train a new Q-learning agent with the current hyperparameter combination using the `train_q_learning` function and evaluate its performance using the `test_q_learning` function.


**Reasoning**:
Train and test the Q-learning agent with the current hyperparameter combination and store the results.



**Reasoning**:
The `train_q_learning` function does not accept the hyperparameters as keyword arguments. I need to modify the function to accept the hyperparameters and pass them to the `QLearningAgent` constructor.



In [None]:
def train_q_learning(n_episodes=50000, max_steps=200,
                     seed_start=0, seed_end=40000, verbose=True,
                     learning_rate=0.1, discount_factor=0.99, epsilon=1.0, epsilon_decay_rate=0.999, epsilon_min=0.01): # Added hyperparameter arguments
    """train Q-learning Agent"""

    # Wrap environment with custom rewards
    env = TaxiRewardWrapper(TaxiEnv())

    agent = QLearningAgent(
        n_states=env.observation_space.n,
        n_actions=env.action_space.n,
        learning_rate=learning_rate,      # Use passed argument
        discount_factor=discount_factor,     # Use passed argument
        epsilon=epsilon,          # Use passed argument
        epsilon_decay_rate=epsilon_decay_rate, # Use passed argument
        epsilon_min=epsilon_min        # Use passed argument
    )

    episode_rewards = []
    success_count = 0

    if verbose:
        print(f"training episodes: {n_episodes}")
        print("=" * 70)

    for episode in range(n_episodes):
        state, info = env.reset(seed=seed_start + episode % seed_end)

        episode_history = []
        total_reward = 0

        for step in range(max_steps):
            action = agent.choose_action(state, env)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            agent.update(state, action, reward, next_state)

            total_reward += reward
            state = next_state

            if done:
                if terminated and info.get("success", False): # Check for success from the wrapper info
                    success_count += 1
                break

        agent.decay_epsilon()
        episode_rewards.append(total_reward)

        if verbose and (episode + 1) % 1000 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            success_rate = success_count / min(1000, episode + 1)

            print(f"episode {episode + 1}/{n_episodes} | "
                  f"avg reward: {avg_reward:.2f} | "
                  f"success rate: {success_rate:.1%} | "
                  f"epsilon: {agent.epsilon:.4f}")

            if episode >= 999: # Reset success count every 1000 episodes
                success_count = 0


    if verbose:
        print("=" * 70)
        print("Training completed!")

    env.close()
    return agent, episode_rewards

import itertools

keys, values = zip(*param_grid.items())
hyperparameter_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]

results = []

for params in hyperparameter_combinations:
    print(f"Training with params: {params}")
    # Train the agent with current parameters
    q_agent, q_rewards = train_q_learning(
        n_episodes=10000,  # Reduced episodes for faster grid search
        verbose=False, # Suppress verbose output during grid search
        **params # Pass hyperparameters
    )

    # Test the trained agent
    test_results = test_q_learning(
        q_agent,
        n_episodes=100, # Reduced episodes for faster grid search
        seed_start=420000,
        verbose=False # Suppress verbose output during grid search
    )

    # Store the results
    results.append({
        'params': params,
        'test_results': test_results
    })

Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.01}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.05}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.9999, 'epsilon_min': 0.01}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.9999, 'epsilon_min': 0.05}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.99999, 'epsilon_min': 0.01}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.99999, 'epsilon_min': 0.05}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.95, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.01}
Training with params: {'learning_rate': 0.01, 'discount_factor': 0.95, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.05}
Training with params: {'learning

## Store and compare results

### Subtask:
Keep track of the performance metrics (e.g., average reward, success rate, evaluation score) for each hyperparameter combination.


**Reasoning**:
Iterate through the results and print the hyperparameters and performance metrics for each combination.



In [None]:
for result in results:
    print("Hyperparameters:", result['params'])
    print("Test Results:")
    print(f"  Avg Reward: {result['test_results']['avg_reward']:.2f}")
    print(f"  Avg Steps: {result['test_results']['avg_steps']:.2f}")
    print(f"  Success Rate: {result['test_results']['success_rate']:.1%}")
    print(f"  Evaluation Score: {result['test_results']['evaluation_score']:.4f}")
    print("-" * 30)

Hyperparameters: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.01}
Test Results:
  Avg Reward: -37.46
  Avg Steps: 190.65
  Success Rate: 5.0%
  Evaluation Score: 0.0474
------------------------------
Hyperparameters: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.05}
Test Results:
  Avg Reward: -33.70
  Avg Steps: 168.17
  Success Rate: 17.0%
  Evaluation Score: 0.1613
------------------------------
Hyperparameters: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.9999, 'epsilon_min': 0.01}
Test Results:
  Avg Reward: -61.54
  Avg Steps: 154.16
  Success Rate: 25.0%
  Evaluation Score: 0.2334
------------------------------
Hyperparameters: {'learning_rate': 0.01, 'discount_factor': 0.9, 'epsilon_decay_rate': 0.9999, 'epsilon_min': 0.05}
Test Results:
  Avg Reward: -62.80
  Avg Steps: 155.34
  Success Rate: 24.0%
  Evaluation Score: 0.2266
------------------------------

## Identify the best hyperparameters

### Subtask:
Identify the best hyperparameters by finding the combination that resulted in the highest evaluation score.


**Reasoning**:
Identify the best hyperparameters by iterating through the results and comparing evaluation scores.



In [None]:
best_evaluation_score = -1.0
best_params = None

for result in results:
    current_evaluation_score = result['test_results']['evaluation_score']
    if current_evaluation_score > best_evaluation_score:
        best_evaluation_score = current_evaluation_score
        best_params = result['params']

print("\nBest Hyperparameters Found:")
print(best_params)
print(f"Corresponding Best Evaluation Score: {best_evaluation_score:.4f}")


Best Hyperparameters Found:
{'learning_rate': 0.1, 'discount_factor': 0.99, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.01}
Corresponding Best Evaluation Score: 0.7042


## Report the best hyperparameters and results

### Subtask:
Report the best hyperparameters found and the corresponding evaluation results.


**Reasoning**:
Print the best hyperparameters and the corresponding evaluation results as requested.



In [None]:
print("=" * 70)
print("Best Hyperparameters and Results")
print("=" * 70)
print("Best Hyperparameters:", best_params)
print(f"Best Evaluation Score: {best_evaluation_score:.4f}")

# Optional: Print the full test results for the best hyperparameters
# Find the full result dictionary for the best parameters
best_test_results = None
for result in results:
    if result['params'] == best_params:
        best_test_results = result['test_results']
        break

if best_test_results:
    print("\nFull Test Results for Best Hyperparameters:")
    print(f"  Avg Reward: {best_test_results['avg_reward']:.2f}")
    print(f"  Avg Steps: {best_test_results['avg_steps']:.2f}")
    print(f"  Success Rate: {best_test_results['success_rate']:.1%}")


Best Hyperparameters and Results
Best Hyperparameters: {'learning_rate': 0.1, 'discount_factor': 0.99, 'epsilon_decay_rate': 0.999, 'epsilon_min': 0.01}
Best Evaluation Score: 0.7042

Full Test Results for Best Hyperparameters:
  Avg Reward: 15.18
  Avg Steps: 66.95
  Success Rate: 86.0%


## Summary:

### Data Analysis Key Findings

*   The best hyperparameters identified through the grid search were a `learning_rate` of 0.1, `discount_factor` of 0.99, `epsilon_decay_rate` of 0.999, and `epsilon_min` of 0.01.
*   This hyperparameter combination achieved the highest evaluation score of 0.7042.
*   The performance metrics for the best hyperparameters included an average reward of 15.18, an average of 66.95 steps per episode, and a success rate of 86.0%.

### Insights or Next Steps

*   The identified best hyperparameters provide a solid starting point for training a high-performing Q-learning agent for this specific task.
*   Further tuning could be explored by refining the ranges around the best hyperparameters or by using more advanced tuning methods like random search or Bayesian optimization.
