In [1]:
import gymnasium as gym
import sklearn
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt

In [2]:
env = gym.make('CartPole-v1', render_mode='rgb_array')

In [3]:
obs, info = env.reset(seed=42)

In [4]:
def plot_environments(env, fig_size=(5, 4)):
    plt.figure(figsize=fig_size)
    img = env.render()
    plt.imshow(img)
    plt.axis('off')
    return img

#plot_environments(env)
#plt.show()

In [5]:
env.action_space

Discrete(2)

In [6]:
action = 1
obs, reward, done, truncated, info = env.step(action)
obs

array([ 0.02727336,  0.18847767,  0.03625453, -0.26141977], dtype=float32)

In [7]:
def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1

totals = []
for episode in range(500):
    episode_reward = 0
    obs, info = env.reset(seed=episode)
    for step in range(200):
        action=basic_policy(obs)
        obs, reward, done, truncated, info = env.step(action)
        episode_reward += reward
        if done or truncated:
            break
    totals.append(episode_reward)

In [8]:
print(np.mean(totals),np.std(totals), np.min(totals), np.max(totals))

41.698 8.389445512070509 24.0 63.0


In [9]:
tf.random.set_seed(42)

model = keras.Sequential([
    keras.layers.Dense(5, activation='relu'),
    keras.layers.Dense(1, activation='sigmoid')
])

In [10]:
def play_one_step(env: gym.Env, obs, model: keras.Sequential, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis])
        action = (tf.random.uniform([1, 1]) > left_proba)
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, terminated, truncated, info = env.step(int(action))
    if terminated:
        env.reset(seed=42)
    return obs, reward, terminated, truncated, grads
    

In [11]:
def play_multiple_episodes(env:gym.Env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs, info = env.reset()
        for step in range(n_max_steps):
            obs, reward, terminated, truncated, grads = play_one_step(
                env, obs, model, loss_fn
            )
            current_rewards.append(reward)
            current_grads.append(grads)
            if terminated or truncated:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)

    return all_rewards, all_grads

In [12]:
def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_factor
    return discounted

def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor) 
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return[(discount_reward - reward_mean)/ reward_std 
           for discount_reward in all_discounted_rewards]

In [13]:
n_iterations = 150
n_episodes_per_iterations = 10
n_max_steps = 200
discount_factor = 0.95

optimizer = keras.optimizers.Adam(learning_rate=0.01)
loss_fn = keras.losses.binary_crossentropy

In [14]:
'''
for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_iterations, n_max_steps, model, loss_fn
    )

    total_rewards = sum(map(sum, all_rewards))
    print(f"\rIteration: {iteration + 1}/{n_iterations},"
          f" mean rewards: {total_rewards / n_episodes_per_iterations:.1f}", end="")


    all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_factor)
    
    all_means_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                for step, final_reward in enumerate(final_rewards)], 
            axis=0
        )
        all_means_grads.append(mean_grads)

    optimizer.apply_gradients(zip(all_means_grads, model.trainable_variables))
'''

'\nfor iteration in range(n_iterations):\n    all_rewards, all_grads = play_multiple_episodes(\n        env, n_episodes_per_iterations, n_max_steps, model, loss_fn\n    )\n\n    total_rewards = sum(map(sum, all_rewards))\n    print(f"\rIteration: {iteration + 1}/{n_iterations},"\n          f" mean rewards: {total_rewards / n_episodes_per_iterations:.1f}", end="")\n\n\n    all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_factor)\n    \n    all_means_grads = []\n    for var_index in range(len(model.trainable_variables)):\n        mean_grads = tf.reduce_mean(\n            [final_reward * all_grads[episode_index][step][var_index]\n             for episode_index, final_rewards in enumerate(all_final_rewards)\n                for step, final_reward in enumerate(final_rewards)], \n            axis=0\n        )\n        all_means_grads.append(mean_grads)\n\n    optimizer.apply_gradients(zip(all_means_grads, model.trainable_variables))\n'

In [15]:
import joblib

#joblib.dump(model, "my_model.pkl")
model = joblib.load("my_model.pkl")

In [16]:
np.random.seed(42)

transition_probabilities = [ # shape=[s, s']
        [0.7, 0.2, 0.0, 0.1],  # from s0 to s0, s1, s2, s3
        [0.0, 0.0, 0.9, 0.1],  # from s1 to s0, s1, s2, s3
        [0.0, 1.0, 0.0, 0.0],  # from s2 to s0, s1, s2, s3
        [0.0, 0.0, 0.0, 1.0]]  # from s3 to s0, s1, s2, s3

n_max_steps = 1000  # to avoid blocking in case of an infinite loop
terminal_states = [3]

def run_chain(start_state):
    current_state = start_state
    for step in range(n_max_steps):
        print(current_state, end=" ")
        if current_state in terminal_states:
            break
        current_state = np.random.choice(
            range(len(transition_probabilities)),
            p=transition_probabilities[current_state]
        )
    else: 
        print("...", end="")

    print()

for idx in range(10):
    print(f"Run #{idx + 1}: ", end="")
    run_chain(start_state=0)

Run #1: 0 0 3 
Run #2: 0 1 2 1 2 1 2 1 2 1 3 
Run #3: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 
Run #4: 0 3 
Run #5: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 
Run #6: 0 1 3 
Run #7: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 
Run #8: 0 0 0 1 2 1 2 1 3 
Run #9: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 
Run #10: 0 0 0 1 2 1 3 


In [29]:
transition_probabilities = [
    [[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
    [[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]],
    [None, [0.8, 0.1, 0.1], None]
]
rewards = [
    [[10, 0, 0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]],
    [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0], [0.0, 0.0, -50.0]],
    [[0.0, 0.0, 0.0], [40.0, 0.0, 0.0], [0.0, 0.0, 0.0]]
]
possible_action = [[0, 1, 2], [0, 2], [1]]

In [20]:
Q_values = np.full((3, 3), -np.inf)
for state, action in enumerate(possible_action):
    Q_values[state, action] = 0.0

In [25]:
gamma = 0.95

for iteration in range(50):
    Q_prev = Q_values.copy()
    for s in range(3):
        for a in possible_action[s]:
            Q_values[s, a] = np.sum([
                transition_probabilities[s][a][sp] 
                * (reward[s][a][sp] + gamma * np.max(Q_prev[sp]))
            for sp in range(3)])

In [26]:
np.argmax(Q_values, axis=1)

array([0, 2, 1], dtype=int64)

In [30]:
def step(state, action):
    probas = transition_probabilities[state][action]
    next_state = np.random.choice([0, 1, 2], p=probas)
    reward = rewards[state][action][next_state]
    return next_state, reward

def exploration_policy(state):
    return np.random.choice(possible_action[state])

In [None]:
alpha0 = 0.05  #  initial learning rate
decay = 0.05  #  learning rate decay
gamma = 0.9  #  discount factor
state = 0  #  initial state

for iteration in range(10000):
    action = exploration_policy(state)
    next_state , reward = step(state, action)
    next_value = np.max(Q_values[next_state])
    alpha = alpha / (1 + iteration * decay)
    Q_values[state, action] *= alpha
    Q_values[state, action] += alpha * (reward + gamma * next_value)
    state = next_state