AI Programming - SW Lee

# Lab 08: Deep Q Learning Network (a.k.a DQN)
## Exercise: Cart Pole, Lunar Lander

### Prepare Library Packages

In [1]:
#import library
# Check if this code runs in Colab
RunningInCOLAB = 'google.colab' in str(get_ipython()) # If running in colab

if RunningInCOLAB:
    !pip install swig # A library that allows code written in C to run in Python
    !pip install gymnasium # for reinforcement learning
    !pip install gymnasium[box2d] # for box enviroment
    from tqdm.notebook import tqdm # to check progress
else: # if not running in colab
    from tqdm import tqdm # to check progress



In [2]:
#Import various libraries
import os # To use functions supported by the operating system
os.environ["KERAS_BACKEND"] = "tensorflow" # for tensorflow environment

import numpy as np # For useful array uses
import tensorflow as tf # Deep learning library
import keras # Deep learning library with tensorflow
import matplotlib.pyplot as plt #for visualizing

import gymnasium as gym #for reinforcement learning
from gymnasium import wrappers #for recoding video

from collections import deque # to use queue algorithm
import random # for randomizing

In [3]:
#Setting GPU
physical_devices = tf.config.list_physical_devices('GPU')# available GPU
print(physical_devices) # print
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True) #to setting GPU environment
except:
    print('GPU is not detected.')# cannot detect GPU

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [4]:
gym.__version__# gymnasium library version

'1.0.0'

### Select Environment

**Cart Pole**<br>
https://gymnasium.farama.org/environments/classic_control/cart_pole/

**Lunar Lander**<br>
https://gymnasium.farama.org/environments/box2d/lunar_lander/

In [5]:
# select evironment
# Discrete Action Space:    0 for Cartpole, 1 for LunarLander

SELECT_ENV = 0

In [6]:
#select environment
if SELECT_ENV == 0:
    env_name, res_prefix = 'CartPole-v1', 'cart' #set relevant environment and saved name
    max_episodes, max_ep_steps, goal_score = 400, 500, 450 # training parameter
    b_size, h_size = 128, 1000 # set batch_size and replay memory_size
    network_type, state_width, state_height, state_depth = 'dense', 0, 0, 0 # these values are not use in code
    kwargs = {'render_mode':'rgb_array'} # rendering is rgb
elif SELECT_ENV == 1:
    env_name, res_prefix = 'LunarLander-v3', 'lunD' #set relevant environment and saved name
    max_episodes, max_ep_steps, goal_score = 400, 1000, 200 # training parameter
    b_size, h_size = 128, 10000 # set batch_size and replay memory_size
    network_type, state_width, state_height, state_depth = 'dense', 0, 0, 0 # these values are not use in code
    kwargs = {'continuous':False, 'render_mode':'rgb_array'} # rendering is rgb
else: assert False, 'environment selection error' #can't find environment

def create_env():
    env = gym.make(env_name, **kwargs) # create environment
    return env

In [7]:
#define environment reset and one step
def env_reset(env):
    observation = env.reset()
    state = observation[0] if type(observation)==tuple else observation
    return state

def env_step(env, action):
    observation = env.step(action) #when one step progressed, observation have three value, state,reward,done
    state = observation[0]
    reward = observation[1]
    done = observation[2] or observation[3] if len(observation)>4 else observation[2]
    return state, reward, done

In [8]:
# environment reset and one step
env = create_env()
state = env_reset(env)
state, reward, done = env_step(env, env.action_space.sample())# use sample

### Check and Show Environment Variables

In [9]:
#set action_space and observation_space
action_shape = env.action_space.shape #action_space shape
action_space_type = type(env.action_space)

if action_space_type==gym.spaces.discrete.Discrete: #we use discrete action
    actn_space = 'DISCRETE'
    action_shape = (1,)
    action_dims = 1
    action_range = env.action_space.n
    num_actions = action_range  # number of actions is action range for DISCRETE actions
    action_batch_shape = (None, action_range) # for replaymemory
