In [1]:
import gymnasium as gym
import numpy as np

In [2]:
#env = gym.make("CartPole-v1", render_mode="human")
env = gym.make("CartPole-v1")
obs = env.reset()
#Carts horizontal position (0.0 = center), velocity (positive means right), the angle of the pole (0.0 = vertical), angular velocity (positive means clockwise)
obs

(array([-0.02291424, -0.02671412, -0.04872506, -0.04269291], dtype=float32),
 {})

In [3]:
# env.render()
env.action_space

Discrete(2)

In [4]:
action = 1
obs, reward, done, truncated, info = env.step(action)
obs, reward, done, truncated, info

(array([-0.02344852,  0.16907144, -0.04957892, -0.3503422 ], dtype=float32),
 1.0,
 False,
 False,
 {})

In [5]:
def basic_policy(observation):
    cart_position, cart_velocity, pole_angle, pole_angular_velocity = observation
    if pole_angle < 0:
        return 0  # Move left
    else:
        return 1  # Move right

In [6]:
totals = []
for episode in range(500):
    observation, info = env.reset()
    episode_rewards = 0
    for step in range(200):
        action = basic_policy(observation)
        observation, reward, done, truncated, info = env.step(action)
        episode_rewards += reward
        if done or truncated:
            break
    totals.append(episode_rewards)

import numpy as np
print("Mean, stddev, Min, Max:", np.mean(totals), np.std(totals), np.min(totals), np.max(totals))
env.close()


Mean, stddev, Min, Max: 41.24 8.727565525391373 24.0 68.0


In [7]:
import tensorflow as tf
from tensorflow import keras

n_inputs = 4 # CartPole observation space == env.observation_space.shape[0]

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="relu", input_shape=[n_inputs]),
    keras.layers.Dense(1, activation="sigmoid")])


2025-11-17 16:02:07.336144: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2025-11-17 16:02:07.408406: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-11-17 16:02:08.888851: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
2025-11-17 16:02:10.347573: E external/local_xla/xla/stream_executor/cuda/cuda_platform.cc:51] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: UNKNOWN ERROR (303)


In [8]:
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        # probability of taking action 0 (left)
        left_proba = model(obs[np.newaxis])
        #print("Left proba:", left_proba.numpy())
        # action is 0 (left) with probability left_proba
        action = (tf.random.uniform([1,1]) > left_proba)
        #print("Action:", action.numpy())
        # target is 1 if action is 0 (left), else 0
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        # compute the loss. 
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    # Get the gradients of the loss w.r.t. the model's trainable variables
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, truncated, info = env.step(int(action[0,0].numpy()))
    return obs, reward, done, grads

In [9]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs, info = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done or truncated:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads
    

In [10]:
def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    for step in range(len(rewards)-2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_factor
    return discounted

In [11]:
def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor) for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards]

In [12]:
discount_rewards([10, 0, -50], discount_factor=0.8)

array([-22, -40, -50])

In [13]:
discount_and_normalize_rewards([[10, 0, -50], [10, 20]], discount_factor=0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [14]:
optimizer = keras.optimizers.Adam(learning_rate=0.01)
loss_fn = keras.losses.BinaryCrossentropy()

In [16]:
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95  

for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(env, n_episodes_per_update, n_max_steps, model, loss_fn)
    all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_factor)
    
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))