<a href="https://colab.research.google.com/github/lolandy/Q-learning/blob/main/Qlearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import random
import numpy as np
!pip install gymnasium
import gymnasium as gym
import tensorflow as tf
import time

Collecting gymnasium
  Downloading gymnasium-0.29.0-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.8/953.8 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.0


In [None]:
env_name = "FrozenLake-v1"
env = gym.make(env_name, render_mode="ansi", is_slippery=False)
print("Observation space: ", env.observation_space)  # 16, the number of states/ tiles
print("Action space:", env.action_space)  # 4, agent can go up, down, left, or right
# print(type(env.action_space))  outputs: <class 'gymnasium.spaces.discrete.Discrete'>

Observation space:  Discrete(16)
Action space: Discrete(4)


In [None]:
class Agent:  # Agent that makes random choices
    def __init__(self):
        self.is_discrete = type(env.action_space) == gym.spaces.discrete.Discrete

        if self.is_discrete:
            self.action_size = env.action_space.n  # .n gets the size of the action space
            print("Action size:", self.action_size)
        else:
            # continuous environment, Represented with "Box", a continuous n-dimension space
            # Observation and Action space represented with: Box(low, high, shape)
            self.action_low = env.action_space.low
            self.action_high = env.action_space.high
            self.action_shape = env.action_space.shape  # Ex. (2,) = 1d array with 2 elements. (2,2) = 2d array, etc
            print("Action range:", self.action_low, self.action_high)

    def get_action(self, state):
        if self.is_discrete:
            action = random.choice(range(self.action_size))
        else:
            action = np.random.uniform(self.action_low, self.action_high, self.action_shape)
        return action

# Q-Network

Model structure:
1.   Input layer of states
2.   Dense/output layer of q-values for each action

Basic version:
*   No experience relay
*   No Double NN












In [None]:
class QNAgent(Agent):  # Subclass of random agent
  def __init__(self, env, discount_rate=0.97, learning_rate=0.001):
      super().__init__()
      self.state_size = env.observation_space.n
      print("State size:", self.state_size)

      self.eps = 1.0
      self.discount_rate = discount_rate  # discounts future rewards over current rewards
      self.learning_rate = learning_rate  # rate at which the Q values are nudged

      # 1. optimizer, 2. Loss funtion, 3. Neural Network (Model)
      self.optimizer = tf.keras.optimizers.Adam()
      self.loss_func = tf.keras.losses.MeanSquaredError()
      # Outputs the Q-values of each action given an state
      self.q_state = tf.keras.layers.Dense(units=self.action_size, name="q_table")

  def model(self, state_in): # forward pass calculations
      # state_in = 1D, onehot tensor
      state_in = tf.one_hot(state_in, depth=self.state_size)
      state_in = tf.expand_dims(state_in, 0)
      # pass the input tensor through the keras layer
      return self.q_state(state_in)

  def get_action(self, state):
      q_state = self.model(state) # gets the action Q-values for a given state
      action_greedy = np.argmax(q_state)  # Returns the index of max q-value in the array
      action_random = super().get_action(state)
      # epsilon-greedy policy
      return action_random if random.random() < self.eps else action_greedy

  def train(self, experience):  # update weights and biases of model
      # post action states
      state, action, next_state, reward, terminated = experience
      q_next = self.model(next_state) # gets action q-values of next state
      q_next = np.zeros([self.action_size]) if terminated else q_next  # Sets q_next to zeros if episode is over
      self.action_in = tf.one_hot(action, depth=self.action_size) # formats action choice for model
      # Bellman equation: Q(s,a) = r + discount*maxQ(s',a')
      # Note: q_target is an estimate since q_next is estimated from the model. This causes moving target problem
      q_target = reward + self.discount_rate * np.max(q_next)

      with tf.GradientTape() as tape: # calculate gradient of loss
        q_state = self.model(state)
        # isolate Q-value of selected action
        self.q_action = tf.reduce_sum(tf.multiply(q_state, self.action_in), axis=1)
        self.loss = self.loss_func(q_target, self.q_action)

      # adjust weights according to the previously calculated gradient
      self.optimizer.minimize(self.loss, self.q_state.trainable_variables, tape)

      if terminated:  # reduces random exploration as time goes on
          self.eps = self.eps * 0.99

agent = QNAgent(env)

Action size: 4
State size: 16


In [None]:
from IPython.core.display import clear_output

total_reward = 0
# training cycle
for ep in range(100):
    state = env.reset()[0]
    terminated = False
    while not terminated:
        action = agent.get_action(state)
        # get effects of a chosen action
        next_state, reward, terminated, _, _ = env.step(action)
        agent.train((state, action, next_state, reward, terminated))
        state = next_state # move on to next state after training is complete
        total_reward += reward

        # visuals
        print("s:", state, "a:", action)
        print("Episode: {}, Total reward: {}, Eps: {}".format(ep, total_reward, agent.eps))
        #print(env.render())
        #print(agent.q_state.get_weights())
        #time.sleep(0.05)
        clear_output(wait=True)

s: 15 a: 2
Episode: 99, Total reward: 87.0, Eps: 0.04904089407128576


8/14/2023: Model converges at epoch 3, total reward = 87, Eps = 0.049

# Q-Table

In [None]:
class QAgent(Agent):  # Subclass of random agent
    def __init__(self, env, discount_rate=0.97, learning_rate=0.1):
        super().__init__()
        self.state_size = env.observation_space.n
        print("State size:", self.state_size)

        self.eps = 1.0
        self.discount_rate = discount_rate  # discounts future rewards over current rewards
        self.learning_rate = learning_rate  # rate at which the Q values are nudged
        self.build_model()

    def build_model(self):  # Creates a Q-Table with small random values
        self.q_table = 1e-4*np.random.random([self.state_size, self.action_size])

    def get_action(self, state):
        q_state = self.q_table[state]  # Gets the q-values of the actions corresponding to a given state
        action_greedy = np.argmax(q_state)  # Returns the index of max q-value in the array
        action_random = super().get_action(state)
        return action_random if random.random() < self.eps else action_greedy

    def train(self, experience):  # Updates the Q-table
        state, action, next_state, reward, terminated = experience
        q_next = self.q_table[next_state]  # Q-values of actions for the next state
        q_next = np.zeros([self.action_size]) if terminated else q_next  # Sets q_next to zeros if episode is over
        q_target = reward + self.discount_rate * np.max(q_next)  # Bellman equation: Q(s,a) = r + discount*maxQ(s',a')

        # q(s,a) <- q(s,a) + learning_rate*(Q(s,a) - q(s,a))
        q_update = q_target - self.q_table[state, action]  # Q(s,a) - q(s,a), Difference between target and actual
        self.q_table[state, action] += self.learning_rate * q_update  # pushes q-value towards the target

        if terminated:  # reduces random exploration as time goes on
            self.eps = self.eps * 0.99

agent = QAgent(env)

Action size: 4
State size: 16


In [None]:
from IPython.core.display import clear_output
total_reward = 0
for ep in range(100):  # training cycle
    state = env.reset()[0]
    terminated = False
    while not terminated:
        action = agent.get_action(state)
        next_state, reward, terminated, trunc, info = env.step(action)
        agent.train((state, action, next_state, reward, terminated))
        state = next_state
        total_reward += reward

        print("s:", state, "a:", action)
        print("Episode: {}, Total reward: {}, Eps: {}".format(ep, total_reward, agent.eps))
        # print(env.render())
        # print(agent.q_table)
        # time.sleep(0.05)
        clear_output(wait=True)

s: 15 a: 2
Episode: 99, Total reward: 95.0, Eps: 0.04904089407128576
