In [None]:
!pip install optuna==3.2.0
!pip install gymnasium==0.28.1
!pip install renderlab!pip install keras==2.13.1!pip install tensorflow==2.13.0rc2!pip install tensorflow-estimator==2.13.0!pip install tensorflow-macos==2.13.0rc2!pip install tensorboard==2.13.0

In [None]:
from typing import Tuple, List, Callable, Union, Optional, Dict
import numpy as np
import os
import pathlib
import imageio
from pathlib import Path
import pandas as pdbb
import gymnasium as gym
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from pathlib import Path
from gym.wrappers import RecordVideo
import datetime
import tensorflow as tf
import numpy as np
from tensorflow import keras
from collections import deque
import time
import random
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import os
import uuid
import renderlab as rl
import gc

In [None]:
def evaluate(
    agent,
    env,
    n_episodes: int,
    observation_space_size: int
) -> Tuple[List, List]:

    # keep track of the reward and steps per episode
    reward_per_episode = []
    steps_per_episode = []

    for i in range(0, n_episodes):

        # reset and initial state from environment needs reshape to np array (tuple instead of array with dimensions 1,4)
        state = np.reshape(env.reset()[0], [1, observation_space_size])

        # initialize the cumulative steps and the rewards for this episode
        rewards = 0
        steps = 0

        # done = environment ended badly / truncated = environment ended after 500 episodes
        done = False
        truncated = False

        while not (done or truncated):

            # determine action based on the state
            action = agent.act(state)

            # take the action and observe the reward, new state, etc
            new_state, reward, done, truncated, info = env.step(action)

            # enchange the reward signal
            reward = enhance_reward_signal(new_state, reward, done, truncated, steps)

            # add the data from this step to the episode data
            rewards += reward
            steps += 1

            # reshape the new state
            state = np.reshape(new_state, [1, observation_space_size])

        reward_per_episode.append(rewards)
        steps_per_episode.append(steps)

    return reward_per_episode, steps_per_episode

In [None]:
# random agent
class RandomAgent:
    def __init__(self, env):
        self.env = env

    def act(self, state: np.array) -> int:
        return self.env.action_space.sample()

In [None]:
def enhance_reward_signal(next_state, reward, done, truncated, steps):
    if done:
        # this is bad and you did not manage to keep the pole in the air
        reward = -100
    elif truncated:
        # you probably reached the time limit
        reward = 100
    elif next_state[0] > 0.2:
        # the pole is getting to the border of the playground
        reward = -5
    elif next_state[0] < -0.2:
        # the pole is getting to the border of the playground
        reward = -5

    return reward

In [None]:
def record_random_agent():
    # trying the random agent first
    env = gym.make('CartPole-v1',render_mode = "rgb_array")
    env = rl.RenderFrame(env, "./output")

    observation_space_size = env.observation_space.shape[0]

    # create a random agent
    random_agent = RandomAgent(env)

    # evaluate the random agent
    rewards, steps = evaluate(random_agent, env, 5, observation_space_size)

    env.play()

    # evaluation results
    median_steps = np.median(steps)
    mean_steps = np.mean(steps)
    std_steps = np.std(steps)

    print(f'median steps = {median_steps}')
    print(f'std steps    = {std_steps}')
    print(f'mean steps   = {mean_steps}')

    return rewards, steps

In [None]:
def record_trained_agent(agent, env):
    env = rl.RenderFrame(env, "./output")

    observation_space_size = env.observation_space.shape[0]

    # evaluate the random agent
    rewards, steps = evaluate(agent, env, 1, observation_space_size)

    env.play()

In [None]:
# record a random agent
rewards, steps = record_random_agent()

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

fig, ax = plt.subplots(figsize = (10, 4))
ax.set_title("Steps")
pd.Series(steps).plot(kind='hist', bins=100)

plt.show()

In [None]:
import gc
from keras.callbacks import Callback

class MemoryClear(Callback):
    def on_epoch_end(self, epoch, logs=None):
        gc.collect()

In [None]:
memory_clear_callback = MemoryClear()

