In [1]:
import pygame
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import random
from collections import deque
import numpy as np
from helper import *

pygame 2.6.1 (SDL 2.28.4, Python 3.11.11)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
# Parameters
grid_size = 5  # Grid size for player and enemies (positions range from 0 to 4)
visited_states = 2  # Visited status can be 0 or 1
wall_states = 2  # Wall status can be 0 (no wall) or 1 (wall)

movements = np.array([
    [0, 1],   # Move up
    [1, 0],   # Move right
    [-1, 0],  # Move left
    [0, -1]   # Move down
])

# Generate walls: 0 for no wall, 1 for wall

state_action_log = []


In [3]:
def optimized_relative_positions(maze_size, player_pos, enemy_positions, grid_size=5):
    x, y = player_pos
    rows, cols = maze_size  # Maze dimensions
    half_grid = grid_size // 2

    relative_positions = []

    for ex, ey in enemy_positions:
        # Compute differences considering wrap-around
        dx = (ex - x + cols) % cols
        if dx > cols // 2:
            dx -= cols

        dy = (ey - y + rows) % rows
        if dy > rows // 2:
            dy -= rows

        # Check if enemy is within the local grid
        if -half_grid <= dx <= half_grid and -half_grid <= dy <= half_grid:
            # Map to local grid coordinates (0 to 4)
            local_x = int(dx + half_grid)
            local_y = int(dy + half_grid)
            relative_positions.append((local_x, local_y))
    return relative_positions

In [4]:
def initialize_positions(maze, num_enemies):
    maze_rows, maze_cols = len(maze), len(maze[0])

    # Find all possible positions (excluding walls)
    possible_positions = [(x, y) for x in range(maze_rows) for y in range(maze_cols) if maze[x][y] != 1]

    # Randomly select a position for the player
    player_pos = random.choice(possible_positions)

    # Remove player's position from possible positions
    possible_positions.remove(player_pos)

    enemies = []
    for _ in range(num_enemies):
        if not possible_positions:
            break  # No more positions available
        enemy_pos = random.choice(possible_positions)
        enemies.append({"pos": enemy_pos, "target": None})
        possible_positions.remove(enemy_pos)

    return player_pos, enemies


In [5]:
maxt=50

In [6]:
def manhattan_distance(p1, p2):
    return abs(p1[0] - p2[0]) + abs(p1[1] - p2[1])

def distance_to_nearest_food(maze, pos):
    food_locs = np.argwhere(maze == 2)
    if len(food_locs) == 0:
        return 0
    return min(manhattan_distance(pos, (fx, fy)) for fx, fy in food_locs)


In [7]:
def calculate_reward(maze, player_pos, action):
    rows, cols = maze.shape
    x, y = player_pos
    dx, dy = action

    # Apply wrap-around
    new_x = (x + dx) % rows
    new_y = (y + dy) % cols

    cell_value = maze[new_x, new_y]

    if cell_value == 2:
        reward = 5.0  # Collected food
    elif cell_value == 1:
        reward = -2   # Hit wall, invalid move
        return reward, False, player_pos  # Stay in place
    elif cell_value == 4:
        reward = -100  # Hit enemy
        return reward, False, player_pos
    else:
        reward = -0.3  # Minor step penalty for exploration


    before_dist = distance_to_nearest_food(maze, player_pos)
    after_dist = distance_to_nearest_food(maze, (new_x, new_y))

    shaping = (before_dist - after_dist) * 0.1  # or scale it differently
    reward += shaping


    return reward, True, (new_x, new_y)


