In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

# Hyperparameters
ENV_NAME = "FrozenLake-v1"
GAMMA = 0.99  # Discount factor
LR = 0.001  # Learning rate
BATCH_SIZE = 64  # Batch size for training
BUFFER_SIZE = 10000  # Replay buffer size
EPSILON_START = 1.0  # Initial exploration rate
EPSILON_END = 0.01  # Final exploration rate
EPSILON_DECAY = 0.995  # Decay rate for epsilon
TARGET_UPDATE = 10  # Frequency for updating target network
EPISODES = 500  # Total episodes to train

# Set device
# device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
device = torch.device("cpu")


# Replay Buffer
class ReplayBuffer:
    def __init__(self, size):
        self.buffer = deque(maxlen=size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)


# Neural Network for Q-function
class QNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(QNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128), nn.ReLU(), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, output_dim)
        )

    def forward(self, x):
        return self.fc(x)


# Training DQN
def train_dqn():
    # Initialize environment and networks
    env = gym.make(ENV_NAME, is_slippery=False)  # Deterministic mode
    state_size = env.observation_space.n
    action_size = env.action_space.n
    q_network = QNetwork(state_size, action_size).to(device)
    target_network = QNetwork(state_size, action_size).to(device)
    target_network.load_state_dict(q_network.state_dict())
    target_network.eval()

    optimizer = optim.Adam(q_network.parameters(), lr=LR)
    replay_buffer = ReplayBuffer(BUFFER_SIZE)

    epsilon = EPSILON_START
    rewards_per_episode = []

    # One-hot encoding for discrete states
    def one_hot_encoding(state, state_size):
        one_hot = np.zeros(state_size)
        one_hot[state] = 1
        return one_hot

    # Training Loop
    for episode in range(EPISODES):
        state, _ = env.reset()
        state = one_hot_encoding(state, state_size)
        total_reward = 0

        while True:
            # Select action using epsilon-greedy
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
                with torch.no_grad():
                    action = torch.argmax(q_network(state_tensor)).item()

            # Take action
            next_state, reward, done, _, _ = env.step(action)
            next_state = one_hot_encoding(next_state, state_size)
            replay_buffer.add((state, action, reward, next_state, done))
            state = next_state
            total_reward += reward

            # Train Q-network
            if len(replay_buffer) >= BATCH_SIZE:
                experiences = replay_buffer.sample(BATCH_SIZE)
                states, actions, rewards, next_states, dones = zip(*experiences)

                states = torch.FloatTensor(states).to(device)
                actions = torch.LongTensor(actions).unsqueeze(1).to(device)
                rewards = torch.FloatTensor(rewards).unsqueeze(1).to(device)
                next_states = torch.FloatTensor(next_states).to(device)
                dones = torch.FloatTensor(dones).unsqueeze(1).to(device)

                q_values = q_network(states).gather(1, actions)
                next_q_values = target_network(next_states).max(1, keepdim=True)[0]
                targets = rewards + (1 - dones) * GAMMA * next_q_values

                loss = nn.MSELoss()(q_values, targets)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            if done:
                break

        # Decay epsilon
        epsilon = max(EPSILON_END, epsilon * EPSILON_DECAY)
        rewards_per_episode.append(total_reward)

        # Update target network
        if episode % TARGET_UPDATE == 0:
            target_network.load_state_dict(q_network.state_dict())

        # Print progress
        if (episode + 1) % 50 == 0:
            print(f"Episode {episode + 1}/{EPISODES}, Reward: {np.mean(rewards_per_episode[-50:])}")

    return q_network


q_network = train_dqn()

  states = torch.FloatTensor(states).to(device)


Episode 50/500, Reward: 0.0
Episode 100/500, Reward: 0.16
Episode 150/500, Reward: 0.1
Episode 200/500, Reward: 0.28
Episode 250/500, Reward: 0.66
Episode 300/500, Reward: 0.74
Episode 350/500, Reward: 0.7
Episode 400/500, Reward: 0.9
Episode 450/500, Reward: 0.88
Episode 500/500, Reward: 0.96


In [None]:
import time


def visualize_agent(env, q_network, state_size, delay=0.5):
    state, _ = env.reset()
    total_reward = 0
    done = False

    def one_hot_encoding(state, state_size):
        one_hot = np.zeros(state_size)
        one_hot[state] = 1
        return one_hot

    while not done:
        env.render()  # Visualize the environment
        state_tensor = torch.FloatTensor(one_hot_encoding(state, state_size)).unsqueeze(0)
        with torch.no_grad():
            action = torch.argmax(q_network(state_tensor)).item()

        next = env.step(action)
        print(next)
        state, reward, done, _, _ = next
        total_reward += reward
        time.sleep(delay)  # Pause for a short duration to simulate animation

    print(f"Total reward: {total_reward}")
    env.render()  # Display the final state


# Visualize the trained agent
env = gym.make(ENV_NAME, is_slippery=False, render_mode="human")
visualize_agent(env, q_network, env.observation_space.n)