# RL, Gymnasium, Q-Learning - Cart Pole

# 1. Setup and Requirements 

In [1]:
import gymnasium as gym
import numpy as np
import random
import pickle

# 2. CartPole class

In [2]:
class CartPole:
    """
        Wrapper class for CartPole environment

        Attributes:
            _env: The Gym environment for the Cart Pole game.
            _curr_state (np.array): The current state of the environment.
            _isTerminated (bool): Flag indicating whether the current episode has ended.
    """
    def __init__(self, is_learning = False):
        """
        Initializes the CartPole environment

        Args:
            is_learning (bool): Flag to determine if the environment is for learning or visualization.
        """
        # Define whether we want to visualize
        if is_learning:
            self._env = gym.make('CartPole-v1')
        else:
            self._env = gym.make('CartPole-v1', render_mode = "human")
        self._currState = self._env.reset()[0]
        self._isTerminated = False


    def digitize_state(self, state):
        """
        Digitizes the continuous state into discrete values for Q-table.
        
        Args:
            state (np.array): The current state of the environment.

        Returns:
            list: A list representing the digitized state.
        """
        pos_space = np.linspace(-2.4, 2.4, 10)
        vel_space = np.linspace(-4, 4, 10)
        ang_space = np.linspace(-.2095, .2095, 10)
        ang_vel_space = np.linspace(-4, 4, 10)
        
        new_state_p = np.digitize(state[0], pos_space)
        new_state_v = np.digitize(state[1], vel_space)
        new_state_a = np.digitize(state[2], ang_space)
        new_state_av= np.digitize(state[3], ang_vel_space)
        new_state_dig = [new_state_p, new_state_v, new_state_a, new_state_av]
        return new_state_dig

    def do_action(self, action):
       """
        Performs a step in the environment. Gets the values for Observation, reward and checks if the game is over

        Args:
            action (int): an action passed to the environment
        Returns:
            new_state: Discrete state after the action is taken
            reward: Reward basing on the taken action
       """
       new_state, reward, self._isTerminated, _, _ = self._env.step(action)       
       # Update the current state
       self._currState = new_state
       return self.digitize_state(new_state), reward
    
    def reset_env(self):
        """ Resets the environment """
        self._currState = self._env.reset()[0]
        self._isTerminated = False

    def get_current_state(self):
        """ Gets the discrete state of the environment """
        return self.digitize_state(self._currState)
    
    def get_action_space(self):
        """Returns the size of the action space"""
        return self._env.action_space.n
    
    def is_game_over(self):
        """ Returns boolean determining if game is over"""
        return self._isTerminated

# 3. Q Learning Agent

