In [None]:
import gymnasium as gym
import numpy as np 
# import time
import matplotlib.pyplot as plt

# Q-Learning Process

### Functions

function to generate a smooth plot

In [3]:
def average_window_filter(data, window_size):
    if window_size % 2 == 0:
        window_size += 1
    pad = window_size // 2
    padded_data = np.pad(data, (pad, pad), mode='edge')
    smoothed_data = np.zeros_like(data)
    for i in range(len(data)):
        smoothed_data[i] = np.mean(padded_data[i:i + window_size])
    return smoothed_data

(non optimized) function to generate bin indices for discretization of continuous state variables

In [4]:
def show_which_bin(state, state_space):
    indices = []
    disc_state = []
    st = state[0] if type(state) == type(()) else state  # because env.reset returns a tuple but env.step returns observations ... ABSOLUTELY WEIRD
    for idx, obs in enumerate(st): 
        for jdx, bin_low in enumerate(state_space[idx]): 
            if obs < bin_low:
                indices.append(jdx)
                disc_state.append(bin_low)
                break
        else:
            indices.append(state_space[idx].shape[0]-1)
            disc_state.append(state_space[idx][-1])
    return disc_state, indices

#### function to implement Q-learning

1. select action with epsilon-greedy 
2. perform the action on the environment and receive new state and reward
3. update Q table with Bellman's equation

Note: epsilon is decaying as the agent trains, so exploit is being preferred as episodes go on

In [10]:
def learn(env, Q_table, state_space, EPISODES, EPSILON, LEARNING_RATE, DISCOUNT_FACTOR, verbose=True):
    all_rewards = []
    for episode in range(EPISODES):
        episode_reward = 0
        state = env.reset()
        _, state_indices = show_which_bin(state, state_space)
        terminated = False
        while not terminated:
            if np.random.rand() < EPSILON:
                action = np.random.randint(0, env.action_space.n)
            else:  
                action = np.argmax(Q_table[:, state_indices[0], state_indices[1], state_indices[2], state_indices[3]]) 

            new_state, reward, terminated, truncated, info = env.step(action)
            _, next_state_indices = show_which_bin(new_state, state_space) 
            terminated = terminated or truncated

            new_Q = LEARNING_RATE * (reward + DISCOUNT_FACTOR * (np.max(Q_table[:, next_state_indices[0], next_state_indices[1], next_state_indices[2], next_state_indices[3]]) - Q_table[action, state_indices[0], state_indices[1], state_indices[2], state_indices[3]]))
            Q_table[action, state_indices[0], state_indices[1], state_indices[2], state_indices[3]] += new_Q
            
            if terminated:
                break

            state = new_state
            state_indices = next_state_indices
            episode_reward += reward
        if verbose:
            print("episode:", episode, '\treward:', episode_reward, '\tepsilon:', EPSILON) 
        EPSILON *= 0.9999
        all_rewards.append(episode_reward)
    return Q_table, all_rewards

### Run

create an environment

In [None]:
env = gym.make("CartPole-v1")
print(env.action_space.n)
print(env.observation_space.shape)  # [low values, high values], shape, type
print(env.observation_space)

create (discrete) state space and zero Q_table (with all possible states and coupled actions) 

The Q-table holds the Q-values for each state-action pair. For continuous state spaces like CartPole, the state space is discretized into bins (state space discretization). This makes it possible to represent the Q-values in a table. The actions are the two possible movements (left or right).
For CartPole, you could discretize the state space into grids for each of the four state variables (position, velocity, angle, angular velocity).

In [None]:
n_bins = 20

state_space = np.array([
    np.linspace(env.observation_space.low[0], env.observation_space.high[0], n_bins),   # cart position
    np.linspace(-5, 5, n_bins),                                                         # cart velocity
    np.linspace(env.observation_space.low[2], env.observation_space.high[2], n_bins),   # pole angle (rad)
    np.linspace(-1, 1, n_bins)                                                          # pole angular velocity
])  
Q_table = np.zeros([2] + state_space.shape[0] * [state_space.shape[1]])
Q_table.shape

set parameters

In [8]:
LEARNING_RATE = 0.1
DISCOUNT_FACTOR = 0.95
EPSILON = 0.5

EPISODES = 50000

run the learning function

In [None]:
filled_Q_table, final_results = learn(env, Q_table, state_space, EPISODES, EPSILON, LEARNING_RATE, DISCOUNT_FACTOR, verbose=True)
final_results_smooth = (average_window_filter(final_results, window_size=400))

plot the results

In [None]:
plt.plot(final_results)
plt.plot(final_results_smooth)

# Run the agent and save the video

In [None]:
import cv2

In [None]:
def run_episodes_and_create_video(env, Q_table, state_space, num_episodes, output_file, fps=30):
    frames = []  
    for episode in range(num_episodes):
        state = env.reset()
        _, state_indices = show_which_bin(state, state_space)
        terminated = False
        while not terminated:
            frame = env.render()
            frames.append(frame)
            action = np.argmax(Q_table[:, state_indices[0], state_indices[1], state_indices[2], state_indices[3]])
            new_state, _, terminated, truncated, _ = env.step(action)
            terminated = terminated or truncated
            _, state_indices = show_which_bin(new_state, state_space)
    env.close()  
    height, width, _ = frames[0].shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
    out = cv2.VideoWriter(output_file, fourcc, fps, (width, height))
    for frame in frames:
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        out.write(frame)
    out.release()  
    print(f"Video saved to {output_file}")

generate two videos for the trained agent and a random one

In [None]:
env = gym.make('CartPole-v1', render_mode="rgb_array")  

run_episodes_and_create_video(env, filled_Q_table, state_space, num_episodes=10, output_file="cartpole_output.mp4")

random_Q_table = np.random.random(([2] + state_space.shape[0] * [state_space.shape[1]])) 
run_episodes_and_create_video(env, random_Q_table, state_space, num_episodes=250, output_file="cartpole_output_random.mp4")