# Building First Ai Agent with Reinforcement Learning
- Explore Gymnasium ( openai gym )
- Implement Q-learning algorithm using gymnasium
- Try to implement perform two tasks with algorithm designed

In [None]:
%pip install gymnasium

In [2]:
import gymnasium as gym

import time
import random
from IPython import display
import matplotlib
import matplotlib.pyplot as plt

In [None]:
# Generate some descriptive names to display for the actions

action_desc = {
    0 : "Move south (down)",
    1 : "Move north (up)",
    2 : "Move east (right)",
    3 : "Move west (left)",
    4 : "Pickup Passenger",
    5 : "Drop off passenger" 
}

# Create the Taxi environment
env = gym.make('Taxi-v3', render_mode = "rgb_array")

# Initilize the environment and draw the current state
obs = env.reset()[0]
plt.imshow(env.render())
plt.show()

# Loop for 100 steps
for i in range(100):

    # Select random action
    action = env.action_space.sample()

    # Apply the action, then observe the state and reward
    obs, reward, done, info, other = env.step(action)

    # Draw the new state
    display.clear_output(wait = True)
    plt.imshow(env.render())


    # Add a caption indicating the current state, action, and reward
    rect = matplotlib.patches.Rectangle((150,0), 250, 75, facecolor = "#999999", edgecolor="#000000")
    ax = plt.gca()
    ax.add_patch(rect)
    plt.text(165, 25, f"State: {obs}")
    plt.text(165, 45, f"Action: {action_desc[action]}")
    plt.text(165, 65, f"Reward: {reward}")
    plt.show()

# Close the environment
env.close()

How can we learn the correct sequence of actions to perform hust by testing out different actions and observing rewards?
____
We will introduce the concept Q-learning by looking at a simplified version of the algorithm on a much simpler version of the taxi problem.

In [4]:
def plot_grid_state(s, n, Q):
    """
    This function will be used to visualize the current state and
    Q table for the simplified taxi example.
    """

    fig, ax = plt.subplots()
    fig.set_size_inches(12, 2)

    for i in range(n):
        if i == n-1:
            facecolor = "#00ff00"
        else:
            facecolor = "#ffffff"
        rect = matplotlib.patches.Rectangle((20*i, 0), 20, 20, facecolor=facecolor, edgecolor="#000000")

        ax.add_patch(rect)
        if (i,0) in Q:
            plt.text(20*i + 5, 30, "<-- {}".format(Q[(i, 0)]))
            plt.text(20*i + 5, 24, "{} -->".format(Q[(i, 1)]))
    plt.xlim([-5, 20*n+5])
    plt.ylim([-5, 30])

    ax.set_aspect("equal", adjustable = 'box')
    ax.axis('off')

    taxi1 = matplotlib.patches.Rectangle((20*s+3, 5), 14, 7, facecolor="#ffff00", edgecolor="#000000")
    taxi2 = matplotlib.patches.Circle((20*s+6, 5), 2, facecolor="#000000", edgecolor="#000000")
    taxi3 = matplotlib.patches.Circle((20*s+14, 5), 2, facecolor="#000000", edgecolor="#000000")

    ax.add_patch(taxi1)
    ax.add_patch(taxi2)
    ax.add_patch(taxi3)

    plt.show()

In [None]:
plot_grid_state(2, 15, {})

goal is to navigate the taxi on 1-dimensional grid to the green square on the right end of the grid. Two actions that can perform "move right", "move left"
We incur a cost of 1 unit (or reward of -1 unit) for every step we take prior to reaching the green square

## Let's implement simplified Q-learning

In [None]:
# Choose the grid containing 15 cells
n = 15
# initialize the state of the left most cell
s= 0
# Create an empty Q table
Q = {}


# Initialize the Q values for the initial state
Q[(s, 0)] = 0
Q[(s, 1)] = 0

# Loop over each 15 episodes
# Each episode ends when the taxi reached the green cell
episodes = 0
while episodes < 15:
    time.sleep(0.1)

    # Display the current state
    display.clear_output(wait = True)
    plot_grid_state(s, 15, Q)

    # input()

    # If we reached the goal, re-initialize the state
    if s == n-1:
        time.sleep(1)
        episodes += 1
        s = 0

    # Select an action
    if Q[(s,0)] < Q[(s, 1)]:
        a = 0
    elif Q[(s,0)] > Q[(s,1)]:
        a = 1
    else:
        a = random.randint(0, 1)

    # Update our position
    if a == 0:
        s_next = max(0, s-1)
    else:
        s_next = min(s+1, n-1)
    
    # Add Q_next to the table if not yet in the table
    if (s_next, 0) not in Q:
        Q[(s_next, 0)] = 0
        Q[s_next, 1] = 0

    # Update the Q table
    Qmin = min(Q[s_next, 0], Q[(s_next, 1)])

    Q[(s, a)] = 1 + Qmin

    # Set the current state to be the next observed state

    s = s_next

    


--- Cost making a step with avilable options of performing left or right

