In [None]:
import gymnasium as gym
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
plt.style.use("seaborn-whitegrid")
plt.rc(
    "figure",
    autolayout=True,
    figsize=(12, 6
    
    ),
    titlesize=18,
    titleweight='bold',
)
plt.rc(
    "axes",
    labelweight="bold",
    labelsize="small",
    titleweight="bold",
    titlesize=16,
    titlepad=10,
)
plot_params = dict(
    color="0.75",
    style=".-",
    markeredgecolor="0.25",
    markerfacecolor="0.25",
    legend=True,
)

# Cart Pole

https://gymnasium.farama.org/environments/classic_control/cart_pole/

### 1. Random Actions:

**Objective**: Understand the environment and its dynamics.

**Task**:
- Initialize the environment.
- At each step, choose an action randomly.
- Observe the consequences of the action in terms of the next state, reward, and whether the episode has ended.

**Expected Outcome**: 
- The pole will likely fall quickly.
- Gain an intuitive understanding of the environment's dynamics.



In [None]:
env = gym.make('CartPole-v1', render_mode='rgb_array')

In [None]:
num_episodes = 50
episode_scores = {}

for episode in range(num_episodes):
    observation, _ = env.reset() 
    episode_reward = 0 
    while True:
        env.render()

        # actions are 0 or 1 (move left or right)
        action = env.action_space.sample()  

        # observation is a 4-tuple of floats: cart_position, cart_velocity, pole_angle, pole_angular_velocity 
        # info is usually(?) empty for cartpole
        observation, reward, terminated, truncated, info = env.step(action)
        
        episode_reward += reward  
        
        if terminated or truncated:
            episode_scores[f"episode_{episode}"] = episode_reward
            break

env.close()

In [None]:
mean_score = sum(episode_scores.values()) / len(episode_scores)
mean_score

In [None]:
# reward is +1 for every step, so the scores are just the episode lengths
plot_data = pd.DataFrame(episode_scores.values())
sns.lineplot(plot_data)
plt.title(f'Random actions mean score: {mean_score:.2f})')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.ylim(0,500)  # truncation happens at 500 steps
plt.show()

### 2. Tabular Q-learning (if state space is discretized):

**Objective**: Grasp the concept of value-based learning.

**Task**:
- Discretize the state space into bins.
- Create a Q-table with state-action pairs.
- Implement the Q-learning algorithm to update Q-values.
- Choose actions using an epsilon-greedy policy.

**Expected Outcome**:
- Initial unstable results, but with enough episodes and appropriate hyperparameters, you should see improvements in the agent's performance.



In [None]:
def discretize_state(observation, bins):
    
    """Discretize a continuous observation into discrete values."""
    # Define the range for each observation value
    state_bounds = [(-2.4, 2.4), (-3, 3), (-0.21, 0.21), (-3, 3)]  # OK for CartPole-v1?

    discrete_observation = []
    
    for i in range(len(observation)):
        # Clip the observation value within defined bounds
        obs = np.clip(observation[i], state_bounds[i][0], state_bounds[i][1])
        
        # Scale the observation to the range [0, 1]
        scaled_obs = (obs - state_bounds[i][0]) / (state_bounds[i][1] - state_bounds[i][0])
        
        # Discretize using the provided bins and append to the discrete state
        discrete_observation.append(min(int(scaled_obs * bins[i]), bins[i] - 1))
    
    return tuple(discrete_observation)

In [None]:
# Create the environment
env = gym.make('CartPole-v1')

In [None]:
# Define parameters
ALPHA = 0.1  # Learning rate
GAMMA = 0.99  # Discount factor
EPSILON = 0.1  # Exploration rate
BINS = [24, 24, 24, 24]  # Number of bins for discretization
NUM_EPISODES = 10_000

# Initialize Q-table with zeros
q_table = np.zeros(BINS + [env.action_space.n])

episode_scores = {}

# Q-learning algorithm
for episode in range(NUM_EPISODES):
    observation, _ = env.reset()
    discrete_state = discretize_state(observation, BINS)
    terminated = False
    truncated = False
    episode_reward = 0
    
    while not terminated and not truncated:
        # Epsilon-greedy action selection
        if np.random.uniform(0, 1) < EPSILON:
            action = env.action_space.sample()  # Explore
        else:
            action = np.argmax(q_table[discrete_state])  # Exploit

        # Take action and get new state and reward
        new_observation, reward, terminated, truncated, _ = env.step(action)
        new_discrete_state = discretize_state(new_observation, BINS)
        
        # Q-learning update
        if not terminated and not truncated:
            max_future_q = np.max(q_table[new_discrete_state])
            current_q    = q_table[discrete_state + (action,)]
            new_q        = (1 - ALPHA) * current_q + ALPHA * (reward + GAMMA * max_future_q)
            q_table[discrete_state + (action,)] = new_q

        else:
            episode_scores[f"episode_{episode}"] = episode_reward

        episode_reward += reward
        discrete_state = new_discrete_state

    if episode % 100 == 0:
        print(f"Episode: {episode}, Total Reward: {episode_reward}")

