<a href="https://colab.research.google.com/github/victorsenoguchi/MAC0425/blob/main/Q_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Initial Imports

In [1]:
import numpy as np
import itertools
import random

# Q-Learning Class

In [2]:
class Q_learning():
 
  def __init__(self, S, A, Q, R, N, initial, terminal): # Starts an object of the class.
    self.S = S                # List with states.
    self.A = A                # Dictionary with a list of actions for each state.
    self.R = R                # Dictionary with the reward for each state.
    self.Q = Q                # Dictionary with the quality for each state and action.
    self.N = N                # Dictionary with the number of visits for each state and action.
    self.initial = initial    # List of possibles initial states.
    self.terminal = terminal  # List of possibles terminal states.
 
  def go(self, s, a): # Return the next state given the current state and an action.
    saux = np.asarray(s)
    if a == "west":
      saux = saux + np.asarray([[-1,0],[0,1],[0,-1]])
    elif a == "north": 
      saux = saux + np.asarray([[0,1],[1,0],[-1,0]])
    elif a == "east":
      saux = saux + np.asarray([[1,0],[0,1],[0,-1]])
    elif a == "south":
      saux = saux + np.asarray([[0,-1],[1,0],[-1,0]]) 
    idx = np.random.choice(np.arange(3), p = [0.8,0.1,0.1])
    s_prime = tuple(saux[idx])
    return s_prime if s_prime in self.S else s
 
  def f(self, s, a): # Aversion to boredom function.
    return 1 if self.N[s,a] == 0 else self.Q[s,a] / self.N[s,a]
 
  def alpha(self, s, a):  
    return 1 if self.N[s,a] == 0 else 1 / self.N[s, a]
 
  def show_policy(self): # Print the current policy.
    policy = dict()
    for s in S:
      idx = np.argmax([self.f(s, a) for a in self.A[s]])
      policy[s] = self.A[s][idx]  
    for j in range(3):
      print(end = "|")
      for i in range(4):
        if policy[(i,2-j)] == "west": print("\u2190", end = "|")
        elif policy[(i,2-j)] == "north": print("\u2191", end = "|")
        elif policy[(i,2-j)] == "east": print("\u2192", end = "|")
        elif policy[(i,2-j)] == "south": print("\u2193", end = "|")
      print()
    print("\n")
 
  def fit(self, random_state = None, epsilon = 1e-3, max_epochs = 1e6, gamma = 1): # Train the model.
    if random_state is not None:
      np.random.seed(random_state)
    epoch = 0
    converged = False
    while epoch < max_epochs and not converged:
      delta = 0
      # Initial State.
      s = random.choice(self.initial)
      # Reward of the initial state.
      r = self.R[s]
      # Index of the initial state.
      idx = np.argmax([self.f(s, a_prime) for a_prime in self.A[s]])       
      # First action.
      a = self.A[s][idx]
      while s not in self.terminal:
        # Next State.
        s_prime = self.go(s, a)
        # Next Reward.
        r_prime = self.R[s_prime]
        self.N[s, a] += 1
        # Old value of quality in state s and action a.
        Q_old = self.Q[s, a]  
        self.Q[s, a] += self.alpha(s, a) * (r + gamma * max([self.Q[s_prime, a_prime] for a_prime in self.A[s_prime]]) - self.Q[s, a])
        # New value of quality in state s and action a.
        Q_new = self.Q[s, a]
        # Calculates the greatest variation between an old quality and a new one at the time. 
        delta = max(delta, np.abs(Q_new - Q_old))
        # Updates current status and current reward.
        s, r = s_prime, r_prime
        idx = np.argmax([self.f(s, a_prime) for a_prime in self.A[s]])       
        # Next Action.
        a = self.A[s][idx]
        #self.show_policy()
      # Updates the quality of the terminal state.
      for a in ["north", "east", "south", "west"]:
        self.Q[s, a] = self.R[s]
      # Checks whether the algorithm has converged.
      if delta <= epsilon: converged = True
      #print("\u0394Q = {}\n".format(delta))
      epoch += 1
 
    if converged:    
      print("The Q-Learning algorithm took {} epochs to converge.\n".format(epoch))
    else:
      print("The Q-Learning algorithm took {} epochs and did not converged.\n".format(max_epochs))
    print("The final policy was")
    self.show_policy()

In [3]:
def generate_states(x, y): 
  x = [i for i in range(x)]
  y = [i for i in range(y)]
  S = list(itertools.product(x, y))
  return S
 
def generate_actions(S):
  A = dict()
  for s in S:
    actions = []
    if (s[0], s[1]+1) in S:
      actions.append("north")
    if (s[0]+1, s[1]) in S:
      actions.append("east")
    if (s[0], s[1]-1) in S:
      actions.append("south")
    if (s[0]-1, s[1]) in S:
      actions.append("west")
    A[s] = actions
  return A

In [4]:
# Reproducibility Seed.
seed = 31
 
# x axis.
x = 4
# y axis.
y = 3

# States.
S = generate_states(x, y)
# Actions.
A = generate_actions(S)
 
# Dictionary with rewards.
R = {s: -1 for s in S}
R[(1,1)], R[(3,2)] = -100, 100
 
# Dictionaries with the quality and number of visits.
Q, N = dict(), dict()
for s in S:
  Q.update({(s, a): 0 for a in A[s]})
  N.update({(s, a): 0 for a in A[s]})
 
# List with possibles initial states.
initial_states = [(0,0)]
# List with possibles terminal states.
terminal_states = [(3,2)]
 
# Discount Factor.
gamma = 1
 
model = Q_learning(S, A, Q, R, N, initial_states, terminal_states)
model.fit(random_state = seed)

The Q-Learning algorithm took 13041 epochs to converge.

The final policy was
|→|→|→|↓|
|↑|↑|↑|↑|
|↑|→|↑|↑|


