In [1]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

In [2]:
env = gym.make('CartPole-v1')

#environment characteristics
observation_space = env.observation_space
action_space = env.action_space
print(f"""Observation space:
0- cart position
1- cart velocity
2- pole angle (radians)
3- pole angular velocity
low bounds: {observation_space.low}, 
high bounds: {observation_space.high}, 
shape: {observation_space.shape}, 
type: {observation_space.dtype}""")

print(f"""\nAction space: {action_space.n}
0- push cart to the left
1- push cart to the right""")

Observation space:
0- cart position
1- cart velocity
2- pole angle (radians)
3- pole angular velocity
low bounds: [-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], 
high bounds: [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], 
shape: (4,), 
type: float32

Action space: 2
0- push cart to the left
1- push cart to the right


Conditions for termination: angle over 12 degrees (unstable), cart position more than 2.4 (out of frame), episode length > 200, or solved requirement (avg return is greater than or equal to 195 over 100 consecutive trials).

When we call env.step we get different outputs: observation tells us the state of the environment (current cart position on x, cart velocity, pole angle and pole angular velocity); the reward achieved (1 for every step taken, 0 after termination), info is used for debugging.

In [3]:
#define the agents
class RandomAgent:
    def __init__(self, action_space):
        self.action_space = action_space

    def act(self):
        #randomly select an action
        return self.action_space.sample()
    
class LearningAgent:
    def __init__(self, env, alpha = 0.1, epsilon = 0.9, gamma = 0.99, episodes = 1000, is_random = False, total_episodes_trained = 0, render = False):
        
        self.env = env
        self.alpha = alpha
        self.epsilon = epsilon
        self.gamma = gamma
        #self.bins = bins
        self.episodes = episodes
        self.epsilon_decay = epsilon / episodes
        self.q_table = self._create_q_table()
        self.render = render
        self.is_random = is_random
        self.total_episodes_trained = total_episodes_trained

    def digitize_state(self, state):
        position_bins = np.linspace(-2.4, 2.4, 10)
        velocity_bins = np.linspace(-4, 4, 10)
        angle_bins = np.linspace(-0.2095, 0.2095, 10)
        angular_velocity_bins = np.linspace(-4, 4, 10)
        # Discretize the state using the bins

        new_position = np.digitize(state[0], position_bins)
        new_velocity = np.digitize(state[1], velocity_bins)
        new_angle = np.digitize(state[2], angle_bins)
        new_angular_velocity = np.digitize(state[3], angular_velocity_bins)
        # Return the discretized state as a tuple
        new_state_digitized = [new_position, new_velocity, new_angle, new_angular_velocity]
        return new_state_digitized
    
    def _create_q_table(self):
        position_bins = np.linspace(-2.4, 2.4, 10)
        velocity_bins = np.linspace(-4, 4, 10)
        angle_bins = np.linspace(-0.2095, 0.2095, 10)
        angular_velocity_bins = np.linspace(-4, 4, 10)
        return np.zeros(
            (
                len(position_bins) + 1,
                len(velocity_bins) + 1,
                len(angle_bins) + 1,
                len(angular_velocity_bins) + 1,
                self.env.action_space.n,
            )
        )
    def act(self, state):
        """Selects an action based on the epsilon-greedy policy."""
        discrete_state = self.digitize_state(state)
 
        if self.is_random == True or np.random.uniform(0,1) < self.epsilon:
            #action = self.env.action_space.sample()
            action = np.random.randint(0, 2)
        else:
            action = np.argmax(self.q_table[state[0], state[1], state[2], state[3], :])
       # print(f"Action: {action}")
        return action
    
    def train(self):
        cumulative_reward = []
        for episode in range(self.episodes):
            ep_reward = 0 #initialize episode reward counter
            state, _ = self.env.reset()
            #state = self.digitize_state(self.env.reset()[0])
            state = self.digitize_state(state)
            done = False
            while not done:
                action = self.act(state)
                next_state, reward, done, _ , _ = self.env.step(action)
                next_state = self.digitize_state(next_state)
                ep_reward += reward
                #find the max q-value for the next state
                #max next q-value is found from the maximum of all possible actions
                #this is the agents estimate of the best possible future reward
                #for the next state
                max_next_q = np.max(self.q_table[
                    next_state[0],
                    next_state[1],
                    next_state[2],
                    next_state[3],
                    :,
                ])
                #update the q-table using the q-learning update rule
                #q(s,a) = q(s,a) + alpha * (r + gamma * max(q(s',a')) - q(s,a))
                self.q_table[
                    state[0],
                    state[1],
                    state[2],
                    state[3],
                    action,
                ] += self.alpha * (reward + self.gamma * max_next_q - self.q_table[
                    state[0],
                    state[1],
                    state[2],
                    state[3],
                    action,
                ])
                ep_reward += reward
                state = next_state

            self.epsilon -= self.epsilon_decay
            cumulative_reward.append(ep_reward)
            mean_reward = np.mean(cumulative_reward[-100:])

            if episode % 100 == 0:
                print(
                    f"Episode: {episode + self.total_episodes_trained} Epsilon: {self.epsilon:0.2f}  Mean Rewards {mean_reward:0.1f}"
                )

            if mean_reward >= 195:
                print(f"Mean rewards: {mean_reward} - no need to train model longer")
                break
        self.env.close()
        
    def test(self):
        """Tests the agent in the environment."""
        #store the rewards for each episode
        cumulative_reward = []
        for episode in range(self.episodes):
            ep_reward = 0
            state, _ = self.env.reset()
           # state = self.digitize_state(self.env.reset()[0])
            state = self.digitize_state(state)
            done = False
            while not done:
                action = self.act(state)
                next_state, reward, done, _ , _ = self.env.step(action)
                next_state = self.digitize_state(next_state)
                ep_reward += reward
                state = next_state
            #print(f"Episode: {episode}, Reward: {ep_reward}")
            #create a plot of the rewards
            cumulative_reward.append(ep_reward)
            mean_reward = np.mean(cumulative_reward[-100:])
            if episode % 100 == 0:
                print(
                    f"Episode: {episode + self.total_episodes_trained} Mean Reward: {mean_reward:0.1f}"
                )

        #close the environment
        self.env.close()

       
  


In [None]:
agent = LearningAgent(env, alpha=0.05, epsilon=0.99, gamma=0.99, episodes=1000, render = True)
agent.train()

In [None]:
agent.test()