class DeepQAgent:

    def __init__(self, env, memory_size, gamma, exploration_decay, layer_size, batch_size, exploration_min, learning_rate, extra_intermediate_layers = 0, exploration_rate = 1.0):
        self.__env = env
        self.__state_size = 4                                           # we have an 4 numbers that represent our size and this will be the input for our neural net
        self.__action_size = 2                                          # we have 2 possible actions (push the card to the left or to the right)
        self.__memory = deque(maxlen=memory_size)                       # memory for storing our experiences
        self.__layer_size_1 = layer_size                                # nn layer 1 width
        self.__layer_size_2 = layer_size                                # nn layer 2 width
        self.__extra_intermediate_layers = extra_intermediate_layers    # extra intermediate layers?
        self.__learning_rate = learning_rate                            # learning rate
        self.__model = self._build_model()                              # our model
        self.__gamma = gamma                                            # discount rate
        self.__exploration_rate = exploration_rate                      # exploration rate
        self.__exploration_min = exploration_min                        # min value for exploration
        self.__exploration_decay = exploration_decay                    # decay in the exploration rate (moving from exploration to exploitation)
        self.__step_counter = 0                                         # keep track of the steps and update the target model after target_model_delay steps
        self.__sample_batch_size = batch_size                           # how much samples to we take for the learning step
        self.__uuid = str(uuid.uuid4())                                 # unique uid for the model


    def add_to_memory(self, state, action, reward, next_state, done):
        # add an experience to the memory
        self.__memory.append((state, action, reward, next_state, done))

    def disable_exploration(self):
        # disable exploration and the agent will always use the model to determine the optimal action
        self.__exploration_rate = 0
        self.__exploration_min = 0

    def get_agent_uuid(self):
        return self.__uuid

    def save_model(self):
        self.__model.save(f'model-{self.__uuid}.keras')

    def replay(self):

        if len(self.__memory) <= self.__sample_batch_size:
            # there is not enough data in the memory to take a batch of samples to learn from
            return
        else:
            # take a random batch of samples from the memory
            mini_batch = random.sample(self.__memory, self.__sample_batch_size)

            # create data structures for setting the batch data
            # 2 dimensions : each sample has a state
            current_state = np.zeros((self.__sample_batch_size, self.__state_size))
            # 2 dimensions : each sample has a next state
            next_state = np.zeros((self.__sample_batch_size, self.__state_size))
            # 2 dimensions : each sample has a target q value
            target_q_values = np.zeros((self.__sample_batch_size, self.__state_size))

            # 1 dimension : each sample has 1 action
            action = np.zeros(self.__sample_batch_size, dtype=int)
            # 1 dimension : each sample has 1 reward
            reward = np.zeros(self.__sample_batch_size)
            # 1 dimension : each sample has 1 boolean that indicates terminal state
            done = np.zeros(self.__sample_batch_size,dtype=bool)

            # fill the data structure with data from the memory
            for i in range(self.__sample_batch_size):
                current_state[i] = mini_batch[i][0]   # state
                action[i] = mini_batch[i][1]          # action
                reward[i] = mini_batch[i][2]          # reward
                next_state[i] = mini_batch[i][3]      # next_state
                done[i] = mini_batch[i][4]            # done

            # what is our current belief of the world?
            # what would we do if we didn't take the action?
            # (q-values for our current state before taking the action)
            target = self.__model.predict(current_state,verbose=0)

            # what are the q values for our next state (after taking the action)
            Qvalue_ns = self.__model.predict(next_state,verbose=0)

            # let's update the target with the new information (that was gained after taking the action)
            for i in range(self.__sample_batch_size):
                if done[i]:
                    # if it's a terminal state, then it's just the reward
                    target[i][action[i]] = reward[i]
                else:
                    # use the bellman equation principle to update the target information for the q value that you actually took
                    target[i][action[i]] = reward[i] + self.__gamma * (np.amax(Qvalue_ns[i]))

            # so now your target network output is updated and we tell the network to adjust it's weights and adapt to that new target (new q-value for the action that you took)
            self.__model.fit(current_state, target, batch_size=self.__sample_batch_size, epochs=1, verbose=0, callbacks=[memory_clear_callback])

            # decay the exploration rate
            if self.__exploration_rate > self.__exploration_min:
                self.__exploration_rate *= self.__exploration_decay


    def act(self, state: np.array) -> int:
        # the exploration rate will decrease over time
        if np.random.rand() < self.__exploration_rate:
            random_action = self.__env.action_space.sample()
            return random_action
        else:
            # get a model prediction
            actions = self.__model.predict(state, verbose=0)
            # get the output index with the highest value
            best_action = np.argmax(actions[0])
            return best_action

    def _build_model(self):
        model = Sequential()
        model.add(Dense(self.__layer_size_1, input_dim=self.__state_size, activation='relu'))
        model.add(Dense(self.__layer_size_2, activation='relu'))
        if (self.__extra_intermediate_layers > 0):
            for i in range(self.__extra_intermediate_layers + 1):
                model.add(Dense(self.__layer_size_2, activation='relu'))
        model.add(Dense(self.__action_size, activation='linear'))
        model.compile(loss='mse', optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=self.__learning_rate))
        return model

