In [7]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from stable_baselines3 import PPO

# Step 1: Environment Setup
env = gym.make('CartPole-v1')

In [8]:
# Step 2: Environment Simulation to collect data
def collect_random_data(env, num_episodes=100):
    data = []
    for _ in range(num_episodes):
        obs = env.reset()
        done = False
        while not done:
            action = env.action_space.sample()
            step_result = env.step(action)
            next_obs, reward, done = step_result[:3]
            info = step_result[3] if len(step_result) > 3 else {}
            data.append((obs, action, reward, next_obs, done))
            obs = next_obs
    return data

data = collect_random_data(env)

In [9]:
obs, action, reward, next_obs, done=data[0]
obs[0]

array([ 0.00731177, -0.01372139, -0.00953324,  0.02445259], dtype=float32)

In [10]:
# Step 3: Model Learning (Learn the dynamics of the environment)
class DynamicsModel(nn.Module):
    def __init__(self, obs_size, action_size):
        super(DynamicsModel, self).__init__()
        self.fc1 = nn.Linear(obs_size + action_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, obs_size + 1)  # Predict next_obs and reward

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

obs_size = env.observation_space.shape[0]
action_size = env.action_space.n

dynamics_model = DynamicsModel(obs_size, action_size)
optimizer = optim.Adam(dynamics_model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()

def train_dynamics_model(data, model, optimizer, loss_fn, epochs=10):
    for epoch in range(epochs):
        losses = []
        for obs, action, reward, next_obs, done in data:
            obs = np.array(obs[0]) if isinstance(obs, tuple) else np.array(obs)
            next_obs = np.array(next_obs[0]) if isinstance(next_obs, tuple) else np.array(next_obs)
            action_onehot = np.zeros(action_size)
            action_onehot[action] = 1
            input_data = np.concatenate([obs, action_onehot])
            input_tensor = torch.FloatTensor(input_data).unsqueeze(0)
            target_tensor = torch.FloatTensor(np.concatenate([next_obs, [reward]])).unsqueeze(0)

            optimizer.zero_grad()
            prediction = model(input_tensor)
            loss = loss_fn(prediction, target_tensor)
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {np.mean(losses)}")

train_dynamics_model(data, dynamics_model, optimizer, loss_fn)

Epoch 1/10, Loss: 0.013302328201539043
Epoch 2/10, Loss: 0.0023811238704294574
Epoch 3/10, Loss: 0.0013715574794935985
Epoch 4/10, Loss: 0.0013444001109254057
Epoch 5/10, Loss: 0.0008250438280797269
Epoch 6/10, Loss: 0.0008949748313825888
Epoch 7/10, Loss: 0.0009210502039504032
Epoch 8/10, Loss: 0.0007900766803542896
Epoch 9/10, Loss: 0.0008016386640197829
Epoch 10/10, Loss: 0.0009191531613953992


In [11]:

# Step 4: Policy Learning (Using PPO from Stable Baselines3)
model = PPO('MlpPolicy', env, verbose=1)
model.learn(total_timesteps=10000)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 23.3     |
|    ep_rew_mean     | 23.3     |
| time/              |          |
|    fps             | 5817     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 25.5        |
|    ep_rew_mean          | 25.5        |
| time/                   |             |
|    fps                  | 4011        |
|    iterations           | 2           |
|    time_elapsed         | 1           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.007885848 |
|    clip_fraction        | 0.079       |
|    clip_range           | 0.2         |
|    entropy_loss         | -0.687      |
|    explained_variance   | -0.00449    |
|    learning_rate        | 0.

<stable_baselines3.ppo.ppo.PPO at 0x167511150>

In [15]:
# Step 5: Visualization
def visualize_policy(env, model, num_episodes=5):
    for episode in range(num_episodes):
        obs = env.reset()
        obs, _ = obs if isinstance(obs, tuple) else (obs, {})
        done = False
        total_reward = 0
        while not done:
            env.render()
            action, _states = model.predict(obs)
            obs, reward, done, info = env.step(action)[:4]
            obs, _ = obs if isinstance(obs, tuple) else (obs, {})
            total_reward += reward
        print(f"Episode {episode + 1}: Total Reward: {total_reward}")

visualize_policy(env, model)

Episode 1: Total Reward: 148.0
Episode 2: Total Reward: 126.0
Episode 3: Total Reward: 122.0
Episode 4: Total Reward: 198.0
Episode 5: Total Reward: 196.0
