In [3]:
import gymnasium as gym
import numpy as np
import random
from gymnasium.wrappers import RecordVideo
from IPython.display import HTML
from base64 import b64encode

In [64]:
def test_policy_video(policy_table, name, video_width=500):
    video_path = './video/' + name + '-step-0.mp4'

    env_test = gym.make('Taxi-v3', render_mode='rgb_array')
    env_test = RecordVideo(env=env_test, video_folder="./video", name_prefix=name, step_trigger=lambda x: True,
                           disable_logger=True)

    state, info = env_test.reset()
    done = False
    steps = 0

    env_test.start_video_recorder()

    while not done:
        action = policy_table[state]
        new_state, reward, done, truncated, info = env_test.step(int(action))
        state = new_state
        env_test.render()
        steps += 1
        if steps > 50:
            break

    env_test.close_video_recorder()

    env_test.close()

    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")

## Defining Taxi Environment

In [4]:
class Taxi:
    def __init__(self,
                 learning_rate=0.9,
                 discount_rate=0.8,
                 epsilon=1.0,
                 decay_rate=0.005,
                 num_iter=200,
                 num_episodes=10000,
                 max_steps=99,
                 num_evaluate_steps=20):
        self.env = gym.make('Taxi-v3')  #initializing the environment
        self.state_size = self.env.observation_space.n  #size of state space (it is 500 in taxi environment)
        self.action_size = self.env.action_space.n  #size of action space (it is 6 in taxi environemt)
        self.vtable = np.zeros(self.state_size)  #initializing state value table
        self.ptable = np.zeros(self.state_size)  #initializing policy table (each entry contains one of 6 aciton)
        self.qtable = np.zeros(
            (self.state_size, self.action_size))  #initializing q value table (value for each (state, action) pair)
        self.learning_rate = learning_rate  #learning rate used in Q-leanring algorithm
        self.discount_rate = discount_rate  #discount factor (gamma)
        self.epsilon = epsilon  #epsilon for specifying randomness threshold in epsilon-greedy action used in Q-learning
        self.decay_rate = decay_rate  #decay rate for decreasing the epsilon during the algorithm
        self.num_iter = num_iter  #number of iteration used in algorihtm
        self.num_episodes = num_episodes  #number of episodes (each episode is a sequence from start to end of a single game)
        self.max_steps = max_steps  #maximum bound for number of steps in each episode
        self.num_evaluate_steps = num_evaluate_steps  #number of steps to evaluate a given policy (just used in policy iteration algorithm)




## 1. Value Iteration

### 1.1. Initializing Parameters

In [5]:
#TODO: Initialize the parameters by filling the blanks
taxi = Taxi(num_iter=200, num_evaluate_steps=20, discount_rate=0.8)

### 1.2. Finding Optimal State Values

In [6]:
# you can use taxi.state_size and taxi.action_size
# you can also use taxi.env.P[state][action] which returns a tuple containing (probability, next state, reward, if it is done or not)
def value_iteration(vtable, num_iter, discount_rate):
    for _ in range(num_iter):
        v_old = np.copy(vtable)
        for state in range(taxi.state_size):
            temp_action = np.zeros(taxi.action_size)
            #TODO: compute the value for each action and fill the temp_action array with those values
            for action in range(taxi.action_size):
                action_expected_value = 0
                transitions = taxi.env.P[state][
                    action]  # although the environment is deterministic and there is only one transition, the function is general purpose, so we should consider that there is a list of transitions
                if type(transitions) == tuple:  # because here transitions is just one tuple we should transform it to a list of transitions so our for loop works correctly (we assume that we have a list of transitions for every state-action)
                    transitions = [transitions]
                for transition in transitions:
                    action_expected_value += transition[0] * (transition[2] + discount_rate * vtable[transition[1]])
                temp_action[action] = action_expected_value
            vtable[state] = np.max(temp_action)  #assigning the best action value to the state


### 1.3. Extracting The Optimal Policy

In [7]:
def optimal_policy_extraction(vtable, ptable, num_iter, discount_rate):
    value_iteration(vtable, num_iter, discount_rate)
    for state in range(taxi.state_size):
        temp_action = np.zeros(taxi.action_size)
        #TODO: compute the value for each action and fill the temp_action array with those values
        for action in range(taxi.action_size):
            action_expected_value = 0
            transitions = taxi.env.P[state][
                action]  # although the environment is deterministic and there is only one transition, the function is general purpose, so we should consider that there is a list of transitions
            if type(transitions) == tuple:  # because here transitions is just one tuple we should transform it to a list of transitions so our for loop works correctly (we assume that we have a list of transitions for every state-action)
                transitions = [transitions]
            for transition in transitions:
                action_expected_value += transition[0] * (transition[2] + discount_rate * vtable[transition[1]])
            temp_action[action] = action_expected_value
        ptable[state] = np.argmax(temp_action)  #finding the best action by argmax

### 1.4. Running The Algorithm