In [3]:
class Q_learning:
    """
        Implementation of Q-learning algorhitm for the CartPole environment.

        Attributes:
            _env (cartPoleEnv): Cart Pole env
            _gamma (float):   The discount factor
            _alpha (float): The learning rate.
            _epsilon (float): The exploration rate.
            _episodes (int): The number of episodes for training
            _is_learning (bool): Flag indicating whether the agent is in learning mode.
            _Q_table (np.array): The Q-table, stores state-action values
    """
    def __init__(self, env, gamma, alpha, epsilon, episodes, isLearning = True):
        """
            Initializes Q-learning agent.

            Works in two ways. When isLearning flag is set True,
            it initializes Q-table as a empty np.array, else it tries to load it from file.
            Args:
                env (cartPoleEnv): The Cart Pole environment.
                gamma (float): The discount factor.
                alpha (float): The learning rate.
                epsilon (float): The exploration rate.
                episodes (int): The number of episodes for training.
                isLearning (bool): Flag to determine if the agent is in learning mode.
        """
        self._env = env
        self._gamma = gamma
        self._alpha = alpha
        self._epsilon = epsilon
        self._episodes = episodes
        self._isLearning = isLearning
        self._decayRate = epsilon / episodes

        if self._isLearning:
            print(f'Learning mode on: training agent on alpha: {self._alpha}, gamma: {self._gamma}, epsilon : {self._epsilon}, with {self._episodes} episodes')
        else:
            print('Visualization mode on')

        # Initialize Q_Table
        if self._isLearning: 
            # State is given as continuous set of variables
            # we need to cut it into pieces to be able to learn
            # The limits here are the limits for our game to be over
            pos_space = np.linspace(-2.4, 2.4, 10)
            vel_space = np.linspace(-4, 4, 10)
            ang_space = np.linspace(-.2095, .2095, 10) #value in rad
            ang_vel_space = np.linspace(-4, 4, 10)
            self.Q_table = np.zeros((len(pos_space)+1, len(vel_space)+1, 
                                    len(ang_space)+1, len(ang_vel_space)+1, self._env.get_action_space())) #11x11x11x11x2
        else:
            #Load the model
            f = open('Q_table.pkl', 'rb')
            self.Q_table = pickle.load(f)
            f.close()

    def policy(self, state):
        """ 
        Epsilon Greedy Policy

        Function works in two modes:
            If isLearning is True, decides on random whether to choose random action or
            the best action according to the Q_table. The higher epsilon, the higher chance of getting random results
            When isLearning is set to False, policy only chooses the values basing on the Q_table.
        
        Args:
            state: Discrete state of the environment
        """
        if self._isLearning and np.random.random() < self._epsilon:
            # Choose an action at random with probability epsilon
            return random.choice([0,1]) # only two actions - left or right
        else:
            # Choose the best action accordin to Q_table with probability 1-epsilon
            return np.argmax(self.Q_table[state[0], state[1], state[2], state[3], :])

    def apply(self):
        """
        Executes Q-learning algorhithm over a specified number of episodes.

        This method runs the Q-learning algorithm, updating the Q-table based on the interactions
        with the environment. It implements an epsilon-greedy policy for action selection and applies 
        temporal difference learning for updating the Q-table.
        Additionally, the method also handles epsilon decay.
         
        For exploration over time and prints out the progress every 100 episodes.

        The method performs the following steps in each episode:
        - Interacts with the environment to obtain states, rewards, and new states.
        - Updates the Q-table using the temporal difference
        - Applies epsilon decay to gradually shift from exploration to exploitation.
        - Tracks and logs the rewards for each episode.

        At the end of the training, the updated Q-table is saved to a file (if in learning mode), 
        and the average reward across all episodes is calculated and printed to the output.
        """

        total_episode_rewards = []  # Rewards of all runs
        for episode in range(self._episodes):
            episode_rewards = [] # rewards for each episode
            rewards = 0
            while not self._env.is_game_over():
                # get the current state
                curr_state = self._env.get_current_state()
                action = self.policy(curr_state)
                next_state, reward = self._env.do_action(action)
                # Choose maximum Q-value for next state
                max_next_value = np.max(self.Q_table[next_state[0], next_state[1], next_state[2], next_state[3], :])
                # Temporal difference update TODO improve readability
                self.Q_table[curr_state[0], curr_state[1], curr_state[2], curr_state[3], action] = self.Q_table[curr_state[0], curr_state[1], curr_state[2], curr_state[3], action] +\
                self._alpha * ( reward + self._gamma * max_next_value -  self.Q_table[curr_state[0], curr_state[1], curr_state[2], curr_state[3], action]) 
                rewards += reward

            # Reset before new episode
            self._env.reset_env()

            # Epsilon Decay rate 
            self._epsilon = self._epsilon - self._decayRate
            
            # Get episode  rewards
            total_episode_rewards.append(rewards)
            mean_rewards = np.mean(total_episode_rewards[len(total_episode_rewards)-100:])
            
            if not self._isLearning:
                # Display results after each episode
                print(f'Episode: {episode} Rewards: {rewards}')
            else:
                # For every 100 display rewards
                if episode % 100 == 0:
                    print(f'Episode: {episode} Rewards: {rewards}  Epsilon: {self._epsilon:0.2f}  Mean Rewards {mean_rewards:0.1f}')
                    total_episode_rewards.append(np.sum(episode_rewards))
            
            # Threshold for rewards
            if mean_rewards >= 1000:
                print(f' Mean rewards: {mean_rewards} - no need to train model longer')
                break
        
        # Save Q table to file
        if self._isLearning:
            f = open('Q_table.pkl','wb')
            pickle.dump(self.Q_table, f)
            f.close()

        # Calculate the mean
        print("Average reward after all episodes: ", np.mean(total_episode_rewards))

