In [37]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gymnasium as gym

In [45]:
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim_1=64, hidden_dim_2=64, action_dim=3, state_dict = None):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim_1)
        self.fc2 = nn.Linear(hidden_dim_1, hidden_dim_2)
        self.fc3 = nn.Linear(hidden_dim_2, action_dim)
        if state_dict:
            self.load_state_dict(state_dict)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# NES Agent class
class NESAgent:
    def __init__(self, env, population_size=100, noise_std=0.1, learning_rate=0.1):
        self.env = env
        self.population_size = population_size
        self.noise_std = noise_std
        self.learning_rate = learning_rate

        # Initialize policy network
        self.input_dim = env.observation_space.shape[0]
        self.output_dim = env.action_space.shape[0]
        self.policy_net = PolicyNetwork(input_dim=self.input_dim, action_dim=self.output_dim)

        for param in self.policy_net.parameters(): param.requires_grad = False
        self.best_net = self.policy_net
        self.best_reward = 0

    def reproduce(self, parent):
        # Generate a population of child networks with perturbed parameters
        population = list()
        for _ in range(self.population_size):
            child_params = {name: param + torch.randn(param.size()) * self.noise_std 
            for name, param in parent.named_parameters()}

            child = PolicyNetwork(input_dim=self.input_dim, action_dim=self.output_dim, state_dict=child_params)
            population.extend([child])
        return population

    def evaluate(self, policy_network):
        total_reward = 0
        state, _ = self.env.reset()
        done, truncated = False, False
        while (not done and not truncated):
            state = torch.tensor(state, dtype=torch.float32)
            action = policy_network.forward(state)

            next_state, reward, done, truncated, _ = self.env.step(action.detach().numpy())
            total_reward += reward
            state = next_state
        if total_reward > self.best_reward:
            self.best_reward = total_reward
            self.best_net = policy_network
            #print(f'New best reward: {total_reward}')
        return total_reward

    def update_parameters(self, parent, population, evaluations):
        # Normalize evaluations
        normalized_evaluations = (evaluations - np.mean(evaluations)) / (np.std(evaluations) + 1e-8)

        # Initialize update tensors for each parameter
        for name, param in parent.named_parameters():
            update = torch.zeros_like(param)
            for index, child in enumerate(population):
                child_param = child.state_dict()[name]
                # Calculate noise as the difference between child and parent parameter
                noise = child_param - param
                # Weighted update using normalized evaluation
                update += noise * normalized_evaluations[index]
            # Apply update scaled by learning rate
            param += self.learning_rate * update

    def train(self, num_iterations=1000):
        parent = self.policy_net
        for iteration in range(num_iterations):
            # Generate a population and evaluate
            population = self.reproduce(parent)
            evaluations = np.array([self.evaluate(child) for child in population])
            
            #if iteration % 10 == 0:
            #    print(f"Iteration {iteration + 1}, Mean Reward: {np.mean(evaluations)}, Best Reward:{np.max(evaluations)}")
            
            # Update parameters based on population evaluations
            self.update_parameters(parent, population, evaluations)

            if self.best_reward >= 1000: 
                torch.save(self.best_net.state_dict(), 'best.pt')
                break
    
    def test(self, path = 'best.pt'):
        self.net = PolicyNetwork(input_dim=self.input_dim, action_dim=self.output_dim)
        self.net.load_state_dict(torch.load(path))

        state, _ = self.env.reset()
        done = False
        truncated = False

        while (not done and not truncated):
            state = torch.tensor(state, dtype=torch.float32)
            action = self.net.forward(state)
            next_state, reward, done, truncated, _ = self.env.step(action.detach().numpy())
            state = next_state

        self.env.close()

# Main training loop
env = gym.make('InvertedPendulum-v5')
agent = NESAgent(env, population_size=10)
agent.train(num_iterations=1000)


In [36]:
def test():
    env = gym.make('InvertedPendulum-v5', render_mode = 'human')
    model = NESAgent(env=env)
    model.test(path = 'best.pt')

if __name__ == '__main__':
    test()

In [43]:
import time

In [46]:
# Main experiment loop with different learning rates and population sizes
env = gym.make('InvertedPendulum-v5')

# Define possible values for learning rate and population size
learning_rates = [0.01, 0.05, 0.1, 0.2]
population_sizes = [10, 50, 100, 200]

# Store results for analysis
results = []

# Loop over combinations of learning rate and population size
for lr in learning_rates:
    for pop_size in population_sizes:
        print(f"Running experiment with learning rate={lr} and population size={pop_size}")
        
        agent = NESAgent(env, population_size=pop_size, learning_rate=lr)
        
        start_time = time.time()
        agent.train(num_iterations=1000)
        elapsed_time = time.time() - start_time
        
        results.append({
            'learning_rate': lr,
            'population_size': pop_size,
            'time_to_1000': elapsed_time,
            'best_reward': agent.best_reward
        })
        
# Print the results
for result in results:
    print(f"LR: {result['learning_rate']}, Population Size: {result['population_size']}, Time to 1000: {result['time_to_1000']:.2f}s, Best Reward: {result['best_reward']}")

Running experiment with learning rate=0.01 and population size=10
Running experiment with learning rate=0.01 and population size=50
Running experiment with learning rate=0.01 and population size=100
Running experiment with learning rate=0.01 and population size=200
Running experiment with learning rate=0.05 and population size=10
Running experiment with learning rate=0.05 and population size=50
Running experiment with learning rate=0.05 and population size=100
Running experiment with learning rate=0.05 and population size=200
Running experiment with learning rate=0.1 and population size=10
Running experiment with learning rate=0.1 and population size=50
Running experiment with learning rate=0.1 and population size=100
Running experiment with learning rate=0.1 and population size=200
Running experiment with learning rate=0.2 and population size=10
Running experiment with learning rate=0.2 and population size=50
Running experiment with learning rate=0.2 and population size=100
Running ex