In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
from collections import namedtuple, deque

In [None]:
def rbf_features(
    state: np.array,  # (N, S)
    centers: np.array,  # (D, S)
    sigmas: float,
) -> np.array:  # (N, D)

    D = np.shape(centers)[0]
    N = np.shape(state)[0]
    new_state = np.repeat(state[:, None, :], D, axis=1)
    new_center = np.repeat(centers[None, :, :], N, axis=0)
    return np.exp(-np.linalg.norm(new_state - new_center, 2, axis=2)**2 / sigmas**2 / 2)


def tile_features(
    state: np.array,  # (N, S)
    centers: np.array,  # (D, S)
    widths: float,
    offsets: list = [0],  # list of tuples of length S
) -> np.array:  # (N, D)

    D = np.shape(centers)[0]
    N = np.shape(state)[0]
    new_state = np.repeat(state[:, None, :], D, axis=1)
    output = np.zeros((N, D))
    for offset in offsets:
        shifted_center = centers + offset
        new_center = np.repeat(shifted_center[None, :, :], N, axis=0)
        output += np.array(np.linalg.norm(new_state - new_center, np.inf, axis=2) < widths, dtype=np.float32)

    return output / len(offsets)


def coarse_features(
    state: np.array,  # (N, S)
    centers: np.array,  # (D, S)
    widths: float,
    offsets: list = [0],  # list of tuples of length S
) -> np.array:  # (N, D)

    D = np.shape(centers)[0]
    N = np.shape(state)[0]
    new_state = np.repeat(state[:, None, :], D, axis=1)
    output = np.zeros((N, D))
    for offset in offsets:
        shifted_center = centers + offset
        new_center = np.repeat(shifted_center[None, :, :], N, axis=0)
        output += np.array(np.linalg.norm(new_state - new_center, 2, axis=2) < widths, dtype=np.float32)

    return output / len(offsets)

def aggregation_features(state, centers):
    state = torch.tensor(state, device=device)
    centers = torch.tensor(centers, device=device)

    distance = torch.sum((state[:, None, :] - centers[None, :, :])**2, dim=-1)
    return (distance == distance.min(-1, keepdims=True).values) * 1.0  # make it float


In [None]:
s = np.array([[1, 2]])
c = np.array([[0, 0], [1, 1], [4, 4], [6, 6]])
aggregation_features(s, c)

[[ 5  1 13 41]]


array([[0., 1., 0., 0.]])

In [None]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# Define the neural network model
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, seed, fc1_units=128, fc2_units=64):
        super(QNetwork, self).__init__()
        self.seed = torch.manual_seed(seed)

        self.feature_size = self.feature_extract_init(state_size, action_size)
        self.fc1 = nn.Linear(self.feature_size, fc1_units)
        self.fc2 = nn.Linear(fc1_units, action_size)
        #self.fc2 = nn.Linear(fc1_units, fc2_units)
        #self.fc3 = nn.Linear(fc2_units, action_size)
        self.to(device)

    def feature_extract_init(self, state_size, action_size):
        n_centers = [3, 3]
        centers = np.array(
          np.meshgrid(*[
              np.linspace(env.observation_space.low[i], env.observation_space.high[i], n_centers[i])
              for i in range(env.observation_space.shape[0])
          ])
        ).reshape(env.observation_space.shape[0], -1).T
        centers = torch.tensor(centers).float().to(device)
        self.feature_name, self.feature_extract = "Aggregate", lambda state : aggregation_features(state.reshape(-1, state_size), centers)
        return self.feature_extract(env.reset()[0]).shape[1]

    def forward(self, state):
        #x = F.relu(self.fc1(state))
        #x = F.relu(self.fc2(x))
        #return self.fc3(x)
        x = self.feature_extract(state)
        return self.fc2(self.fc1(x))

class ReplayBuffer:
    def __init__(self, action_size, buffer_size, batch_size, seed):
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)

    def add(self, state, action, reward, next_state, done):
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)

        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        return len(self.memory)

cpu


In [None]:
# Define the DQN agent class
class DQNAgent:
    # Initialize the DQN agent
    def __init__(self, state_size, action_size, seed, lr):
        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)

        self.qnetwork_local = QNetwork(state_size, action_size, seed).to(device)
        self.qnetwork_target = QNetwork(state_size, action_size, seed).to(device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr)

        self.memory = ReplayBuffer(action_size, buffer_size=int(1e5), batch_size=64, seed=seed)
        self.t_step = 0

    def step(self, state, action, reward, next_state, done):
        self.memory.add(state, action, reward, next_state, done)

        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            if len(self.memory) > 64:
                experiences = self.memory.sample()
                self.learn(experiences, gamma=0.99)

    # Choose an action based on the current state
    def act(self, state, eps=0.):
        state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device)
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(state_tensor)
        self.qnetwork_local.train()

        if np.random.random() > eps:
            return action_values.argmax(dim=1).item()
        else:
            return np.random.randint(self.action_size)

    # Learn from batch of experiences
    def learn(self, experiences, gamma):
        states, actions, rewards, next_states, dones = zip(*experiences)
        states = torch.from_numpy(np.vstack(states)).float().to(device)
        actions = torch.from_numpy(np.vstack(actions)).long().to(device)
        rewards = torch.from_numpy(np.vstack(rewards)).float().to(device)
        next_states = torch.from_numpy(np.vstack(next_states)).float().to(device)
        dones = torch.from_numpy(np.vstack(dones).astype(np.uint8)).float().to(device)

        Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))

        Q_expected = self.qnetwork_local(states).gather(1, actions)

        loss = F.mse_loss(Q_expected, Q_targets)
        #print(loss)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.soft_update(self.qnetwork_local, self.qnetwork_target, tau=1e-3)

    def soft_update(self, local_model, target_model, tau):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)