# 4. Training the Agent

In [6]:
def main():
    gamma = 0.7 # Discount rate
    alpha = 0.1 # Learning rate
    epsilon = 0.5 # How much we want to explore 
    episodes = 1000 # Number of episodes ; experimented with 5000, 10 000 and 30 000

    isLearning = True # Set to True to train the agent

    cart_pole = CartPole(isLearning)
    agent = Q_learning(cart_pole, gamma, alpha, epsilon, episodes, isLearning)
    agent.apply()

if __name__ == "__main__":
    main()

Learning mode on: training agent on alpha: 0.1, gamma: 0.7, epsilon : 0.5, with 1000 episodes
Episode: 0 Rewards: 10.0  Epsilon: 0.50  Mean Rewards 10.0
Episode: 100 Rewards: 21.0  Epsilon: 0.45  Mean Rewards 21.8
Episode: 200 Rewards: 26.0  Epsilon: 0.40  Mean Rewards 20.4
Episode: 300 Rewards: 17.0  Epsilon: 0.35  Mean Rewards 24.6
Episode: 400 Rewards: 20.0  Epsilon: 0.30  Mean Rewards 26.9
Episode: 500 Rewards: 25.0  Epsilon: 0.25  Mean Rewards 29.1
Episode: 600 Rewards: 25.0  Epsilon: 0.20  Mean Rewards 27.7
Episode: 700 Rewards: 35.0  Epsilon: 0.15  Mean Rewards 30.7
Episode: 800 Rewards: 37.0  Epsilon: 0.10  Mean Rewards 32.9
Episode: 900 Rewards: 32.0  Epsilon: 0.05  Mean Rewards 35.0
Average reward after all episodes:  28.114851485148513


# 5. Visualize

In [8]:
# Copy of previous cell with human render visualisation
def main():
    gamma = 0.7 # Discount rate
    alpha = 0.1 # Learning rate
    epsilon = 0.5 # How much we want to explore 
    episodes = 1000 # Number of episodes ; experimented with 5000, 10 000 and 30 000

    isLearning = False # Set to False to test the trained agent

    cart_pole = CartPole(isLearning)
    agent = Q_learning(cart_pole, gamma, alpha, epsilon, episodes, isLearning)
    agent.apply()

if __name__ == "__main__":
    main()

Visualization mode on
Episode: 0 Rewards: 35.0
Episode: 1 Rewards: 37.0
Episode: 2 Rewards: 40.0
Episode: 3 Rewards: 40.0
Episode: 4 Rewards: 36.0
Episode: 5 Rewards: 34.0
Episode: 6 Rewards: 36.0
Episode: 7 Rewards: 41.0
Episode: 8 Rewards: 31.0
Episode: 9 Rewards: 42.0
Episode: 10 Rewards: 32.0
Episode: 11 Rewards: 30.0
Episode: 12 Rewards: 34.0
Episode: 13 Rewards: 42.0
Episode: 14 Rewards: 38.0
Episode: 15 Rewards: 29.0
Episode: 16 Rewards: 29.0
Episode: 17 Rewards: 35.0
Episode: 18 Rewards: 40.0
Episode: 19 Rewards: 39.0
Episode: 20 Rewards: 34.0
Episode: 21 Rewards: 33.0
Episode: 22 Rewards: 36.0
Episode: 23 Rewards: 31.0
Episode: 24 Rewards: 32.0
Episode: 25 Rewards: 32.0
Episode: 26 Rewards: 33.0
Episode: 27 Rewards: 34.0
Episode: 28 Rewards: 39.0
Episode: 29 Rewards: 38.0
Episode: 30 Rewards: 38.0
Episode: 31 Rewards: 44.0
Episode: 32 Rewards: 41.0
Episode: 33 Rewards: 41.0
Episode: 34 Rewards: 37.0
Episode: 35 Rewards: 41.0
Episode: 36 Rewards: 36.0
Episode: 37 Rewards: 41.0
