In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random

class GridWorld:
    def __init__(self, size=5):
        self.size = size
        self.reset()
    
    def reset(self):
        self.agent_position = 0 #starts at top-left
        return self.agent_position
    
    def step(self, action):
        if action == 0 and self.agent_position % self.size > 0: #left
            self.agent_position -= 1
        elif action == 1 and self.agent_position % self.size < self.size - 1: #right
            self.agent_position += 1
        elif action == 2 and self.agent_position >= self.size: #up
            self.agent_position -= self.size
        elif action == 3 and self.agent_position < self.size * (self.size - 1): #down
            self.agent_position += self.size
        
        done = self.agent_position == self.size * self.size - 1
        reward = 10 if done else -1
        return self.agent_position, reward, done

class PolicyGradientAgent:
    def __init__(self, state_size, action_size, learning_rate=0.001, discount_factor=0.99):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        
        print(self.device)

        # Policy Network
        self.policy_network = nn.Sequential(
            nn.Linear(state_size, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, action_size),
            nn.Softmax(dim=-1) # Softmax to output probability distribution over actions
        ).to(self.device)
        
        self.optimizer = optim.Adam(self.policy_network.parameters(), lr=learning_rate)
        
    def one_hot_encode(self, state):
        encoding = [0] * self.state_size
        encoding[state] = 1
        return encoding
    
    def get_action(self, state):
        with torch.no_grad():
            state_tensor = torch.FloatTensor(self.one_hot_encode(state)).to(self.device)
            action_probs = self.policy_network(state_tensor)
            action = torch.multinomial(action_probs, num_samples=1).item() # Sample action from policy distribution
        return action
    
    def train(self, episode_states, episode_actions, episode_rewards):
        #Convert to tensors
        state_tensors = torch.FloatTensor([self.one_hot_encode(s) for s in episode_states]).to(self.device)
        action_tensors = torch.LongTensor(episode_actions).to(self.device)
        reward_tensors = torch.FloatTensor(episode_rewards).to(self.device)

        # Calculate discounted rewards for the entire episode
        discounted_rewards = []
        cumulative_reward = 0
        for r in reversed(reward_tensors):
            cumulative_reward = r + self.discount_factor * cumulative_reward
            discounted_rewards.insert(0, cumulative_reward)
        discounted_rewards = torch.tensor(discounted_rewards).to(self.device)
        
        #Normalize the rewards
        discounted_rewards = (discounted_rewards - discounted_rewards.mean())/(discounted_rewards.std() + 1e-8)
        
        #Get the probabilities
        action_probs = self.policy_network(state_tensors)
        
        #Get log probabilities for sampled actions
        log_probs = torch.log(action_probs.gather(1, action_tensors.unsqueeze(1))).squeeze()
        
        #Calculate the loss
        loss = -torch.mean(log_probs * discounted_rewards) #Negative since we are trying to maximize rewards
        # discounted_rewards aren't getting updated. The log_probs are
        #so if discounted_rewards are positive, log_probs would go to 0, meaning the probs are going to 1
        #and if the discounted_rewards are negative, log_probs would go to -infinity, meaning the probs are going to 0

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

def train_policy_gradient_agent(episodes=1000):
    env = GridWorld()
    agent = PolicyGradientAgent(state_size=env.size**2, action_size=4, learning_rate=0.001)

    for episode in range(episodes):
        state = env.reset()
        done = False
        episode_states, episode_actions, episode_rewards = [], [], []
        total_reward = 0

        while not done:
            action = agent.get_action(state)
            next_state, reward, done = env.step(action)
            
            episode_states.append(state)
            episode_actions.append(action)
            episode_rewards.append(reward)
            
            state = next_state
            total_reward += reward

        agent.train(episode_states, episode_actions, episode_rewards)
        
        if episode % 100 == 0:
            print(f"Episode {episode}: Total Reward = {total_reward}")

    return agent

if __name__ == "__main__":
    # Train and compare the DQN agent
    from torch import nn, optim
    import random
    from torch.utils.tensorboard import SummaryWriter
    import numpy as np

    print("Training Policy Gradient Agent:")
    pg_agent = train_policy_gradient_agent()

Training Policy Gradient Agent:
cuda
Episode 0: Total Reward = -50
Episode 100: Total Reward = -6
Episode 200: Total Reward = -4
Episode 300: Total Reward = -2
Episode 400: Total Reward = 2
Episode 500: Total Reward = 3
Episode 600: Total Reward = 2
Episode 700: Total Reward = 3
Episode 800: Total Reward = 2
Episode 900: Total Reward = 3


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np

