In [2]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import optuna

# Custom CartPole Environment
class CustomCartPole:
    def __init__(self):
        # Constants
        self.gravity = 9.8
        self.mass_cart = 1.0
        self.mass_pole = 0.1
        self.total_mass = self.mass_cart + self.mass_pole
        self.length = 0.5  # Half the length of the pole
        self.pole_mass_length = self.mass_pole * self.length
        self.force_mag = 10.0
        self.tau = 0.02  # Time step (20 ms)
        self.theta_threshold_radians = 12 * 2 * np.pi / 360  # 12 degrees
        self.x_threshold = 2.4  # Cart position threshold (meters)

        # State variables
        self.state = None
        self.steps_beyond_done = None

    def reset(self):
        # Reset the state to a random small initial value
        self.state = np.random.uniform(low=-0.05, high=0.05, size=(4,))
        self.steps_beyond_done = None
        return np.array(self.state, dtype=np.float32)

    def step(self, action):
        # Get the current state
        x, x_dot, theta, theta_dot = self.state

        # Force applied based on action (0: left, 1: right)
        force = self.force_mag if action == 1 else -self.force_mag

        # Dynamics equations
        costheta = np.cos(theta)
        sintheta = np.sin(theta)
        temp = (force + self.pole_mass_length * theta_dot**2 * sintheta) / self.total_mass
        theta_acc = (self.gravity * sintheta - costheta * temp) / \
                    (self.length * (4.0/3.0 - self.mass_pole * costheta**2 / self.total_mass))
        x_acc = temp - self.pole_mass_length * theta_acc * costheta / self.total_mass

        # Update the state using Euler's method
        x = x + self.tau * x_dot
        x_dot = x_dot + self.tau * x_acc
        theta = theta + self.tau * theta_dot
        theta_dot = theta_dot + self.tau * theta_acc
        self.state = (x, x_dot, theta, theta_dot)

        # Check if the episode is done
        done = bool(
            x < -self.x_threshold
            or x > self.x_threshold
            or theta < -self.theta_threshold_radians
            or theta > self.theta_threshold_radians
        )

        # Compute the reward
        if not done:
            reward = 1.0
        elif self.steps_beyond_done is None:
            self.steps_beyond_done = 0
            reward = 1.0
        else:
            self.steps_beyond_done += 1
            reward = 0.0

        return np.array(self.state, dtype=np.float32), reward, done, {}

    def render(self):
        x, _, theta, _ = self.state
        print(f"Cart Position: {x:.2f}, Pole Angle: {theta:.2f}")

    def close(self):
        pass

