<a href="https://colab.research.google.com/github/mswang12/minDQN/blob/main/minDQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# A Minimal Deep Q-Network
We'll be showing how to code a minimal Deep Q-Network to solve the CartPole environment.

## Step 1. Import libraries and setup the environment

In [1]:
!pip show tensorflow
!pip show gymnasium

Name: tensorflow
Version: 2.16.1
Summary: TensorFlow is an open source machine learning framework for everyone.
Home-page: https://www.tensorflow.org/
Author: Google Inc.
Author-email: packages@tensorflow.org
License: Apache 2.0
Location: C:\Users\Mike\AppData\Local\Programs\Python\Python312\Lib\site-packages
Requires: tensorflow-intel
Required-by: 
Name: gymnasium
Version: 0.29.1
Summary: A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym).
Home-page: 
Author: 
Author-email: Farama Foundation <contact@farama.org>
License: MIT License
Location: C:\Users\Mike\AppData\Local\Programs\Python\Python312\Lib\site-packages
Requires: cloudpickle, farama-notifications, numpy, typing-extensions
Required-by: 


In [1]:
import gymnasium as gym
import tensorflow as tf
import numpy as np
from tensorflow import keras

from collections import deque
import time
import random

RANDOM_SEED = 5
tf.random.set_seed(RANDOM_SEED)

env = gym.make("FrozenLake-v1", is_slippery=False)
env.reset(seed=RANDOM_SEED)
# env.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

print("Action Space: {}".format(env.action_space))
print("State space: {}".format(env.observation_space))

Action Space: Discrete(4)
State space: Discrete(16)


## Step 2. Define the network architecture

In [2]:
# An episode a full game
train_episodes = 100
test_episodes = 30

def agent(state_shape, action_shape):
    """ The agent maps X-states to Y-actions
    e.g. The neural network output is [.1, .7, .05, 0.05, .05, .05]
    The highest value 0.7 is the Q-Value.
    The index of the highest action (0.7) is action #1.
    """
    learning_rate = 0.001
    init = tf.keras.initializers.HeUniform()
    model = keras.Sequential()
    model.add(keras.layers.Dense(24, input_shape=state_shape, activation='relu', kernel_initializer=init))
    model.add(keras.layers.Dense(12, activation='relu', kernel_initializer=init))
    model.add(keras.layers.Dense(action_shape, activation='linear', kernel_initializer=init))
    model.compile(loss=tf.keras.losses.Huber(), optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), metrics=['accuracy'])
    return model

def get_qs(model, state, step):
    return model.predict(state.reshape([1, state.shape[0]]))[0]


## Step 3. Define the train function using Experience Replay

In [3]:
def train(env, replay_memory, model, target_model, done):
    learning_rate = 0.7 # Learning rate
    discount_factor = 0.618

    MIN_REPLAY_SIZE = 1000
    if len(replay_memory) < MIN_REPLAY_SIZE:
        return

    batch_size = 64 * 2
    mini_batch = random.sample(replay_memory, batch_size)
    current_states = np.array([transition[0] for transition in mini_batch])
    current_qs_list = model.predict(current_states, verbose=0)
    new_current_states = np.array([transition[3] for transition in mini_batch])
    future_qs_list = target_model.predict(new_current_states, verbose=0)

    X = []
    Y = []
    for index, (observation, action, reward, new_observation, done) in enumerate(mini_batch):
        if not done:
            max_future_q = reward + discount_factor * np.max(future_qs_list[index])
        else:
            max_future_q = reward

        current_qs = current_qs_list[index]
        current_qs[action] = (1 - learning_rate) * current_qs[action] + learning_rate * max_future_q

        X.append(observation)
        Y.append(current_qs)
    model.fit(np.array(X), np.array(Y), batch_size=batch_size, verbose=0, shuffle=True)


## Step 4. Run the Deep Q-Network Algorithm

In [4]:
def one_hot_encode(position, num_states):
  encoded = np.zeros(num_states)
  encoded[position] = 1
  return encoded

In [33]:

