#Setup colab

In [None]:
!pip -q install pyvirtualdisplay
!apt-get install -y -q xvfb ffmpeg
!pip -q install swig
!pip -q install gymnasium[box2d]

In [None]:
import gym
import gymnasium as gym
from gym import spaces
import numpy as np
from enum import Enum
from collections import deque
from typing import Tuple
import tensorflow as tf
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt

tf.get_logger().setLevel('ERROR')

In [None]:
# Uncomment if you want to use google drive

# from google.colab import drive
# drive.mount('/content/drive')

# Helper Functions

In [None]:
class EWA:
    def __init__(self, beta=0.9):
        self.beta = beta
        self.ewa = 0
        self.count = 0

    def update(self, value):
        self.count += 1
        self.ewa = self.beta * self.ewa + (1 - self.beta) * value

        # Correct the bias
        bias_correction = 1 - self.beta ** self.count
        return self.ewa / bias_correction

    def get_val(self):
        bias_correction = 1 - self.beta ** self.count
        return self.ewa / bias_correction if self.count > 0 else 0

# DQN Implementation

In [None]:
import tensorflow as tf
import numpy as np
import random
from collections import deque
from tensorflow.keras.models import load_model

class QNetwork(tf.keras.Model):
    def __init__(self, num_actions, state_size):
        super(QNetwork, self).__init__()
        self.state_size = state_size
        self.num_actions = num_actions

        self.fc1 = tf.keras.layers.Dense(32, input_dim=state_size, activation='relu', kernel_initializer="he_uniform")
        self.fc2 = tf.keras.layers.Dense(32, activation='relu', kernel_initializer="he_uniform")
        self.fc3 = tf.keras.layers.Dense(num_actions, kernel_initializer="he_uniform")

    def call(self, state):
        x = self.fc1(state)
        x = self.fc2(x)
        return self.fc3(x)

    def custom_build(self):
        self.fc1.build(self.state_size)
        self.fc2.build((None, 32))
        self.fc3.build((None, 32))

    def set_weights_from_tuple(self, weights_tuple):
        self.fc1.set_weights(weights_tuple[0])
        self.fc2.set_weights(weights_tuple[1])
        self.fc3.set_weights(weights_tuple[2])

class DQN:
    def __init__(self,
                 num_actions,
                 state_size,
                 learning_rate=0.01,
                 gamma=0.99,
                 tau=0.1,
                 epsilon=1,
                 epsilon_min=0.01,
                 epsilon_decay=0.995,
                 replay_memory_size=2000,
                 model_path=None):
        self.num_actions = num_actions
        self.state_size = state_size
        self.gamma = gamma
        self.tau = tau

        # Main and target networks
        if model_path:
           self.load_model_from_path(model_path)
        else:
            self.main_network = QNetwork(num_actions, self.state_size)
            self.target_network = QNetwork(num_actions, self.state_size)
            self.target_network.set_weights(self.main_network.get_weights())

        # Optimizer
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

        # Replay memory
        self.replay_memory = deque(maxlen=replay_memory_size)

        # Epsilon values for epsilon-greedy strategy
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay

    # Reuse a model from Google drive
    def load_model_from_path(self, path):
        loaded_model = load_model(path)
        w1 = loaded_model.layers[0].get_weights()
        w2 = loaded_model.layers[1].get_weights()
        w3 = loaded_model.layers[2].get_weights()

        self.main_network = QNetwork(self.num_actions, self.state_size)
        self.target_network = QNetwork(self.num_actions, self.state_size)
        self.main_network.custom_build()
        self.target_network.custom_build()

        self.target_network.set_weights_from_tuple((w1, w2, w3))
        self.main_network.set_weights_from_tuple((w1, w2, w3))

    def add_to_replay_memory(self, state, action, reward, next_state, done):
        self.replay_memory.append((state, action, reward, next_state, done))

    def get_action(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.num_actions)
        q_values = self.main_network(state)
        return np.argmax(q_values.numpy()[0])

    def replay(self, batch_size):
        if len(self.replay_memory) < batch_size:
            return

        # Sample a minibatch from the replay memory
        minibatch = random.sample(self.replay_memory, batch_size)

        for state, action, reward, next_state, done in minibatch:
            state = np.array(state)
            next_state = np.array(next_state)

            with tf.GradientTape() as tape:
                # Get Q values for current state
                q_values = self.main_network(state, training=True)

                # Get Q values for next state from target network
                next_q_values = self.target_network(next_state, training=True)
                max_next_q_values = np.amax(next_q_values, axis=1)

                # Update Q values for actions taken with the Bellman equation
                target_q_values = q_values.numpy()
                updates = reward + self.gamma * max_next_q_values * (1 - done)

                target_q_values[0, action] = updates

                # Calculate loss
                loss = tf.keras.losses.mean_squared_error(target_q_values, q_values)

            # Calculate gradients and update network weights
            grads = tape.gradient(loss, self.main_network.trainable_variables)
            self.optimizer.apply_gradients(zip(grads, self.main_network.trainable_variables))

        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def update_target_network(self):
        target_weights = self.target_network.get_weights()
        main_weights = self.main_network.get_weights()
        updated_weights = [self.tau * mw + (1 - self.tau) * tw for mw, tw in zip(main_weights, target_weights)]

        self.target_network.set_weights(updated_weights)

