In [4]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Neural Network for DQN
class DQNNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQNNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 24)
        self.fc2 = nn.Linear(24, 48)
        self.fc3 = nn.Linear(48, action_size)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Priority Replay Buffer
class PriorityReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
        self.priorities = deque(maxlen=capacity)

    def add(self, experience, priority):
        self.buffer.append(experience)
        self.priorities.append(priority)

    def sample(self, batch_size):
        probabilities = np.array(self.priorities) / sum(self.priorities)
        indices = np.random.choice(range(len(self.buffer)), batch_size, p=probabilities)
        experiences = [self.buffer[idx] for idx in indices]
        return experiences

    def size(self):
        return len(self.buffer)

    def get_importance_weights(self, b=0.4, epsilon=1e-5):
        sampling_probabilities = np.array(self.priorities) / sum(self.priorities)
        importance_weights = (1 / (len(self.buffer) * sampling_probabilities + epsilon)) ** b
        return importance_weights

# Agent with Target Network
class Agent:
    def __init__(self, state_size, action_size, batch_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = PriorityReplayBuffer(10000)
        self.batch_size = batch_size
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.model = DQNNetwork(state_size, action_size).to(device)
        self.target_model = DQNNetwork(state_size, action_size).to(device)
        self.update_target_model()
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.action_values = transform_action_values(action_size)

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def select_action(self, state):
        if np.random.rand() > self.epsilon:
            state = torch.FloatTensor(state).unsqueeze(0).to(device)
            with torch.no_grad():
                q_values = self.model(state)
            action = torch.argmax(q_values).item()
        else:
            action = np.random.randint(self.action_size)

        self.epsilon = max(self.epsilon * self.epsilon_decay, self.epsilon_min)
        return action

    def store_experience(self, state, action, reward, next_state, done):
        priority = self.calculate_priority(state, action, reward, next_state, done)
        self.memory.add((state, action, reward, next_state, done), priority)

    def update_network(self):
        if self.memory.size() < self.batch_size:
            return

        experiences = self.memory.sample(self.batch_size)
        importance_weights = torch.FloatTensor(self.memory.get_importance_weights()).to(device)

        states, actions, rewards, next_states, dones = zip(*experiences)
        states = torch.FloatTensor(states).to(device)
        actions = torch.LongTensor(actions).unsqueeze(1).to(device)
        rewards = torch.FloatTensor(rewards).to(device)
        next_states = torch.FloatTensor(next_states).to(device)
        dones = torch.FloatTensor(dones).to(device)

        current_q_values = self.model(states).gather(1, actions)
        next_q_values = self.target_model(next_states).detach().max(1)[0].unsqueeze(1)
        expected_q_values = rewards + self.gamma * next_q_values * (1 - dones)

        loss = (current_q_values - expected_q_values).pow(2) * importance_weights
        loss = loss.mean()

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def calculate_priority(self, state, action, reward, next_state, done):
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        next_state = torch.FloatTensor(next_state).unsqueeze(0).to(device)
        action = torch.LongTensor([action]).unsqueeze(0).to(device)

        with torch.no_grad():
            current_q = self.model(state)[0][action]
            max_next_q = torch.max(self.target_model(next_state)).item()
            expected_q = reward + self.gamma * max_next_q * (1 - done)

        td_error = abs(current_q - expected_q)
        epsilon = 0.01
        alpha = 0.6
        priority = (td_error + epsilon) ** alpha
        return priority

def transform_action_values(n_actions, min_action=-2.0, max_action=2.0):
    linear_actions = np.linspace(-1, 1, n_actions)
    non_linear_actions = np.sign(linear_actions) * (linear_actions ** 2)
    scaled_actions = min_action + (non_linear_actions + 1) * (max_action - min_action) / 2
    return scaled_actions
    
def run_test_episodes(env, agent, num_episodes=10):
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        for t in range(200):
            env.render()
            action = agent.select_action(state)
            next_state, reward, done, _, _ = env.step(action)
            state = next_state
            total_reward += reward
            if done:
                break
        print(f"Test Episode {episode+1}/{num_episodes}, Total Reward: {total_reward}")
    env.close()

In [5]:

if __name__ == "__main__":
    env = gym.make('Pendulum-v1', g=9.81, render_mode="rgb_array")
    state_size = env.observation_space.shape[0]
    action_size = 15  # Discretize action space into 11 actions

    agent = Agent(state_size, action_size, batch_size=128)

    scores = []  # List to store scores
    num_episodes = 2000  # Number of episodes to train

    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        total_custom = 0

        for t in range(200):
            action = agent.select_action(state)
            next_state, reward, done, _, _ = env.step(action)
            state_array = state[0] if isinstance(state, tuple) else state
            next_state_array = next_state[0] if isinstance(next_state, tuple) else next_state

            # Now calculate the custom_reward using the numpy arrays
            custom_reward = 25 * np.exp(-1 * (next_state_array[0] - 1) ** 2 / 0.001) - 100 * np.abs(10 * 0.5 - (10 * 0.5 * next_state_array[0] + 0.5 * 0.3333 * next_state_array[2] ** 2)) + 100 * np.abs(10 * 0.5 - (10 * 0.5 * state_array[0] + 0.5 * 0.3333 * state_array[2] ** 2))
            agent.store_experience(state, action, custom_reward, next_state, done)
            agent.update_network()
            state = next_state
            total_reward += reward
            total_custom += custom_reward

            if done:
                break

        scores.append(total_reward)  # Store score for plotting
        print(f"Episode {episode+1}/{num_episodes}, Total Reward: {total_reward}, Custom Reward: {total_custom}")

        # Update the target network
        if episode % 10 == 0:
            agent.update_target_model()

    env.close()

    # Saving the model
    torch.save(agent.model.state_dict(), 'pendulum_dqn_model.pth')

IndexError: invalid index to scalar variable.

In [None]:
# Plotting the rewards
plt.plot(scores)
plt.title('Training Rewards Over Episodes')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.show()