In [1]:
import numpy as np
import pandas as pd
import gymnasium as gym
import tensorflow as tf
import matplotlib.pyplot as plt

In [2]:
env = gym.make("CartPole-v1")

In [3]:
EPSILON = 1.0
EPSILON_DECAY = 0.995
GAMMA = 0.99
TARGET_UPDATE_FREQ = 1000
BUFFER_SIZE = 100000
BATCH_SIZE = 64
NUM_EPISODES = 500
ACTIONS = env.action_space.n
OBSERVATIONS = env.observation_space.shape[0]

In [4]:
Q_network = tf.keras.models.Sequential([
    tf.keras.layers.Dense(64, activation="relu", input_shape=(OBSERVATIONS,)),
    tf.keras.layers.Dense(32, activation="relu"),
    tf.keras.layers.Dense(ACTIONS, activation="linear")
])

Target_network = tf.keras.models.clone_model(Q_network)

optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

In [5]:
def greedy_policy(state, epsilon=0.0):
    if np.random.random() <= epsilon:
        return env.action_space.sample()
    else:
        state = tf.expand_dims(state, axis=0)
        q_values = Q_network(state)
        return np.argmax(q_values.numpy()[0])

In [6]:
REPLAY_BUFFER = []


def add_transitions(state, action, reward, next_state, done):
    REPLAY_BUFFER.append((state, action, reward, next_state, done))
    if len(REPLAY_BUFFER) > BUFFER_SIZE:
        REPLAY_BUFFER.pop(0)


def sample_transitions(batch_size=16):
    random_indices = np.random.randint(low=0, high=len(REPLAY_BUFFER), size=batch_size)
    sampled_states = []
    sampled_actions = []
    sampled_rewards = []
    sampled_next_states = []
    sampled_dones = []

    for i in random_indices:
        state, action, reward, next_state, done = REPLAY_BUFFER[i]
        sampled_states.append(state)
        sampled_actions.append(action)
        sampled_rewards.append(reward)
        sampled_next_states.append(next_state)
        sampled_dones.append(done)

    return tf.convert_to_tensor(sampled_states), tf.convert_to_tensor(sampled_actions), tf.convert_to_tensor(sampled_rewards), tf.convert_to_tensor(sampled_next_states), tf.convert_to_tensor(sampled_dones)

In [7]:
# episode_rewards = []

# for episode in range(NUM_EPISODES):
#     state, info = env.reset()
#     done = False

#     total_reward = 0
#     episode_len = 0

#     while not done:
#         action = greedy_policy(state, EPSILON)
#         next_state, reward, terminated, truncated, info = env.step(action)
#         done = terminated or truncated

#         add_transitions(state, action, reward, next_state, done)
#         s_states, s_actions, s_rewards, s_next_states, s_dones = sample_transitions(BATCH_SIZE)
        
#         next_action_values = tf.reduce_max(Target_network(s_next_states), axis=1)
#         targets = tf.where(s_dones, s_rewards, s_rewards + GAMMA * next_action_values)

#         with tf.GradientTape() as tape:
#             q_values = Q_network(s_states)
#             action_indices = tf.stack([tf.range(BATCH_SIZE), s_actions], axis=1)
#             current_action_values = tf.gather_nd(q_values, action_indices)

#             loss = tf.keras.losses.MSE(targets, current_action_values)

#         grads = tape.gradient(loss, Q_network.trainable_variables)
#         optimizer.apply_gradients(zip(grads, Q_network.trainable_variables))

#         if episode_len % TARGET_UPDATE_FREQ == 0:
#             Target_network.set_weights(Q_network.get_weights())

#         state = next_state

#         total_reward += reward
#         episode_len += 1

#         done = terminated or truncated
    
#     episode_rewards.append(total_reward)
#     print(f"Episode {episode}: Total Reward: {total_reward}, Episode Length: {episode_len}, Epsilon: {EPSILON}")
#     EPSILON = max(0.01, EPSILON * EPSILON_DECAY)

# print("Training Completed!")
# Q_network.save("models/DQN_cartpole")
# env.close()

In [8]:
# plt.figure(figsize=(10, 5))
# plt.plot(episode_rewards, label="Total Reward per Episode", color='blue', alpha=0.6)
# plt.xlabel("Episode")
# plt.ylabel("Total Reward")
# plt.title("Learning Progress of DQN Agent")
# plt.legend()
# plt.grid()
# plt.show()

![image.png](attachment:image.png)

#### Optimizations
1. Reward function
2. Error clipping
3. Average Q Metric

In [9]:
def calculate_reward(state):
    if -0.5 <= state[0] <= 0.5 and -1 <= state[1] <= 1 and -0.07 <= state[2] <= 0.07 and -0.525 <= state[3] <= 0.525:
        # if the cart position and cart velocity is within the acceptable range (0.5, 1)
        # if the pole angle and angular velocity are within the acceptable range (4 degree, 15 percent)
        return 1.0
    else:
        return -1.0

In [10]:
initial_states = []

state, info = env.reset()
for i in range(20):
    initial_states.append(state)
    action = greedy_policy(state)
    state, reward, terminated, truncated, info = env.step(action)
    if truncated or terminated:
        break


def get_initial_q_values(states):
    q_values = Q_network(states)
    return tf.reduce_max(q_values, axis=1)