# New Simulation with Rendering

In [None]:
from pyvirtualdisplay import Display
import tensorflow as tf
import numpy as np
import random
import gym
import time
from collections import deque
from gym.wrappers.record_video import RecordVideo

run_training = True
should_create_videos = False
save_final_model = False

if should_create_videos:
    display = Display(visible=0, size=(1400, 900))
    display.start()
    env = gym.make('CartPole-v1', render_mode="rgb_array")
    env = RecordVideo(env, './video', episode_trigger = lambda episode_number: True)
else:
    env = gym.make('CartPole-v1')

state_size = env.observation_space.shape[0]
action_size = env.action_space.n
model_checkpoint_path = '/content/drive/My Drive/ML_RL_Study/cartpole_v1_dqn_model'

# Initialize DQN Agent
# dqn_agent = DQN(num_actions=action_size, state_size=state_size, learning_rate=0.003, gamma=0.99, tau=1, epsilon=0.01, epsilon_decay=0.995, model_path=model_checkpoint_path)
dqn_agent = DQN(num_actions=action_size, state_size=state_size, learning_rate=0.003, gamma=0.99, tau=1, epsilon_decay=0.995)

# Training parameters
num_episodes = 200
batch_size = 128
update_target_every = 10
threshold_reward = 200

ewa = EWA(beta=0.3)

# Training loop
for episode in range(num_episodes):
    start_time = time.time()

    state = env.reset()
    state = np.reshape(state, [1, state_size])
    total_reward = 0

    while True:
        # Choose action based on current state
        action = dqn_agent.get_action(state)

        # Take action, observe new state and reward
        next_state, reward, done, _ = env.step(action)
        next_state = np.reshape(next_state, [1, state_size])
        total_reward += reward

        # Store experience in replay memory
        if run_training:
            dqn_agent.add_to_replay_memory(state, action, reward, next_state, done)

        state = next_state

        # Learning
        if run_training and len(dqn_agent.replay_memory) > batch_size:
            dqn_agent.replay(batch_size)

        # End of episode
        if done:
            break

    # Update target network every fixed number of episodes
    if run_training and episode % update_target_every == 0:
        dqn_agent.update_target_network()

    ewa.update(total_reward)

    curr_ewa = ewa.get_val()

    end_time = time.time()
    duration = end_time - start_time
    print(f"Ep: {episode + 1}, Reward: {total_reward}, ewa: {curr_ewa}, epsilon: {dqn_agent.epsilon:.2f} time: {duration:.2f} secs")

    if run_training and curr_ewa > threshold_reward:
        break

env.close()

if save_final_model:
    dqn_agent.main_network.save(model_checkpoint_path)

# Show Video

In [None]:
from IPython import display as ipythondisplay
from IPython.display import HTML
from base64 import b64encode
import os

def show_video():
    # Change this index to show different videos
    video_index = 0

    mp4list = [f for f in os.listdir('./video') if f.endswith('.mp4')]
    if len(mp4list) > 0 and video_index < len(mp4list):
        mp4 = mp4list[video_index]
        video = open('./video/' + mp4, 'rb').read()
        encoded = b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{0}" type="video/mp4" />
                </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()