class GridWorld:
    def __init__(self, size=5):
        self.size = size
        self.reset()
    
    def reset(self):
        self.agent_position = 0 #starts at top-left
        return self.agent_position
    
    def step(self, action):
        if action == 0 and self.agent_position % self.size > 0: #left
            self.agent_position -= 1
        elif action == 1 and self.agent_position % self.size < self.size - 1: #right
            self.agent_position += 1
        elif action == 2 and self.agent_position >= self.size: #up
            self.agent_position -= self.size
        elif action == 3 and self.agent_position < self.size * (self.size - 1): #down
            self.agent_position += self.size
        
        done = self.agent_position == self.size * self.size - 1
        reward = 10 if done else -1
        return self.agent_position, reward, done

class PPOAgent:
    def __init__(self, state_size, action_size, learning_rate=0.0003, discount_factor=0.99, clip_epsilon=0.2, update_epochs=10, batch_size=64):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.clip_epsilon = clip_epsilon
        self.update_epochs = update_epochs
        self.batch_size = batch_size
        print(self.device)

        # Policy Network
        self.policy_network = nn.Sequential(
            nn.Linear(state_size, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, action_size),
            nn.Softmax(dim=-1) 
        ).to(self.device)

        # Value Network
        self.value_network = nn.Sequential(
            nn.Linear(state_size, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, 1)
        ).to(self.device)

        self.policy_optimizer = optim.Adam(self.policy_network.parameters(), lr=learning_rate)
        self.value_optimizer = optim.Adam(self.value_network.parameters(), lr=learning_rate)
        
    def one_hot_encode(self, state):
        encoding = [0] * self.state_size
        encoding[state] = 1
        return encoding
    
    def get_action(self, state):
        with torch.no_grad():
            state_tensor = torch.FloatTensor(self.one_hot_encode(state)).to(self.device)
            action_probs = self.policy_network(state_tensor)
            action = torch.multinomial(action_probs, num_samples=1).item() 
        return action
    
    def get_value(self, state):
         with torch.no_grad():
            state_tensor = torch.FloatTensor(self.one_hot_encode(state)).to(self.device)
            value = self.value_network(state_tensor)
            return value.item()
    
    def train(self, episode_states, episode_actions, episode_rewards, next_states):
        # Convert to tensors
        state_tensors = torch.FloatTensor([self.one_hot_encode(s) for s in episode_states]).to(self.device)
        action_tensors = torch.LongTensor(episode_actions).to(self.device)
        reward_tensors = torch.FloatTensor(episode_rewards).to(self.device)
        next_state_tensors = torch.FloatTensor([self.one_hot_encode(s) for s in next_states]).to(self.device)

        # Calculate discounted rewards for the entire episode
        discounted_rewards = []
        cumulative_reward = 0
        for r in reversed(reward_tensors):
            cumulative_reward = r + self.discount_factor * cumulative_reward
            discounted_rewards.insert(0, cumulative_reward)
        discounted_rewards = torch.tensor(discounted_rewards).to(self.device)

        # Precompute old probabilities and values (detached from computation graph)
        with torch.no_grad():
            old_values = self.value_network(state_tensors)
            next_values = self.value_network(next_state_tensors)
            old_action_probs = self.policy_network(state_tensors).gather(1, action_tensors.unsqueeze(1)).squeeze().detach()
            advantages = discounted_rewards - old_values.squeeze()
            # Normalize advantages
            advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        for _ in range(self.update_epochs):
            # Shuffle data
            indices = np.arange(len(episode_states))
            np.random.shuffle(indices)

            for start in range(0, len(episode_states), self.batch_size):
                end = min(start + self.batch_size, len(episode_states))
                batch_indices = indices[start:end]
                
                batch_state_tensors = state_tensors[batch_indices]
                batch_action_tensors = action_tensors[batch_indices]
                batch_advantages = advantages[batch_indices]
                batch_discounted_rewards = discounted_rewards[batch_indices]
                batch_old_action_probs = old_action_probs[batch_indices]

                # Policy loss
                new_action_probs = self.policy_network(batch_state_tensors).gather(1, batch_action_tensors.unsqueeze(1)).squeeze()
                ratios = new_action_probs / (batch_old_action_probs + 1e-8)
                
                clipped_ratios = torch.clamp(ratios, 1 - self.clip_epsilon, 1 + self.clip_epsilon)
                policy_loss = -torch.min(ratios * batch_advantages, clipped_ratios * batch_advantages).mean()
                #old_action_probs and advantages aren't being updated, only the new_action_probs are
                #advantages>0 means an action was better than expected
                #ratios>0 meeans the new policy is increasing the probability of that action
                #thus we need to maximize ratios * advantages 

                self.policy_optimizer.zero_grad()
                policy_loss.backward()
                self.policy_optimizer.step()

                # Value loss
                values = self.value_network(batch_state_tensors).squeeze()
                value_loss = F.mse_loss(values, batch_discounted_rewards)

                self.value_optimizer.zero_grad()
                value_loss.backward()
                self.value_optimizer.step()