In [8]:
class Game:
    def __init__(self, maze, player_pos, enemies, score=0, timeout=maxt):
        self.maze = maze
        self.player_pos = player_pos
        self.enemies = enemies
        self.score = score
        self.timeout = timeout
        self.state_action_log = []
        self.running = True  # Indicates if the game is still running
        self.visited_empty_spaces = 0
        self.moves_taken = 0     # Track steps for this run
        self.success_times = []  # Store past success durations

    def restart_game(self):
        self.maze = create_maze(ROWS, COLS)
        self.player_pos, self.enemies = initialize_positions(self.maze, num_enemies)
        self.score = 0
        self.timeout = maxt
        self.state_action_log = []
        self.running = True
        self.visited_empty_spaces = 0
        self.moves_taken = 0
        # Mark player and enemy positions on maze
        px, py = self.player_pos
        self.maze[px][py] = 3  # Mark player

        for enemy in self.enemies:
            ex, ey = enemy["pos"]
            self.maze[ex][ey] = 4  # Mark enemies

    def step(self, action):
        """
        Execute one environment step based on the action.
        Returns: (reward, done)
        """
        reward, valid, new_player_pos = calculate_reward(self.maze, self.player_pos, action)

        if valid:
            # Collect food if any
            if self.maze[new_player_pos[0]][new_player_pos[1]] == 2:
                self.score += 1
                self.maze[new_player_pos[0]][new_player_pos[1]] = 0
            self.player_pos = new_player_pos

        # Move enemies
        move_enemies(self.enemies, self.maze)

        # Check for terminal states
        done = not valid or self.timeout <= 0 or check_collision(self.player_pos, self.enemies)

        if done:
            self.running = False

        self.timeout -= 1
        return reward, done

In [9]:
maze = create_maze(ROWS, COLS)
player_pos, enemies = initialize_positions(maze, num_enemies)
game = Game(maze, player_pos, enemies, score=0, timeout=maxt)
x,y =player_pos
maze[x][y]=3
for enemy in enemies:
    x,y=(enemy['pos'])
    maze[x][y]=4   
maze=np.array(maze)

In [10]:
enemies

[{'pos': (17, 27), 'target': None},
 {'pos': (18, 21), 'target': None},
 {'pos': (2, 6), 'target': None},
 {'pos': (31, 30), 'target': None}]

In [11]:
enemy_vec=optimized_relative_positions((10,3), (37,9), [enemy["pos"] for enemy in enemies], grid_size=5)

In [12]:
def build_state_tensor(grid_view, enemy_rel_positions, player_pos=None):
    """
    grid_view: numpy array of shape (4, 5, 5) or whatever channels you have
    enemy_rel_positions: list of (dx, dy) tuples where enemy is within view
    player_pos: unused here but might be useful for further encoding
    """

    # Enemy presence map in the local 5x5 view
    enemy_map = np.zeros((5, 5), dtype=np.float32)
    for (dx, dy) in enemy_rel_positions:
        if 0 <= dx < 5 and 0 <= dy < 5:
            enemy_map[dx, dy] = 1.0  # Mark enemy presence

    # Combine grid + enemy_map → final state shape: (5, 5, N)
    full_input = np.concatenate(
        [grid_view, enemy_map[np.newaxis, :, :]], axis=0  # shape becomes (5, 5, C+1)
    )

    # Convert to PyTorch tensor: (batch_size, channels, height, width)
    state_tensor = torch.tensor(full_input, dtype=torch.float32).unsqueeze(0)

    return state_tensor 

In [13]:
def partition_maze_optimized(maze, player_pos, view_size=5):
    """
    Extract a square view of the maze around the player, using wrap-around.
    
    Args:
        maze (np.ndarray): The full maze array of shape (rows, cols)
        player_pos (tuple): (x, y) coordinates of the player
        view_size (int): Size of the square view (default: 5)

    Returns:
        np.ndarray: A (view_size x view_size) grid centered on the player with wrap-around
    """
    rows, cols = maze.shape
    px, py = player_pos
    half = view_size // 2

    view = np.zeros((view_size, view_size), dtype=maze.dtype)

    for dx in range(-half, half + 1):
        for dy in range(-half, half + 1):
            wrapped_x = (px + dx) % rows
            wrapped_y = (py + dy) % cols
            view[dx + half][dy + half] = maze[wrapped_x][wrapped_y]

    return view

