In [1]:
!pip install pillow



In [None]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.animation import PillowWriter  # Import for saving as GIF

# Create the MountainCar-v0 environment with render_mode='rgb_array' and new_step_api=True
env = gym.make('MountainCar-v0', render_mode='rgb_array', new_step_api=True)

# Hyperparameters
learning_rate = 0.001
gamma = 0.99
entropy_coeff = 0.01  # Coefficient for entropy regularization
clip_epsilon = 0.2  # PPO clip parameter

# Define the policy network
input_shape = env.observation_space.shape[0]
output_shape = env.action_space.n

def build_policy_network():
    inputs = layers.Input(shape=(input_shape,))
    x = layers.Dense(64, activation='relu')(inputs)
    x = layers.Dense(output_shape, activation='softmax')(x)
    return keras.Model(inputs=inputs, outputs=x)

policy_network = build_policy_network()

optimizer = keras.optimizers.Adam(learning_rate)

# Training loop
num_episodes = 10
num_steps = 2  # Maximum number of steps per episode
episode_rewards = []

for episode in range(num_episodes):
    state = env.reset()
    episode_states, episode_actions, episode_rewards_episode, episode_probs = [], [], [], []

    for t in range(num_steps):
        # Forward pass
        action_probs = policy_network.predict(state.reshape(1, -1))[0]
        action = np.random.choice(output_shape, p=action_probs)
        result = env.step(action)
        observation, reward, done, _ = result[0], result[1], result[2], result[3]

        # Use observation as the next state
        next_state = observation

        # Store state, action, reward, and action probabilities
        episode_states.append(state)
        episode_actions.append(action)
        episode_rewards_episode.append(reward)
        episode_probs.append(action_probs)

        if done:
            break

        state = next_state

    # Compute discounted rewards
    discounted_rewards = []
    cumulative_reward = 0
    for r in episode_rewards_episode[::-1]:
        cumulative_reward = r + gamma * cumulative_reward
        discounted_rewards.insert(0, cumulative_reward)
    discounted_rewards = np.array(discounted_rewards)

    # Normalize discounted rewards
    discounted_rewards -= np.mean(discounted_rewards)
    discounted_rewards /= np.std(discounted_rewards)

    # Compute advantages for PPO
    advantages = discounted_rewards

    with tf.GradientTape() as tape:
        action_probs = policy_network(np.vstack(episode_states))
        action_masks = tf.one_hot(episode_actions, output_shape)
        selected_action_probs = tf.reduce_sum(action_masks * action_probs, axis=1)

        # Compute entropy for entropy regularization
        entropy = -tf.reduce_sum(action_probs * tf.math.log(action_probs + 1e-20), axis=1)

        # Compute policy loss with PPO clipping
        selected_episode_probs = tf.reduce_sum(action_masks * episode_probs, axis=1)
        ratio = selected_action_probs / selected_episode_probs
        surrogate_objective = tf.minimum(
            advantages * ratio,
            tf.clip_by_value(ratio, 1.0 - clip_epsilon, 1.0 + clip_epsilon) * advantages
        )
        policy_loss = -tf.reduce_mean(surrogate_objective)

        # Compute entropy loss
        entropy_loss = -tf.reduce_mean(entropy)

        # Total loss
        total_loss = policy_loss + entropy_coeff * entropy_loss

    # Update policy network
    grads = tape.gradient(total_loss, policy_network.trainable_variables)
    optimizer.apply_gradients(zip(grads, policy_network.trainable_variables))

    episode_rewards.append(sum(episode_rewards_episode))

    if episode % 100 == 0:
        print(f"Episode {episode}, Total Reward: {episode_rewards[-1]:.2f}")

# Visualization of the trained agent and save as a video (MP4 format)
def visualize_agent(save_video=True):
    state = env.reset()
    frames = []

    while True:
        frames.append(env.render(mode='rgb_array'))
        action_probs = policy_network.predict(state.reshape(1, -1))[0]
        action = np.argmax(action_probs)
        result = env.step(action)
        observation, _, done, _ = result[0], result[1], result[2], result[3]

        # Use observation as the next state
        next_state = observation

        state = next_state
        if done:
            break

    if save_video:
        writer = PillowWriter(fps=20)
        fig, ax = plt.subplots(figsize=(8, 6))
        patch = ax.imshow(frames[0])
        plt.axis('off')

        def animate(i):
            patch.set_data(frames[i])

        anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=50)
        anim.save('trained_agent.mp4', writer=writer)

    return frames

# Uncomment the following lines to visualize the trained agent
# and save the video as "trained_agent.mp4"
frames = visualize_agent(save_video=True)
env.close()

print(f"Average Reward per Episode: {np.mean(episode_rewards):.2f}")


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
