<a href="https://colab.research.google.com/github/wko1014/RL_Study/blob/main/notes/tmpeaifeaTemporal_Difference_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import APIs
import gym
import numpy as np
import random

# APIs for annimation
from IPython.display import clear_output
from time import sleep

In [None]:
# Call Taxi environment
env = gym.make('Taxi-v2').env
env.reset()
env.render()

+---------+
|[43mR[0m: | : :[35mG[0m|
| : : : : |
| : : : : |
| | : | : |
|Y| : |[34;1mB[0m: |
+---------+



In [None]:
%%time
# Let us implement on-policy TD control, Sarsa algorithm.
# Define hyperparameters
alpha = .4
epsilon = .1
gamma = .9
num_states = env.observation_space.n
num_actions = env.action_space.n
num_iterations = 10000

# Initialize
Q = np.zeros([num_states, num_actions])

# Loop for each episode
for iter in range(num_iterations):
  S = env.reset() # Initialize S
  
  # Epsilon-greedy XX
  if np.random.rand() < epsilon:
    A = env.action_space.sample() # take a random action
  else:
    A = np.argmax(Q[S, :])
  
  # Loop for each step of episode
  done = False
  while not done:
    S_prime, R, done, info = env.step(int(A))
    
    # Epsilon-greedy XX
    if np.random.rand() < epsilon:
      A_prime = env.action_space.sample()
    else:
      A_prime = np.argmax(Q[S_prime, :])
    Q[S, A] += alpha * (R + gamma * Q[S_prime, A_prime] - Q[S, A])
    S, A = S_prime, A_prime

CPU times: user 3.02 s, sys: 2.2 ms, total: 3.03 s
Wall time: 3.03 s


In [None]:
# Now, the agent is learnt with Sarsa algorithm.

# Initialize
state = env.reset()

epochs, penalties, reward = 0, 0, 0
# For animation
animation_sarsa = []
done = False

while not done:
  action = np.argmax(Q[state, :])
  state, reward, done, info = env.step(action)
  
  animation_sarsa.append({"frame": env.render(mode="ansi"), "state":state,
                    "action":action, "reward":reward})
  epochs += 1

print("The agent used {} timesteps for delivery.".format(epochs))
print("The agent got {} penalties.".format(penalties))

The agent used 10 timesteps for delivery.
The agent got 0 penalties.


In [None]:
%%time
# To animate, we define a function.
def animating(frames, time_per_frame):
  for i, frame in enumerate(frames):
    clear_output(wait=True)
    print(frame["frame"].getvalue())
    print(f"Timesteps: {i}")
    print(f"State: {frame['state']}")
    print(f"Action: {frame['action']}")
    print(f"Reward: {frame['reward']}")
    sleep(time_per_frame)
    
animating(animation_sarsa, 0.5)

+---------+
|R: | : :G|
| : : : : |
| : : : : |
| | : | : |
|[35m[34;1m[43mY[0m[0m[0m| : |B: |
+---------+
  (Dropoff)

Timesteps: 9
State: 410
Action: 5
Reward: 20
CPU times: user 32.1 ms, sys: 10.1 ms, total: 42.2 ms
Wall time: 5.03 s


In [None]:
%%time
# Let us implement off-policy TD control, Q-learning algorithm.
# Define hyperparameters
alpha = .4
epsilon = .1
gamma = .9
num_states = env.observation_space.n
num_actions = env.action_space.n
num_iterations = 1000

# Initialize
Q = np.zeros([num_states, num_actions])

# Loop for each episode
for iter in range(num_iterations):
  S = env.reset() # Initialize S
  
  # Loop for each step of episode
  done = False
  while not done:
    # Epsilon-greedy XX
    if np.random.rand() < epsilon:
      A = env.action_space.sample() # take a random action
    else:
      A = np.argmax(Q[S, :])
    S_prime, R, done, info = env.step(int(A))
    Q[S, A] += alpha * (R + gamma * np.max(Q[S_prime, :]) - Q[S, A])
    S = S_prime

CPU times: user 782 ms, sys: 0 ns, total: 782 ms
Wall time: 782 ms


In [None]:
# Now, the agent is learnt with Q-learning algorithm.

# Initialize
state = env.reset()

epochs, penalties, reward = 0, 0, 0
# For animation
animation_Q_learn = []
done = False

while not done:
  action = np.argmax(Q[state, :])
  state, reward, done, info = env.step(action)
  
  animation_Q_learn.append({"frame": env.render(mode="ansi"), "state":state,
                    "action":action, "reward":reward})
  epochs += 1

print("The agent used {} timesteps for delivery.".format(epochs))
print("The agent got {} penalties.".format(penalties))

The agent used 15 timesteps for delivery.
The agent got 0 penalties.


In [None]:
%%time
# To animate, we define a function.
def animating(frames, time_per_frame):
  for i, frame in enumerate(frames):
    clear_output(wait=True)
    print(frame["frame"].getvalue())
    print(f"Timesteps: {i}")
    print(f"State: {frame['state']}")
    print(f"Action: {frame['action']}")
    print(f"Reward: {frame['reward']}")
    sleep(time_per_frame)
    
animating(animation_Q_learn, 0.5)

+---------+
|R: | : :G|
| : : : : |
| : : : : |
| | : | : |
|Y| : |[35m[34;1m[43mB[0m[0m[0m: |
+---------+
  (Dropoff)

Timesteps: 14
State: 475
Action: 5
Reward: 20
CPU times: user 47.2 ms, sys: 17.5 ms, total: 64.7 ms
Wall time: 7.54 s


In [None]:
%%time
# Let us implement expected Sarsa algorithm.
# Define hyperparameters
alpha = .4
epsilon = .1
gamma = .9
num_states = env.observation_space.n
num_actions = env.action_space.n
num_iterations = 100

# Initialize
Q = np.zeros([num_states, num_actions])

# Loop for each episode
for iter in range(num_iterations):
  S = env.reset() # Initialize S
  
  # Loop for each step of episode
  done = False
  while not done:
    # Epsilon-greedy XX
    if np.random.rand() < epsilon:
      A = env.action_space.sample() # take a random action
    else:
      A = np.argmax(Q[S, :])
      
    # Calculate Sarsa moving distance in expectation
    tmp = 0
    for a in range(Q.shape[-1]):
      if a == A:
        tmp += (1-epsilon)*Q[S_prime, a]
      else:
        tmp += epsilon*(1/(num_actions-1))*Q[S_prime, a]
      
    S_prime, R, done, info = env.step(int(A))
    Q[S, A] += alpha * (R + gamma * tmp - Q[S, A])
    S = S_prime

In [None]:
%%time
# Let us implement Double Q-learning algorithm.
# Define hyperparameters
alpha = .4
epsilon = .1
gamma = .9
num_states = env.observation_space.n
num_actions = env.action_space.n
num_iterations = 1000

# Initialize
Q1 = np.zeros([num_states, num_actions])
Q2 = np.zeros([num_states, num_actions])

# Loop for each episode
for iter in range(num_iterations):
  S = env.reset() # Initialize S
  
  # Loop for each step of episode
  done = False
  while not done:
    # Epsilon-greedy XX
    if np.random.rand() < epsilon:
      A = env.action_space.sample() # take a random action
    else:
      tmp = Q1+Q2
      A = np.argmax(tmp[S, :])
    S_prime, R, done, info = env.step(int(A))
    
    # Flip a coin
    if np.random.rand() < 0.5:
      Q1[S, A] += alpha * (R + gamma * Q2[S_prime, np.argmax(Q1[S_prime, :])] - Q1[S, A])
    else:
      Q2[S, A] += alpha * (R + gamma * Q1[S_prime, np.argmax(Q2[S_prime, :])] - Q2[S, A])
    S = S_prime