In [1]:
import random
import numpy as np
import pygame
import matplotlib.pyplot as plt
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import time

# Define the Policy Network
class SharedPolicyNetwork(nn.Module):
    def __init__(self, input_size, action_size):
        super(SharedPolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.action_head = nn.Linear(128, action_size)
        self.value_head = nn.Linear(128, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        action_logits = self.action_head(x)
        state_value = self.value_head(x)
        return action_logits, state_value

class PPOAgent:
    def __init__(self, input_size, action_size, lr=3e-4, gamma=0.99, epsilon=0.2, lambda_=0.95, ppo_epochs=4, batch_size=64):
        self.action_size = action_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.lambda_ = lambda_
        self.ppo_epochs = ppo_epochs
        self.batch_size = batch_size

        self.policy_net = SharedPolicyNetwork(input_size, action_size)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)

        # Memory for PPO
        self.memory = {
            'observations': [],
            'actions': [],
            'log_probs': [],
            'rewards': [],
            'dones': [],
            'values': []
        }

    def select_action(self, observation):
        observation = torch.tensor(observation, dtype=torch.float32)
        action_logits, state_value = self.policy_net(observation)
        action_probs = torch.softmax(action_logits, dim=-1)
        dist = Categorical(action_probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action.item(), log_prob, state_value

    def store_experience(self, observation, action, log_prob, reward, done, value):
        self.memory['observations'].append(observation)
        self.memory['actions'].append(action)
        self.memory['log_probs'].append(log_prob)
        self.memory['rewards'].append(reward)
        self.memory['dones'].append(done)
        self.memory['values'].append(value)

    def compute_gae(self, rewards, dones, values):
        advantages = []
        returns = []
        gae = 0
        next_value = 0
        for i in reversed(range(len(rewards))):
            mask = 1 - dones[i]
            delta = rewards[i] + self.gamma * next_value * mask - values[i]
            gae = delta + self.gamma * self.lambda_ * gae * mask
            advantages.insert(0, gae)
            next_value = values[i]
            returns.insert(0, gae + values[i])
        return torch.tensor(returns), torch.tensor(advantages)

    def update_policy(self):
        rewards = self.memory['rewards']
        dones = self.memory['dones']
        values = self.memory['values']
        log_probs_old = torch.stack(self.memory['log_probs']).detach()
        observations = torch.tensor(self.memory['observations'], dtype=torch.float32)
        actions = torch.tensor(self.memory['actions'])
        returns, advantages = self.compute_gae(rewards, dones, values)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        for _ in range(self.ppo_epochs):
            action_logits, state_values = self.policy_net(observations)
            action_probs = torch.softmax(action_logits, dim=-1)
            dist = Categorical(action_probs)
            log_probs_new = dist.log_prob(actions)
            ratio = torch.exp(log_probs_new - log_probs_old)

            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()
            critic_loss = nn.MSELoss()(state_values.squeeze(), returns)
            loss = actor_loss + 0.5 * critic_loss

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        # Clear memory after update
        for key in self.memory.keys():
            self.memory[key] = []

class SugarscapeEnvironment:
    def __init__(self, width, height, num_agents, cell_size=10, show_sugar_levels=True,
                 show_broadcast_radius=True, show_agent_paths=True, broadcast_radius=5):
        self.width = width
        self.height = height
        self.num_agents = num_agents
        self.cell_size = cell_size
        self.show_sugar_levels = show_sugar_levels
        self.show_broadcast_radius = show_broadcast_radius
        self.show_agent_paths = show_agent_paths
        self.broadcast_radius = broadcast_radius

        self.params = {
            'max_sugar': 5,
            'growth_rate': 1,
            'sugar_peak_frequency': 0.04,
            'sugar_peak_spread': 6,
            'job_center_duration': (40, 100),
            'vision_range': 1,
            'message_expiry': 15,
            'max_relay_messages': 10,
            'gamma': 0.99,
            'lambda': 0.95,
            'epsilon': 0.2,
            'ppo_epochs': 4,
            'batch_size': 64,
            'learning_rate': 3e-4,
        }

        self.job_centers = []
        self.sugar = np.zeros((self.height, self.width), dtype=int)
        self.create_initial_sugar_peaks()
        self.max_sugar_landscape = self.sugar.copy()
        self.agents = self.initialize_agents()
        self.agent_positions = set((agent['x'], agent['y']) for agent in self.agents)
        self.dead_agents = []

        pygame.init()
        self.screen = pygame.display.set_mode((width * cell_size, height * cell_size))
        pygame.display.set_caption("Sugarscape Simulation - With PPO")
        self.clock = pygame.time.Clock()

        self.font = pygame.font.Font(None, 10)

        self.population_history = []
        self.average_wealth_history = []
        self.gini_coefficient_history = []
        self.timestep = 0

        # Initialize PPO agent
        self.observation_space_size = (self.params['vision_range'] * 2 + 1) ** 2 + 2
        self.action_space_size = 5  # Up, Down, Left, Right, Stay
        self.agent = PPOAgent(self.observation_space_size, self.action_space_size)

    def create_initial_sugar_peaks(self, num_peaks=2):
        for _ in range(num_peaks):
            self.create_job_center()
        self.update_sugar_landscape()

    def create_job_center(self):
        x, y = np.random.randint(0, self.width), np.random.randint(0, self.height)
        duration = np.random.randint(*self.params['job_center_duration'])
        self.job_centers.append({
            'x': x, 'y': y,
            'duration': duration,
            'max_sugar': self.params['max_sugar']
        })

    def update_sugar_landscape(self):
        self.sugar = np.zeros((self.height, self.width))
        for center in self.job_centers:
            x_grid, y_grid = np.meshgrid(np.arange(self.width), np.arange(self.height))
            distance = np.sqrt((x_grid - center['x']) ** 2 + (y_grid - center['y']) ** 2)
            sugar_level = center['max_sugar'] * np.exp(-distance ** 2 / (2 * self.params['sugar_peak_spread'] ** 2))
            self.sugar += sugar_level
        self.sugar = np.clip(self.sugar, 0, self.params['max_sugar'])
        self.sugar = np.round(self.sugar).astype(int)

    def initialize_agents(self):
        agents = []
        available_positions = set((x, y) for x in range(self.width) for y in range(self.height))
        for i in range(self.num_agents):
            if not available_positions:
                break
            x, y = available_positions.pop()
            agents.append(self.create_agent(i, x, y))
        return agents

    def create_agent(self, id, x, y):
        return {
            'id': id, 'x': x, 'y': y,
            'sugar': np.random.randint(40, 80),
            'metabolism': np.random.randint(1, 3),
            'vision': np.random.randint(1, self.params['vision_range'] + 1),
            'broadcast_radius': max(1, int(np.random.normal(self.broadcast_radius, self.broadcast_radius / 3))),
            'messages': deque(maxlen=100),
            'destination': None,
            'done': False
        }

    def get_agent_observation(self, agent):
        x, y = agent['x'], agent['y']
        obs_range = self.params['vision_range']
        sugar_obs = self.sugar[max(0, y - obs_range):min(self.height, y + obs_range + 1),
                               max(0, x - obs_range):min(self.width, x + obs_range + 1)]
        pad_width_x = (max(0, obs_range - x), max(0, x + obs_range + 1 - self.width))
        pad_width_y = (max(0, obs_range - y), max(0, y + obs_range + 1 - self.height))
        sugar_obs = np.pad(sugar_obs, (pad_width_y, pad_width_x), mode='constant', constant_values=0)
        sugar_obs = sugar_obs.flatten()
        agent_features = np.array([agent['sugar'], agent['metabolism']])
        observation = np.concatenate((sugar_obs, agent_features))
        return observation

    def move_agent(self, agent, action):
        x, y = agent['x'], agent['y']
        possible_moves = {
            0: (x, max(0, y - 1)),        # Up
            1: (x, min(self.height - 1, y + 1)),  # Down
            2: (max(0, x - 1), y),        # Left
            3: (min(self.width - 1, x + 1), y),   # Right
            4: (x, y)                     # Stay
        }
        new_x, new_y = possible_moves[action]
        if (new_x, new_y) not in self.agent_positions:
            self.agent_positions.remove((agent['x'], agent['y']))
            agent['x'], agent['y'] = new_x, new_y
            self.agent_positions.add((new_x, new_y))

    def step(self):
        for center in self.job_centers:
            center['duration'] -= 1
        self.job_centers = [center for center in self.job_centers if center['duration'] > 0]
        if np.random.random() < self.params['sugar_peak_frequency']:
            self.create_job_center()
        self.update_sugar_landscape()

        observations = []
        actions = []
        log_probs = []
        rewards = []
        dones = []
        values = []

        for agent in self.agents:
            observation = self.get_agent_observation(agent)
            action, log_prob, value = self.agent.select_action(observation)
            self.move_agent(agent, action)
            collected_sugar = self.sugar[agent['y'], agent['x']]
            agent['sugar'] += collected_sugar
            self.sugar[agent['y'], agent['x']] = 0
            agent['sugar'] -= agent['metabolism']
            reward = collected_sugar - agent['metabolism']  # Reward is net sugar gain
            done = agent['sugar'] <= 0

            self.agent.store_experience(observation, action, log_prob, reward, done, value.item())

            agent['done'] = done  # Update done flag

        alive_agents = []
        for agent in self.agents:
            if agent['done']:
                self.dead_agents.append({'x': agent['x'], 'y': agent['y'], 'death_time': self.timestep})
                self.agent_positions.remove((agent['x'], agent['y']))
            else:
                alive_agents.append(agent)
        self.agents = alive_agents

        self.dead_agents = [agent for agent in self.dead_agents if self.timestep - agent['death_time'] <= 5]

        if len(self.agent.memory['rewards']) >= self.params['batch_size']:
            self.agent.update_policy()

        self.collect_data()
        self.timestep += 1

    def train(self, num_episodes=2000):
        start_time = time.time()
        for episode in range(1, num_episodes + 1):
            self.step()
            if episode % 100 == 0:
                elapsed_time = time.time() - start_time
                estimated_time_per_episode = elapsed_time / episode
                remaining_time = estimated_time_per_episode * (num_episodes - episode)
                print(f"Episode {episode}/{num_episodes}, Estimated remaining time: {remaining_time:.2f} seconds")
                self.agent.update_policy()

    def render(self):
        self.screen.fill((255, 255, 255))

        for y in range(self.height):
            for x in range(self.width):
                sugar_level = self.sugar[y, x]
                color = self.get_color(sugar_level)
                pygame.draw.rect(self.screen, color,
                                 (x * self.cell_size, y * self.cell_size, self.cell_size, self.cell_size))

                if self.show_sugar_levels:
                    sugar_text = self.font.render(f"{sugar_level}", True, (0, 0, 0))
                    text_rect = sugar_text.get_rect(center=(x * self.cell_size + self.cell_size // 2,
                                                            y * self.cell_size + self.cell_size // 2))
                    self.screen.blit(sugar_text, text_rect)

        for dead_agent in self.dead_agents:
            pygame.draw.circle(self.screen, (128, 128, 128),
                               (int(dead_agent['x'] * self.cell_size + self.cell_size / 2),
                                int(dead_agent['y'] * self.cell_size + self.cell_size / 2)),
                               int(self.cell_size / 3))

        for agent in self.agents:
            if self.show_broadcast_radius:
                pygame.draw.circle(self.screen, (200, 200, 200),
                                   (int(agent['x'] * self.cell_size + self.cell_size / 2),
                                    int(agent['y'] * self.cell_size + self.cell_size / 2)),
                                   int(agent['broadcast_radius'] * self.cell_size), 1)

            pygame.draw.circle(self.screen, (255, 0, 0),
                               (int(agent['x'] * self.cell_size + self.cell_size / 2),
                                int(agent['y'] * self.cell_size + self.cell_size / 2)),
                               int(self.cell_size / 3))

            if self.show_agent_paths and agent['destination']:
                pygame.draw.line(self.screen, (0, 255, 0),
                                 (int(agent['x'] * self.cell_size + self.cell_size / 2),
                                  int(agent['y'] * self.cell_size + self.cell_size / 2)),
                                 (int(agent['destination'][0] * self.cell_size + self.cell_size / 2),
                                  int(agent['destination'][1] * self.cell_size + self.cell_size / 2)),
                                 1)

        pygame.display.flip()

    def get_color(self, sugar_level):
        if sugar_level == 0:
            return (255, 255, 255)
        else:
            intensity = sugar_level / self.params['max_sugar']
            return (255, 255, int(255 * (1 - intensity)))

    def collect_data(self):
        population = len(self.agents)
        total_wealth = sum(agent['sugar'] for agent in self.agents)
        average_wealth = total_wealth / population if population > 0 else 0

        self.population_history.append(population)
        self.average_wealth_history.append(average_wealth)
        self.gini_coefficient_history.append(self.calculate_gini_coefficient())

    def calculate_gini_coefficient(self):
        if not self.agents:
            return 0
        wealth_values = sorted(agent['sugar'] for agent in self.agents)
        cumulative_wealth = np.cumsum(wealth_values)
        return (np.sum((2 * np.arange(1, len(wealth_values) + 1) - len(wealth_values) - 1) * wealth_values) /
                (len(wealth_values) * np.sum(wealth_values)))

    def plot_results(self):
        plt.figure(figsize=(15, 5))

        plt.subplot(131)
        plt.plot(self.population_history)
        plt.title('Population over Time')
        plt.xlabel('Timestep')
        plt.ylabel('Population')

        plt.subplot(132)
        plt.plot(self.average_wealth_history)
        plt.title('Average Wealth over Time')
        plt.xlabel('Timestep')
        plt.ylabel('Average Wealth')

        plt.subplot(133)
        plt.plot(self.gini_coefficient_history)
        plt.title('Gini Coefficient over Time')
        plt.xlabel('Timestep')
        plt.ylabel('Gini Coefficient')

        plt.tight_layout()
        plt.show()

    def final_simulation(self, max_timesteps=1000):
        running = True
        while running and self.timestep < max_timesteps:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    running = False

            self.step()
            self.render()
            self.clock.tick(5)

        self.plot_results()

# Train the PPO agent without visualizing the environment
env = SugarscapeEnvironment(width=50, height=50, num_agents=1000, cell_size=10,
                            broadcast_radius=15, show_sugar_levels=False, show_broadcast_radius=False, show_agent_paths=False)
env.train(num_episodes=2000)

# Run the final simulation with rendering
env.final_simulation(max_timesteps=1000)


pygame 2.6.0 (SDL 2.28.4, Python 3.11.9)
Hello from the pygame community. https://www.pygame.org/contribute.html


  observations = torch.tensor(self.memory['observations'], dtype=torch.float32)


RuntimeError: Found dtype Double but expected Float