In [None]:
# @title Setup
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Is this notebook running on Colab or Kaggle?
IS_COLAB = "google.colab" in sys.modules
IS_KAGGLE = "kaggle_secrets" in sys.modules

if IS_COLAB or IS_KAGGLE:
    !apt update && apt install -y libpq-dev libsdl2-dev swig xorg-dev xvfb
    %pip install -U tf-agents pyvirtualdisplay
    %pip install -U gym~=0.21.0
    %pip install -U gym[box2d,atari,accept-rom-license]

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

if not tf.config.list_physical_devices('GPU'):
    print("No GPU was detected. CNNs can be very slow without a GPU.")
    if IS_COLAB:
        print("Go to Runtime > Change runtime and select a GPU hardware accelerator.")
    if IS_KAGGLE:
        print("Go to Settings > Accelerator and select GPU.")

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# To get smooth animations
import matplotlib.animation as animation
mpl.rc('animation', html='jshtml')

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "rl"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

[33m0% [Working][0m            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
[33m0% [Connecting to archive.ubuntu.com (91.189.91.82)] [Connecting to security.ubuntu.com (185.125.190[0m                                                                                                    Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Ign:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:6 https://r2u.stat.illinois.edu/ubuntu jammy Release
Hit:7 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu

In [None]:
import gym
import keras
import random
import numpy as np
import tensorflow as tf

from tensorflow import keras

from gym import Env
from gym import spaces

In [None]:
# game board values
NOTHING = 0
PLAYER = 1
WIN = 2
LOSE = 3

# action values
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3

In [None]:
class GameBoardEnv(Env):
    def __init__(self):
      # custom class variable used to display the reward earned
      self.cumulative_reward = 0
      #
      # set the initial state to a flattened 6x6 grid with a randomly
      # placed entry, win, and player
      #
      self.state = [NOTHING] * 36
      # self.player_position = random.randrange(0, 36)
      # self.win_position = random.randrange(0, 36)
      # self.lose_position = random.randrange(0, 36)
      self.player_position = 5
      self.win_position = 15
      self.lose_position = 30

      # make sure the player, win, and lose points aren't
      # overlapping each other
      while self.win_position == self.player_position:
          self.win_position = random.randrange(0, 36)
      while self.lose_position == self.win_position or self.lose_position == self.player_position:
          self.lose_position = random.randrange(0, 36)

      self.state[self.player_position] = PLAYER
      self.state[self.win_position] = LOSE
      self.state[self.lose_position] = WIN

      # convert the python array into a numpy array
      # (This is needed since Gym expects the state to be this way)
      self.state = np.array(self.state, dtype=np.int16)
      # observation space (valid ranges for observations in the state)
      self.observation_space = spaces.Box(0, 3, [36,], dtype=np.int16)

      # valid actions:
      #   0 = up
      #   1 = down
      #   2 = left
      #   3 = right
      # spaces.Discrete(4) is a shortcut for defining the actions 0-3
      self.action_space = spaces.Discrete(4)

    def step(self, action):
      # placeholder for debugging information
      info = {}
      # set default values for done, reward, and the player position
      #before taking the action
      done = False
      previous_position = self.player_position
      #
      # take the action by moving the player
      #
      # this section can be a bit confusing, but
      # just trust that they move the agent and prevent
      # it from moving off of the grid
      #
      if action == UP:
          if (self.player_position - 6) >= 0:
              self.player_position -= 6
      elif action == DOWN:
          if (self.player_position + 6) < 36:
              self.player_position += 6
      elif action == LEFT:
          if (self.player_position % 6) != 0:
              self.player_position -= 1
      elif action == RIGHT:
          if (self.player_position % 6) != 5:
              self.player_position += 1
      else:
          # check for invalid actions
          raise Exception("invalid action")
      #
      # check for win/lose conditions and set reward
      #
      if self.state[self.player_position] == WIN:
          reward = 1.0
          done = True

          # this section is for display purposes
          # clear_screen()
          self.cumulative_reward += reward
          print(f'Cumulative Reward: {self.cumulative_reward}, Result: WIN')

      elif self.state[self.player_position] == LOSE:
          reward = -1.0
          done = True

          # this section is for display purposes
          # clear_screen()
          self.cumulative_reward += reward
          print(f'Cumulative Reward: {self.cumulative_reward}, Result: LOSE')
      #
      # Update the environment state
      #
      if not done:
          reward = -0.1
          # update the player position
          self.state[previous_position] = NOTHING
          self.state[self.player_position] = PLAYER
          self.cumulative_reward += reward

      return self.state, reward, done, False, info

    def reset(self):
      self.cumulative_reward = 0
      #
      # set the initial state to a flattened 6x6 grid with a randomly
      # placed entry, win, and player
      #
      self.state = [NOTHING] * 36
      # self.player_position = random.randrange(0, 36)
      # self.win_position = random.randrange(0, 36)
      # self.lose_position = random.randrange(0, 36)

      self.player_position = 5
      self.win_position = 15
      self.lose_position = 30

      # make sure the entry and lose points aren't
      # overlapping each other
      while self.win_position == self.player_position:
          self.win_position = random.randrange(0, 36)
      while self.lose_position == self.win_position or self.lose_position == self.player_position:
          self.lose_position = random.randrange(0, 36)

      self.state[self.player_position] = PLAYER
      self.state[self.win_position] = WIN
      self.state[self.lose_position] = LOSE

      # convert the python array into a numpy array
      # (needed since Gym expects the state to be this way)
      self.state = np.array(self.state, dtype=np.int16)
      return self.state

    def render(self):
        pass

In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

env = GameBoardEnv()

obs = env.reset()

In [None]:
input_shape = env.observation_space.shape
n_outputs = env.action_space.n

model = tf.keras.Sequential([
    tf.keras.layers.Dense(36, activation="elu", input_shape=input_shape),
    tf.keras.layers.Dense(32, activation="elu"),
    tf.keras.layers.Dense(n_outputs)
])

In [None]:
def epsilon_greedy_policy(state, epsilon=0):
    # if np.random.rand() < epsilon:
        return np.random.randint(4)  # random action
    # else:
    #     Q_values = model.predict(state[np.newaxis], verbose=0)[0]
    #     return Q_values.argmax()  # optimal action according to the DQN

In [None]:
from collections import deque

replay_buffer = deque(maxlen=2000)

In [None]:
def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_buffer), size=batch_size)
    batch = [replay_buffer[index] for index in indices]
    return [
        np.array([experience[field_index] for experience in batch])
        for field_index in range(6)
    ]  # [states, actions, rewards, next_states, dones, truncateds]

In [None]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done, truncated, info = env.step(action)
    # if done and env.cumulative_reward > 0:
    replay_buffer.append((state, action, reward, next_state, done, truncated))
    return next_state, reward, done, truncated, info

In [None]:
batch_size = 35
discount_factor = 0.95
optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-2)
loss_fn = tf.keras.losses.mean_squared_error
# loss_fn = tf.keras.losses.hinge
# loss_fn = tf.keras.losses.categorical_crossentropy

def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones, truncateds = experiences
    next_Q_values = model.predict(next_states, verbose=0)
    max_next_Q_values = next_Q_values.max(axis=1)
    runs = 1.0 - (dones | truncateds)  # episode is not done or truncated
    target_Q_values = rewards + runs * discount_factor * max_next_Q_values
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q_values = model(states)
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [None]:
obs = env.reset()
# obs, info = env.reset(seed=42)
# env.reset()
np.random.seed(42)
tf.random.set_seed(42)

rewards = []
best_score = 0

In [None]:
for episode in range(5):
  obs = env.reset()
  for step in range(100):
    epsilon = max(1 - episode / 500, 0.01)
    obs, reward, done, truncated, info = play_one_step(env, obs, epsilon)
    if done or truncated:
      break

  print("\rEpisode: {}, Steps: {}, eps: {:.3f}".format(episode, step + 1, epsilon), end="") # Not shown
  print("\n") # Not shown

  rewards.append(env.cumulative_reward) # Not shown in the book
  if env.cumulative_reward >= best_score: # Not shown
      best_weights = model.get_weights() # Not shown
      best_score = env.cumulative_reward # Not shown

  if episode > 50:
      training_step(batch_size)

# model.set_weights(best_weights)

Cumulative Reward: -2.900000000000002, Result: WIN
Episode: 0, Steps: 40, eps: 1.000

Cumulative Reward: -0.30000000000000004, Result: WIN
Episode: 1, Steps: 14, eps: 0.998

Cumulative Reward: -2.0000000000000013, Result: WIN
Episode: 2, Steps: 31, eps: 0.996

Cumulative Reward: -1.600000000000001, Result: WIN
Episode: 3, Steps: 27, eps: 0.994

Cumulative Reward: -1.1000000000000005, Result: WIN
Episode: 4, Steps: 22, eps: 0.992



In [None]:
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 4))
plt.plot(rewards)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Sum of rewards", fontsize=14)
# save_fig("dqn_rewards_plot")
plt.show()