<a href="https://colab.research.google.com/github/kuds/rl-atari-breakout/blob/main/%5BAtari%20Breakout%5D%20Model-Based%20Reinforcement%20Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/958.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m952.3/958.1 kB[0m [31m61.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m25.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


In [9]:
import gymnasium
import platform
import torch
import numpy
from importlib.metadata import version
from datetime import datetime
import google.colab.drive

# Load the CartPole-v1 environment
env = gymnasium.make("CartPole-v1")

In [11]:
print(f"Python Version: {platform.python_version()}")
print(f"Torch Version: {version('torch')}")
print(f"Is Cuda Available: {torch.cuda.is_available()}")
print(f"Cuda Version: {torch.version.cuda}")
print(f"Gymnasium Version: {version('gymnasium')}")
print(f"Numpy Version: {version('numpy')}")

Python Version: 3.10.12
Torch Version: 2.5.0+cu121
Is Cuda Available: False
Cuda Version: 12.1
Gymnasium Version: 1.0.0
Numpy Version: 1.26.4


In [24]:
def collect_data(env, num_episodes=1000):
    data = []
    for _ in range(num_episodes):
        state = env.reset()
        terminated = False
        truncated = False
        while not (terminated or truncated):
            action = env.action_space.sample()
            next_state, reward, terminated, truncated, info = env.step(action)
            data.append((state, action, reward, next_state))
            if(terminated or truncated):
              print(next_state)
            state = next_state
    return data

# Collect data

data = collect_data(env)
print(len(data))

[-0.15219986 -0.37948948  0.21644448  0.9168774 ]
[ 0.04517109  0.4043905  -0.2095816  -1.3276857 ]
[-0.08746042 -1.6123929   0.21588443  2.5496356 ]
[ 0.14857228  0.8113916  -0.21894036 -1.4850429 ]
[ 0.14648964  0.97737485 -0.22315826 -1.8016258 ]
[-0.16182984 -0.59033585  0.21936722  1.1976805 ]
[-0.09295858 -0.20970203  0.22971687  0.8560988 ]
[ 0.12848571  0.95662886 -0.21001379 -1.6860118 ]
[-0.14429647 -0.38552988  0.22942144  0.9630001 ]
[-0.09768817 -0.96545666  0.21648502  1.7041638 ]
[ 0.1705447   0.64149266 -0.21682613 -1.2238789 ]
[ 1.0570644   1.458888   -0.21277562 -1.2173722 ]
[ 0.149978    0.8060962  -0.23365204 -1.6813605 ]
[-0.02437592 -0.10684087 -0.21026905 -1.0648534 ]
[-0.08971665 -0.43760103  0.21675503  0.96174294]
[ 0.06371882  0.39235127 -0.21000776 -1.2331458 ]
[-0.06745196  0.7938261  -0.2171724  -2.0613604 ]
[ 0.14470123  0.04168209 -0.21757118 -0.441529  ]
[0.0239886  0.37329143 0.21533604 0.45981959]
[ 0.15823434  0.73919386 -0.2104919  -1.3581455 ]
[-0.

In [22]:
print(data[0][0])

(array([ 0.00603946,  0.03967333,  0.00213784, -0.01986235], dtype=float32), {})


In [18]:
import torch
import torch.nn as nn
import torch.optim as optim

# Define the neural network for the dynamics model
class DynamicsModel(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DynamicsModel, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, state_dim)  # Predict next state
        self.reward = nn.Linear(128, 1)       # Predict reward

    def forward(self, state, action):
        x = torch.cat([state, action], dim=-1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        next_state = self.fc3(x)
        reward = self.reward(x)
        return next_state, reward


In [19]:
# Initialize model and optimizer
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
model = DynamicsModel(state_dim, action_dim)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Convert collected data to tensors for training
states = torch.tensor([d[0] for d in data], dtype=torch.float32)
actions = torch.tensor([d[1] for d in data], dtype=torch.float32).unsqueeze(1)
next_states = torch.tensor([d[3] for d in data], dtype=torch.float32)
rewards = torch.tensor([d[2] for d in data], dtype=torch.float32).unsqueeze(1)

# Train the dynamics model
for epoch in range(100):
    optimizer.zero_grad()
    predicted_next_states, predicted_rewards = model(states, actions)
    loss = criterion(predicted_next_states, next_states) + criterion(predicted_rewards, rewards)
    loss.backward()
    optimizer.step()

print("Model training complete!")


ValueError: expected sequence of length 4 at dim 2 (got 0)

In [23]:
[d[0] for d in data]

[(array([ 0.00603946,  0.03967333,  0.00213784, -0.01986235], dtype=float32),
  {}),
 array([ 0.00683293, -0.15547922,  0.00174059,  0.27349433], dtype=float32),
 array([ 0.00372334, -0.35062596,  0.00721048,  0.56672573], dtype=float32),
 array([-0.00328917, -0.5458483 ,  0.018545  ,  0.8616715 ], dtype=float32),
 array([-0.01420614, -0.35098374,  0.03577843,  0.57487684], dtype=float32),
 array([-0.02122582, -0.15638115,  0.04727596,  0.29367638], dtype=float32),
 array([-0.02435344, -0.35214412,  0.05314949,  0.6008867 ], dtype=float32),
 array([-0.03139632, -0.54796773,  0.06516723,  0.9098259 ], dtype=float32),
 array([-0.04235568, -0.3537854 ,  0.08336374,  0.6383163 ], dtype=float32),
 array([-0.04943138, -0.5499647 ,  0.09613007,  0.95604396], dtype=float32),
 array([-0.06043068, -0.74623895,  0.11525095,  1.2773148 ], dtype=float32),
 array([-0.07535546, -0.9426261 ,  0.14079724,  1.60375   ], dtype=float32),
 array([-0.09420798, -1.1391054 ,  0.17287225,  1.9368104 ], dtype=f

In [None]:
def mpc_action_selection(model, current_state, num_simulations=100, horizon=10):
    best_action = None
    best_reward = -np.inf

    for _ in range(num_simulations):
        simulated_state = current_state
        total_reward = 0
        for _ in range(horizon):
            action = np.random.choice([0, 1])  # Random action sampling for now
            action_tensor = torch.tensor([action], dtype=torch.float32).unsqueeze(0)
            state_tensor = torch.tensor(simulated_state, dtype=torch.float32).unsqueeze(0)
            next_state, reward = model(state_tensor, action_tensor)
            total_reward += reward.item()
            simulated_state = next_state.detach().numpy()[0]

        if total_reward > best_reward:
            best_reward = total_reward
            best_action = action

    return best_action


In [None]:
def evaluate_model_based_agent(env, model, num_episodes=10):
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        total_reward = 0
        while not done:
            action = mpc_action_selection(model, state)
            state, reward, done, _ = env.step(action)
            total_reward += reward
        print(f"Episode {episode + 1}: Total Reward: {total_reward}")

# Evaluate the agent
evaluate_model_based_agent(env, model)