elif action_space_type==gym.spaces.box.Box: #don't use this space in code
    actn_space = 'CONTINUOUS'
    action_dims = action_shape[0]
    actn_uppr_bound = env.action_space.high[0]
    actn_lowr_bound = env.action_space.low[0]
    action_range = (actn_uppr_bound - actn_lowr_bound) # x0.5 for tanh output
    action_batch_shape = tuple([None]+[x for x in action_shape]) # for replaymemory
    num_actions = action_dims   # number of actions is action dimension for CONTINUOUS actions
else: assert False, 'other action space type are not supported'

observation_space_type = type(env.observation_space)
observation_shape = env.observation_space.shape #observation_space shape

if observation_space_type==gym.spaces.discrete.Discrete: #don't use this space in code
    observation_shape = (1,)
    num_states = env.observation_space.n
elif observation_space_type==gym.spaces.box.Box: #Cart Pole and Lunar Lander have this observation
    num_states = observation_shape[0]
else: print('observation space type error')

state_shape = observation_shape
state_batch_shape = tuple([None]+[x for x in observation_shape]) # for replaymemory

value_shape = (1,)
num_values = 1

In [10]:
#check action,observation,state,value
print('Action space ', action_space_type)
print('Action shape ', action_shape)
print('Action dimensions ', action_dims)
print('Action range ', action_range)
if action_space_type==gym.spaces.box.Box:
    print('Max Value of Action ', actn_uppr_bound)
    print('Min Value of Action ', actn_lowr_bound)
else: pass
print('Action batch shape ', action_batch_shape)

print('Observation space ', observation_space_type)
print('Observation shape ', observation_shape)
print('Size of State Space ', num_states)
print('State shape ', state_shape)
print('State batch shape ', state_batch_shape)

print('Vallue shape ', value_shape)
print('Value dimensions ', num_values)

Action space  <class 'gymnasium.spaces.discrete.Discrete'>
Action shape  (1,)
Action dimensions  1
Action range  2
Action batch shape  (None, 2)
Observation space  <class 'gymnasium.spaces.box.Box'>
Observation shape  (4,)
Size of State Space  4
State shape  (4,)
State batch shape  (None, 4)
Vallue shape  (1,)
Value dimensions  1


### Define and Initialize The Agent
### **Exercise:** Define Deep Q-network (TensorFlow)

A NN of three fully-connected layers is enough for classic control problems.<br>