def train_ppo_agent(episodes=1000):
    env = GridWorld()
    agent = PPOAgent(state_size=env.size**2, action_size=4)

    for episode in range(episodes):
        state = env.reset()
        done = False
        episode_states, episode_actions, episode_rewards, next_states = [], [], [], []
        total_reward = 0

        while not done:
            action = agent.get_action(state)
            next_state, reward, done = env.step(action)
            
            episode_states.append(state)
            episode_actions.append(action)
            episode_rewards.append(reward)
            next_states.append(next_state)
            
            state = next_state
            total_reward += reward

        agent.train(episode_states, episode_actions, episode_rewards, next_states)
        
        if episode % 100 == 0:
            print(f"Episode {episode}: Total Reward = {total_reward}")

    return agent

if __name__ == '__main__':
    print("Training Proximal Policy Optimization Agent:")
    trained_agent = train_ppo_agent(episodes=500)

Training Proximal Policy Optimization Agent:
cuda
Episode 0: Total Reward = -69
Episode 100: Total Reward = -1
Episode 200: Total Reward = 3
Episode 300: Total Reward = 3
Episode 400: Total Reward = 3


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

class GridWorld:
    def __init__(self, size=15):
        self.size = size
        self.reset()
    
    def reset(self):
        self.agent_position = 0 #starts at top-left
        return self.agent_position
    
    def step(self, action):
        if action == 0 and self.agent_position % self.size > 0: #left
            self.agent_position -= 1
        elif action == 1 and self.agent_position % self.size < self.size - 1: #right
            self.agent_position += 1
        elif action == 2 and self.agent_position >= self.size: #up
            self.agent_position -= self.size
        elif action == 3 and self.agent_position < self.size * (self.size - 1): #down
            self.agent_position += self.size
        
        done = self.agent_position == self.size * self.size - 1
        reward = 10 if done else -1
        return self.agent_position, reward, done

class QLearningAgent:
    def __init__(self, state_size, action_size, learning_rate=0.001, discount_factor=0.99, exploration_rate=1.0, exploration_decay=0.995, exploration_min=0.01):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.exploration_min = exploration_min
        self.exploration_decay = exploration_decay

        print(self.device)
        
        # Q-Network
        self.q_network = nn.Sequential(
            nn.Linear(state_size, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, action_size)
        ).to(self.device) 
        
        # Target Network
        self.target_network = nn.Sequential(
            nn.Linear(state_size, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, action_size)
        ).to(self.device) 
        self.target_network.load_state_dict(self.q_network.state_dict())
        
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.loss_fn = nn.MSELoss()
        self.target_update_frequency = 5
        self.steps = 0

    def get_action(self, state):
        if np.random.rand() < self.exploration_rate:
            return random.randrange(self.action_size)
        with torch.no_grad():
            state_tensor = torch.FloatTensor(self.one_hot_encode(state)).to(self.device)  
            q_values = self.q_network(state_tensor)
            return torch.argmax(q_values).item()

    def one_hot_encode(self, state):
        encoding = [0] * self.state_size
        encoding[state] = 1
        return encoding

    def train(self, state, action, reward, next_state, done):
        state_tensor = torch.FloatTensor(self.one_hot_encode(state)).to(self.device)
        next_state_tensor = torch.FloatTensor(self.one_hot_encode(next_state)).to(self.device)

        current_q_values = self.q_network(state_tensor)
        
        with torch.no_grad():
            next_q_values = self.target_network(next_state_tensor)
            max_next_q_value = torch.max(next_q_values)
            target_q_value = reward + (self.discount_factor * max_next_q_value * (not done))

        target = current_q_values.clone()
        target[action] = target_q_value
        
        loss = self.loss_fn(current_q_values, target)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        self.steps += 1

    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