env.close()

In [None]:
mean_score = sum(episode_scores.values()) / len(episode_scores)
mean_score

In [None]:
plot_data = pd.DataFrame(episode_scores.values())
sns.lineplot(plot_data.values[0::100]) # plot every 100th value so it's not too crowded
plt.title(f'Tabular Q-learning (mean score: {mean_score:.2f}))')
plt.xlabel('Episode (x100)')
plt.ylabel('Score')
plt.ylim(0,500)  # truncation happens at 500 steps
plt.show()

### 3. Policy Gradient using Neural Networks:

**Objective**: Transition from tabular methods to function approximators like neural networks.

**Task**:
- Use a neural network to estimate the policy.
- Implement the REINFORCE algorithm or a similar vanilla policy gradient method.
- Update the policy based on the received rewards.

**Expected Outcome**:
- The agent will learn to balance the pole for longer durations.
- Understanding of how neural networks can be used in RL.



In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

In [None]:
# Hyperparameters
LEARNING_RATE = 0.01
EPISODES = 100

# Initialize environment and policy network
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
n_actions = env.action_space.n

In [None]:
inputs = keras.Input(shape=(state_size,))
x = Dense(24, activation='relu')(inputs)
x = Dense(24, activation='relu')(x)
outputs = Dense(n_actions, activation='softmax')(x)

model = keras.Model(inputs=inputs, outputs=outputs)

# we have a custom training loop which doesn't fit with the model.compile() pattern
optimizer = Adam(learning_rate=LEARNING_RATE)


In [None]:
episode_scores = {}
for episode in range(1, EPISODES + 1):
    state, _ = env.reset()
    terminated = False
    truncated = False
    episode_states, episode_actions, episode_rewards = [], [], []

    while not terminated and not truncated:
        # Forward pass
        action_prob = model.predict(state.reshape(1, -1), verbose=0)[0]
        action = np.random.choice(n_actions, p=action_prob)

        # Take action
        next_state, reward, terminated, truncated, _ = env.step(action)

        # Store state, action and reward
        episode_states.append(state)
        episode_actions.append(action)
        episode_rewards.append(reward)

        state = next_state

    episode_scores[f"episode_{episode}"] = np.sum(episode_rewards)

    # Compute discounted rewards
    discounted_rewards = []
    cumulative_reward = 0
    for reward in reversed(episode_rewards):
        cumulative_reward = reward + 0.99 * cumulative_reward
        discounted_rewards.insert(0, cumulative_reward)
    discounted_rewards = np.array(discounted_rewards)
    discounted_rewards = (discounted_rewards - np.mean(discounted_rewards)) / (np.std(discounted_rewards) + 1e-9)

    # Compute loss values and perform a gradient step
    with tf.GradientTape() as tape:
        tape.watch(model.trainable_variables)
        probs = model(np.vstack(episode_states))
        indices = tf.range(0, tf.shape(probs)[0]) * tf.shape(probs)[1] + episode_actions
        chosen_probs = tf.gather(tf.reshape(probs, [-1]), indices)
        loss = -tf.reduce_mean(tf.math.log(chosen_probs) * discounted_rewards)

    # Compute gradients and update model weights
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    print(f"Episode: {episode}, Total Reward: {np.sum(episode_rewards)}")

env.close()


In [None]:
mean_score = sum(episode_scores.values()) / len(episode_scores)
mean_score

In [None]:
plot_data = pd.DataFrame(episode_scores.values())
sns.lineplot(plot_data.values) 
plt.title(f'Policy gradient using NN (mean score: {mean_score:.2f}))')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.ylim(0,500)  # truncation happens at 500 steps
plt.show()

In [None]:
import time 

# run the trained model
env = gym.make('CartPole-v1', render_mode='human')
state, _ = env.reset()
state = np.reshape(state, [1, 4])

terminated = False
truncated = False

while not terminated and not truncated:
    env.render()

    action_prob = model.predict(state)
    action = np.argmax(action_prob[0])

    next_state, reward, terminated, truncated, _ = env.step(action)
    next_state = np.reshape(next_state, [1, 4])
    state = next_state

    # Adding a small sleep to make the rendering smoother
    time.sleep(0.01)

# Close the environment
env.close()

### 4. Deep Q-learning:

**Objective**: Learn how to use deep neural networks in value-based methods.