In [14]:
def save_maze_to_txt(maze, filename="maze_output.txt"):
    with open(filename, 'w') as f:
        for row in maze:
            row_str = " ".join(str(cell) for cell in row)
            f.write(row_str + "\n")


In [15]:
def one_hot_encode_features(view):
    wall_mask = (view == 1).astype(np.float32)
    food_mask = (view == 2).astype(np.float32)
    empty_mask = (view == 0).astype(np.float32)

    return np.stack([wall_mask, food_mask, empty_mask], axis=0)  # Shape: (3, 5, 5)


In [16]:
view=partition_maze_optimized(maze, player_pos, view_size=5)

In [17]:
save_maze_to_txt(maze, "my_maze.txt")

In [18]:
one_hot_encode_features(view)

array([[[0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.]],

       [[1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 0., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.]],

       [[0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.]]], dtype=float32)

In [19]:
class DQNPacmanHybrid(nn.Module):
    def __init__(self, input_shape=(4, 40, 40), num_actions=4):
        super(DQNPacmanHybrid, self).__init__()
        
        c, h, w = input_shape  # c=4, h=5, w=5

        # Convolutional layers
        self.conv1 = nn.Conv2d(c, 32, kernel_size=3, stride=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1)

        # Calculate size of conv output
        def conv2d_size_out(size, kernel_size=3, stride=1):
            return (size - kernel_size) // stride + 1

        convw = conv2d_size_out(conv2d_size_out(w))
        convh = conv2d_size_out(conv2d_size_out(h))
        linear_input_size = convw * convh * 64

        # Fully connected layers
        self.fc1 = nn.Linear(linear_input_size, 128)
        self.fc2 = nn.Linear(128, num_actions)

    def forward(self, x):
        x = F.relu(self.conv1(x))  # → (batch, 32, 3, 3)
        x = F.relu(self.conv2(x))  # → (batch, 64, 1, 1)
        x = x.view(x.size(0), -1)  # Flatten → (batch, 64)
        x = F.relu(self.fc1(x))    # → (batch, 128)
        return self.fc2(x)         # → (batch, 4) — Q-values for each action


In [20]:
import random
from collections import deque

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def append(self, transition):
        self.buffer.append(transition)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)


In [21]:
def optimize_model(policy_net, target_net, replay_buffer, optimizer, batch_size=32, gamma=0.99, device='cuda'):
    if len(replay_buffer) < batch_size:
        return  # Not enough data yet

    transitions = replay_buffer.sample(batch_size)
    state_batch, action_batch, reward_batch, next_state_batch, done_batch = zip(*transitions)

    state_batch = torch.cat(state_batch).to(device)               # Shape: (B, 4, 5, 5)
    next_state_batch = torch.cat(next_state_batch).to(device)
    action_batch = torch.tensor(action_batch).to(device)
    reward_batch = torch.tensor(reward_batch, dtype=torch.float32).to(device)
    done_batch = torch.tensor(done_batch, dtype=torch.bool).to(device)

    # Current Q values
    q_values = policy_net(state_batch)
    q_values = q_values.gather(1, action_batch.unsqueeze(1)).squeeze(1)

    # Target Q values
    with torch.no_grad():
        next_q_values = target_net(next_state_batch).max(1)[0]
        next_q_values[done_batch] = 0.0  # Zero for terminal states

    target_q_values = reward_batch + gamma * next_q_values

    loss = F.mse_loss(q_values, target_q_values)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [22]:
def update_target_network(policy_net, target_net):
    target_net.load_state_dict(policy_net.state_dict())

In [23]:
def select_action(state_tensor, policy, epsilon, num_actions=4):
    if random.random() < epsilon:
        return random.randint(0, num_actions - 1)
    with torch.no_grad():
        q_values = policy(state_tensor)
        return q_values.argmax().item()


