<a href="https://colab.research.google.com/github/kcngdominic/rl-gym/blob/main/Gym_CartPole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
# Import necessary libraries
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import pdb
import random

  and should_run_async(code)


In [7]:
# Define the neural network model
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)



In [8]:
# Define the replay buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, transition):
        if len(self.memory) < self.capacity:
            self.memory.append(transition)
        else:
            self.memory[self.position] = transition
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)



In [14]:
class Transition:
    def __init__(self, state, action, next_state, reward, done):
        self.state = state
        self.action = action
        self.next_state = next_state
        self.reward = reward
        self.done = done


In [9]:
# Initialize environment and hyperparameters
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
epsilon_decay = 0.995
gamma = 0.99
learning_rate = 0.001
target_update = 10
buffer_capacity = 10000
batch_size = 64

# Initialize model, optimizer, and replay buffer
policy_net = QNetwork(state_size, action_size)
target_net = QNetwork(state_size, action_size)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
replay_buffer = ReplayBuffer(buffer_capacity)



  deprecation(
  deprecation(


In [10]:
# Define exploration-exploitation strategy (epsilon-greedy)
def select_action(state, epsilon):
    if np.random.rand() < epsilon:
        return env.action_space.sample()  # Exploration
    else:
        with torch.no_grad():
            return policy_net(state).max(1)[1].view(1, 1).item()  # Exploitation



In [15]:
# Training loop
num_episodes = 1000
epsilon = 1.0
for episode in range(num_episodes):
    state = env.reset()
    state = torch.tensor(state, dtype=torch.float32).view(1, -1)

    total_reward = 0
    while True:
        # Select and perform an action
        action = select_action(state, epsilon)
        next_state, reward, done, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32).view(1, -1)
        reward = torch.tensor([reward], dtype=torch.float32)

        # Store the transition in the replay buffer
        replay_buffer.push((state, action, next_state, reward, done))

        # Move to the next state
        state = next_state
        total_reward += reward.item()

        # Sample and optimize the model
        if len(replay_buffer.memory) > batch_size:
            transitions = replay_buffer.sample(batch_size)
            batch = Transition(*zip(*transitions))
            non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), dtype=torch.bool)
            non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
            state_batch = torch.cat(batch.state)
            action_batch = torch.cat(batch.action)
            reward_batch = torch.cat(batch.reward)

            # Compute Q values
            current_q_values = policy_net(state_batch).gather(1, action_batch)

            # Compute target Q values
            next_q_values = torch.zeros(batch_size)
            next_q_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
            target_q_values = (next_q_values * gamma) + reward_batch

            # Compute loss and optimize the model
            loss = F.smooth_l1_loss(current_q_values, target_q_values.view(-1, 1))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Update target network
        if episode % target_update == 0:
            target_net.load_state_dict(policy_net.state_dict())

        if done:
            break

    # Decay epsilon for exploration-exploitation trade-off
    epsilon *= epsilon_decay
    epsilon = max(0.01, epsilon)  # Ensure epsilon doesn't go below a minimum value

    # Print episode information
    if episode % 10 == 0:
        print(f"Episode {episode}, Total Reward: {total_reward}")



TypeError: expected Tensor as element 0 in argument 0, but got int

In [None]:
# Test the trained model
test_episodes = 10
for _ in range(test_episodes):
    state = env.reset()
    state = torch.tensor(state, dtype=torch.float32).view(1, -1)

    total_reward = 0
    while True:
        # Select the best action according to the policy network
        action = policy_net(state).max(1)[1].view(1, 1).item()

        # Perform the selected action
        next_state, reward, done, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32).view(1, -1)

        # Move to the next state
        state = next_state
        total_reward += reward

        if done:
            print(f"Test Episode, Total Reward: {total_reward}")
            break
