In [7]:
# Imports
import torch
import numpy as np
import gymnasium as gym
from collections import deque
import pygame
import random

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


In [9]:
# DQN model which takes in the state as an input and outputs predicted q values for every possible action
class DQN(torch.nn.Module):
    def __init__(self, state_space, action_space):
        super().__init__()
        self.fc1 = torch.nn.Linear(state_space,128)
        self.fc2 = torch.nn.Linear(128, 128)
        self.fc3 = torch.nn.Linear(128, action_space)

    def forward(self, input):
        input = torch.relu(self.fc1(input))
        input = torch.relu(self.fc2(input))
        return self.fc3(input)


In [10]:
# While training neural networks, we split the data into batches.
# To improve the training, we need to remove the "correlation" between game states
# The buffer starts storing states and once it reaches maximum capacity, it replaces
# states at random which reduces the correlation.

class ExperienceBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.buffer)


In [11]:
env = gym.make("CartPole-v1", render_mode="human")
obs_dim = env.observation_space.shape[0]
n_actions = env.action_space.n

policy_net = DQN(obs_dim, n_actions).to(device)
target_net = DQN(obs_dim, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

buffer = ExperienceBuffer(10000)
optimizer = torch.optim.Adam(policy_net.parameters(), lr=1e-3)
batch_size = 64
gamma = 0.99
sync_target_steps = 100
epsilon_start = 1.0
epsilon_end = 0.1
num_episodes = 500
epsilon_decay = (epsilon_start - epsilon_end) / num_episodes
steps_done = 0


for episode in range(num_episodes):
    obs, _ = env.reset()
    total_reward = 0
    done = False
    epsilon = epsilon_start - episode*epsilon_decay
    while not done and epsilon > epsilon_end:
        if np.random.rand() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                state = torch.tensor(obs, dtype=torch.float32, device = device).unsqueeze(0)
                q_values = policy_net(state)
                action = torch.argmax(q_values, dim=1).item()

        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        buffer.push(obs, action, reward, next_obs, done)
        obs = next_obs
        total_reward += reward
        steps_done += 1

        if len(buffer) >= batch_size:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)
            states = np.array(states, dtype=np.float32)
            states = torch.tensor(states, dtype=torch.float32, device=device)
            actions = torch.tensor(actions, dtype=torch.int64, device = device).unsqueeze(1)
            rewards = torch.tensor(rewards, dtype=torch.float32, device = device).unsqueeze(1)
            next_states = np.array(next_states, dtype=np.float32)
            next_states = torch.tensor(next_states, dtype=torch.float32, device = device)
            dones = torch.tensor(dones, dtype=torch.float32, device=device).unsqueeze(1)

            q_values = policy_net(states).gather(1, actions)
            with torch.no_grad():
                max_next_q = target_net(next_states).max(1, keepdim=True)[0]
                target_q = rewards + gamma * max_next_q * (1 - dones)

            loss = torch.nn.functional.mse_loss(q_values, target_q)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Update target network
        if steps_done % sync_target_steps == 0:
            target_net.load_state_dict(policy_net.state_dict())
env.close()

In [12]:
def evaluate_cartpole_model(model, episodes=20, render=True):
    env = gym.make("CartPole-v1", render_mode="human" if render else None)
    obs_dim = env.observation_space.shape[0]
    n_actions = env.action_space.n

    model.eval()
    rewards = []

    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            state = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                q_values = model(state)
                action = torch.argmax(q_values, dim=1).item()

            obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_reward += reward

            if render:
                env.render()

        rewards.append(total_reward)
        print(f"Episode {episode + 1}: Reward = {total_reward}")

    env.close()
    avg_reward = sum(rewards) / episodes
    print(f"Average reward over {episodes} episodes: {avg_reward}")

In [13]:
evaluate_cartpole_model(policy_net, episodes=10, render=True)

Episode 1: Reward = 500.0
Episode 2: Reward = 215.0
Episode 3: Reward = 219.0
Episode 4: Reward = 207.0
Episode 5: Reward = 233.0
Episode 6: Reward = 221.0
Episode 7: Reward = 215.0
Episode 8: Reward = 500.0
Episode 9: Reward = 248.0
Episode 10: Reward = 282.0
Average reward over 10 episodes: 284.0