**Task**:
- Use a neural network as a Q-function approximator.
- Implement the DQN algorithm with experience replay and target networks.

**Expected Outcome**:
- Improved stability compared to simple Q-learning due to the use of neural networks and experience replay.
- Understanding of challenges like overestimation bias in Q-learning and the need for techniques like target networks.



In [None]:
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
import gymnasium as gym
import random


In [None]:
# Hyperparameters
LEARNING_RATE = 0.001
GAMMA = 0.99
MEMORY_SIZE = 10_000
BATCH_SIZE = 32
EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.01
EXPLORATION_DECAY = 0.995

NUM_EPISODES = 100

In [None]:
# Create the Q-network model
input = layers.Input(shape=(4,))
x = layers.Dense(24, activation='relu')(input)
x = layers.Dense(24, activation='relu')(x)
output = layers.Dense(2, activation='linear')(x)

model = keras.Model(inputs=input, outputs=output)

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss='mse'
)


In [None]:
# Initialize other variables
memory = []
exploration_rate = EXPLORATION_MAX

In [None]:
# Initialize environment
env = gym.make('CartPole-v1')

In [None]:
episode_scores = {}
for episode in range(1, NUM_EPISODES+1):  
    state, _ = env.reset()
    state = np.reshape(state, [1, 4])
    terminated = False
    truncated = False
    episode_reward = 0

    while not terminated and not truncated:
        # Epsilon-greedy action selection
        if np.random.rand() < exploration_rate:
            action = random.randrange(2)
        else:
            q_values = model.predict(state)
            action = np.argmax(q_values[0])

        next_state, reward, terminated, truncated, _ = env.step(action)
        next_state = np.reshape(next_state, [1, 4])
        episode_reward += reward

        # Store experience in memory
        memory.append((state, action, reward, next_state, terminated, truncated))
        if len(memory) > MEMORY_SIZE:
            memory.pop(0)

        state = next_state

        # Train the Q-network
        if len(memory) >= BATCH_SIZE:
            minibatch = random.sample(memory, BATCH_SIZE)
            for state, action, reward, next_state, terminated, truncated in minibatch:
                if terminated or truncated:
                    q_update = reward
                else:
                    q_values_next = model.predict(next_state, verbose=0)
                    q_update = reward + GAMMA * np.max(q_values_next)
                q_values = model.predict(state, verbose=0)
                q_values[0][action] = q_update
                model.fit(state, q_values, verbose=0)

    episode_scores[f"episode_{episode}"] = episode_reward
    print(f"Episode: {episode}, Total Reward: {episode_reward}")

    # Exploration rate decay
    if exploration_rate > EXPLORATION_MIN:
        exploration_rate *= EXPLORATION_DECAY

    # You can add code here to print episode statistics, save models, etc.

env.close()


In [None]:
mean_score = sum(episode_scores.values()) / len(episode_scores)
mean_score

In [None]:
plot_data = pd.DataFrame(episode_scores.values())
sns.lineplot(plot_data.values) 
plt.title(f'Policy gradient using Deep Q-Learning (mean score: {mean_score:.2f}))')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.ylim(0,500)  # truncation happens at 500 steps
plt.show()

### 5. Advanced DQN Variants:

**Objective**: Dive deeper into challenges and improvements in value-based deep RL.

**Task**:
- Explore algorithms like Double DQN, Dueling DQN, and Prioritized Experience Replay.
- Integrate these techniques into your DQN implementation.

**Expected Outcome**:
- Improved performance and stability.
- Comprehensive understanding of challenges in deep Q-learning and the methodologies to mitigate them.



#### Double DQN

In [None]:
import numpy as np
import random
import gym
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from collections import deque


In [None]:
# Hyperparameters
GAMMA = 0.99
LEARNING_RATE = 0.001
MEMORY_SIZE = 10000
BATCH_SIZE = 64
EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.01
EXPLORATION_DECAY = 0.995
EPISODES = 500


In [None]:
# Initialize environment
env = gym.make("CartPole-v1")
observation_space = env.observation_space.shape[0]
action_space = env.action_space.n


In [None]:
# Initialize memory and models
memory = deque(maxlen=MEMORY_SIZE)
model = Sequential([Dense(24, input_shape=(observation_space,), activation="relu"),
                    Dense(24, activation="relu"),
                    Dense(action_space, activation="linear")])
model.compile(optimizer="adam", loss="mse")




In [None]:
target_model = Sequential([Dense(24, input_shape=(observation_space,), activation="relu"),
                           Dense(24, activation="relu"),
                           Dense(action_space, activation="linear")])
target_model.compile(optimizer="adam", loss="mse")

exploration_rate = EXPLORATION_MAX

