Esta notebook contiene bloques de código útiles para realizar Q-learning en el entorno "Pendulum"

In [72]:
import numpy as np
from pendulum_env_extended import PendulumEnvExtended
import random 

In [73]:
env = PendulumEnvExtended(render_mode='rgb_array')

Discretización de los estados

In [74]:
x_space = np.linspace(-1, 1, 10)
y_space = np.linspace(-1, 1, 10)
vel_space = np.linspace(-8, 8, 100)
x_space

array([-1.        , -0.77777778, -0.55555556, -0.33333333, -0.11111111,
        0.11111111,  0.33333333,  0.55555556,  0.77777778,  1.        ])

Obtener el estado a partir de la observación

In [75]:
def get_state(obs):
    x, y, vel = obs
    x_bin = np.digitize(x, x_space)
    y_bin = np.digitize(y, y_space)
    vel_bin = np.digitize(vel, vel_space)
    return x_bin, y_bin, vel_bin

In [76]:
state = get_state(np.array([-0.4, 0.2, 0.3]))
state

(3, 6, 52)

Discretización de las acciones

In [77]:
actions = list(np.linspace(-2, 2, 10))
actions

[-2.0,
 -1.5555555555555556,
 -1.1111111111111112,
 -0.6666666666666667,
 -0.22222222222222232,
 0.22222222222222232,
 0.6666666666666665,
 1.1111111111111107,
 1.5555555555555554,
 2.0]

In [78]:
def get_sample_action():
    return random.choice(actions)

Inicilización de la tabla Q

In [79]:
Q = np.zeros((len(x_space) + 1, len(y_space) + 1, len(vel_space) + 1, len(actions)))
Q

array([[[[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
    

Obtención de la acción a partir de la tabla Q

In [80]:
def optimal_policy(state, Q):
    action = actions[np.argmax(Q[state])]
    return action

Epsilon-Greedy Policy

In [81]:
def epsilon_greedy_policy(state, Q, epsilon=0.1):
    explore = np.random.binomial(1, epsilon)
    if explore:
        action = get_sample_action()
    # exploit
    else:
        action = optimal_policy(state, Q)
        
    return action

Ejemplo de episodio 

In [82]:
def get_state(obs):
    x, y, vel = obs
    x_bin = np.digitize(x, x_space)
    y_bin = np.digitize(y, y_space)
    vel_bin = np.digitize(vel, vel_space)
    return x_bin, y_bin, vel_bin

Implemento Q Learning:

In [83]:
def q_learning(env, num_episodes=5000, alpha=0.1, gamma=0.99, epsilon_start=1.0, epsilon_end=0.1, epsilon_decay=0.999):
    Q = np.zeros((len(x_space) + 1, len(y_space) + 1, len(vel_space) + 1, len(actions)))
    epsilon = epsilon_start
    
    for episode in range(num_episodes):
        obs, _ = env.reset()
        state = get_state(obs)
        done = False
        total_reward = 0
        
        while not done:
            action = epsilon_greedy_policy(state, Q, epsilon)
            action_idx = actions.index(action)
            real_action = np.array([action])
            obs, reward, done, _, _ = env.step(real_action)
            next_state = get_state(obs)
            next_action_idx = np.argmax(Q[next_state])
            td_target = reward + gamma * Q[next_state][next_action_idx]
            td_delta = td_target - Q[state][action_idx]
            Q[state][action_idx] += alpha * td_delta
            state = next_state
            total_reward += reward
        
        epsilon = max(epsilon_end, epsilon * epsilon_decay)  # Decaer epsilon
        
        if (episode + 1) % 100 == 0:
            print(f"Episode {episode + 1}/{num_episodes} - Total Reward: {total_reward}, Epsilon: {epsilon}")
    
    return Q


In [84]:
Q = q_learning(env, num_episodes=10000, alpha=0.1, gamma=0.99, epsilon_start=1.0, epsilon_end=0.1, epsilon_decay=0.999)

Episode 100/10000 - Total Reward: -3948.7889837709085, Epsilon: 0.9047921471137096
Episode 200/10000 - Total Reward: -4515.975054817654, Epsilon: 0.818648829478636
Episode 300/10000 - Total Reward: -3357.307566612331, Epsilon: 0.7407070321560997
Episode 400/10000 - Total Reward: -4633.162693526457, Epsilon: 0.6701859060067403
Episode 500/10000 - Total Reward: -3243.1311690856855, Epsilon: 0.6063789448611848
Episode 600/10000 - Total Reward: -2841.259012801881, Epsilon: 0.5486469074854965
Episode 700/10000 - Total Reward: -5272.465468057276, Epsilon: 0.4964114134310989
Episode 800/10000 - Total Reward: -4345.575508943294, Epsilon: 0.4491491486100748
Episode 900/10000 - Total Reward: -5079.414342690447, Epsilon: 0.4063866225452039
Episode 1000/10000 - Total Reward: -4546.537663265384, Epsilon: 0.3676954247709635
Episode 1100/10000 - Total Reward: -3730.808498069549, Epsilon: 0.33268793286240766
Episode 1200/10000 - Total Reward: -4014.8574392482037, Epsilon: 0.3010134290933992
Episode 13

In [85]:
def evaluate_policy(env, Q, num_episodes=100):
    total_rewards = []
    for episode in range(num_episodes):
        obs, _ = env.reset()
        state = get_state(obs)
        done = False
        total_reward = 0
        
        while not done:
            action_idx = np.argmax(Q[state])
            action = actions[action_idx]
            real_action = np.array([action])
            obs, reward, done, _, _ = env.step(real_action)
            state = get_state(obs)
            total_reward += reward
        
        total_rewards.append(total_reward)
    
    average_reward = np.mean(total_rewards)
    print(f"Average Reward over {num_episodes} episodes: {average_reward}")
    return average_reward


In [86]:
average_reward = evaluate_policy(env, Q)

Average Reward over 100 episodes: -865.8959008248903
