In [1]:
import gym
import time
from math import sin, cos, radians, log10
from sklearn.preprocessing import KBinsDiscretizer
import numpy as np
from ipywidgets import IntProgress
from IPython.display import display
from lib.cartpoles import CartPoleSystem, CartPolesEnv
from lib.colors import Colors
from matplotlib import pyplot as plt

In [2]:
dt = 0.01
g = 9.81

system = CartPoleSystem(
    (0.0, 0.5, 0.05, -0.8, 0.8, Colors.red),
    (0.05, 0.05, 0.01, 0.5, 0.05, -24.0, 24.0, Colors.black),
    [
        (radians(10), 0.2, 0.2, 0.005, Colors.green),
    ],
    g,
    "rk4"
)

env = CartPolesEnv(system, dt, g)
env.observation_space.shape

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


(4,)

In [3]:
n_obs_bins = 11
n_action_bins = 3
max_velocity = 5

obs_low = np.array(env.observation_space.low)
obs_high = np.array(env.observation_space.high)

obs_low = np.hstack([np.array([obs_low[0], -max_velocity]), np.tile([0, -max_velocity], system.num_poles)])
obs_high = np.hstack([np.array([obs_high[0], max_velocity]), np.tile([radians(360), max_velocity], system.num_poles)])

action_low = np.array(env.action_space.low)
action_high = np.array(env.action_space.high)

print("Low", obs_low)
print("High", obs_high)

Low [-0.80000001 -5.          0.         -5.        ]
High [0.80000001 5.         6.28318531 5.        ]


In [4]:
def refine_state(state):
    refined = state
    return refined

def obs_discretizer(obs) -> tuple[int,...]:
    est = KBinsDiscretizer(n_bins=n_obs_bins, encode="ordinal", strategy="uniform")
    est.fit([obs_low, obs_high])
    return tuple(map(int, est.transform([obs])[0]))

def action_discretizer(action) -> tuple[int,...]:
    est = KBinsDiscretizer(n_bins=n_action_bins, encode="ordinal", strategy="uniform")
    est.fit([action_low, action_high])
    return tuple(map(int, est.transform([action])[0]))

def action_undiscretizer(action):
    est = KBinsDiscretizer(n_bins=n_action_bins, encode="ordinal", strategy="uniform")
    est.fit([action_low, action_high])
    return est.inverse_transform([action])[0]

In [5]:
Q_shape = tuple([n_obs_bins for n in range(obs_low.shape[0])] + [n_action_bins for n in range(action_low.shape[0])])
Q_table = np.random.uniform(low=-2, high=0, size=Q_shape)
print("Shape", Q_table.shape)

save = False

if save:
    with open('q_table.npy', 'wb') as f:
        np.save(f, Q_table)
    print("Successfully saved Q_table")

Shape (11, 11, 11, 11, 3)


In [6]:
def policy(state):
    return (np.argmax(Q_table[state]),)

In [7]:
def new_Q_value(reward: float, new_state, discount_factor=1.0) -> float:
    future_optimal_value = np.max(Q_table[new_state])
    learned_value = reward + discount_factor * future_optimal_value
    return learned_value

In [8]:
read = True
save = True

if read:
    with open('q_table.npy', 'rb') as f:
        Q_table = np.load(f)
    print("Successfully read Q_table")


alpha = learning_rate = 0.1
gamma = discount = 0.95
epsilon = exploration_rate = 1

n_episodes = 10000
show_every = 100
save_every = 100

start_epsilon_decay = 1
end_epsilon_decay = 4000
epsilon_decay_value = epsilon/(end_epsilon_decay-start_epsilon_decay)

progress = IntProgress(min=0, max=n_episodes) # instantiate the bar
display(progress) # display the bar

ep_rewards = []
aggr_ep_rewards = {
    "ep": [],
    "avg": [],
    "min": [],
    "max": []
}

for e in range(n_episodes):
    progress.value += 1
    obs, _ = env.reset() 
    done = False
    state = obs_discretizer(refine_state(obs))

    ep_reward = 0
    
    while not done:
        action = policy(state)
        exploring = False

        if np.random.random() < epsilon:
            action = (np.random.randint(0, n_action_bins),)
            exploring = True
        
        action_continuous = action_undiscretizer(action)
        obs, reward, done, msg, _ = env.step(action_continuous)
        ep_reward += reward

        new_state = obs_discretizer(refine_state(obs))

        if not done:
            Q_future_max = np.max(Q_table[new_state + action])
            Q_current = Q_table[new_state + action]

            Q_new = (1-alpha) * Q_current + alpha * (reward + gamma * Q_future_max)
            Q_table[state + action] = Q_new

        state = new_state

        if not e % show_every:
            env.render([
                "",
                f"Episode: {e}",
                f"Reward: {reward}",
                f"Exploration rate: {round(epsilon, 3)}",
                f"Exploring: {exploring}"
            ])
            time.sleep(dt)

        if msg["won"]:
            print(f"Won! on episode {e}")

    if end_epsilon_decay >= e >= start_epsilon_decay:
        epsilon -= epsilon_decay_value

    ep_rewards.append(ep_reward)

    if not e % save_every:
        average_reward = sum(ep_rewards[-save_every:])/len(ep_rewards[-save_every:])
        min_reward = min(ep_rewards[-save_every:])
        max_reward = max(ep_rewards[-save_every:])
        aggr_ep_rewards["ep"].append(e)
        aggr_ep_rewards["avg"].append(average_reward)
        aggr_ep_rewards["min"].append(min_reward)
        aggr_ep_rewards["max"].append(max_reward)

        print(f"Episode: {e}, avg: {average_reward}, min: {min_reward} max: {max_reward}")
        
        if save:
            with open('q_table.npy', 'wb') as f:
                np.save(f, Q_table)
            print("Successfully saved Q_table")
        
env.close()

plt.plot(aggr_ep_rewards["ep"], aggr_ep_rewards["avg"], label="avg")
plt.plot(aggr_ep_rewards["ep"], aggr_ep_rewards["min"], label="min")
plt.plot(aggr_ep_rewards["ep"], aggr_ep_rewards["max"], label="max")
plt.legend(loc=4)
plt.show()

if save:
    with open('q_table.npy', 'wb') as f:
        np.save(f, Q_table)
    print("Successfully saved Q_table")

Successfully read Q_table


IntProgress(value=0, max=10000)

Episode: 0, avg: -630.0, min: -630 max: -630
Successfully saved Q_table