In [None]:
!git clone https://github.com/sparisi/gym_gridworlds

Cloning into 'gym_gridworlds'...
remote: Enumerating objects: 140, done.[K
remote: Counting objects: 100% (71/71), done.[K
remote: Compressing objects: 100% (47/47), done.[K
remote: Total 140 (delta 36), reused 51 (delta 24), pack-reused 69 (from 1)[K
Receiving objects: 100% (140/140), 74.35 KiB | 1.91 MiB/s, done.
Resolving deltas: 100% (66/66), done.


In [None]:
!pip install -e ./gym_gridworlds

Obtaining file:///content/gym_gridworlds
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting gymnasium (from Gym-Gridworlds==1.0)
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium->Gym-Gridworlds==1.0)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium, Gym-Gridworlds
  Running setup.py develop for Gym-Gridworlds
Successfully installed Gym-Gridworlds-1.0 farama-notifications-0.0.4 gymnasium-1.0.0


In [None]:
from gymnasium.envs.registration import register

register(
    id="Gym-Gridworlds/Empty-2x2-v0",
    entry_point="gym_gridworlds.gridworld:GridworldRandomStart",
    max_episode_steps=10,
    kwargs={
        "grid": "2x2_empty",
    },
)

In [None]:
# Initialize the environment and the agent
import gym
import gymnasium
from collections import deque
import random

# Set up the environment
env_id = "Gym-Gridworlds/Empty-3x3-v0"
env = gymnasium.make(env_id, coordinate_observation=True, random_action_prob=0.1, reward_noise_std=0.01)
env_eval = gymnasium.make(env_id, coordinate_observation=True, max_episode_steps=10)  # 10 steps only for faster eval
episodes_eval = 10  # max expected return will be 0.994

# Define training parameters
num_episodes = 10000
max_steps_per_episode = 10
epsilon_start = 1.0
epsilon_end = 0.2
epsilon_decay_rate = 0.99
gamma = 0.9
lr = 1e-3
buffer_size = 100
buffer = deque(maxlen=buffer_size)
batch_size = 32
update_frequency = 100


# Initialize the DQNAgent
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n
new_agent = DQNAgent(input_dim, output_dim, seed=170715, lr = lr)

  centers = torch.tensor(centers, device=device)


In [None]:
# Training loop
ep_mean_reward = 0
for episode in range(num_episodes):
    # Reset the environment
    state, _ = env.reset()
    epsilon = max(epsilon_end, epsilon_start * (epsilon_decay_rate ** episode))

    ep_reward = 0
    # Run one episode
    for step in range(max_steps_per_episode):
        # Choose and perform an action
        action = new_agent.act(state, epsilon)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        #print(reward)
        buffer.append((state, action, reward, next_state, done))

        #print(len(buffer), batch_size)
        if len(buffer) >= batch_size:
            batch = random.sample(buffer, batch_size)
            # Update the agent's knowledge
            new_agent.learn(batch, gamma)

        state = next_state

        ep_reward += reward
        # Check if the episode has ended
        if done:
            break

    ep_mean_reward += ep_reward
    if (episode + 1) % update_frequency == 0:
        print(f"Episode {episode + 1}: Finished training with reward {ep_mean_reward}")
        ep_mean_reward = 0

  state = torch.tensor(state, device=device)
  centers = torch.tensor(centers, device=device)


Episode 100: Finished training with reward 41.778254822966424
Episode 200: Finished training with reward 83.82525071859243
Episode 300: Finished training with reward 82.87260599972302
Episode 400: Finished training with reward 96.99196246927394
Episode 500: Finished training with reward 95.01423278351403
Episode 600: Finished training with reward 96.14027723552594
Episode 700: Finished training with reward 95.75091964923547
Episode 800: Finished training with reward 97.67935285506772
Episode 900: Finished training with reward 96.82794593994151
Episode 1000: Finished training with reward 95.58492690068314
Episode 1100: Finished training with reward 98.98421601976912
Episode 1200: Finished training with reward 98.17582726059109
Episode 1300: Finished training with reward 97.60103537390421
Episode 1400: Finished training with reward 99.35196498302626
Episode 1500: Finished training with reward 97.69023967546062
Episode 1600: Finished training with reward 97.60274362553264
Episode 1700: Fi

KeyboardInterrupt: 

In [None]:
# Evaluate the agent's performance
test_episodes = 100
episode_rewards = []

for episode in range(test_episodes):
    state = env.reset()
    episode_reward = 0
    done = False

    while not done:
        action = new_agent.act(state, eps=0.)
        next_state, reward, done, _ = env.step(action)
        episode_reward += reward
        state = next_state

    episode_rewards.append(episode_reward)

average_reward = sum(episode_rewards) / test_episodes
print(f"Average reward over {test_episodes} test episodes: {average_reward:.2f}")


Average reward over 100 test episodes: 178.75


In [None]:
# Visualize the agent's performance
import time

state = env.reset()
done = False

while not done:
    env.render()
    action = new_agent.act(state, eps=0.)
    next_state, reward, done, _ = env.step(action)
    state = next_state
    time.sleep(0.1)  # Add a delay to make the visualization easier to follow

env.close()