In [None]:
import gym

#### Representations

| --> WALL (Can't pass through, will remain in the same position if tries to move through wall)

Yellow --> Taxi Current Location

Blue --> Pick up Location

Purple --> Drop-off Location

Green --> Taxi turn green once passenger board

Letters --> Locations

In [None]:
env = gym.make('Taxi-v3').env # Env is the unified environment interface

Following are the **env** methods that could be quite helpful to us: <br />
- env.reset(): Resets the environment and returns a random initial state
- env.step(action): Step the environment by one timestep. Returns
 - observation: Observation of the environment
 - reward: If your action was beneficial or not
 - done: Indicates if we have successfully picked up and dropped off a passenger, also called one *episode*
 - info: Addition info such as performance and latency for debugging purposes
- env.render(): Renders one frame of the environment

In [None]:
env.reset()
env.render()

+---------+
|R: | : :[34;1mG[0m|
| : |[43m [0m: : |
| : : : : |
| | : | : |
|[35mY[0m| : |B: |
+---------+



In [None]:
env.observation_space.n # Total number of states

500

##### Actions (6 in total)
0: move south <br />
1: move north <br />
2: move east <br />
3: move west <br />
4: pickup passenger <br />
5: dropoff passenger <br />

In [None]:
env.action_space.n # Total number of actions

6

In [None]:
state = env.encode(1, 1, 3, 2) # Taxi row, taxi column, passenger index, destination index
print('State: ', state)
env.s = state
env.render()
print(env.step(3))

State:  134
+---------+
|R: | : :G|
| :[43m [0m| : : |
| : : : : |
| | : | : |
|[35mY[0m| : |[34;1mB[0m: |
+---------+

(114, -1, False, {'prob': 1.0})


In [None]:
env.P[134] # Structure of dictionary: {action: [(probability, nextstate, reward, done)]}

{0: [(1.0, 234, -1, False)],
 1: [(1.0, 34, -1, False)],
 2: [(1.0, 134, -1, False)],
 3: [(1.0, 114, -1, False)],
 4: [(1.0, 134, -10, False)],
 5: [(1.0, 134, -10, False)]}

In [None]:
from IPython.display import clear_output
from time import sleep

def print_frames(frames, delay):
    for i, frame in enumerate(frames):
        clear_output(wait=True)
        print(frame['frame'])
        print(f"Timestep: {i + 1}")
        print(f"State: {frame['state']}")
        print(f"Action: {frame['action']}")
        print(f"Reward: {frame['reward']}")
        sleep(delay)

#### Brute Force algorithm

In [None]:
# Let's see what happens if we try to brute force,
# meaning choosing random actions until passenger is picked up and dropped of at right destination

env.s = 134 # set environment to the illustration state above

epochs = 0
penalties, reward = 0, 0
frames = [] # for animation
done = False

while not done:
    action = env.action_space.sample() # Chosing random action
    state, reward, done, info = env.step(action) # Extracting info

    if reward == -10:
        penalties += 1

    # Put each rendered frame into dict for animation
    frames.append({
        'frame': env.render(mode='ansi'),
        'state': state,
        'action': action,
        'reward': reward
        }
    )

    epochs += 1

print_frames(frames, 0.05)
print("Timesteps taken: {}".format(epochs))
print("Penalties incurred: {}".format(penalties))

+---------+
|[34;1mR[0m: | :[43m [0m:G|
| : | : : |
| : : : : |
| | : | : |
|[35mY[0m| : |B: |
+---------+
  (North)

Timestep: 2716
State: 62
Action: 1
Reward: -1


KeyboardInterrupt: ignored

### Using Reinforcement Learning
We are going to use a simple RL algorithm called Q-learning which will give our agent some memory.
We use a Q-table with Q-values (Q: quality) that states the quality of an state-action-combination.
Q-values are initialized to arbitrary values, and as the agent exposes itself to the environment and receives different rewards by executing different actions, the Q-values are updated using the equation:

**Q(state, action) = (1−α)Q(state, action) + α( reward + γ*max_a {Q(next state,all actions)} )**

The Q-table is a matrix where we have a row for every state (500) and a column for every action (6). It's first initialized to 0, and then values are updated after training. Note that the Q-table has the same dimensions as the reward table, but it has a completely different purpose.

After enough random exploration of actions, the Q-values tend to converge serving our agent as an action-value function which it can exploit to pick the most optimal action from a given state.

There's a tradeoff between exploration (choosing a random action) and exploitation (choosing actions based on already learned Q-values). We want to prevent the action from always taking the same route, and possibly overfitting, so we'll be introducing another parameter called ϵ "epsilon" to cater to this during training.

Instead of just selecting the best learned Q-value action, we'll sometimes favor exploring the action space further. Higher epsilon value results in episodes with more penalties (on average) which is obvious because we are exploring and making random decisions.

In [None]:
# ** TRAINING THE AGENT **
import numpy as np
q_table = np.zeros([env.observation_space.n, env.action_space.n]) # Initialize the Q-table to a 500 x 6 matrix of zeros
q_table

array([[0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       ...,
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.]])

In [None]:
import random
from IPython.display import clear_output

# Hyper parameters
alpha = 0.1
gamma = 0.6
epsilon = 0.1

# For plotting metrics
all_epochs = []
all_penalties = []
randoms = 0

for i in range(1, 50000):
    state = env.encode(1, 1, 3, 2)

    epochs, penalties, reward = 0, 0, 0
    done = False

    while not done:
        if random.uniform(0, 1) < epsilon:
            randoms += 1
            action = env.action_space.sample() # Explore action space
        else:
            action = np.argmax(q_table[state]) #Exploit learned values

        next_state, reward, done, info = env.step(action)

        old_value = q_table[state, action]
        next_max = np.max(q_table[next_state])

        new_value = (1-alpha)*old_value + alpha*(reward + gamma*next_max)
        q_table[state, action] = new_value

        if reward == -10:
            penalties += 1

        state = next_state
        epochs += 1

        if i % 100 == 0:
            clear_output(wait=True)
            print('Episode: {}'.format(i))

print("Training finished. \n")



Episode: 49900
Training finished. 



In [None]:
"""Evaluate agent's performance after Q-learning"""

total_epochs, total_penalties = 0, 0
episodes = 100

for _ in range(episodes):
    state = env.reset()

    epochs, penalties, reward = 0, 0, 0
    done = False

    while not done:
        action = np.argmax(q_table[state])
        state, reward, done, info = env.step(action)

        if reward == -10:
            penalties += 1

        epochs += 1

    total_penalties += penalties
    total_epochs += epochs

print(f"Results after {episodes} episodes:")
print(f"Average timesteps per episode: {total_epochs / episodes}")
print(f"Average penalties per episode: {total_penalties / episodes}")
print(q_table[134])

KeyboardInterrupt: ignored

In [None]:
state = env.encode(1, 1, 3, 2) # Taxi row, taxi column, passenger index, destination index
print('State: ', state)
env.s = state




epochs = 0
penalties, reward = 0, 0
frames = [] # for animation
done = False
i = 0
while not done:
    action = np.argmax(q_table[state])
    state, reward, done, info = env.step(action)

    if reward == -10:
        penalties += 1

    # Put each rendered frame into dict for animation
    frames.append({
        'frame': env.render(mode='ansi'),
        'state': state,
        'action': action,
        'reward': reward
        }
    )

    epochs += 1

print_frames(frames, 0.5)
print("Timesteps taken: {}".format(epochs))
print("Penalties incurred: {}".format(penalties))

State:  134