In [11]:
metrics = {"episode": [], "episode_length": [], "total_reward": [], "epsilon": [], "avg_q_value": []}
step_counter = 0

for episode in range(NUM_EPISODES):
    state, info = env.reset()
    done = False

    total_reward = 0
    episode_len = 0

    while not done:
        action = greedy_policy(state, EPSILON)
        next_state, _, terminated, truncated, info = env.step(action)
        reward = calculate_reward(next_state) # Updated reward is calculated based on the new state
        done = terminated or truncated

        add_transitions(state, action, reward, next_state, done)
        s_states, s_actions, s_rewards, s_next_states, s_dones = sample_transitions(BATCH_SIZE)
        
        next_action_values = tf.reduce_max(Target_network(s_next_states), axis=1)
        targets = tf.where(s_dones, s_rewards, s_rewards + GAMMA * next_action_values)

        with tf.GradientTape() as tape:
            q_values = Q_network(s_states)
            action_indices = tf.stack([tf.range(BATCH_SIZE), s_actions], axis=1)
            current_action_values = tf.gather_nd(q_values, action_indices)

            loss = tf.keras.losses.Huber()(targets, current_action_values) # Huber loss is used instead of MSE for error clipping

        grads = tape.gradient(loss, Q_network.trainable_variables)
        optimizer.apply_gradients(zip(grads, Q_network.trainable_variables))

        if step_counter % TARGET_UPDATE_FREQ == 0:
            Target_network.set_weights(Q_network.get_weights())

        state = next_state

        total_reward += reward
        episode_len += 1
        step_counter += 1

        done = terminated or truncated
    
    avg_q_metric = tf.reduce_mean(get_initial_q_values(tf.convert_to_tensor(initial_states))).numpy()
      
    metrics["episode"].append(episode)
    metrics["episode_length"].append(episode_len)
    metrics["total_reward"].append(total_reward)
    metrics["epsilon"].append(EPSILON)
    metrics["avg_q_value"].append(avg_q_metric)

    print(f"Episode {episode}: Total Reward: {total_reward}, Episode Length: {episode_len}, Epsilon: {EPSILON}, Avg. Q-Value: {avg_q_metric}")

    EPSILON = max(0.01, EPSILON * EPSILON_DECAY)
    metrics_df = pd.DataFrame(metrics)
    metrics_df.to_csv("models/DQN_cartpole_metrics.csv", index=False)

print("Training Completed!")
Q_network.save("models/DQN_cartpole")
env.close()

Episode 0: Total Reward: -13.0, Episode Length: 15, Epsilon: 1.0, Avg. Q-Value: -0.022024309262633324
Episode 1: Total Reward: -8.0, Episode Length: 12, Epsilon: 0.995, Avg. Q-Value: -0.027068622410297394
Episode 2: Total Reward: -2.0, Episode Length: 20, Epsilon: 0.990025, Avg. Q-Value: 0.30359989404678345
Episode 3: Total Reward: -8.0, Episode Length: 10, Epsilon: 0.985074875, Avg. Q-Value: 0.4769752323627472
Episode 4: Total Reward: -8.0, Episode Length: 18, Epsilon: 0.9801495006250001, Avg. Q-Value: 0.8164733648300171
Episode 5: Total Reward: -10.0, Episode Length: 14, Epsilon: 0.9752487531218751, Avg. Q-Value: 0.7484970092773438
Episode 6: Total Reward: -16.0, Episode Length: 40, Epsilon: 0.9703725093562657, Avg. Q-Value: 0.5414944291114807
Episode 7: Total Reward: -7.0, Episode Length: 13, Epsilon: 0.9655206468094844, Avg. Q-Value: 0.41424527764320374
Episode 8: Total Reward: -5.0, Episode Length: 17, Epsilon: 0.960693043575437, Avg. Q-Value: 0.3311136066913605
Episode 9: Total R

INFO:tensorflow:Assets written to: models/DQN_cartpole\assets


In [12]:
env = gym.make('CartPole-v1', render_mode="human")

Q_network = tf.keras.models.load_model("models/DQN_cartpole")

NUM_EPISODES = 10

for episode in range(NUM_EPISODES):
    total_reward = 0
    episode_len = 0
    
    state, info = env.reset()
    done = False

    while not done:
        q_values = Q_network(tf.expand_dims(state, axis=0))
        action = np.argmax(q_values.numpy()[0])
        state, reward, terminated, truncated, info = env.step(action)

        total_reward += reward
        episode_len += 1

        done = terminated or truncated

    print(f"Episode {episode}: Total Reward: {total_reward}, Episode Length: {episode_len}")
    
env.close()





Episode 0: Total Reward: 500.0, Episode Length: 500
Episode 1: Total Reward: 500.0, Episode Length: 500
Episode 2: Total Reward: 500.0, Episode Length: 500
Episode 3: Total Reward: 500.0, Episode Length: 500
Episode 4: Total Reward: 500.0, Episode Length: 500
Episode 5: Total Reward: 500.0, Episode Length: 500
Episode 6: Total Reward: 500.0, Episode Length: 500
Episode 7: Total Reward: 500.0, Episode Length: 500
Episode 8: Total Reward: 500.0, Episode Length: 500
Episode 9: Total Reward: 500.0, Episode Length: 500
