## Initial steps

In theory, multistage decision problems can be solved by dynamic programming (DP), a method for determining optimal control solutions using Bellman’s principle. Unfortunately, their exact solution is typically computationally intractable. We address and overcome this hurdle using reinforcement learning and approximate dynamic programming. Both produce suboptimal policies/control inputs with, however, sufficiently adequate performance. We offer different solution strategies which vary in their approach to modelling, approximation and optimisation, depending on the individual problem setup: Firstly, in model-free reinforcement learning, an agent/controller learns optimal behaviour by taking actions/inputs and receiving rewards/costs. Secondly, in a stochastic setting, a Markov Decision Process (MDP) can be used to model a discrete-time stochastic control process for situations with partly random, partly controlled outcomes. Typically, the so-called value function and policy are approximated by neural networks, while the weights are determined using Q- and Temporal Difference Learning. Finally, in a deterministic setting, we can employ model-based finite-horizon reinforcement learning, more commonly referred to as Model Predictive Control (MPC).

Machine learning: Within this framework, three distinct approaches of pertinent interest have emerged, each fitting a different problem category:

Supervised learning is applied to problems which have knowledge of “correct answers”. Suitable models include symbolic (genetic programming), kernel (Gaussian process regression, SVMs, SVRs), and tree-based methods (random forest, gradient-boosted trees), smoothing splines (GAMs), and (deep) neural networks, the latter suitable for a wide array of problems such as regression, online learning, image classification, and time series forecasting.

Semi- and self-supervised learning forces models to learn embeddings on labelled data. Successful examples include vision transformers and graph neural networks (geometric cue learning, deep geometric learning, knowledge graphs).

Unsupervised learning tackles problems like clustering into unknown classes (K-means, SVC) and dimensionality reduction (PCA, LDA).
Selecting and training the right model for the right task allows us to solve problems such as static or dynamic regression, predictive maintenance, and outlier or anomaly detection.

In [1]:
#Download these libraries if you don't have them
import gymnasium as gym
import numpy as np
import pygame

We use the Farama Foundation's [Gymnasium](https://github.com/Farama-Foundation/Gymnasium): it's actually a fork and continuation of the OpenAI Gym. Basically, it provides easy and nice-to-use environments for reinforcement learning, perfect for smaller models.<br>
It has environments for `pygame` games, which are python-based games that are easy to use and understand.

Two we will use are:
- `CartPole-v1`: a classic control problem, where the goal is to balance a pole on a cart. Very common not just in reinforcement learning, but also in control theory.
- `FrozenLake-v1`: a simple grid world environment, where the goal is to reach the goal without falling into a hole (lake).

Gifs of the two:

<div>
    <img src="https://www.gymlibrary.dev/_images/cart_pole.gif" width="600"/>
    <img src="https://gymnasium.farama.org/_images/frozen_lake.gif" width="400"/>
</div>

To get the environment (with states, etc.), we just need to call the `gym.make()` function with the name of the environment.

If we use `render_mode='human'`, we can see the environment in a pop-up window, however this slows learning. We will only use it for demonstration of the final model.

We can close the environment with the `env.close()` function.

## CartPole

In [2]:
env = gym.make("CartPole-v1")
#env = gym.make("CartPole-v1", render_mode = 'human')

observation, info = env.reset(seed=42) #Puts the pole and cart in a fair starting position

Let's see some things about the environment:

In [3]:
observation

array([ 0.0273956 , -0.00611216,  0.03585979,  0.0197368 ], dtype=float32)

The 4 values of the state of the `CartPole-v1` environment are:
- `cart position`: -4.8 to 4.8
- `cart velocity`: Could be any value, derivative of position (-inf to +inf)
- `pole angle`: -41.8 to 41.8
- `pole velocity at tip`: Any value

In [4]:
#Observation space limits
print(env.observation_space) #Could also use: print(env.observation_space.high, env.observation_space.low) #
 
#Action space: tells the type of space (discrete or continuous) and the number of actions possible
print(env.action_space)
 
#All info, includes stuff like env.spec.reward_threshold, env.spec.max_episode_steps, etc.
print(env.spec) #for prettier print: pprint.pprint(vars(env.spec)) 


Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
Discrete(2)
EnvSpec(id='CartPole-v1', entry_point='gymnasium.envs.classic_control.cartpole:CartPoleEnv', reward_threshold=475.0, nondeterministic=False, max_episode_steps=500, order_enforce=True, autoreset=False, disable_env_checker=False, apply_api_compatibility=False, kwargs={}, namespace=None, name='CartPole', version=1, additional_wrappers=(), vector_entry_point='gymnasium.envs.classic_control.cartpole:CartPoleVectorEnv')


An episode terminates under the following conditions:

* Pole angle absolute value becomes greater than 24° (+-24°) == +-0.4188 radians. Meaning: the pole is not standing upright enough.
* Cart position absolute value is greater than 4.8 units, meaning the cart is too far from the center ("would fall off")
* If the number of steps in an episode is greater than 500 (v1).