In [11]:
class QLearner:
    """ This class will allow us to specify a Gymnasium environment and apply
    tabular Q-learning on it.
    """

    def __init__(self, environment, g = 0.98, a=0.05, e = 0.05):
        # Initialize the following:
        # g : The discount factor used in our total discounted reward
        # a : The learning rate for Q-learning
        # e : The epsilon for epsilon-greedy action selection
        self.g = g
        self.a = a
        self.e = e
        self.env = gym.make(environment, render_mode = "rgb_array")

        # Initialize the Q-table
        # If the environment has a terminal state, set its 0 value to zero
        self.Q = {}
        for i in range(self.env.action_space.n):
            self.Q[("done", i)] = 0.0

    def learn(self, n_steps):
        """ 
        This method is called to run Q-learning for n_steps time steps
        """

        # Create local copies of Q-learning parameters
        g = self.g
        a = self.a
        e = self.e

        # Start a new episode and loop n_steps
        done = True
        for k in range(n_steps):
            obs = self.env.reset()[0]

            # If this state is not yet in the Q-table, add it and 
            # initialize values to zero
            for i in range(self.env.action_space.n):
                if (obs, i) not in self.Q:
                    self.Q[(obs, i)] = 0.0

        # Select an action with epsilon-greedy action selection
        if random.random() < e:
            action = self.env.action_space.sample()
        else:
            _, action = max(
                (self.Q[(obs, i)], i) for i in range(self.env.action_space.n)
            )
            # Apply the selected action and observe the reward and next state
            obs_prev = obs
            obs, reward, done, info, other = self.env.step(action)

            # Indicate whether the episode reached the terminal state
            if done is True:
                obs = "done"

            # If the next state is not yet in the Q-table, add it and
            # initialize value to zero
            for i in range(self.env.action_space.n):
                if (obs,i) not in self.Q:
                    self.Q[(obs, i)] = 0.0
            
            # Update the Q value for the previous state and selected action
            maxQ, _ = max(
                (self.Q[(obs, i)], i) for i in range(self.env.action_space.n)
            )
            self.Q[(obs_prev, action)] = (1-a)*self.Q[(obs_prev, action)] + a*(reward + g*maxQ) # smoothing the values
        
    def close(self):
        # Close the environment
        self.env.close()

In [12]:
learner = QLearner("Taxi-v3")
learner.learn(500000)
learner.close()

In [None]:

import matplotlib.patches


action_desc = {
    0 : "Move south (down)",
    1 : "Move north (up)",
    2 : "Move east (right)",
    3 : "Move west (left)",
    4 : "Pickup Passenger",
    5 : "Drop off passenger" 
}

# Create the Taxi environment
env = gym.make('Taxi-v3', render_mode = "rgb_array")

# Initilize the environment and draw the current state
obs = env.reset()[0]
plt.imshow(env.render())
plt.show()


for i in range(200):

    if (obs, 0) not in learner.Q:
        action = env.action_space.sample()
    else:
        _, action = max((learner.Q[(obs, i)],i) for i in range(env.action_space.n))

    obs, reward, done, info, other = env.step(action)

    display.clear_output(wait=True)
    plt.imshow(env.render())

    rect = matplotlib.patches.Rectangle(
        (150, 0), 
        250,
        75,
        facecolor="#999999",
        edgecolor="#000000"
    )
    ax = plt.gca()
    ax.add_patch(rect)
    plt.text(165, 25, f"State: {obs}")
    plt.text(165, 45, f"Action: {action_desc[action]}")
    plt.text(165, 65, f"Reward: {reward}")

    plt.show()
    if done:
        obs = env.reset()[0]

    time.sleep(0.5)
env.close()

We can see that the learned policy now solves the taxi problem optimally in each run.
It is important to note that Q-learning knows nothing a-priori about the overall objective of the task. it simply is selecting actions and observing the resulting immediate rewards and next state. By building up the Q table, we encode a policy that is capable of performing the task

___
We can apply Q-learning algorithm that we implemented to an entirely different task and still learn a policy for performing that task skillfully

Let's applt Q-learning to the Blackjack environment in Gymnasium. In this environment, a player plays a single hand Blackjackagainst the dealer.

In [None]:
env = gym.make("Blackjack-v1", render_mode = "rgb_array")
obs = env.reset()[0]

plt.imshow(env.render())
plt.show()
print(obs)

env.close()

In [16]:
learner = QLearner("Blackjack-v1")
learner.learn(500000)
learner.close()

to evaluate the policy that we learned, we will play fixed number of hands against the dealer and measure the fraction of hands that we win.

In [20]:
def run_simualtion(n_games, policy_func, **kwargs):
    """ 
    Here we provide a reusable function for evaluating Blackjack policies.
    For a given policy, this fucntion will run n_games using the provided policy
    fucntion and return the fraction of games won.
    """
    env =  gym.make("Blackjack-v1", render_mode = "rdb_array")

    wins = 0
    for i in range(n_games):

        obs = env.reset()[0]

        done = False

        while not done:
            action = policy_func(obs, **kwargs)
            obs, reward, done, truncated, info = env.step(action)

        if reward > 0:
            wins += 1
    env.close()
    return wins/n_games

def Q_policy(state, Q):
    """
    This function implements a Blackjack policy from
    a given Q function.
    """
    
    if (state, 0) not in Q:
        return env.action_space.sample()
    elif Q[(state, 0)] > Q[(state, 1)]:
        return 0
    else:
        return 1

In [None]:
run_simualtion(50000, Q_policy, Q = learner.Q)