In [3]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dense, Activation
from tensorflow.keras.optimizers import SGD
from tensorflow.keras import activations
from tqdm import tqdm

def NN(env):

  input_size = env.observation_space.shape[0] + 1
  output_size = env.action_space.n - 1
  model = Sequential() 
  model.add(Dense(32, input_shape=(input_size,)))
  model.add(Activation(activations.relu))
  model.add(Dense(32, input_shape=(32,)))
  model.add(Activation(activations.relu))
  model.add(Dense(32, input_shape=(32,)))
  model.add(Activation(activations.relu))
  model.add(Dense(output_size))

  return model

def get_action(state, model, e):

    state1 = np.pad(state, (0,1), mode='constant', constant_values=(0)).reshape(1,-1)
    state2 = np.pad(state, (0,1), mode='constant', constant_values=(1)).reshape(1,-1)
    return np.random.choice([np.argmax([model(state1)[0], model(state2)[0]]), 0, 1],\
                            p=[1 - e, e / 2. , e / 2.])

def q_hat(model, state, action):
    
    inp = np.pad(state, (0,1), mode='constant', constant_values=(action)).reshape(1,-1)

    return model(inp)

# Based on algorithm 10.1
def train(env, model, optimizer, loss, n_per_episode=100, n_episodes = 1024, e = 0.1):

  for _ in tqdm(range(n_episodes)):
    s = env.reset()
    a = get_action(s, model, e)
    for i in range(n_per_episode):
      sn, reward, done, info = env.step(a)
      if done:
        with tf.GradientTape() as tape:
          err = loss(reward, q_hat(model,s, a))
        grads = tape.gradient(err, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
        break
      else:
        an = get_action(sn, model, e)

        with tf.GradientTape() as tape:
          err = loss(reward + q_hat(model, sn, an), q_hat(model, s, a))
        grads = tape.gradient(err, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        s = sn
        a = an
  return model


env = gym.make("CartPole-v0")
model = NN(env)
optimizer = SGD(learning_rate=0.01)
loss = tf.keras.losses.MeanSquaredError()
agent = train(env, model, optimizer, loss, n_per_episode=100, n_episodes=1024, e=0.05)

100%|██████████| 1024/1024 [06:56<00:00,  2.46it/s]


In [4]:
# Based on OpenAI specifications considered solved when the average reward is greater than or equal to
#195.0 over 100 consecutive trials.

avg_reward = 0
max_num_episodes = 200
trials = 100
for episodes in tqdm(range(trials)):
  obs = env.reset()
  for reward in range(max_num_episodes):
    action = get_action(obs, agent, e=0.0)
    obs, reward, done, info = env.step(action) 
    avg_reward += reward
    if done:
      break

print(f'Average reward: {avg_reward / trials}')
env.close()

100%|██████████| 100/100 [00:31<00:00,  3.16it/s]

Average reward: 200.0