In [24]:
local_view = partition_maze_optimized(maze, player_pos)
grid_view = one_hot_encode_features(local_view)
enemy_vector = optimized_relative_positions(maze.shape, player_pos, [enemy["pos"] for enemy in enemies], grid_size=5)
player_vector = np.array(player_pos)
state_tensor = build_state_tensor(grid_view, enemy_vector, player_vector)


In [25]:
# Define model
policy = DQNPacmanHybrid(input_shape=(4, 5, 5), num_actions=4)
target_net = DQNPacmanHybrid(input_shape=(4, 5, 5), num_actions=4)

# Copy initial weights to target network
target_net.load_state_dict(policy.state_dict())
target_net.eval()  # Target network is frozen (no training)


DQNPacmanHybrid(
  (conv1): Conv2d(4, 32, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=64, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=4, bias=True)
)

In [26]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
policy = policy.to(device)
target_net = target_net.to(device)


In [27]:
optimizer = optim.Adam(policy.parameters(), lr=1e-4)


In [28]:
for i, name in enumerate(["Wall", "Food", "Empty", "Enemy"]):
    print(f"{name} Channel:\n", state_tensor[0, i])


Wall Channel:
 tensor([[0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.]])
Food Channel:
 tensor([[1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 0., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.]])
Empty Channel:
 tensor([[0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.]])
Enemy Channel:
 tensor([[0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.]])


In [29]:
def build_state_tensor_full(maze, enemies, player_pos):
    rows, cols = maze.shape

    wall_mask = (maze == 1).astype(np.float32)
    food_mask = (maze == 2).astype(np.float32)
    empty_mask = (maze == 0).astype(np.float32)

    # Enemy map
    enemy_map = np.zeros((rows, cols), dtype=np.float32)
    for ex, ey in [e["pos"] for e in enemies]:
        enemy_map[ex][ey] = 1.0

    # Optional: Add player mask as a 5th channel
    # player_map = np.zeros((rows, cols), dtype=np.float32)
    # player_map[player_pos[0]][player_pos[1]] = 1.0

    grid = np.stack([wall_mask, food_mask, empty_mask, enemy_map], axis=0)  # (4, rows, cols)
    state_tensor = torch.tensor(grid, dtype=torch.float32).unsqueeze(0)     # (1, 4, rows, cols)
    return state_tensor


In [30]:
class DQNPacmanFull(nn.Module):
    def __init__(self, input_shape, num_actions):
        super().__init__()
        c, h, w = input_shape

        self.conv1 = nn.Conv2d(c, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)

        self.flattened_size = h * w * 64

        self.fc1 = nn.Linear(self.flattened_size, 256)
        self.fc2 = nn.Linear(256, num_actions)

    def forward(self, x):
        x = F.relu(self.conv1(x))     # → (B, 32, H, W)
        x = F.relu(self.conv2(x))     # → (B, 64, H, W)
        x = x.view(x.size(0), -1)     # → flatten
        x = F.relu(self.fc1(x))
        return self.fc2(x)


In [31]:
# --- Game Setup ---
pygame.init()
print("RRUN")

maxt = 50
num_games = 4
FPS = 180
BATCH_SIZE = 32

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_shape = (4, ROWS, COLS)
policy = DQNPacmanFull(input_shape=input_shape, num_actions=4).to(device)
target_net = DQNPacmanFull(input_shape=input_shape, num_actions=4).to(device)
target_net.load_state_dict(policy.state_dict())
target_net.eval()

optimizer = optim.Adam(policy.parameters(), lr=1e-4)
replay_buffer = ReplayBuffer(capacity=10000)

epsilon = 1.0
epsilon_min = 0.05
epsilon_decay = 0.995
target_update_freq = 100
train_step = 0

clock = pygame.time.Clock()
running = True
games = []
for _ in range(num_games):
    maze = create_maze(ROWS, COLS)
    player_pos, enemies = initialize_positions(maze, num_enemies)
    game = Game(maze, player_pos, enemies, score=0, timeout=maxt)
    games.append(game)


RRUN


