In [53]:
import numpy as np

In [54]:
# def manhattan_distance(a, b):
#     return abs(a[0] - b[0]) + abs(a[1] - b[1])

In [55]:
# def heuristic_value(env, state):
#     # High value near goal, low near ghosts, small penalty for distance
#     dist_to_goal = manhattan_distance(state, env.goal_pos)
#     ghost_penalty = -5 if state in env.ghost_positions else 0
#     return -dist_to_goal + ghost_penalty

In [56]:
class TreeNode:
    def __init__(self, state, parent=None, action_from_parent=None):
        self.state = state
        self.parent = parent
        self.children = {}
        self.visits = 0
        self.total_reward = 0.0
        self.untried_actions = None  # lazy init
        self.action_from_parent = action_from_parent  # useful when re-rooting

    def uct_score(self, C=1.41):
        if self.visits == 0:
            return float("inf")
        avg = self.total_reward / self.visits
        explore = C * np.sqrt(np.log(self.parent.visits) / self.visits)
        return avg + explore

    def best_child(self, C=1.41):
        return max(self.children.values(), key=lambda c: c.uct_score(C))

In [57]:
class MCTree:
    def __init__(self, root):
        self.root = root

    def select(self, node):
        return max(node.children, key=lambda c: c.uct_score())

    def backpropagate(self, node, reward):
        # walk up the tree updating stats
        while node is not None:
            node.update(reward)
            node = node.parent

In [58]:
def rollout(env, state, max_depth=200):
    """Random rollout from state using simulate_step."""
    total_reward = 0
    for _ in range(max_depth):
        actions = env.get_valid_actions(state)
        if not actions:
            break
        action = np.random.choice(actions)
        state, reward, done = env.simulate_step(state, action)
        total_reward += reward
        if done:
            break
    return total_reward

In [59]:
import copy

def simulate(env, node, max_depth=120):
    """Play randomly from node.state until terminal or depth cutoff."""
    sim_env = copy.deepcopy(env)
    sim_env.player_pos = node.state
    total_reward = 0
    for _ in range(max_depth):
        action = np.random.choice(sim_env.action_space)
        _, reward, done = sim_env.step(action)
        total_reward += reward
        if done:
            break
    return total_reward

In [60]:
def monte_carlo_search(env, root, iterations=500, C=1.41):
    """Perform MCTS from the given root node, return best action."""
    for _ in range(iterations):
        node = root
        state = node.state

        # 1. Selection: descend tree until a leaf or unexpanded action
        while node.untried_actions == [] and node.children:
            node = node.best_child(C)
            state = node.state

        # 2. Expansion: expand *one* untried action
        if node.untried_actions is None:
            node.untried_actions = env.get_valid_actions(state)

        if node.untried_actions:
            action = node.untried_actions.pop()
            next_state, reward, done = env.simulate_step(state, action)
            child = TreeNode(next_state, parent=node, action_from_parent=action)
            node.children[action] = child
            node = child
            state = next_state

        # 3. Simulation: heuristic-guided rollout
        reward = rollout(env, state)

        # 4. Backpropagation
        while node is not None:
            node.visits += 1
            node.total_reward += reward
            node = node.parent

    # Best action from root = most visited child
    best_action = max(root.children.items(), key=lambda kv: kv[1].visits)[0]
    return best_action


In [61]:
def update_root_after_move(root, actual_action, new_state):
    """Reuse the subtree after taking a real action."""
    if actual_action in root.children:
        new_root = root.children[actual_action]
        new_root.parent = None
        return new_root
    else:
        return TreeNode(new_state)

In [62]:
import config
from environment.grid_loader import load_grid
from environment.grid_env import GridEnv

# grid_filepath = config.ENV_PARAMS.get('grid_filepath', "grid.npy")
grid_filepath = "grid.npy"
grid, start_pos, goal_pos, ghost_positions = load_grid(grid_filepath)
env = GridEnv(grid, start_pos, goal_pos, ghost_positions)

In [None]:
player_pos = env.reset()
root = TreeNode(player_pos)

done = False
total_reward = 0
i = 0

while not done:
    action = monte_carlo_search(env, root, iterations=500, C=1.41)
    next_state, reward, done = env.step(action)
    total_reward += reward
    # Re-root the tree at the chosen child
    root = update_root_after_move(root, action, next_state)
    print(f"Action {i}: moved", next(key for key, value in config.ACTIONS.items() if value == action))

    i += 1

    if reward == config.REWARDS['GOAL']:
        print(f'\nVICTORY! in {i} steps\n')
    elif reward == config.REWARDS['GHOST']:
        print(f'\nDEFEAT! in {i} steps\n')

print("Game finished with total reward:", total_reward)

Action 0: moved RIGHT
Action 1: moved DOWN
Action 2: moved RIGHT
Action 3: moved RIGHT
Action 4: moved DOWN
Action 5: moved RIGHT
Action 6: moved DOWN
Action 7: moved RIGHT