Let's do an example run: for 200 iterations (episodes), we will take a random action and see how the environment behaves. If we get into an ending possition (terminated episode), we just reset the environment and continue.

Watch the pop-up window!

In [5]:
env = gym.make("CartPole-v1", render_mode = 'human')

observation, info = env.reset(seed=42)
for _ in range(200):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        observation, info = env.reset()

env.render()


We can close the environment anytime with `env.close()`. Before we close it: we can just make an action ourselves:

In [6]:
env.step(0) #0: Move left, 1: move right

(array([-0.13156797, -1.3455839 ,  0.02740329,  1.5653288 ], dtype=float32),
 1.0,
 False,
 False,
 {})

In [7]:
env.close()

What is the Q-function, what are Q-values?

The Q-function is a function that takes a state (of the environment) and an action and returns a value; the Q-value of a state and action: this is the expected reward you're supposted to get in the long run by choosing this function. We do not have these Q-values, we have to learn them by letting the algorithm play the game and learn it by itself. This value is the expected reward of taking that action in that state, and then following the optimal policy.

The Q-function is defined as:

$$Q(s, a) = r + \gamma \max_{a'} Q(s', a')$$

The discounted total reward on the long run from one state, from time $t$: $R_t = r_t + \gamma r_{t+1} + \gamma^2 r_{t+2} + ... = \sum_{k=0}^{\infty} \gamma^k r_{t+k+1}$. Typically, $0 \leq \gamma \leq 1$, because this way, the sum converges.

This way, we could define the Q-function this way:

$$Q(s_t, a_t) = \mathbb{E}[R_t|s_t, a_t] $$

the expected value of the total reward, given that we are in state $s_t$ and take action $a_t$.





What are the steps of Q-learning?

1. Create the Q-table (initialize it, e.g. with zeros).<br>
    - What would be the size? Simply said it is the amount of states times the amount of actions. However, we have "infinite amount of states" (because the state space is continuous), so we have to discretize somewhat the state space. For position, angle this is easy, for velocity we will limit the velocities to a certain range: -4 to 4.<br>
    We bin each state into 30 bins, so we have 30^4 states. This is a lot, but doesn't seem to cause an issue in performance.

2. Start an iteration of episodes. <br>
 2.1. Reset the environment.<br>
 2.2. Choose an action (e.g. epsilon-greedy).<br>
    -The way we choose the action is the "epsilon-greedy" method: with probability epsilon, we choose a random action, otherwise we choose the action with the highest Q-value (expected) for the current state.<br>
 2.3. Take the action and observe the reward and the next state.<br>
 2.4. Update the Q-value of the state-action pair.<br>
    - This is done with the Q-learning update rule: Q(s, a) = Q(s, a) + alpha * (reward + gamma * max(Q(s', a')) - Q(s, a)).<br>
    - Alpha is the learning rate, gamma is the discount factor (of future rewards), s is the current state, a is the action, s' is the next state, a' is the action in the next state.<br>
    - The Q-value of the next state is the maximum Q-value of the next state (we learn the optimal policy).<br>
 2.5. Two cases:
    - If the state returns a termination, skip to the next episode.<br>
    - If not, go to the next state, and repeat the process.<br>
This is done until the episode ends.

3. We finish with a well-updated Q-table, that we can use to test the model in practice.


In [8]:
env = gym.make("CartPole-v1")
num_bins = 30
state_space_bins = [np.linspace(-4.8, 4.8, num_bins),
                    np.linspace(-4, 4, num_bins),
                    np.linspace(-0.418, 0.418, num_bins),
                    np.linspace(-4, 4, num_bins)]

The Q-table size: states x actions = 30^4 x 2

In [9]:
[num_bins]*env.observation_space.shape[0] + [env.action_space.n]

[30, 30, 30, 30, 2]

Let's define the functions for the model. (**Warning**: We purposefully make a mistake in the model for educational purposes)

Let's go row by row in the learning function:
1) We create the Q-table, initialize it with zeros
2) We start the iteration of episodes
3) We reset the environment. We get the state, then discretize it (`np.digitize`)
4) We iterate through the steps of the episode (time steps)
5) We choose an action (epsilon-greedy)
6) Make the action, gather the new state, reward, and if the episode is done (truncated and info are irrelevant for us) + discretize the new state
7) Get the old Q-value, calculate the new Q-value, and update the Q-table
8) If we terminated, or finished with steps, go to the next episode, otherwise go to the next state and repeat from 5).
9) Repeat from 3) until we finish the iterations of episodes.
10) Return the Q-table.

Alpha down below is for the learning rate, it isn't necessary but because it helps distribute the value gathered be higher when we are closer to the middle of the screen, it helps the model stay more in the middle of the screen. (Gamma is as said above, the discount factor of future rewards).