In [14]:
class SnakeGame(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 10}

    def __init__(self, size=10, render_mode=None):
        super().__init__()
        self.size = size
        self.cell_size = 30
        self.screen_size = self.size * self.cell_size
        self.render_mode = render_mode

        self.action_space = gym.spaces.Discrete(4)  # 0: right, 1: up, 2: left, 3: down
        self.observation_space = gym.spaces.Box(0, 2, shape=(self.size, self.size), dtype=np.uint8)

        self.screen = None
        self.clock = None

        self.snake = deque()
        self.food = None
        self.direction = [1, 0]

        if self.render_mode == "human":
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_size, self.screen_size))
            self.clock = pygame.time.Clock()

    def reset(self, seed=None, options=None):           
        super().reset(seed=seed)
        self.snake.clear()
        mid = self.size // 2
        self.snake.appendleft([mid, mid])
        self.direction = [1, 0]
        self._place_food()
        obs = self._get_obs()

        if self.render_mode == "human":
            self._render_init()

        return obs, {}

    def step(self, action):
    
        if action == 0 and self.direction != [-1, 0]: self.direction = [1, 0]
        elif action == 1 and self.direction != [0, 1]: self.direction = [0, -1]
        elif action == 2 and self.direction != [1, 0]: self.direction = [-1, 0]
        elif action == 3 and self.direction != [0, -1]: self.direction = [0, 1]
    
        head = self.snake[0]
        new_head = [head[0] + self.direction[0], head[1] + self.direction[1]]
    
        done = False
        reward = 0

        if not (0 <= new_head[0] < self.size and 0 <= new_head[1] < self.size):
            done = True
            reward = -1
        else:
            body_to_check = list(self.snake)[:-1] if new_head != self.food else list(self.snake)
            if new_head in body_to_check:
                done = True
                reward = -1
    
        if not done:
            self.snake.appendleft(new_head)
            if new_head == self.food:
                reward = 1
                self._place_food()
            else:
                self.snake.pop()
        else:
            distance = np.linalg.norm(np.array(new_head) - np.array(self.food))
            reward = -distance * 0.01
    
        obs = self._get_obs()
    
        if self.render_mode == "human":
            self.render()
    
        return obs, reward, done, False, {}

    def _get_obs(self):
       head = self.snake[0]
       new_head = [head[0] + self.direction[0], head[1] + self.direction[1]]
       return {"agent": new_head, "target": self.food}

    def _place_food(self):
        positions = set(tuple(p) for p in self.snake)
        empty = [(x, y) for x in range(self.size) for y in range(self.size) if (x, y) not in positions]
        self.food = list(random.choice(empty)) if empty else None

    def render(self):
        if self.screen is None:
            self._render_init()

        self.screen.fill((0, 0, 0))
        for x, y in self.snake:
            pygame.draw.rect(
                self.screen, (0, 255, 0),
                pygame.Rect(x * self.cell_size, y * self.cell_size, self.cell_size, self.cell_size)
            )
        if self.food:
            fx, fy = self.food
            pygame.draw.rect(
                self.screen, (255, 0, 0),
                pygame.Rect(fx * self.cell_size, fy * self.cell_size, self.cell_size, self.cell_size)
            )

        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])

    def _render_init(self):
        pygame.init()
        self.screen = pygame.display.set_mode((self.size * self.cell_size, self.size * self.cell_size))
        self.clock = pygame.time.Clock()

    def close(self):
        if self.screen:
            pygame.quit()
            self.screen = None

In [15]:
def flatten_obs(obs):
    return np.array(list(obs["agent"]) + list(obs["target"]), dtype=np.float32)

env = SnakeGame(size=10, render_mode=None)
obs_dim = len(flatten_obs(env.reset()[0]))
n_actions = env.action_space.n

policy_net = DQN(obs_dim, n_actions)
target_net = DQN(obs_dim, n_actions)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

buffer = ExperienceBuffer(10000)
optimizer = torch.optim.Adam(policy_net.parameters(), lr=1e-3)
batch_size = 64
gamma = 0.99
sync_target_steps = 100
epsilon_start = 1.0
epsilon_end = 0.1
epsilon_decay = (epsilon_start - epsilon_end)/num_episodes
steps_done = 0
num_episodes = 1000
max_steps_per_episode = 200

for episode in range(num_episodes):
    obs, _ = env.reset()
    obs = flatten_obs(obs)
    total_reward = 0
    done = False
    epsilon = max(epsilon_end, epsilon_start - episode * epsilon_decay)
    while not done and steps_done < max_steps_per_episode:
        if np.random.rand() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                state = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
                q_values = policy_net(state)
                action = torch.argmax(q_values, dim=1).item()

        next_obs, reward, done, _, _ = env.step(action)
        next_obs = flatten_obs(next_obs)
        buffer.push(obs, action, reward, next_obs, done)
        obs = next_obs
        total_reward += reward
        steps_done += 1

        if len(buffer) >= batch_size:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)
            states = torch.tensor(states, dtype=torch.float32)
            actions = torch.tensor(actions, dtype=torch.int64).unsqueeze(1)
            rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1)
            next_states = torch.tensor(next_states, dtype=torch.float32)
            dones = torch.tensor(dones, dtype=torch.float32).unsqueeze(1)

            q_values = policy_net(states).gather(1, actions)
            with torch.no_grad():
                max_next_q = target_net(next_states).max(1, keepdim=True)[0]
                target_q = rewards + gamma * max_next_q * (1 - dones)

            loss = torch.nn.functional.mse_loss(q_values, target_q)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Update target network
        if steps_done % sync_target_steps == 0:
            target_net.load_state_dict(policy_net.state_dict())
