In [66]:
import numpy as np

In [67]:
# n-step TD
alpha = 0.5
gamma = 0.5
n = 3

V = np.zeros(7)
p = np.full((7,2), 0.5)
rewards = np.array([0,0,0,0,0,0,1])
transitions = np.array([
               [0,0],
               [0,2],
               [1,3],
               [2,4],
               [3,5],
               [4,6],
               [6,6],
])

for episode in range(3):
  t = 0
  tau = 0
  T = 100000
  terminal = False
  r_list = [0]
  s_list = [3]
  while tau != T - 1:
    if t < T:
      s = s_list[-1]
      a = np.random.choice(np.arange(len(p[s])), p=p[s])
      sp = transitions[s,a]
      r = rewards[sp]
      s_list.append(sp)
      r_list.append(r)

      if (sp == 0 or sp == 6) and not terminal:
        T = t + 1
        terminal = True
    tau = t - n + 1
    if tau >= 0:
      G = np.sum([(gamma**(i-tau-1))*r_list[i] for i in range(tau+1, np.min([tau+n,T])+1)])
      if tau + n < T:
        G += (gamma**n) * V[s_list[tau+n]]
      V[s_list[tau]] += alpha * (G - V[s_list[tau]])
    t += 1

In [68]:
V

array([0., 0., 0., 0., 0., 0., 0.])

In [76]:
class CliffEnv(Env):
  def __init__(self):
    self.s = (3,0)
    self.path = np.zeros((4,12))
    self.t = 0

  def reset(self):
    self.s = (3,0)
    self.path = np.zeros((4,12))
    self.t = 0
    return self.s

  def step(self, s, a):
    if a == 0:
      self.s = (s[0], np.max([s[1]-1,0]))
    elif a == 1:
      self.s = (np.max([s[0]-1,0]), s[1])
    elif a == 2:
      self.s = (s[0], np.min([s[1]+1,11]))
    elif a == 3:
      self.s = (np.min([s[0]+1,3]), s[1])
    r = -1

    if self.s[0] == 3 and 1 <= self.s[1] <= 10:
      self.s = (3,0)
      r = -100

    self.t += 1
    self.path[self.s[0], self.s[1]] = self.t

    return s, a, r, self.s, self.s == (3,11)

  def get_path(self):
    return self.path

In [109]:
# On-policy n-step SARSA

env = CliffEnv()
Q = np.zeros((4,12,4))

alpha = 0.9
gamma = 0.5
eps = 0.1
n = 3

episodes = 10

for episode in range(episodes):
  t = 0
  tau = 0
  T = 100000
  terminal = False
  
  s_list = [env.reset()]
  a_list = [np.argmax(Q[s_list[-1][0],s_list[-1][1]]) if np.random.random() > eps else np.random.choice(len(Q[s_list[-1][0],s_list[-1][1]]))]
  r_list = [0]
  while tau != T - 1:
    if t < T:
      s = s_list[-1]
      a = a_list[-1]
      s, a, r, sp, done = env.step(s, a)
      r_list.append(r)
      
      if done and not terminal:
        T = t + 1
        terminal = True
      else:
        s_list.append(sp)
        a_list.append(np.argmax(Q[s_list[-1][0],s_list[-1][1]]) if np.random.random() > eps else np.random.choice(len(Q[s_list[-1][0],s_list[-1][1]])))
    tau = t - n + 1
    if tau >= 0:
      G = np.sum([(gamma**(i-tau))*r_list[i-tau] for i in range(tau, np.min([tau+n,T]))])
      if tau + n < T:
        G += (gamma**n) * Q[s_list[n][0],s_list[n][1],a_list[n]]
      Q[s_list[0][0],s_list[0][1],a_list[0]] += alpha*(G - Q[s_list[0][0],s_list[0][1],a_list[0]])

      s_list.pop(0)
      a_list.pop(0)
      r_list.pop(0)
    t += 1

In [110]:
print(env.get_path())

[[ 0.  0.  0.  0.  7.  8.  9. 10. 11. 12. 13. 14.]
 [ 0.  3.  4.  5.  6.  0.  0.  0.  0.  0.  0. 16.]
 [ 1.  2.  0.  0.  0.  0.  0.  0.  0.  0.  0. 17.]
 [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0. 18.]]


In [None]:
# Implement n-step Tree Backup on page 154

In [None]:
# Implement Off-policy n-step Q(sigma) on page 156

#### Later stuff

In [34]:
# TD(lambda) for v_pi
alpha = 0.5
gamma = 0.5
lam = 0.5

V = np.zeros(7)
p = np.full((7,2), 0.5)
rewards = np.array([0,0,0,0,0,0,1])
transitions = np.array([
               [0,0],
               [0,2],
               [1,3],
               [2,4],
               [3,5],
               [4,6],
               [6,6],
])

def update_trace(E_s, update = "acc", alpha = 0.5):
  if update == "dutch":
    return (1 - alpha)*E_s + 1
  elif update == "replace":
    return 1
  else:
    return E_s + 1

for t in range(10):
  E = np.zeros(7)
  s = 3
  while s != 6 and s != 0:
    a = np.random.choice(np.arange(len(p[s])), p=p[s])
    sp = transitions[s,a]
    r = rewards[sp]

    delta = r + gamma*V[sp] - V[s]
    E[s] = update_trace(E[s])
    for s_i in range(len(V)):
      V[s_i] += alpha*delta*E[s_i]
      E[s_i] *= gamma*lam

    #print(s, a, r, sp, delta, E)

    s = sp