# Q-Network for DQN
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# DQN Agent
class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, epsilon_decay=0.995, batch_size=64, buffer_capacity=10000):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.q_network = QNetwork(state_dim, action_dim)
        self.target_network = QNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.replay_buffer = []
        self.buffer_capacity = buffer_capacity
        self.batch_size = batch_size
        self.gamma = gamma
        self.epsilon = 1.0
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = 0.01

    def act(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            return torch.argmax(self.q_network(state_tensor)).item()

    def store_transition(self, transition):
        if len(self.replay_buffer) >= self.buffer_capacity:
            self.replay_buffer.pop(0)
        self.replay_buffer.append(transition)

    def sample_batch(self):
        indices = np.random.choice(len(self.replay_buffer), self.batch_size)
        batch = [self.replay_buffer[idx] for idx in indices]
        return batch

    def learn(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        batch = self.sample_batch()
        states, actions, rewards, next_states, dones = zip(*batch)
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)

        q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze()
        with torch.no_grad():
            max_next_q_values = self.target_network(next_states).max(1)[0]
            target_q_values = rewards + self.gamma * max_next_q_values * (1 - dones)

        loss = nn.MSELoss()(q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

# Objective Function for Optuna Hyperparameter Optimization
def objective(trial):
    # Suggest hyperparameters for optimization
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
    gamma = trial.suggest_uniform('gamma', 0.9, 0.999)
    epsilon_decay = trial.suggest_uniform('epsilon_decay', 0.9, 0.9999)
    batch_size = trial.suggest_int('batch_size', 32, 128)
    buffer_capacity = trial.suggest_int('buffer_capacity', 5000, 20000)

    # Initialize the environment and agent with suggested hyperparameters
    env = CustomCartPole()
    state_dim = 4
    action_dim = 2
    agent = DQNAgent(state_dim, action_dim, lr, gamma, epsilon_decay, batch_size, buffer_capacity)

    num_episodes = 100  # Limit episodes for hyperparameter search
    total_reward = 0

    # Training loop (same as your original code but with early exit if done)
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0

        for t in range(500):  # Max timesteps per episode
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            
            # Store the transition in replay buffer
            agent.store_transition((state, action, reward, next_state, done))

            state = next_state
            total_reward += reward

            # Train the agent
            agent.learn()

            # Exit if the episode ends
            if done:
                break

        # Update the target network after each episode
        agent.update_target_network()

        # Decay epsilon
        agent.epsilon = max(agent.epsilon_min, agent.epsilon * agent.epsilon_decay)

    return total_reward / num_episodes  # Return average reward

# Create the Optuna Study and optimize the objective function
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)  # Number of trials to optimize

# Output the best hyperparameters found by Optuna
print("Best hyperparameters: ", study.best_params)

# After finding the best hyperparameters, retrain the agent with these optimal parameters

# Retrieve the best hyperparameters from the study
best_params = study.best_params
lr = best_params['lr']
gamma = best_params['gamma']
epsilon_decay = best_params['epsilon_decay']
batch_size = best_params['batch_size']
buffer_capacity = best_params['buffer_capacity']

# Initialize the environment and agent with the best hyperparameters
env = CustomCartPole()
state_dim = 4
action_dim = 2
agent = DQNAgent(state_dim, action_dim, lr, gamma, epsilon_decay, batch_size, buffer_capacity)

# Retrain the agent using the best hyperparameters
num_episodes = 500  # Full training with best parameters

for episode in range(num_episodes):  # Loop over episodes
    state = env.reset()  # Reset the environment
    total_reward = 0
    reason_for_termination = None  # To store the reason for termination

    for t in range(500):  # Loop over timesteps (500 max)
        action = agent.act(state)  # Choose an action
        next_state, reward, done, _ = env.step(action)  # Step in the environment

        # Check termination conditions
        cart_position = next_state[0]  # Cart position (x)
        pole_angle = next_state[2]  # Pole angle (theta)

        # Identify the termination reason
        if abs(cart_position) > 2.4:
            reason_for_termination = f"Cart moved out of bounds: {cart_position:.2f}m"
        elif abs(pole_angle) > 0.209:  # 0.209 radians ≈ 12 degrees
            reason_for_termination = f"Pole angle exceeded: {pole_angle:.2f} radians"

        # Store the transition in the replay buffer
        agent.store_transition((state, action, reward, next_state, done))

        # Update the current state and accumulate reward
        state = next_state
        total_reward += reward

        # Train the agent
        agent.learn()

        # Break the loop if the episode ends
        if done:
            break

    # Update the target network
    agent.update_target_network()

    # Decay epsilon for exploration-exploitation trade-off
    agent.epsilon = max(agent.epsilon_min, agent.epsilon * agent.epsilon_decay)

    if reason_for_termination is None:
        print(f"Episode {episode}, Total Reward: {total_reward}, Episode completed successfully")   
    else:
        print(f"Episode {episode}, Total Reward: {total_reward}, Terminated due to: {reason_for_termination}")


  from .autonotebook import tqdm as notebook_tqdm
[I 2025-01-26 00:07:11,448] A new study created in memory with name: no-name-a9163e40-a277-4404-aa62-7771ad9a9be8
  lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
  gamma = trial.suggest_uniform('gamma', 0.9, 0.999)
  epsilon_decay = trial.suggest_uniform('epsilon_decay', 0.9, 0.9999)
[W 2025-01-26 00:07:14,082] Trial 0 failed with parameters: {'lr': 1.4025066060439202e-05, 'gamma': 0.9003009914915834, 'epsilon_decay': 0.9623399224549015, 'batch_size': 69, 'buffer_capacity': 12992} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "c:\Python312\Lib\site-packages\optuna\study\_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "C:\Users\Pratik\AppData\Local\Temp\ipykernel_7868\617576488.py", line 188, in objective
    agent.learn()
  File "C:\Users\Pratik\AppData\Local\Temp\ipykernel_7868\617576488.py", line 142, in learn
    max_n

KeyboardInterrupt: 