<a href="https://colab.research.google.com/github/itsmepriyabrata/priyabrata_ai_python/blob/main/Reinforcement%20learning%20algorithms%20part%203.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Proximal Policy Optimization

In [None]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

class PPOAgent:
    def __init__(self, env):
        self.env = env
        self.state_size = env.observation_space.shape[0]
        self.action_size = env.action_space.n
        self.gamma = 0.99
        self.clip_ratio = 0.2
        self.learning_rate = 0.001
        self.epochs = 10
        self.batch_size = 32
        self.model = self.build_model()

    def build_model(self):
        model = Sequential()
        model.add(Dense(64, input_dim=self.state_size, activation='relu'))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(self.action_size, activation='softmax'))
        model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=self.learning_rate))
        return model

    def act(self, state):
        probs = self.model.predict(state)[0]
        action = np.random.choice(self.action_size, p=probs)
        return action

    def train(self, episodes):
        for episode in range(episodes):
            states, actions, rewards = [], [], []
            state = self.env.reset()
            done = False
            while not done:
                action = self.act(state[None, :])
                next_state, reward, done, _ = self.env.step(action)
                states.append(state)
                actions.append(action)
                rewards.append(reward)
                state = next_state
            returns = self.calculate_returns(rewards)
            self.update_policy(states, actions, returns)
            print(f"Episode: {episode}, Reward: {sum(rewards)}")

    def calculate_returns(self, rewards):
        returns = []
        R = 0
        for r in reversed(rewards):
            R = r + self.gamma * R
            returns.insert(0, R)
        return returns

    def update_policy(self, states, actions, returns):
        states = np.array(states)
        actions = np.array(actions)
        returns = np.array(returns)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        for _ in range(self.epochs):
            indices = np.random.choice(len(states), size=self.batch_size)
            batch_states, batch_actions, batch_returns = states[indices], actions[indices], returns[indices]
            old_probs = self.model.predict(batch_states)
            new_probs = self.model.fit(batch_states, batch_actions, epochs=1, verbose=0)
            ratios = new_probs / old_probs
            clipped_ratios = np.clip(ratios, 1 - self.clip_ratio, 1 + self.clip_ratio)
            loss = -np.mean(np.minimum(ratios * batch_returns, clipped_ratios * batch_returns))
            self.model.optimizer.lr = self.learning_rate
            self.model.optimizer.minimize(loss)

if __name__ == "__main__":
    env = gym.make('CartPole-v1')
    agent = PPOAgent(env)
    agent.train(episodes=1000)

Soft Actor Critic

In [None]:
import gym
import numpy as np
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam

class SACAgent:
    def __init__(self, env):
        self.env = env
        self.state_size = env.observation_space.shape[0]
        self.action_size = env.action_space.n
        self.gamma = 0.99
        self.tau = 0.005
        self.alpha = 0.2
        self.learning_rate = 0.001
        self.buffer_size = 100000
        self.batch_size = 64
        self.policy_net, self.q1_net, self.q2_net, self.target_q1_net, self.target_q2_net = self.build_networks()
        self.replay_buffer = deque(maxlen=self.buffer_size)

    def build_networks(self):
        policy_net = self.build_model()
        q1_net = self.build_model()
        q2_net = self.build_model()
        target_q1_net = self.build_model()
        target_q2_net = self.build_model()
        target_q1_net.set_weights(q1_net.get_weights())
        target_q2_net.set_weights(q2_net.get_weights())
        return policy_net, q1_net, q2_net, target_q1_net, target_q2_net

    def build_model(self):
        model = Sequential()
        model.add(Dense(64, input_dim=self.state_size, activation='relu'))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model

    def act(self, state):
        probs = self.policy_net.predict(state)[0]
        action = np.random.choice(self.action_size, p=probs)
        return action

    def remember(self, state, action, reward, next_state, done):
        self.replay_buffer.append((state, action, reward, next_state, done))

    def train(self, episodes):
        for episode in range(episodes):
            state = self.env.reset()
            done = False
            while not done:
                action = self.act(state[None, :])
                next_state, reward, done, _ = self.env.step(action)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                if len(self.replay_buffer) >= self.batch_size:
                    self.update_policy()
            print(f"Episode: {episode}, Reward: {reward}")

    def update_policy(self):
        batch = np.random.sample(self.replay_buffer, self.batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        q1_target = self.q1_net.predict(states)
        q2_target = self.q2_net.predict(states)
        q1_target[np.arange(self.batch_size), actions] = rewards + self.gamma * (1 - dones) * np.min(
            [self.target_q1_net.predict(next_states), self.target_q2_net.predict(next_states)], axis=0).max(axis=1)
        q2_target[np.arange(self.batch_size), actions] = rewards + self.gamma * (1 - dones) * np.min(
            [self.target_q1_net.predict(next_states), self.target_q2_net.predict(next_states)], axis=0).max(axis=1)
        self.q1_net.fit(states, q1_target, epochs=1, verbose=0)
        self.q2_net.fit(states, q2_target, epochs=1, verbose=0)
        q_value = np.min([q1_target, q2_target], axis=0).mean()
        policy_loss = -q_value
        self.policy_net.fit(states, actions, epochs=1, verbose=0)
        self.update_target_networks()

    def update_target_networks(self):
        q1_weights = self.q1_net.get_weights()
        q2_weights = self.q2_net.get_weights()
        target_q1_weights = self.target_q1_net.get_weights()
        target_q2_weights = self.target_q2_net.get_weights()
        for i in range(len(q1_weights)):
            target_q1_weights[i] = self.tau * q1_weights[i] + (1 - self.tau) * target_q1_weights[i]
            target_q2_weights[i] = self.tau * q2_weights[i] + (1 - self.tau) * target_q2_weights[i]
        self.target_q1_net.set_weights(target_q1_weights)
        self.target_q2_net.set_weights(target_q2_weights)

if __name__ == "__main__":
    env = gym.make('CartPole-v1')
    agent = SACAgent(env)
    agent.train(episodes=1000)

Trust Region Policy Optimization

In [None]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

class TRPOAgent:
    def __init__(self, env):
        self.env = env
        self.state_size = env.observation_space.shape[0]
        self.action_size = env.action_space.n
        self.gamma = 0.99
        self.delta = 0.01
        self.damping = 0.1
        self.learning_rate = 0.001
        self.policy_net, self.value_net = self.build_networks()

    def build_networks(self):
        policy_net = self.build_model()
        value_net = self.build_model()
        return policy_net, value_net

    def build_model(self):
        model = Sequential()
        model.add(Dense(64, input_dim=self.state_size, activation='relu'))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(self.action_size, activation='softmax'))
        model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=self.learning_rate))
        return model

    def act(self, state):
        probs = self.policy_net.predict(state)[0]
        action = np.random.choice(self.action_size, p=probs)
        return action

    def train(self, episodes):
        for episode in range(episodes):
            states, actions, rewards = [], [], []
            state = self.env.reset()
            done = False
            while not done:
                action = self.act(state[None, :])
                next_state, reward, done, _ = self.env.step(action)
                states.append(state)
                actions.append(action)
                rewards.append(reward)
                state = next_state
            returns = self.calculate_returns(rewards)
            advantages = self.calculate_advantages(states, returns)
            self.update_policy(states, actions, advantages)
            print(f"Episode: {episode}, Reward: {sum(rewards)}")

    def calculate_returns(self, rewards):
        returns = []
        R = 0
        for r in reversed(rewards):
            R = r + self.gamma * R
            returns.insert(0, R)
        return returns

    def calculate_advantages(self, states, returns):
        advantages = []
        for state in states:
            value = self.value_net.predict(state[None, :])[0]
            advantage = returns - value
            advantages.append(advantage)
        return np.array(advantages)

    def update_policy(self, states, actions, advantages):
        states = np.array(states)
        actions = np.array(actions)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        old_probs = self.policy_net.predict(states)
        new_probs = self.policy_net.fit(states, actions, epochs=1, verbose=0)
        ratios = new_probs / old_probs
        loss = -np.mean(ratios * advantages)
        grads = self.policy_net.optimizer.compute_gradients(loss)
        self.conjugate_gradient(grads, advantages)
        self.line_search(states, actions, advantages)

    def conjugate_gradient(self, grads, advantages):
        x = np.zeros_like(grads)
        r = grads.copy()
        p = r.copy()
        rdotr = np.sum(r * r)
        for i in range(10):
            Hp = self.hessian_vector_product(p, states, actions)
            alpha = rdotr / np.sum(p * Hp)
            x += alpha * p
            r -= alpha * Hp
            new_rdotr = np.sum(r * r)
            p = r + (new_rdotr / rdotr) * p
            rdotr = new_rdotr
        return x

    def hessian_vector_product(self, p, states, actions):
        feed_dict = {self.policy_net.input: states, self.policy_net.output: actions}
        grads = self.policy_net.optimizer.compute_gradients(self.policy_net.output, feed_dict=feed_dict)
        flat_grad_grad = self.conjugate_gradient(grads, p)
        return flat_grad_grad

    def line_search(self, states, actions, advantages):
        old_params = self.policy_net.get_weights()
        for step_frac in [0.5 ** i for i in range(10)]:
            new_params = old_params - step_frac * self.conjugate_gradient(states, actions, advantages)
            self.policy_net.set_weights(new_params)
            new_probs = self.policy_net.predict(states)
            ratio = new_probs / old_probs
            kl = np.mean(np.sum(ratio * np.log(ratio), axis=1))
            if kl < self.delta:
                return
        self.policy_net.set_weights(old_params)

if __name__ == "__main__":
    env = gym.make('CartPole-v1')
    agent = TRPOAgent(env)
    agent.train(episodes=1000)