In [None]:
# train loop for the deep q agent
def train(agent, env, observation_space_size, n_episodes):
    # print('starting traing of agent ...')

    for i in range(0, n_episodes):
        # reset and reshape the state
        state = np.reshape(env.reset()[0], [1, observation_space_size])

        print(f'new train episode {i} on : {datetime.datetime.now()}')

        # initialize done (fail) and truncated (success)
        done = False
        truncated = False

        steps = 0

        # execute the garbase collector
        gc.collect()

        # clear all the memory
        keras.backend.clear_session()

        while not (done or truncated):

            steps += 1

            # take action based on the state
            action = agent.act(state)

            # observe the signal from the environment after the action
            next_state, reward, done, truncated, info = env.step(action)

            # enhance the signal and merge the truncated and done signals
            reward = enhance_reward_signal(next_state, reward, done, truncated, steps)

            if done:
                print(f'ended training episode with mistake and reward {reward} and step count {steps}')
            elif truncated:
                print(f'ended training episode with success and reward {reward} and step count {steps}')

            next_state = np.reshape(next_state, [1, observation_space_size])

            # add the observation to the memory
            agent.add_to_memory(state, action, reward, next_state, done)

            # replay observations from the memory and learn if needed
            agent.replay()

            # replace the current state
            state = next_state

In [None]:
# create an environment
env = gym.make('CartPole-v1',render_mode = "rgb_array")

# fetch properties from environment
observation_space_size = env.observation_space.shape[0]
action_space_size = env.action_space.n

print(f'observation space size : {observation_space_size}')
print(f'action space size      : {action_space_size}')

# TODO FILL IN
p_memory_size = 500                          # value from 500 - 5000
p_gamma = 0.9                                # value from 0.9 - 0.99
p_batch_size = 16                           # value from either [16,32,64,128]
p_exploration_decay = 0.9                     # value from 0.9 - 0.99
p_layer_size = 16                           # value from either [16,32,64,128,256]
p_exploration_min = 0.1                      # value from 0.0001 - 0.2
p_episodes = 150                             # value from 150 - 500
p_learning_rate = 0.001                         # value from 0.0001 - 0.001
p_extra_intermediate_layers = 0            # value from 0 - 1
p_episodes = 50                             # value from 100 - 1000

deep_agent = DeepQAgent(
    env,
    memory_size = p_memory_size,
    gamma = p_gamma,
    exploration_decay = p_exploration_decay,
    layer_size = p_layer_size,
    batch_size = p_batch_size,
    exploration_min = p_exploration_min,
    learning_rate = p_learning_rate,
    extra_intermediate_layers = p_extra_intermediate_layers,
)

print('train loop started')

# reset all
env.reset()

# train loop
train(deep_agent,env, observation_space_size,n_episodes=p_episodes)

# disable exploitation and only rely on the model for decisions
deep_agent.disable_exploration()

print('evaluation started')
rewards, steps = evaluate(deep_agent, env, 50, observation_space_size)

# evaluation results
median_steps = np.median(steps)
mean_steps = np.mean(steps)
std_steps = np.std(steps)

median_reward = np.median(rewards)
mean_reward = np.mean(rewards)
std_reward= np.std(rewards)


print(f'median reward = {median_reward}')
print(f'std reward    = {std_reward}')
print(f'mean reward   = {mean_reward}')

print(f'median steps = {median_steps}')
print(f'std steps    = {std_steps}')
print(f'mean steps   = {mean_steps}')

# record the agent
record_trained_agent(deep_agent,env)

# clear all the memory
keras.backend.clear_session()