VICTORY! in 8 steps

Game finished with total reward: 3


# Average Performance

In [64]:
# losses = 0
# i = 0
# wins = 0
# while i < 100:
#     player_pos = env.reset()
#     root = TreeNode(player_pos)
#     done = False

#     while not done:
#         action = monte_carlo_search(env, root, iterations=1000, C=2)
#         next_state, reward, done = env.step(action)
#         total_reward += reward
#         # Re-root the tree at the chosen child
#         root = update_root_after_move(root, action, next_state)

#         i += 1

#         if reward == config.REWARDS['GOAL']:
#             wins += 1
#         if reward == config.REWARDS['GHOST']:
#             losses += 1

# print(wins/100)
# print(losses/100)

# Scaling?

In [65]:
# from utils.helpers import make_grid
# from visualisation.pygame_renderer import PygameRenderer

# grid = make_grid(50, 40, 300, 0.2)

# grid_mapping = config.GRID_MAPPING

# start_pos = np.where(grid==grid_mapping['PLAYER'])
# start_pos = (start_pos[0][0], start_pos[1][0])

# goal_pos = np.where(grid==grid_mapping['GOAL'])
# goal_pos = (goal_pos[0][0], goal_pos[1][0])

# ghost_positions = np.where(grid==config.GRID_MAPPING['GHOST'])
# ghost_positions = [(int(r), int(c)) for r, c in zip(ghost_positions[0], ghost_positions[1])]

# env = GridEnv(grid, start_pos, goal_pos, ghost_positions)
# renderer = PygameRenderer(env.height, env.width)

# renderer.init_display("RL vs Pacman (Human Player)")

# current_display_grid = env.get_grid_snapshot()
# renderer.render(current_display_grid, player_pos=start_pos)

# np.save('big_grid.npy', grid)

In [None]:
import numpy as np
import time
from tqdm import tqdm  # For progress bar

class RandomAgent:
    def __init__(self, env):
        self.env = env

    def choose_action(self, state):
        # Get all valid actions from current state
        valid_actions = self.env.get_valid_actions(state)

        # If no valid actions (shouldn't happen in proper grid), pick any action
        if not valid_actions:
            valid_actions = list(config.ACTIONS.values())

        # Pick a random valid action
        return np.random.choice(valid_actions)

# Function to run a single episode with the random agent
def run_random_agent_episode(env, agent):
    player_pos = env.reset()
    done = False
    total_reward = 0
    steps = 0

    while not done:
        action = agent.choose_action(player_pos)
        player_pos, reward, done = env.step(action)
        total_reward += reward
        steps += 1

    # Determine game outcome
    win = reward == config.REWARDS['GOAL']
    ghost_collision = reward == config.REWARDS['GHOST']

    return {
        'win': win,
        'ghost_collision': ghost_collision,
        'steps': steps,
        'total_reward': total_reward
    }

# Test the random agent for 100 games
def evaluate_random_agent(env, num_episodes=100):
    random_agent = RandomAgent(env)

    wins = 0
    ghost_collisions = 0
    total_steps = 0
    total_rewards = 0

    # Run episodes with progress bar
    for _ in tqdm(range(num_episodes), desc="Testing Random Agent"):
        result = run_random_agent_episode(env, random_agent)

        wins += int(result['win'])
        ghost_collisions += int(result['ghost_collision'])
        total_steps += result['steps']
        total_rewards += result['total_reward']

        # Small delay to prevent CPU overload
        time.sleep(0.01)

    # Calculate statistics
    win_rate = wins / num_episodes
    ghost_collision_rate = ghost_collisions / num_episodes
    avg_steps = total_steps / num_episodes
    avg_reward = total_rewards / num_episodes

    # Print results
    print("\n===== Random Agent Performance =====")
    print(f"Games played: {num_episodes}")
    print(f"Win rate: {win_rate:.2%} ({wins}/{num_episodes})")
    print(f"Ghost collision rate: {ghost_collision_rate:.2%} ({ghost_collisions}/{num_episodes})")
    print(f"Average steps per episode: {avg_steps:.2f}")
    print(f"Average reward per episode: {avg_reward:.2f}")
    print("===================================")

    return {
        'win_rate': win_rate,
        'ghost_collision_rate': ghost_collision_rate,
        'avg_steps': avg_steps,
        'avg_reward': avg_reward
    }

# Run the evaluation
random_agent_results = evaluate_random_agent(env, num_episodes=1000)

Testing Random Agent: 100%|██████████| 1000/1000 [00:13<00:00, 76.41it/s]


===== Random Agent Performance =====
Games played: 1000
Win rate: 1.10% (11/1000)
Ghost collision rate: 98.90% (989/1000)
Average steps per episode: 32.49
Average reward per episode: -41.27