def main():
    epsilon = 1 # Epsilon-greedy algorithm in initialized at 1 meaning every step is random at the start
    max_epsilon = 1 # You can't explore more than 100% of the time
    min_epsilon = 0.01 # At a minimum, we'll always explore 1% of the time
    decay = 0.01

    # 1. Initialize the Target and Main models
    
    shape = (env.observation_space.n,)
    
    # Main Model (updated every 4 steps)
    model = agent(shape, env.action_space.n)
    
    model.summary()
    
    # Target Model (updated every 100 steps)
    target_model = agent(shape, env.action_space.n)
    target_model.set_weights(model.get_weights())

    replay_memory = deque(maxlen=50_000)

    target_update_counter = 0

    # X = states, y = actions
    X = []
    y = []

    steps_to_update_target_model = 0

    for episode in range(train_episodes):
        total_training_rewards = 0
        observation, _ = env.reset()
        done = False
        while not done:
            steps_to_update_target_model += 1
            #if True:
            #    env.render()

            random_number = np.random.rand()
            # 2. Explore using the Epsilon Greedy Exploration Strategy
            
            # encode input state
            encoded_observation = one_hot_encode(observation, env.observation_space.n)
            encoded_observation_reshaped = np.reshape(encoded_observation, (1, env.observation_space.n))
            
            if random_number <= epsilon:
                # Explore
                action = env.action_space.sample()
            else:
                # Exploit best known action
                # model dims are (batch, env.observation_space.n)
                
                predicted = model.predict(encoded_observation_reshaped, verbose=0).flatten()
                action = np.argmax(predicted)
                
            new_observation, reward, done, trunc, info = env.step(action)
            
            encoded_new_observation = one_hot_encode(observation, env.observation_space.n)
            
            replay_memory.append([encoded_observation, action, reward, encoded_new_observation, done])

            # 3. Update the Main Network using the Bellman Equation
            if steps_to_update_target_model % 4 == 0 or done:
                train(env, replay_memory, model, target_model, done)

            observation = new_observation
            total_training_rewards += reward

            if done:
                print('Total training rewards: {} after n steps = {} with final reward = {}'.format(total_training_rewards, episode, reward))
                total_training_rewards += 1

                if steps_to_update_target_model >= 100:
                    print('Copying main network weights to the target network weights')
                    target_model.set_weights(model.get_weights())
                    steps_to_update_target_model = 0
                break

        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay * episode)
    env.close()
    model.save('frozen-dqn-m1.keras')
    target_model.save('frozen-dqn-t1.keras')


if __name__ == '__main__':
    main()


Total training rewards: 0.0 after n steps = 0 with final reward = 0.0
Total training rewards: 0.0 after n steps = 1 with final reward = 0.0
Total training rewards: 0.0 after n steps = 2 with final reward = 0.0
Total training rewards: 1.0 after n steps = 3 with final reward = 1.0
Total training rewards: 0.0 after n steps = 4 with final reward = 0.0
Total training rewards: 0.0 after n steps = 5 with final reward = 0.0
Total training rewards: 0.0 after n steps = 6 with final reward = 0.0
Total training rewards: 0.0 after n steps = 7 with final reward = 0.0
Total training rewards: 0.0 after n steps = 8 with final reward = 0.0
Total training rewards: 0.0 after n steps = 9 with final reward = 0.0
Total training rewards: 0.0 after n steps = 10 with final reward = 0.0
Copying main network weights to the target network weights
Total training rewards: 0.0 after n steps = 11 with final reward = 0.0
Total training rewards: 0.0 after n steps = 12 with final reward = 0.0
Total training rewards: 0.0 

In [5]:
env = gym.make("FrozenLake-v1", is_slippery=False, render_mode='human')

model = keras.models.load_model("frozen3.h5", compile=False)
model.compile(loss=tf.keras.losses.Huber(), optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), metrics=['accuracy'])
observation, _ = env.reset()

appendedObservations = []
rewards = 0


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [16]:
Q = []
path = []
for i in range(env.observation_space.n):
    encoded_observation = one_hot_encode(i, env.observation_space.n)
    encoded_observation_reshaped = np.reshape(encoded_observation, (1, env.observation_space.n))
    actions = model.predict(encoded_observation_reshaped, verbose=0).flatten()
    Q.append(actions)
    path.append(np.argmax(actions))
print(np.array(Q))
print(path)

[[ 5.14015555e-04 -2.94614583e-05  5.46924770e-04  4.06913459e-04]
 [-1.14575028e-04 -1.26855448e-04  3.54112126e-05 -1.57371163e-04]
 [ 1.05373561e-04  2.87745148e-04  1.22266728e-03  9.02876258e-04]
 [-2.24821270e-04 -1.35004520e-04  5.18907327e-04  6.32204115e-04]
 [-6.76214695e-04 -3.96173447e-04  2.02290248e-04  1.67340040e-04]
 [ 6.48468062e-02 -3.40053067e-02  3.01965401e-02  5.19545078e-02]
 [ 1.75508969e-02  2.30151564e-02 -7.67231174e-03  1.47161409e-02]
 [ 1.47692919e-01  1.75758541e-01  2.53255844e-01  1.25647023e-01]
 [ 8.40142369e-04 -1.84486248e-03 -2.24866625e-03  1.35175139e-03]
 [-4.63977456e-04  1.86327845e-04  1.28791248e-03  2.48268247e-04]
 [ 6.50396049e-02  1.80935934e-02  6.49545807e-03  7.36918300e-02]
 [-8.29583853e-02  9.12303254e-02  1.50025517e-01 -1.39209047e-01]
 [ 3.14221203e-01  1.19832985e-01  4.35661376e-01  4.22758982e-02]
 [-2.14725733e-04 -2.62713246e-03 -2.21437030e-03  2.89447606e-04]
 [ 6.28897905e-01  6.40989184e-01  1.00650847e+00  5.65553069e

In [7]:

for i in range(200):
  print(i)
  encoded = one_hot_encode(observation, env.observation_space.n)
  encoded_reshaped = np.reshape(encoded, (1, env.observation_space.n))
  predicted = model.predict(encoded_reshaped, verbose=0).flatten()
  action = np.argmax(predicted)
  new_observation, reward, terminated, truncated, info =env.step(action)
  appendedObservations.append(new_observation)
  observation = new_observation
  rewards += reward
  time.sleep(1)
  if (terminated):
      break
print(rewards)
env.close()

0
1
2
3
4
5
6
7


KeyboardInterrupt: 

: 