In [35]:
V

array([0.        , 0.00475336, 0.01966991, 0.12810141, 0.36533486,
       0.89675832, 0.        ])

In [36]:
class Env:
  def step(self):
    pass

class Agent:
  def action(self):
    pass

  def update(self):
    pass

In [None]:
class CliffEnv(Env):
  def __init__(self):
    self.s = (3,0)
    self.path = np.zeros((4,12))
    self.t = 0

  def reset(self):
    self.s = (3,0)
    self.path = np.zeros((4,12))
    self.t = 0
    return self.s

  def step(self, s, a):
    if a == 0:
      self.s = (s[0], np.max([s[1]-1,0]))
    elif a == 1:
      self.s = (np.max([s[0]-1,0]), s[1])
    elif a == 2:
      self.s = (s[0], np.min([s[1]+1,11]))
    elif a == 3:
      self.s = (np.min([s[0]+1,3]), s[1])
    r = -1

    if self.s[0] == 3 and 1 <= self.s[1] <= 10:
      self.s = (3,0)
      r = -100

    self.t += 1
    self.path[self.s[0], self.s[1]] = self.t

    return s, a, r, self.s

  def get_path(self):
    return self.path

In [None]:
class CliffAgent(Agent):
  def __init__(self, eps=0.1, alpha=0.9, delta=1, gamma=0.5, lam=0.5, update_method="q", E_update_method="accumulate"):
    self.eps = eps
    self.alpha = alpha
    self.delta = delta
    self.gamma = gamma
    self.lam = lam
    self.Q = np.zeros((4,12,4))
    self.E = np.zeros((4,12,4))

    update_method_map = {
        "sarsa": self.sarsa_update,
        "q": self.q_update
    }
    self.update_method = update_method_map.get(update_method, None)

    E_update_method_map = {
        "accumulate": self.accumulate,
        "dutch": self.dutch,
        "replace": self.replace
    }
    self.E_update_method = E_update_method_map.get(E_update_method, None)
  
  def reset_E(self):
    self.E = np.zeros((4,12,4))
    return self.E

  def action(self, s):
    if np.random.random() > self.eps:
      return np.argmax(self.Q[s[0],s[1]])
    else:
      return np.random.choice(len(self.Q[s[0],s[1]]))
  
  def update(self, s, a, r, sp):
    return self.update_method(s, a, r, sp)

  def update_E(self, s, a):
    self.E[s[0],s[1],a] = self.E_update_method(s, a)
  
  # Helper functions

  def sarsa_update(self, s, a, r, sp):
    ap = self.action(sp)
    error = r + self.gamma * self.Q[sp[0], sp[1], ap] - self.Q[s[0], s[1], a]
    self.update_E(s, a)
    for row in range(self.Q.shape[0]):
      for col in range(self.Q.shape[1]):
        for act in range(self.Q.shape[2]):
          self.Q[row,col,act] = self.Q[row,col,act] + self.alpha * error * self.E[row,col,act]
          self.E[row,col,act] = self.gamma * self.lam * self.E[row,col,act]
    return sp, ap

  def q_update(self, s, a, r, sp):
    ap = self.action(sp)
    a_star = np.argmax(self.Q[s[0],s[1]])
    error = r + self.gamma * self.Q[sp[0], sp[1], a_star] - self.Q[s[0], s[1], a]
    self.update_E(s, a)
    for row in range(self.Q.shape[0]):
      for col in range(self.Q.shape[1]):
        for act in range(self.Q.shape[2]):
          self.Q[row,col,act] = self.Q[row,col,act] + self.alpha * error * self.E[row,col,act]
          self.E[row,col,act] = self.gamma * self.lam * self.E[row,col,act] if ap == a_star else 0
    return sp, ap

  def accumulate(self, s, a):
    return self.E[s[0],s[1],a] + 1

  def dutch(self, s, a):
    return (1 - self.alpha) * self.E[s[0],s[1],a] + 1

  def replace(self, s, a):
    return 1

  def getQ(self):
    return self.Q

In [None]:
# SARSA(lambda) for cliff problem
env = CliffEnv()
sarsa_agent = CliffAgent(update_method="sarsa")
T = 50

for t in range(T):
  E = sarsa_agent.reset_E()
  s = env.reset()
  a = sarsa_agent.action(s)
  while s != (3,11):
    s, a, r, sp = env.step(s, a)
    sp, ap = sarsa_agent.update(s, a, r, sp)

    s = sp
    a = ap
  if t == T-1:
    print(env.get_path())

[[ 3.  4.  5.  6.  7.  8.  9. 10. 11. 12. 13. 14.]
 [ 2.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0. 15.]
 [ 1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0. 16.]
 [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0. 17.]]


In [None]:
# Q(lambda) for cliff problem
env = CliffEnv()
q_agent = CliffAgent(update_method="q")
T = 50

for t in range(T):
  E = q_agent.reset_E()
  s = env.reset()
  a = q_agent.action(s)
  while s != (3,11):
    s, a, r, sp = env.step(s, a)
    sp, ap = q_agent.update(s, a, r, sp)

    s = sp
    a = ap
  if t == T-1:
    print(env.get_path())

[[ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [ 1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 13.]
 [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0. 14.]]