In [None]:
# Training loop
episode_scores = {}
for episode in range(EPISODES):
    state, _ = env.reset()
    state = np.reshape(state, [1, observation_space])
    terminated = False
    truncated = False
    
    while not terminated and not truncated:
        if np.random.rand() < exploration_rate:
            action = random.randrange(action_space)
        else:
            q_values = model.predict(state)
            action = np.argmax(q_values[0])
        
        next_state, reward, terminated, truncated, _ = env.step(action)
        next_state = np.reshape(next_state, [1, observation_space])

        memory.append((state, action, reward, next_state, terminated, truncated))

        state = next_state

        if len(memory) < BATCH_SIZE:
            continue

        # Double DQN Logic
        minibatch = random.sample(memory, BATCH_SIZE)
        for state, action, reward, next_state, terminated, truncated in minibatch:
            if done:
                q_update = reward
            else:
                q_values = model.predict(next_state)
                best_action = np.argmax(q_values[0])
                target_q_values = target_model.predict(next_state)
                q_update = reward + GAMMA * target_q_values[0][best_action]
            
            q_values = model.predict(state)
            q_values[0][action] = q_update
            model.fit(state, q_values, verbose=0)

    target_model.set_weights(model.get_weights())
    exploration_rate *= EXPLORATION_DECAY
    exploration_rate = max(EXPLORATION_MIN, exploration_rate)

    episode_scores[f"episode_{episode}"] = episode_reward
    print(f"Episode: {episode}, Total Reward: {episode_reward}")

env.close()

In [None]:
mean_score = sum(episode_scores.values()) / len(episode_scores)
mean_score

In [None]:
plot_data = pd.DataFrame(episode_scores.values())
sns.lineplot(plot_data.values) 
plt.title(f'Double SQN model (mean score: {mean_score:.2f}))')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.ylim(0,500)  # truncation happens at 500 steps
plt.show()

### 6. Actor-Critic Methods:

**Objective**: Combine the benefits of value-based and policy-based methods.

**Task**:
- Implement a basic Actor-Critic model.
- Extend it to methods like Deep Deterministic Policy Gradient (DDPG) or Proximal Policy Optimization (PPO), even though these might be overkill for CartPole.

**Expected Outcome**:
- A balanced understanding of how value and policy methods can be combined.
- Familiarity with advanced RL algorithms.


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
import gymnasium as gym

In [None]:
# Define constants
EPISODES = 100
GAMMA = 0.99
LEARNING_RATE = 0.001

In [None]:
# Environment setup
env = gym.make('CartPole-v1')

state_size = env.observation_space.shape[0]
n_actions = env.action_space.n

In [None]:
# Define the Actor model
input_state = layers.Input(shape=(state_size,))
x = layers.Dense(24, activation='relu')(input_state)
output_probs = layers.Dense(n_actions, activation='softmax')(x)

actor = Model(inputs=input_state, outputs=output_probs)

actor.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
              loss='categorical_crossentropy')

In [None]:
# Define the Critic model
x = layers.Dense(24, activation='relu')(input_state)
value = layers.Dense(1, activation='linear')(x)

critic = Model(inputs=input_state, outputs=value)

critic.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
               loss='mean_squared_error')

In [None]:
# Training loop
episode_scores = {}
for episode in range(1, EPISODES + 1):
    state, _ = env.reset()
    terminated = False
    truncated = False
    episode_reward = 0
    while not terminated and not truncated:
        episode_reward += 1
        action_prob = actor.predict(state.reshape(1, -1), verbose=0)
        action = np.random.choice(n_actions, p=action_prob.ravel())
        next_state, reward, terminated, truncated, _ = env.step(action)

        target = reward + GAMMA * critic.predict(next_state.reshape(1, -1), verbose=0) * (not (terminated or truncated))
        advantage = target - critic.predict(state.reshape(1, -1), verbose=0)
        
        # Update Critic
        critic.fit(state.reshape(1, -1), target, verbose=0)
        
        # Update Actor
        action_one_hot = np.zeros(n_actions)
        action_one_hot[action] = 1
        actor.fit(state.reshape(1, -1), action_one_hot.reshape(1, -1) * advantage, verbose=0)

        state = next_state

    episode_scores[f"episode_{episode}"] = episode_reward
    print(f"Episode: {episode}, Total Reward: {episode_reward}")

env.close()


In [None]:
mean_score = sum(episode_scores.values()) / len(episode_scores)
mean_score

In [None]:
plot_data = pd.DataFrame(episode_scores.values())
sns.lineplot(plot_data.values) 
plt.title(f'Actor-Critic model (mean score: {mean_score:.2f}))')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.ylim(0,500)  # truncation happens at 500 steps
plt.show()