env.close()

In [16]:
def evaluate_snake_model(model, size=20, episodes=10, render=True):
    env = SnakeGame(size=size, render_mode="human" if render else None)
    model.eval()
    rewards = []
    for episode in range(episodes):
        obs, _ = env.reset()
        obs = flatten_obs(obs)
        total_reward = 0
        done = False

        while not done:
            state = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                q_values = model(state)
                action = torch.argmax(q_values, dim=1).item()

            next_obs, reward, done, _, _ = env.step(action)
            next_obs = flatten_obs(next_obs)
            buffer.push(obs, action, reward, next_obs, done)
            obs = next_obs
            total_reward += reward

            if render:
                env.render()
    
        rewards.append(total_reward)
        print(f"Episode {episode + 1}: Reward = {total_reward}")

    env.close()
    avg_reward = sum(rewards) / episodes

    print(f"Average reward over {episodes} episodes: {avg_reward}")

In [17]:
evaluate_snake_model(policy_net, episodes=10, render=True)

Episode 1: Reward = -0.09433981132056603
Episode 2: Reward = -0.1503329637837291
Episode 3: Reward = -0.11401754250991379
Episode 4: Reward = -0.08602325267042626
Episode 5: Reward = -0.13341664064126335
Episode 6: Reward = -0.09055385138137417
Episode 7: Reward = -0.0316227766016838
Episode 8: Reward = -0.09055385138137417
Episode 9: Reward = -0.12083045973594572
Episode 10: Reward = -0.13601470508735444
Average reward over 10 episodes: -0.10477058551136309


In [49]:
class ChaseEscapeEnv(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 15}

    def __init__(self, render_mode=None):
        super().__init__()

        self.dt = 0.1
        self.max_speed = 0.4
        self.agent_radius = 0.05
        self.target_radius = 0.05
        self.chaser_radius = 0.07
        self.chaser_speed = 0.03

        self.action_space = gym.spaces.MultiDiscrete([3, 3])  # actions in {0,1,2} map to [-1,0,1]
        self.observation_space = gym.spaces.Box(
            low=-1,
            high=1,
            shape=(8,),
            dtype=np.float32,
        )

        self.render_mode = render_mode
        self.screen_size = 500
        self.np_random = None

        if render_mode == "human":
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_size, self.screen_size))
            self.clock = pygame.time.Clock()

    def sample_pos(self, far_from=None, min_dist=0.5):
        while True:
            pos = self.np_random.uniform(low=-0.8, high=0.8, size=(2,))
            if far_from is None or np.linalg.norm(pos - far_from) >= min_dist:
                return pos

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.agent_pos = self.sample_pos()
        self.agent_vel = np.zeros(2, dtype=np.float32)
        self.target_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.5)
        self.chaser_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.7)

        return self._get_obs(), {}

    def _get_obs(self):
        return np.concatenate([ self.agent_pos, self.agent_vel, self.target_pos, self.chaser_pos ])

    def _get_info(self):
        return {}

    def step(self, action):
        # TODO: Add reward scheme
        # 1) Try to make the agent stay within bounds
        # 2) The agent shouldn't idle around
        # 3) The agent should go for the reward
        # 4) The agent should avoid the chaser
        
        accel = (np.array(action) - 1) * 0.1
        self.agent_vel += accel
        self.agent_vel = np.clip(self.agent_vel, -self.max_speed, self.max_speed)
        self.agent_pos += self.agent_vel * self.dt
        self.agent_pos = np.clip(self.agent_pos, -1, 1)

        direction = self.agent_pos - self.chaser_pos
        norm = np.linalg.norm(direction)
        if norm > 1e-5:
            self.chaser_pos += self.chaser_speed * direction / norm

        dist_to_target = np.linalg.norm(self.agent_pos - self.target_pos)
        dist_to_chaser = np.linalg.norm(self.agent_pos - self.chaser_pos)

        reward = 0.0
        terminated = False

        if dist_to_target < self.agent_radius + self.target_radius:
            reward += 3.0
            self.target_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.5)

        if dist_to_chaser < self.agent_radius + self.chaser_radius:
            reward -= 1.0
            terminated = True

        reward -= 0.01 * np.linalg.norm(self.agent_vel)

        reward -= 0.5 * np.exp(-dist_to_chaser)

        return self._get_obs(), reward, terminated, False, self._get_info()

    def render(self):
        if self.render_mode != "human":
            return

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.close()

        self.screen.fill((255, 255, 255))

        def to_screen(p):
            x = int((p[0] + 1) / 2 * self.screen_size)
            y = int((1 - (p[1] + 1) / 2) * self.screen_size)
            return x, y

        pygame.draw.circle(self.screen, (0, 255, 0), to_screen(self.target_pos), int(self.target_radius * self.screen_size))
        pygame.draw.circle(self.screen, (0, 0, 255), to_screen(self.agent_pos), int(self.agent_radius * self.screen_size))
        pygame.draw.circle(self.screen, (255, 0, 0), to_screen(self.chaser_pos), int(self.chaser_radius * self.screen_size))

        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])

