<a href="https://colab.research.google.com/github/salarMokhtariL/Reinforcement_Learning_Algorithms/blob/main/PPO_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Import required libraries
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import gym
from torch.distributions import Categorical
import matplotlib.pyplot as plt

In [2]:
# Hyperparameters
learning_rate = 3e-4         # Learning rate for optimizers
gamma = 0.99                 # Discount factor for future rewards
lambda_gae = 0.95            # GAE lambda for advantage calculation
clip_epsilon = 0.2           # Clipping range for PPO objective
epochs = 3                   # Epochs per policy update
timesteps_per_batch = 2048   # Number of timesteps per batch for training
max_timesteps = 1e5          # Total training timesteps

  and should_run_async(code)


In [3]:
# Policy Network for action selection
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        # Define a simple feed-forward network with two hidden layers and softmax output
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, action_dim),
            nn.Softmax(dim=-1)  # Output probabilities for each action
        )

    def forward(self, x):
        return self.fc(x)

In [4]:
# Value Network for estimating state values (baseline)
class ValueNetwork(nn.Module):
    def __init__(self, state_dim):
        super(ValueNetwork, self).__init__()
        # Define a feed-forward network to output the state value
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 1)  # Single output for state value
        )

    def forward(self, x):
        return self.fc(x)

In [5]:
# PPO Agent class
class PPOAgent:
    def __init__(self, env):
        self.env = env
        self.policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.n)
        self.value_net = ValueNetwork(env.observation_space.shape[0])
        # Optimizers for policy and value networks
        self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
        self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=learning_rate)
        self.rewards_history = []  # Track rewards for plotting

    def select_action(self, state):
        """Sample an action according to the policy."""
        state = torch.FloatTensor(state).unsqueeze(0)  # Add batch dimension
        probs = self.policy_net(state)  # Get action probabilities from policy
        dist = Categorical(probs)       # Create a Categorical distribution
        action = dist.sample()          # Sample an action
        return action.item(), dist.log_prob(action)  # Return action and log probability

    def compute_gae(self, rewards, values, dones):
        """Compute Generalized Advantage Estimation (GAE)."""
        values = values + [0]  # Extend values to use for last reward calculation
        gae = 0
        returns = []           # Store returns for each timestep
        # Loop backward to calculate GAE
        for step in reversed(range(len(rewards))):
            delta = rewards[step] + gamma * values[step + 1] * (1 - dones[step]) - values[step]
            gae = delta + gamma * lambda_gae * (1 - dones[step]) * gae
            returns.insert(0, gae + values[step])
        return returns

    def update_policy(self, states, actions, log_probs_old, returns, advantages):
        """Update policy and value networks based on PPO objective."""
        for _ in range(epochs):
            for index in range(len(states)):
                state = torch.FloatTensor(states[index])
                action = torch.tensor(actions[index])
                log_prob_old = torch.tensor(log_probs_old[index])
                advantage = torch.tensor(advantages[index])

                # Calculate new log probability and probability ratio
                dist = Categorical(self.policy_net(state))
                log_prob = dist.log_prob(action)
                ratio = torch.exp(log_prob - log_prob_old)

                # PPO clipped objective
                surr1 = ratio * advantage
                surr2 = torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon) * advantage
                policy_loss = -torch.min(surr1, surr2).mean()  # Negative for gradient descent

                # Update policy network
                self.policy_optimizer.zero_grad()
                policy_loss.backward()
                self.policy_optimizer.step()

            # Update value network to minimize mean squared error
            values = self.value_net(torch.FloatTensor(states)).squeeze()
            value_loss = nn.MSELoss()(values, torch.FloatTensor(returns))
            self.value_optimizer.zero_grad()
            value_loss.backward()
            self.value_optimizer.step()

    def train(self):
        """Main training loop."""
        timestep = 0
        episode_rewards = []
        while timestep < max_timesteps:
            states, actions, log_probs_old, rewards, dones, values = [], [], [], [], [], []
            state = self.env.reset()
            episode_reward = 0  # Total reward per episode

            # Collect experience in the environment
            for _ in range(timesteps_per_batch):
                action, log_prob = self.select_action(state)
                next_state, reward, done, _ = self.env.step(action)

                # Store experience
                states.append(state)
                actions.append(action)
                log_probs_old.append(log_prob.item())
                rewards.append(reward)
                dones.append(done)
                values.append(self.value_net(torch.FloatTensor(state)).item())

                # Update state and episode reward
                state = next_state
                episode_reward += reward
                timestep += 1
                if done:
                    # Track rewards to visualize agent progress
                    state = self.env.reset()
                    episode_rewards.append(episode_reward)
                    break

            # Calculate moving average reward over the last 10 episodes
            avg_reward = np.mean(episode_rewards[-10:])
            self.rewards_history.append(avg_reward)

            # Compute returns and advantages using GAE
            advantages = self.compute_gae(rewards, values, dones)
            returns = [adv + val for adv, val in zip(advantages, values)]

            # Normalize advantages for stability
            advantages = (advantages - np.mean(advantages)) / (np.std(advantages) + 1e-8)

            # Perform policy and value updates
            self.update_policy(states, actions, log_probs_old, returns, advantages)

        # Plotting the reward function to visualize training progress
        plt.plot(self.rewards_history)
        plt.xlabel('Episode')
        plt.ylabel('Average Reward (last 10 episodes)')
        plt.title('PPO Training Rewards')
        plt.show()