In [10]:
def choose_action_epsilon_greedy(state, q_table, env, epsilon):
    if np.random.uniform(0, 1) <= epsilon:
        action = env.action_space.sample()  #Explore
    else:
        action = np.argmax(q_table[state])  #Exploit
    return action


def q_learning_continuous(env, state_space_bins, alpha=0.5, gamma=0.95, epsilon=0.1, episode_count=5000, time_steps=200, q_table=None):
    num_bins = [len(bins) for bins in state_space_bins]
    if q_table is None:
        q_table = np.zeros(num_bins + [env.action_space.n])

    for i_episode in range(episode_count):
        state = env.reset()[0]
        state = tuple([np.digitize(s, bins)-1 for s, bins in zip(state, state_space_bins)]) #-1 to avoid 1-30 indexing

        for t in range(time_steps):
            action = choose_action_epsilon_greedy(state, q_table, env, epsilon)
            next_state, reward, done, truncated, info = env.step(action)
            next_state = tuple([np.digitize(s, bins)-1 for s, bins in zip(next_state, state_space_bins)]) #-1 to avoid 1-30 indexing
            
            old_value = q_table[state + (action,)]
            next_max = np.max(q_table[next_state])
            new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max) #If done, we just get the reward without future rewards
            q_table[state + (action,)] = new_value

            if done:
                break
            state = next_state

    return q_table

Also prepare to showcase our model on the display:

In [11]:
def showcase_model_continuous(env, q_table, state_space_bins, limit=1000):
    state = env.reset()
    state = state[0] if isinstance(state, tuple) else state
    state = tuple([np.digitize(s, bins) for s, bins in zip(state, state_space_bins)])
    i = 0
    while i < limit:
        action = np.argmax(q_table[state])
        next_state, reward, done, truncated, info = env.step(action)
        next_state = next_state[0] if isinstance(next_state, tuple) else next_state
        next_state = tuple([np.digitize(s, bins) for s, bins in zip(next_state, state_space_bins)])

        env.render()

        if done:
            state = env.reset()
            state = state[0] if isinstance(state, tuple) else state
            state = tuple([np.digitize(s, bins) for s, bins in zip(state, state_space_bins)])
        else:
            state = next_state
        i += 1

    env.close()

In [12]:
env = gym.make("CartPole-v1")
q_table200 = q_learning_continuous(env, state_space_bins, episode_count=200)

In [15]:
env = gym.make("CartPole-v1", render_mode = 'human')
showcase_model_continuous(env, q_table200, state_space_bins)

Whaaaat... Our model is terrible! Why is that? Is it just that we didn't train it enough?

It could be an assumption, but if we try with 1 million episodes the model still does the same thing. The Q-table didn't change after the first few episodes. Why is that?

Well, we need to think deeply how does the model update the Q-table. ...

Let's re-do the updating part of the Q-table:

In [17]:
def q_learning_continuous(env, state_space_bins, alpha=0.5, gamma=0.95, epsilon=0.1, episode_count=5000, time_steps=200, q_table=None):
    num_bins = [len(bins) for bins in state_space_bins]
    if q_table is None:
        q_table = np.zeros(num_bins + [env.action_space.n])

    for i_episode in range(episode_count):
        state = env.reset()[0]
        state = tuple([np.digitize(s, bins)-1 for s, bins in zip(state, state_space_bins)]) #-1 to avoid 1-30 indexing

        for t in range(time_steps):
            action = choose_action_epsilon_greedy(state, q_table, env, epsilon)
            next_state, reward, done, truncated, info = env.step(action)
            next_state = tuple([np.digitize(s, bins)-1 for s, bins in zip(next_state, state_space_bins)]) #-1 to avoid 1-30 indexing
            
            old_value = q_table[state + (action,)]
            
            if done:
                next_max = 0
            else:
                next_max = np.max(q_table[next_state])

            new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max) #If done, we just get the reward without future rewards
            q_table[state + (action,)] = new_value

            if done:
                break
            state = next_state

    return q_table

In [None]:
env = gym.make("CartPole-v1")
q_table200 = q_learning_continuous(env, state_space_bins, episode_count=200)

In [None]:
env = gym.make("CartPole-v1", render_mode = 'human')
showcase_model_continuous(env, q_table200, state_space_bins)

In [None]:
def evaluate_q_table_continuous(env, q_table, state_space_bins, episodes=1000):
    total_reward = 0
    for _ in range(episodes):
        state = env.reset()[0]
        state = tuple([np.digitize(s, bins) for s, bins in zip(state, state_space_bins)])
        done = False
        while not done:
            action = np.argmax(q_table[state])
            next_state, reward, done, _, __ = env.step(action)
            next_state = tuple([np.digitize(s, bins) for s, bins in zip(next_state, state_space_bins)])
            if reward == 0:
                print(f"Next state: {next_state}, reward: {reward}, done: {done}")
            total_reward += reward
            state = next_state
    average_reward = total_reward / episodes
    return average_reward