In [32]:
def check_collision(current_state, enemies):
    x, y = current_state
    for enemy in enemies:
        ex, ey = enemy["pos"]
        if (x, y) == (ex, ey):
            return True
    return False


In [33]:
def move_entity(pos, direction):
    new_row = (pos[0] + direction[0]) % ROWS
    new_col = (pos[1] + direction[1]) % COLS
    return (new_row, new_col)


In [34]:
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False
            game.restart_game()

    for game in games:
        if not game.running:
            game.restart_game()
            print("Game restarted.")


        state_tensor = build_state_tensor_full(game.maze, game.enemies, game.player_pos).to(device)


        action_index = select_action(state_tensor, policy, epsilon)
        index_to_direction = {0: (-1, 0), 1: (1, 0), 2: (0, -1), 3: (0, 1)}
        action = index_to_direction[action_index]

        reward, valid, new_player_pos = calculate_reward(game.maze, game.player_pos, action)

        next_state_tensor = build_state_tensor_full(game.maze, game.enemies, new_player_pos).to(device)


        done = not valid or game.timeout <= 0 or check_collision(new_player_pos, game.enemies)
        replay_buffer.append((state_tensor, action_index, reward, next_state_tensor, done))

        if len(replay_buffer) > BATCH_SIZE:
            optimize_model(policy, target_net, replay_buffer, optimizer, BATCH_SIZE, device=device)

        train_step += 1
        if train_step % target_update_freq == 0:
            update_target_network(policy, target_net)

        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        game.timeout -= 1
        game.moves_taken += 1

        if valid:
            if game.maze[new_player_pos[0]][new_player_pos[1]] == 2:
                game.score += 1
                game.maze[new_player_pos[0]][new_player_pos[1]] = 0
            game.player_pos = new_player_pos

        move_enemies(game.enemies, game.maze)
        if check_collision(game.player_pos, game.enemies):
            #print("Game Over! Restarting...")
            game.running = False
            print(f"Lived for {game.moves_taken} amount of moves")
        if all(game.maze[row][col] != 2 for row in range(ROWS) for col in range(COLS)):
            print(f"You Win! Completed in {game.moves_taken} moves")
            game.success_times.append(game.moves_taken)
            # Save to file (optional)
            with open("completion_times.txt", "a") as f:
                f.write(f"{game.moves_taken}\n")

            game.running = False

    draw_all_games(games)
    pygame.display.flip()
    clock.tick(FPS)

pygame.quit()


Lived for 3 amount of moves
Game restarted.
Lived for 12 amount of moves
Game restarted.
Lived for 16 amount of moves
Game restarted.
Lived for 20 amount of moves
Game restarted.
Lived for 41 amount of moves
Game restarted.
Lived for 16 amount of moves
Game restarted.
Lived for 4 amount of moves
Game restarted.
Lived for 69 amount of moves
Game restarted.
Lived for 26 amount of moves
Game restarted.
Lived for 12 amount of moves
Game restarted.
Lived for 238 amount of moves
Game restarted.
Lived for 255 amount of moves
Game restarted.
Lived for 42 amount of moves
Game restarted.
Lived for 9 amount of moves
Game restarted.
Lived for 174 amount of moves
Game restarted.
Lived for 17 amount of moves
Game restarted.
Lived for 31 amount of moves
Game restarted.
Lived for 11 amount of moves
Game restarted.
Lived for 140 amount of moves
Game restarted.
Lived for 22 amount of moves
Game restarted.
Lived for 517 amount of moves
Game restarted.
Lived for 139 amount of moves
Game restarted.
Lived f

KeyboardInterrupt: 

In [None]:
game.maze

array([[2, 2, 2, ..., 2, 2, 2],
       [2, 2, 2, ..., 2, 2, 2],
       [2, 2, 2, ..., 2, 2, 2],
       ...,
       [2, 2, 1, ..., 2, 2, 2],
       [2, 2, 1, ..., 2, 2, 2],
       [2, 2, 2, ..., 2, 2, 2]], shape=(40, 40))