def close(self):
    if self.render_mode == "human":
        pygame.quit()


In [50]:
#Training

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = ChaseEscapeEnv(render_mode=None)
obs_dim = env.observation_space.shape[0]
n_actions = env.action_space.nvec.prod()

def flatten_action(action):
    return np.unravel_index(action, (3, 3))

policy_net = DQN(obs_dim, n_actions).to(device)
target_net = DQN(obs_dim, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

buffer = ExperienceBuffer(10000)
optimizer = torch.optim.Adam(policy_net.parameters(), lr=1e-3)
batch_size = 64
gamma = 0.99
sync_target_steps = 100
epsilon_start = 1.0
epsilon_end = 0.1
num_episodes = 500
epsilon_decay = (epsilon_start - epsilon_end) / num_episodes
steps_done = 0
max_steps_per_episode = 300

for episode in range(num_episodes):
    obs, _ = env.reset()
    total_reward = 0
    done = False
    epsilon = max(epsilon_end, epsilon_start - episode * epsilon_decay)
    step = 0
    while not done and step < max_steps_per_episode:
        if np.random.rand() < epsilon:
            action_idx = np.random.randint(n_actions)
        else:
            with torch.no_grad():
                state = torch.tensor(obs, dtype=torch.float32, device=device).unsqueeze(0)
                q_values = policy_net(state)
                action_idx = torch.argmax(q_values, dim=1).item()
        action = flatten_action(action_idx)
        next_obs, reward, done, _, _ = env.step(action)
        buffer.push(obs, action_idx, reward, next_obs, done)
        obs = next_obs
        total_reward += reward
        steps_done += 1
        step += 1

        if len(buffer) >= batch_size:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)
            states = torch.tensor(states, dtype=torch.float32, device=device)
            actions = torch.tensor(actions, dtype=torch.int64, device=device).unsqueeze(1)
            rewards = torch.tensor(rewards, dtype=torch.float32, device=device).unsqueeze(1)
            next_states = torch.tensor(next_states, dtype=torch.float32, device=device)
            dones = torch.tensor(dones, dtype=torch.float32, device=device).unsqueeze(1)

            q_values = policy_net(states).gather(1, actions)
            with torch.no_grad():
                max_next_q = target_net(next_states).max(1, keepdim=True)[0]
                target_q = rewards + gamma * max_next_q * (1 - dones)

            loss = torch.nn.functional.mse_loss(q_values, target_q)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        if steps_done % sync_target_steps == 0:
            target_net.load_state_dict(policy_net.state_dict())

env.close()

In [51]:
def evaluate_chaseescape_model(model, episodes=10, render=True):
    env = ChaseEscapeEnv(render_mode="human" if render else None)
    model.eval()
    rewards = []
    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        done = False
        steps = 0

        while not done:
            state = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                q_values = model(state)
                action_idx = torch.argmax(q_values, dim=1).item()
            action = (action_idx // 3, action_idx % 3)
            obs, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            done = terminated or truncated
            steps += 1

            if render:
                env.render()

        rewards.append(total_reward)
        print(f"Episode {episode + 1}: Reward = {total_reward:.2f}  Steps = {steps}")

    env.close()
    avg_reward = sum(rewards) / episodes
    print(f"Average reward over {episodes} episodes: {avg_reward:.2f}")

In [52]:
evaluate_chaseescape_model(policy_net, episodes=10, render=True)

Episode 1: Reward = -5.33  Steps = 13
Episode 2: Reward = -16.02  Steps = 43
Episode 3: Reward = -7.03  Steps = 24
Episode 4: Reward = -14.32  Steps = 66
Episode 5: Reward = -6.83  Steps = 17
Episode 6: Reward = -5.91  Steps = 19
Episode 7: Reward = -11.47  Steps = 50
Episode 8: Reward = -6.07  Steps = 16
Episode 9: Reward = -4.85  Steps = 12
Episode 10: Reward = -13.26  Steps = 58
Average reward over 10 episodes: -9.11
