In [None]:
import gymnasium as gym
import numpy as np
import tensorflow as tf
import random
from tensorflow import keras
from tensorflow.keras import layers
from collections import deque

In [None]:
#DQN-AGENT
class DQN:
    def __init__(self, input_shape, num_actions):
        self.input_shape = input_shape
        self.num_actions = num_actions
        self.replay_buffer = deque(maxlen=20000) #a high enough replay-buffer size is needed to prevent catastrophic forgetting (initial value was 100.000, but I reduced it to 20.000 to save ram)
        self.learning_rate = 0.001
        self.discount_factor = 0.99
        
        self.model = self.build_model()
        self.target_model = self.build_model()

    # build model
    def build_model(self):
        model = tf.keras.models.Sequential([
            layers.Dense(24, activation="relu", input_shape=self.input_shape), #32 units
            layers.Dense(24, activation="relu"), #32 units
            layers.Dense(self.num_actions, activation="linear")
        ])
        #compiles model with mean squared error loss and adam optimizer
        model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate))
        return model
    
    #agent takes action based on epsilon-greedy policy
    def act(self, state, epsilon = 0.0):
        if np.random.rand() < epsilon:
            return np.random.choice(self.num_actions)
        else:
            Q_values = self.model(state[np.newaxis])
            #Q_values = self.model.predict(state)
            return np.argmax(Q_values[0])
    
    #agent stores experience in replay buffer
    def remember(self, state, action, reward, next_state, done):
        self.replay_buffer.append((state, action, reward, next_state, done))

    #agent samples from replay buffer and updates Q-values
    def replay(self, batch_size):
        if len(self.replay_buffer) < batch_size:
            return
        
        samples = random.sample(self.replay_buffer, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*samples))
        
        #Q-values
        q_values = self.model(states)
        #next Q-values from target model
        next_q_values = self.target_model(next_states)
        max_next_q_values = np.max(next_q_values, axis=1)
        #calculate target Q-values using Bellman equation
        target_q_values = (rewards + (1 - dones) * self.discount_factor * max_next_q_values)

        # #quick fix for TypeError: 'tensorflow.python.framework.ops.EagerTensor' object does not support item assignment
        q_values = q_values.numpy()

        q_values[range(batch_size), actions] = target_q_values
        self.model.train_on_batch(states, q_values)

    #updates target model weights with model weights
    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())


In [None]:
#Preperation
keras.utils.disable_interactive_logging()
env = gym.make("CartPole-v0", render_mode="human")
num_actions = env.action_space.n
input_shape = env.observation_space.shape

#create agent
dqn = DQN(input_shape, num_actions)

In [None]:
#training loop

#epsilon
epsilon = 1.0
min_epsilon = 0.01
epsilon_decay = 0.995 #0.995

batch_size = 64 #32
num_episodes = 150 #500

target_update_counter = 0
#every x episode the target model is updated
target_update_freq = 8 #1


running_reward = []

for episode in range(num_episodes):
    state, _ = env.reset()
    done = False
    episode_reward = 0
    while not done:
        #env.render()
        action = dqn.act(state, epsilon)
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated or episode_reward > 200 #episode reward > 200 is probably unnecessary
        dqn.remember(state, action, reward, next_state, done)
        state = next_state
        episode_reward += reward
        dqn.replay(batch_size)
    
    target_update_counter += 1
    if target_update_counter % target_update_freq == 0:
        dqn.update_target_model()
        
    running_reward.append(episode_reward)
    epsilon = max(min_epsilon, epsilon * epsilon_decay)
    print(f"Episode: {episode} episode reward: {episode_reward} Epsilon: {epsilon}")

In [None]:
#load weights 
#NAME_TO_LOAD = "models/dqn_cartpole_freq1_200eps/weights.h5"

BATCH = 64
FREQ = 8
NR_EPISODE = 400
NAME_TO_LOAD = f"models/batch_size_{BATCH}/target_update_freq_{FREQ}/intermediate_results/episode_{NR_EPISODE}.h5"
dqn.model.load_weights(NAME_TO_LOAD)

In [None]:
#testing model 
test_rewards = []
for episode in range(20):
    state, _ = env.reset()
    done = False
    episode_reward = 0
    while not done:
        env.render()
        #with epsilon = 0.0 so the agent exploits the learned policy 
        action = dqn.act(state, 0.0)
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated or episode_reward > 200 #episode reward > is unnecessary
        state = next_state
        episode_reward += reward
    print(f"Test Episode: {episode} episode reward: {episode_reward}")
    test_rewards.append(episode_reward)

In [None]:
np.mean(test_rewards)

In [None]:
#save weights, rewards and model
NAME = "PLACEHOLDER_NAME"

dqn.model.save(f"models/{NAME}/weights.h5")
np.save(f"models/{NAME}/rewards.npy", running_reward)

#if test_rewards is not empty
# if test_rewards:
#     np.save(f"models/{NAME}/test_rewards.npy", test_rewards)

#save architecture, replay_buffer_length, learning rate, discount factor, batch_size, epsilon_decay, target_update_frequency
with open(f"models/{NAME}/info.txt", "w") as f:
    #write model summary and other details to file
    dqn.model.summary(print_fn=lambda x: f.write(x + '\n'))
    f.write(f"Replay buffer length: {len(dqn.replay_buffer)}\n")
    f.write(f"Learning rate: {dqn.learning_rate}\n")
    f.write(f"Discount factor: {dqn.discount_factor}\n")
    f.write(f"Batch size: {batch_size}\n")
    f.write(f"Epsilon decay: {epsilon_decay}\n")
    f.write(f"Target update frequency: {target_update_freq}\n")
    f.write(f"Episodes: {num_episodes}\n")

In [None]:
import matplotlib.pyplot as plt
#loads rewards and plots them together
def plot_rewards(names):
    for name in names:
        rewards = np.load(f"models/{name}/rewards.npy")
        plt.plot(rewards, label=name)
    plt.xlabel("Episode")
    plt.ylabel("Reward")
    plt.legend()
    plt.show()