In [8]:
optimal_policy_extraction(taxi.vtable, taxi.ptable, taxi.num_iter, taxi.discount_rate)
optimal_policy = taxi.ptable.copy()  # saving determined optimal policy

  logger.warn(


### 1.5. Tesing

In [70]:
test_policy_video(taxi.ptable, 'value-iteration')

  logger.warn(


## 2. Policy Iteration

### 2.1. Initialize Parameters

In [71]:
#TODO: Initialize the parameters by filling the blanks
taxi = Taxi(num_iter=200, num_evaluate_steps=20, discount_rate=0.8)

### 2.2. Policy Evaluation

In [72]:
def evaluate(num_iter, discount_rate):
    vtable = np.zeros(taxi.state_size)
    for _ in range(num_iter):
        v_old = np.copy(vtable)
        for state in range(taxi.state_size):
            action = taxi.ptable[state]
            action_expected_value = 0
            transitions = taxi.env.P[state][
                action]  # although the environment is deterministic and there is only one transition, the function is general purpose, so we should consider that there is a list of transitions
            if type(transitions) == tuple:  # because here transitions is just one tuple we should transform it to a list of transitions so our for loop works correctly (we assume that we have a list of transitions for every state-action)
                transitions = [transitions]
            for transition in transitions:
                action_expected_value += transition[0] * (transition[2] + discount_rate * vtable[transition[1]])
            vtable[state] = action_expected_value
    return vtable

### 2.3. Policy Improvement

In [73]:
def improvement(ptable, num_iter, num_evaluate_steps, discount_rate):
    for _ in range(num_iter):
        vtable = evaluate(num_evaluate_steps, discount_rate).copy()
        for state in range(taxi.state_size):
            temp_action = np.zeros(taxi.action_size)
            #TODO: compute the value for each action and fill the temp_action array with those values
            for action in range(taxi.action_size):
                action_expected_value = 0
                transitions = taxi.env.P[state][
                    action]  # although the environment is deterministic and there is only one transition, the function is general purpose, so we should consider that there is a list of transitions
                if type(transitions) == tuple:  # because here transitions is just one tuple we should transform it to a list of transitions so our for loop works correctly (we assume that we have a list of transitions for every state-action)
                    transitions = [transitions]
                for transition in transitions:
                    action_expected_value += transition[0] * (transition[2] + discount_rate * vtable[transition[1]])
                temp_action[action] = action_expected_value
            ptable[state] = np.argmax(temp_action)  #improving the policy

### 2.4. Running The Algorithm

In [74]:
improvement(taxi.ptable, taxi.num_iter, taxi.num_evaluate_steps, taxi.discount_rate)

  logger.warn(


### 2.5. Testing

In [77]:
test_policy_video(taxi.ptable, 'policy-iteration')

  logger.warn(


## 3. Q-learning

### 3.1. Initializing Parameters

In [78]:
#TODO: Initialize the parameters by filling the blanks
taxi = Taxi(num_episodes=10000, max_steps=99, learning_rate=0.9, discount_rate=0.8, epsilon=1.0, decay_rate=0.005)

### 3.2. Training

In [79]:

def q_learing_train(qtable, num_episodes, max_steps, learning_rate, discount_rate, epsilon, decay_rate):
    for episode in range(num_episodes):
        state, info = taxi.env.reset()
        done = False
        for s in range(max_steps):
            #epsilon greedy
            if random.uniform(0, 1) < epsilon:
                action = taxi.env.action_space.sample()
            else:
                action = np.argmax(qtable[state])  #TODO: assign the action for greedy case
            new_state, reward, done, truncated, info = taxi.env.step(action)  #doing one step
            qtable[state][action] = (1 - learning_rate) * qtable[state][action] + learning_rate * (reward + discount_rate * np.max(qtable[new_state]))  #TODO: update q-value (main part of Q-learning)
            state = new_state
            if done == True:
                break
        epsilon = np.exp(-decay_rate * episode)

### 3.3. Runing The Algorithm

In [80]:
q_learing_train(taxi.qtable, taxi.num_episodes, taxi.max_steps, taxi.learning_rate, taxi.discount_rate, taxi.epsilon,
                taxi.decay_rate)

### 3.4. Etracting The Policy

In [81]:
for state in range(taxi.state_size):
    taxi.ptable[state] = np.argmax(taxi.qtable[state][:])

### 3.5. Testing

In [85]:
test_policy_video(taxi.ptable, 'Q-learning')

  logger.warn(


## 4. Direct Evaluation(Monte Carlo)

In this section, the policy is given and you should just determine the value of each state based on the given policy. Then you can evaluate states, based on the optimal policy (you found in the previous sections) and then compare the 2 matrices.

### 4.1. Initializing Parameters

In [86]:
#TODO: Initialize the parameters by filling the blanks
taxi = Taxi(num_episodes=10000, max_steps=99, discount_rate=0.8)

### 4.2. Training

In [17]:
def monte_carlo(ptable, num_episodes, max_steps, discount_rate):
    count = np.ones(taxi.state_size)  # counts the number of visits to each state
    vtable = np.zeros(taxi.state_size)  # store the total return for each state
    for _ in range(num_episodes):
        state, info = taxi.env.reset()
        done = False
        trajectory = [state]
        rewards = []
        for s in range(max_steps):
            new_state, reward, done, truncated, info = taxi.env.step(ptable[state])
            trajectory.append(new_state)
            count[new_state] += 1
            #TODO For each state visited in the current episode, update the value function vtable using the Monte Carlo update formula
            rewards.append(reward)
            if done == True:
                break
        discounted_reward_sum = 0
        rewards = list(reversed(rewards))
        trajectory = list(reversed(trajectory))[1:]
        for i in range(len(trajectory)): # the last state has no reward to compute
            discounted_reward_sum += rewards[i]
            vtable[trajectory[i]] += discounted_reward_sum
            discounted_reward_sum *= discount_rate
    vtable /= count
    return vtable

In [19]:
optimal_policy_state_values = monte_carlo(optimal_policy, taxi.num_episodes, taxi.max_steps, taxi.discount_rate)

### 4.3. Testing

In [None]:
print(optimal_policy_state_values)