**Parameters for layer definition are:**<br>
hiddens = (unit # for layer1, unit # for layer2),<br>
act_fn: activation function,<br>
out_fn: activation function for output layer, <br>
init_fn: kernel initialization function

In [11]:
#define DQNet function
def DQNet(hiddens, act_fn, out_fn, init_fn):    # hiddends = (layer1 units, layer2 units)
    inputs = keras.Input(shape=state_shape)  # input layer

    ### START CODE HERE ###

    l1 = tf.keras.layers.Dense(units=hiddens[0], activation=act_fn, kernel_initializer=init_fn)(inputs) # first fully connected layer
                                                                                                        #  (units=, activation=, kernel_initializer=)
    l2 = tf.keras.layers.Dense(units=hiddens[1], activation=act_fn, kernel_initializer=init_fn)(l1) # second fully connected layer

    outputs = tf.keras.layers.Dense(units=num_actions, activation=out_fn, kernel_initializer=init_fn)(l2) # output (third) layer

    ###  END CODE HERE  ###

    model = keras.Model(inputs=inputs, outputs=outputs, name='q_net')
    return model

def build_DQNet():
    model = DQNet(hiddens=(32,32), act_fn='relu', out_fn='linear', init_fn='he_uniform')
    return model

In [12]:
#test model and check inside
test_model = build_DQNet()
test_model.summary()
del test_model

**Model Summary:**

```
┏--------------------------------------┳-----------------------------┳-----------------┓
┃ Layer (type)                         ┃ Output Shape                ┃         Param # ┃
┡--------------------------------------╇-----------------------------╇-----------------┩
│ input_layer (InputLayer)             │ (None, 4 or 8)              │               0 │
├--------------------------------------┼-----------------------------┼-----------------┤
│ dense (Dense)                        │ (None, 32)                  │      160 or 288 │
├--------------------------------------┼-----------------------------┼-----------------┤
│ dense_1 (Dense)                      │ (None, 32)                  │           1,056 │
├--------------------------------------┼-----------------------------┼-----------------┤
│ dense_2 (Dense)                      │ (None, 2 or 4)              │       66 or 132 │
└--------------------------------------┴-----------------------------┴-----------------┘
```

The policy function in the Agent_Net is just for getting a SINGLE sample, not for training. So you have to set `training=False` if necessary.
Since the network takes batch format (i.e., `[batch,data]`), the input state should be given as `[None,...]` and the output should be taken as `[0,...]`.

### **Exercise:** Define Policy Functions

In [13]:
#agent(Q network, target network)
class Agent_Net:
    def __init__(self):
        self.policy_q = build_DQNet()                       # build policy network
        self.target_q = build_DQNet()                       # build target network
        self.target_update()                                # copy weights from policy to target

    def policy(self, state, epsilon, exploring):            # e-greedy policy if exploring==True
        state_input = tf.convert_to_tensor(state[None,...], dtype=tf.float32)   # make the state network-ready

        ### START CODE HERE ###

        if exploring:                                       # if e-greedy policy
            if tf.random.uniform(()) > epsilon:                         #  exploit if random>epsilon
                action_q = self.policy_q(state_input)                                 #   get actions for input state
                action = tf.math.argmax(action_q[0])                                   #   find an action for maximum q value
            else:                                           #  explore else
                action = tf.random.uniform((), minval=0, maxval=action_range, dtype=tf.int64)  #   random action
        else:                                               # else greedy policy (exploitation)
            action_q = self.policy_q(state_input)                                     #  get actions for input state
            action = tf.math.argmax(action_q[0])                                       #  find an action for maximum q value

        ###  END CODE HERE  ###

        return action.numpy()

    def target_update(self):
        self.target_q.set_weights(self.policy_q.get_weights())  # copy weights from policy network to target network
        return

In [14]:
# Check whether the implemented code works well
def test_policy(exploring):

    epsilon = 0.1
    state_input = tf.random.uniform((1,3)) # 1x3 state input
    action_range = 7

    def policy_q(x):    # pretending policy_q network
        x = tf.random.uniform((tf.shape(x)[0],action_range))
        return x

    ### START CODE HERE ###

    if exploring:                                                                       # if e-greedy policy
        if tf.random.uniform(()) > epsilon:                                             #  exploit if random>epsilon
            action_q = policy_q(state_input)                                            #   get actions for input state
            action = tf.math.argmax(action_q[0])                                        #   find an action for maximum q value
        else:                                                                           #  explore else
            action = tf.random.uniform((),minval=0,maxval=action_range,dtype=tf.int64)  #   random action
    else:                                                                               # else greedy policy (exploitation)
        action_q = policy_q(state_input)                                                #  get actions for input state
        action = tf.math.argmax(action_q[0])                                            #  find an action for maximum q value

    ###  END CODE HERE  ###

    return action

tf.random.set_seed(2) # Use consistent random values
for _ in range(10): print(test_policy(True).numpy(), ' ', end='')
for _ in range(10): print(test_policy(False).numpy(), ' ', end='')

5  0  6  5  1  3  1  4  5  0  3  1  6  0  1  2  0  3  6  4  

**Expected Outputs:**

```
5  0  6  5  1  3  1  4  5  0  3  1  6  0  1  2  0  3  6  4
```

### Define and Initialize Replay Memory

The replay memory (or replay buffer) is implemented with `deque`, which maintians the fixed number of elements by discarding the oldest element automatically.<br>
The inputs of the `put_experience` function are from environments and the outputs of the `get_batch` function are fed to the NN. Therefore the output types should be tensors.

In [15]:
#define replay memory
class ReplayMemory:
    def __init__(self, memory_size):
        self.experiences = deque(maxlen=memory_size)    # allocate replay memory
        self.num_episodes = 0                           # set number of episode to zero

    def put_experience(self, experience):               # put an experience into replay memory
        state, action, next_state, reward, not_done = experience
        self.experiences.append((state, action, next_state, reward, not_done)) #bundle and save
        return

    def get_batch(self, num_samples):                   # get a batch of randomly sampled experiences
        state_batch, next_state_batch, action_batch, reward_batch, not_done_batch = [], [], [], [], []

        sample_batch = random.sample(self.experiences, num_samples) # shuffle

        for sample in sample_batch: # Take each item out of the sample and save it to the list
            state, action, next_state, reward, not_done = sample
            state_batch.append(state)
            action_batch.append(action)
            next_state_batch.append(next_state)
            reward_batch.append(reward)
            not_done_batch.append(not_done)

        batch = (tf.convert_to_tensor(state_batch, dtype=tf.float32),
                tf.convert_to_tensor(action_batch, dtype=tf.int32),
                tf.convert_to_tensor(next_state_batch, dtype=tf.float32),
                tf.convert_to_tensor(reward_batch, dtype=tf.float32),
                tf.convert_to_tensor(not_done_batch, dtype=tf.float32))
        return batch #return tensor type

### **Exercise:** Initialize the replay memory

A single experience consists of `(state, action, next_state, reward, not_done)`. Be careful with **`not_done`**.

In [16]:
#initialize replay memory
def init_memory(mem, env, agent, num_samples):
    state = env_reset(env)                          # initialize environment
    for _ in range(num_samples):

        ### START CODE HERE ###

        action = agent.policy(state, epsilon=1.0, exploring=True) # get an action with the policy
        next_state, reward, done = env_step(env, action)                 # observe the environment reaction
        experience = (state, action, next_state, reward, not done)       # pack observations into experience tuple, done is converted to not done
        mem.put_experience(experience)                       # put the experience to replay memory
        state = env_reset(env) if done else next_state # set the next state (reset env if done)

        ###  END CODE HERE  ###

    return

In [17]:
#test replay memory
tf.random.set_seed(2) #extract random value
keras.utils.set_random_seed(2)
test_agent = Agent_Net() #target network
test_mem = ReplayMemory(4) #replay memory
test_env = create_env() #environment
test_state = test_env.reset(seed=3) #state
init_memory(test_mem, test_env, test_agent, 4) #initialize memory
# There is no difference between if statement and else statement.
# When the value is 1, the state has 8 dimensions, so extracting 8 seems to be meaningful as a branch.
if SELECT_ENV==1:
    print(test_mem.get_batch(4)[0][0][:4].numpy())
    print(test_mem.get_batch(4)[1].numpy())
    print(test_mem.get_batch(4)[2][0][:4].numpy())
    print(test_mem.get_batch(4)[3].numpy())
    print(test_mem.get_batch(4)[4].numpy())
else:
    print(test_mem.get_batch(4)[0][0][:4].numpy())
    print(test_mem.get_batch(4)[1].numpy())
    print(test_mem.get_batch(4)[2][0][:4].numpy())
    print(test_mem.get_batch(4)[3].numpy())
    print(test_mem.get_batch(4)[4].numpy())

# finishing work
del test_agent, test_mem
test_env.close()

[-0.04072088  0.18846463 -0.00277539 -0.32736924]
[1 0 1 0]
[-0.02550636 -0.00617038 -0.02836518 -0.04544564]
[1. 1. 1. 1.]
[1. 1. 1. 1.]


**Expected Outputs:**

```
[-0.04072088  0.18846463 -0.00277539 -0.32736924]
[1 0 1 0]
[-0.02550636 -0.00617038 -0.02836518 -0.04544564]
[1. 1. 1. 1.]
[1. 1. 1. 1.]
```
or
```
[-0.01505842  1.4191642  -0.76619244  0.17035668]
[1 0 1 0]
[-0.03796177  1.427054   -0.7755038   0.08992036]
[-1.0618883 -1.0290995 -1.9180926 -1.4797792]
[1. 1. 1. 1.]
```

### Learning Procedures

DQN trains the Q-network to minimize the difference:

$$ \delta_t = (r_{t} + \gamma \max_{\hat{a}_{t+1}} Q_T(s_{t+1}, \hat{a}_{t+1}; \theta_T)) - Q_P(s_t, a_t; \theta_P) $$

where $Q_T$ is a target network and $Q_P$ policy network.

Then the loss function is

$$ L(\theta_P) = \delta_t^2 $$

### **Exercise:** Define ONE step of Training Loop and Evaluation Loop

The following test has same codes as the `dqn_train` function except the last line.

In [18]:
#for test
def test_training():
    state_b = tf.random.uniform((3,4))
    action_b = tf.random.uniform((3,), minval=0, maxval=7, dtype=tf.int32)
    next_state_b = tf.random.uniform((3,4))
    reward_b = tf.random.uniform((3,))
    not_done_b = tf.random.uniform((3,))
    gamma = tf.random.uniform((1,))
    action_range = 7

    class test_agent:
        def __init__(self):
            pass
        def policy_q(self, x):
            return tf.reduce_sum(x, axis=-1, keepdims=True) # The way to obtain the q value through the state is through a Q network, but since this is a test, it is implemented as a simple sum.
        def target_q(self, x):
            return tf.reduce_sum(x, axis=-1, keepdims=True) # The way to obtain the q value through the state is through a target network, but since this is a test, it is implemented as a simple sum.

    agent = test_agent()

    ### START CODE HERE ###

    # get action probability with current (think of WHY!!) policy (b,a)
    curr_q = agent.policy_q(state_b) #calculate Q value

    # get action probability with target policy (b,a),
    # and then find the max Q value with it (b,)
    next_q = agent.target_q(next_state_b) #calculate Q value
    max_next_q = tf.reduce_max(next_q,axis=-1) # remove axis

    # calculate target reward (b,1)
    target_reward = reward_b + gamma * max_next_q * not_done_b

    # make one-hot actions (b,a) to filter out other actions
    action_v = tf.one_hot(action_b,depth=action_range)

    # make ground true labels for training (b,a)
    label_q = curr_q + (tf.expand_dims(target_reward,-1) - curr_q) * action_v
    ###  END CODE HERE  ###

    return label_q

In [19]:
# Check if q-learning is working well
tf.random.set_seed(2)

ans = np.array([[1.1889474 , 1.1889474 , 1.1889474 , 1.1889474 , 1.1889474 , 1.1889474 , 0.40853113],
                [2.713482  , 2.713482  , 2.4351463 , 2.713482  , 2.713482  , 2.713482  , 2.713482  ],
                [3.3669732 , 1.4427134 , 3.3669732 , 3.3669732 , 3.3669732 , 3.3669732 , 3.3669732 ]])
res = test_training()
print('Training lable test passed.') if np.allclose(res,ans) else print('Training lable test failed.')

Training lable test passed.


In [20]:
# train network
def dqn_train(agent, batch, config):
    state_b, action_b, next_state_b, reward_b, not_done_b = batch #load from batch
    gamma = config.gamma

    ### START CODE HERE ###

    # get action probability with current (think of WHY!!) policy (b,a)
    curr_q = agent.policy_q(state_b)

    # get action probability with target policy (b,a),
    # and then find the max Q value with it (b,)
    next_q = agent.target_q(next_state_b)
    max_next_q = tf.reduce_max(next_q,axis=-1)

    # calculate target reward (b,1)
    target_reward = reward_b + gamma * max_next_q * not_done_b

    # make one-hot actions (b,a) to filter out other actions
    action_v = tf.one_hot(action_b,depth=action_range)

    # make ground true labels for training (b,a)
    label_q = curr_q + (tf.expand_dims(target_reward,-1) - curr_q) * action_v

    # training with model.fit()
    logs = agent.policy_q.fit(state_b, label_q, epochs=1, verbose=0)

    ###  END CODE HERE  ###

    loss = logs.history['loss'][-1]
    return loss

In [21]:
# Evaluation after training
def evaluate_policy(env, agent, num_avg):

    total_reward = 0.0
    episodes_to_play = num_avg
    for i in range(episodes_to_play): # Play n episode and take the average
        state = env_reset(env)
        done = False
        episode_reward = 0.0
        while not done:

            ### START CODE HERE ###

            action = agent.policy(state,0,False)                              # get an action with policy
            next_state, reward, done = env_step(env,action)             # take action and observe outcomes

            ###  END CODE HERE  ###

            state = next_state
            episode_reward += reward
        total_reward += episode_reward
    average_reward = total_reward / episodes_to_play

    return average_reward

### Define Epsilon Function

This is an example of exponential decay epsilon function. One of easist epsilon decay functions is simply to multiply 0.9. You can define your own epsilon function.

In [22]:
# Exploration parameters for epsilon greedy strategy
class Epsilon:
    def __init__(self, max_episodes, decay_speed=1.0):
        self.explore_start = 1.0            # exploration probability at start
        self.explore_stop = 0.01            # minimum exploration probability
        self.decay_rate = decay_speed/max_episodes  # exp decay rate for exploration prob (10/max ≈ 0.99)
        self.episode_cnt = 0

    def get_epsilon(self):
        eps = (self.explore_stop
            + (self.explore_start - self.explore_stop) * tf.math.exp(-self.decay_rate * self.episode_cnt)) # Decrease epsilon gradually (early exploratory, late follow learned)
        self.episode_cnt += 1
        return eps

### Define and Initialize Hyperparameters


In [23]:
#set hyperparameter
class configuration:
    def __init__(self):
        self.gamma = 0.99   # discount rate
        self.lr = 2e-4      # learning rate

config = configuration()

In [24]:
#Replay memory size setting, batch size setting, related optimizer setting
max_steps = max_episodes * max_ep_steps
batch_size = b_size
memory_size = h_size

agent = Agent_Net()

memD = ReplayMemory(memory_size)
init_memory(memD, env, agent, memory_size) #replay memory init
epsF = Epsilon(max_episodes, 10.0) #epsilon setting
opt = tf.optimizers.Adam(learning_rate=config.lr, clipvalue=2.0)

agent.policy_q.compile(optimizer=opt, loss='mse', jit_compile=False) #precompile

### Define Main Training Loop


### **Exercise:** Complete Main Training Loop

In [25]:
#final main train code
#log
logs = keras.callbacks.History()
logs.history.update({'pi_loss':[]})
logs.history.update({'ereward':[]})
logs.history.update({'e-steps':[]})
logs.history.update({'vreward':[]})

# variables for simulation
num_episodes = 0
val_episodes = 2            # exit condition

# variables for episode logging
pi_loss = 0.0
#initial reward and loss value
loss_sum = 0.0
epis_steps = 0
epis_reward = 0.0
eval_reward = -float('inf')

# initialize training variables
epsilon = 1.0
next_state = None
done = True

pbar = tqdm(range(max_steps), bar_format='{l_bar}{bar:10}{r_bar}{bar:-10b}') #progress bar

for sim_steps in pbar:

    ### START CODE HERE ###

    state = env_reset(env) if done else next_state                        # get the current state
    action = agent.policy(state,epsilon,exploring=True)                   # find an action with e-greedy
    next_state, reward, done = env_step(env, action)                      # take action and observe outcomes

    experience = (state, action, next_state, reward, not done)            # pack observations into a new experience
    memD.put_experience(experience)                                       # put a new experience to replay buffer

    batch = memD.get_batch(batch_size)                                    # get a new batch from replay buffer
    step_pi_loss = dqn_train(agent, batch, config)                        # train DQN for a step

    ###  END CODE HERE  ###

    loss_sum += step_pi_loss                                # accumulate policy loss for a step
    epis_reward += reward                                   # accumulate reward for a step
    epis_steps += 1                                         # increase the number of steps for an episode

    # episode termination conditions
    if epis_steps>max_ep_steps: done = True

    # summarize episode
    if done:
        agent.target_update()                               # update target network whenever episode ends
        memD.num_episodes += 1                              # increase number of episode simulated
        epsilon = epsF.get_epsilon()                        # update decay epsilon value

        pi_loss = loss_sum / epis_steps                     # average policy loss for an episode

        pbar.set_postfix({'episode':num_episodes, 'loss':step_pi_loss, 'reward':eval_reward, 'steps':epis_steps, 'evaluating':val_episodes})
        eval_reward = evaluate_policy(env, agent, 1)        # evaluate policy one time

        #log append
        logs.history['pi_loss'].extend([pi_loss])
        logs.history['ereward'].extend([epis_reward])
        logs.history['e-steps'].extend([epis_steps])
        logs.history['vreward'].extend([eval_reward])

        loss_sum = 0.0
        epis_reward = 0.0
        epis_steps = 0
        num_episodes += 1

    else: pass

    pbar.set_postfix({'episode':num_episodes, 'loss':step_pi_loss, 'reward':eval_reward, 'steps':epis_steps})

    # coditions to stop simulation
    if eval_reward > goal_score:
        eval_reward = evaluate_policy(env, agent, val_episodes) # evaluate policy multiple times
    if eval_reward > goal_score: break
    if num_episodes > max_episodes: break

print('episodes:{0:5d}, loss:{1:7.5f}, val_reward {2:4.2f}'.format(num_episodes, pi_loss, eval_reward))
print('total steps:', sim_steps+1)


  0%|          | 0/200000 [00:00<?, ?it/s]

KeyboardInterrupt: 

### Plot Training Histories

In [None]:
# plot loss and accuracy
def plot_graphs(log_history, log_labels, graph_labels, graph_colors=['b-','g-']):
    num_graphs = len(log_labels)
    plt.figure(figsize=(5*num_graphs,4))
    for i in range(num_graphs):
        plt.subplot(1,num_graphs,i+1)
        plt.plot(log_history[log_labels[i]], graph_colors[i], label=graph_labels[i])
        plt.xlabel('episodes')
        plt.legend()
    plt.show()
    return

log_labels    = ['pi_loss', 'vreward']# Correlation between loss and reward
label_strings = ['loss', 'reward']
label_colors  = ['b-', 'g-'] # blue and green
plot_graphs(logs.history, log_labels, label_strings, label_colors)

### Evaluate the Agent

Since a single evaluation try often takes some time, evaluate the agent here to show the progress bar

In [None]:
#evaluate agent
evaluate_episodes = 20
sum_episode_rewards = 0.0
pbar = tqdm(range(evaluate_episodes))

for i in pbar:
    sum_episode_rewards += evaluate_policy(env, agent, 1) #Evaluate 20 episodes once each

env.close()

print('Evaluation Result:',  sum_episode_rewards/evaluate_episodes)

## Show How The Agent Works

In [None]:
#save video
env = create_env()
env = wrappers.RecordVideo(env, video_folder='./gym-results/', name_prefix=res_prefix)

eval_reward = evaluate_policy(env, agent, 1) #evaluate 1

print('Sample Total Reward:', eval_reward)

env.close()

In [None]:
#show video
from IPython.display import HTML
from base64 import b64encode

def show_video(video_path, video_width = 320):
  video_file = open(video_path, "r+b").read() #load saved video
  video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}" #encode,decode
  return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")#put link

show_video('./gym-results/' + res_prefix + '-episode-0.mp4')

(c) 2024 SW Lee