def train_agent(episodes=500):
    env = GridWorld()
    agent = QLearningAgent(
        state_size=env.size**2,
        action_size=4,
        learning_rate=0.001,
        exploration_decay=0.995,
        exploration_min=0.01
    )
    
    for episode in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0
        
        while not done:
            action = agent.get_action(state)
            next_state, reward, done = env.step(action)
            agent.train(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
        
        # Decay exploration rate after each episode
        agent.exploration_rate = max(
            agent.exploration_min,
            agent.exploration_rate * agent.exploration_decay
        )
        
        # Update target network periodically
        if episode % agent.target_update_frequency == 0:
            agent.update_target_network()
        
        if episode % 100 == 0:
            print(f"Episode {episode}: Total Reward = {total_reward}, Exploration Rate = {agent.exploration_rate:.3f}")
    
    return agent

# Run the training
print("Training DQN Agent:")
trained_agent = train_agent()

Training DQN Agent:
cuda
Episode 0: Total Reward = -284, Exploration Rate = 0.995
Episode 100: Total Reward = -74, Exploration Rate = 0.603
Episode 200: Total Reward = -34, Exploration Rate = 0.365
Episode 300: Total Reward = -29, Exploration Rate = 0.221
Episode 400: Total Reward = -18, Exploration Rate = 0.134


In [4]:
def test_agent(agent, env):
    state = env.reset()
    done = False
    trajectory = []
    
    # Disable exploration for testing
    original_exploration = agent.exploration_rate
    agent.exploration_rate = 0
    
    while not done:
        action = agent.get_action(state)
        next_state, reward, done = env.step(action)
        trajectory.append((state, action))
        state = next_state
    
    # Restore exploration rate (if you plan to keep training)
    agent.exploration_rate = original_exploration
    
    return trajectory

# Test the agent
env = GridWorld()
path = test_agent(trained_agent, env)
print("Path taken:", path)

Path taken: [(0, 3), (15, 3), (30, 3), (45, 3), (60, 1), (61, 3), (76, 3), (91, 3), (106, 1), (107, 3), (122, 1), (123, 1), (124, 1), (125, 1), (126, 1), (127, 1), (128, 1), (129, 1), (130, 1), (131, 1), (132, 3), (147, 1), (148, 1), (149, 3), (164, 3), (179, 3), (194, 3), (209, 3)]


In [5]:
def print_grid_path(size, path):
    grid = [["·" for _ in range(size)] for _ in range(size)]
    action_symbols = {0: "←", 1: "→", 2: "↑", 3: "↓"}
    
    for (state, action) in path:
        row = state // size
        col = state % size
        grid[row][col] = action_symbols[action]
    
    # Mark goal (bottom-right corner)
    grid[size-1][size-1] = "G"
    
    for row in grid:
        print(" ".join(row))

# Visualize the path
print_grid_path(env.size, path)

↓ · · · · · · · · · · · · · ·
↓ · · · · · · · · · · · · · ·
↓ · · · · · · · · · · · · · ·
↓ · · · · · · · · · · · · · ·
→ ↓ · · · · · · · · · · · · ·
· ↓ · · · · · · · · · · · · ·
· ↓ · · · · · · · · · · · · ·
· → ↓ · · · · · · · · · · · ·
· · → → → → → → → → → → ↓ · ·
· · · · · · · · · · · · → → ↓
· · · · · · · · · · · · · · ↓
· · · · · · · · · · · · · · ↓
· · · · · · · · · · · · · · ↓
· · · · · · · · · · · · · · ↓
· · · · · · · · · · · · · · G


In [6]:
def print_all_path(size):
    positions = torch.tensor([[0]*size*size]*size*size, dtype=torch.float).to('cuda' if torch.cuda.is_available() else 'cpu')
    for i in range(size*size):
        positions[i][i] = 1

    maxpos = []
    for position in positions:
        pick = trained_agent.q_network(position)
        maxpos.append(torch.argmax(pick))

    action_symbols = {0: "←", 1: "→", 2: "↑", 3: "↓"}
    for i,j in enumerate(maxpos):
        print(action_symbols[j.item()], end=' ')
        if (i+1)%size==0:
            print()

print_all_path(env.size)

↓ ← ↓ ↓ ↓ ↓ ↓ ↓ ↓ → ↓ ↓ ↓ → ↓ 
↓ ↓ ↓ ↓ ↓ ↓ → → ↓ → ↓ → ↓ ↓ ↓ 
↓ ↓ ↓ → ↓ ↓ ↓ ↓ ↓ → → → ↓ → ↓ 
↓ ↓ ↓ ↓ ↓ → ↓ ↓ ↓ → → → → ↓ ↓ 
→ ↓ ↓ ↓ → ↓ ↓ ↓ ↓ ↓ → → → → ↓ 
↓ ↓ ↓ ↓ → ↓ → → ↓ ↓ ↓ → ↓ ↓ ↓ 
↓ ↓ ↓ → → → → → ↓ ↓ ↓ → ↓ → ↓ 
↓ → ↓ → → → → → → ↓ → → → ↓ ↓ 
→ ↓ → → → → → → → → → → ↓ → ↓ 
→ → ↓ ↓ → ↓ → → → → → ↓ → → ↓ 
→ → ↓ ↓ ↓ ↓ → → → ↓ → → → → ↓ 
↓ → ↓ ↓ ↓ → → → → → → → → → ↓ 
↓ → → → → → ↓ → → → → → → → ↓ 
→ → → → → → → → → → → → → → ↓ 
→ → → → → → → → → → → → → → → 
