#### INFO
implementirani so okolje, prey, predator, rewards, trace, observation 6 sosedov, animacija, full map view, LR ampak brez algoritma, passive in active forces

niso vsi parametri kt so velikost zemljevida in hitrost enaki kt v articlu, ampak mislm da to nima veze? se pa lahko poprav.


In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
import torch
import torch.nn as nn
import torch.optim as optim
from matplotlib.lines import Line2D
import random

In [109]:
class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.buffer = []
        self.capacity = capacity

    def add(self, experience):
        if len(self.buffer) >= self.capacity:
            self.buffer.pop(0)
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)


In [110]:
class ActorNetwork(nn.Module):
    def __init__(self, input_dim, action_dim, hidden_dim=128):
        super(ActorNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.output = nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return torch.tanh(self.output(x))  # Actions are scaled between -1 and 1

In [111]:
class CriticNetwork(nn.Module):
    def __init__(self, input_dim, action_dim, hidden_dim=128):
        super(CriticNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.output = nn.Linear(hidden_dim, 1)

    def forward(self, x, a):
        x = torch.relu(self.fc1(torch.cat([x, a], dim=1)))
        x = torch.relu(self.fc2(x))
        return self.output(x)

In [112]:
class SharedAgentNetwork:
    def __init__(self, input_dim, action_dim, hidden_dim=128, is_actor=True):
        if is_actor:
            self.actor = ActorNetwork(input_dim, action_dim, hidden_dim)
            self.target_actor = ActorNetwork(input_dim, action_dim, hidden_dim)
            self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=0.001)
            self.update_target_network(self.actor, self.target_actor, tau=1.0)
        else:
            self.critic = CriticNetwork(input_dim, action_dim, hidden_dim)
            self.target_critic = CriticNetwork(input_dim, action_dim, hidden_dim)
            self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=0.001)
            self.update_target_network(self.critic, self.target_critic, tau=1.0)

    def update_target_network(self, local_model, target_model, tau=1.0):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)

    def select_action(self, observation, noise_scale=0.1):  # Reduced noise_scale for smoother actions
        observation = torch.FloatTensor(observation).unsqueeze(0)
        action = self.actor(observation).detach().numpy()[0]
        action += noise_scale * np.random.randn(action.shape[0])  # Add exploration noise
        return np.clip(action, -1.0, 1.0)


In [113]:
class Agent:
    def __init__(self, position, speed, max_angular_velocity, agent_type, env):
        self.position = np.array(position, dtype=float)
        self.speed = speed
        self.type = agent_type
        self.heading_angle = np.random.uniform(0, 2 * np.pi)
        self.heading_vector = np.array([np.cos(self.heading_angle), np.sin(self.heading_angle)])
        self.position_history = [self.position.copy()]
        self.max_angular_velocity = max_angular_velocity
        self.radius = 2.0 if agent_type == 'predator' else 1.0  # Size of the agent
        self.is_captured = False
        self.env = env  # Reference to the environment

    def move(self, action, dt=0.1):
        # Extract scalar from action array
        delta_heading = action[0]
        
        # Collision avoidance adjustment
        collision_avoidance_angle = self.compute_collision_avoidance(dt)
        
        # Combine action with collision avoidance
        delta_heading += collision_avoidance_angle
        
        # Clip action to max angular velocity
        max_delta_heading = self.max_angular_velocity * dt
        delta_heading = np.clip(delta_heading, -max_delta_heading, max_delta_heading)
        
        # Update heading angle
        self.heading_angle += delta_heading
        self.heading_angle = self.heading_angle % (2 * np.pi)  # Keep within [0, 2π)
        self.heading_vector = np.array([np.cos(self.heading_angle), np.sin(self.heading_angle)])
        
        # Update position
        self.position += self.heading_vector * self.speed * dt
        
        # Apply boundary conditions
        self.apply_boundary_conditions(self.env.width, self.env.height)
        
        # Update position history
        self.position_history.append(self.position.copy())
        if len(self.position_history) > 10:
            self.position_history.pop(0)

    def compute_collision_avoidance(self, dt):
        avoidance_vector = np.zeros(2)
        for other_agent in self.env.agents:
            if other_agent is not self:
                distance_vector = self.position - other_agent.position
                distance_vector = self.env.apply_periodic_adjustment(distance_vector)
                distance = np.linalg.norm(distance_vector)
                # Increase min_distance to account for the size of the plotted shapes
                min_distance = (self.radius + other_agent.radius) * 2  # Multiply by 2 for visual representation
                if distance < min_distance:
                    # Compute repulsion force inversely proportional to distance
                    avoidance_force = (distance_vector / (distance + 1e-5)) * (1.0 / (distance + 1e-5))
                    avoidance_vector += avoidance_force
        if np.linalg.norm(avoidance_vector) > 0:
            # Compute the angle of the avoidance vector
            avoidance_angle = np.arctan2(avoidance_vector[1], avoidance_vector[0])
            # Compute angle difference from current heading
            angle_diff = (avoidance_angle - self.heading_angle + np.pi) % (2 * np.pi) - np.pi
            # Limit the adjustment to avoid abrupt changes
            max_avoidance_angle = self.max_angular_velocity * dt
            collision_avoidance_angle = np.clip(angle_diff, -max_avoidance_angle, max_avoidance_angle)
            return collision_avoidance_angle
        else:
            return 0.0

    def apply_boundary_conditions(self, width, height):
        # Wrap-around boundary conditions
        self.position[0] = self.position[0] % width
        self.position[1] = self.position[1] % height


In [114]:
class Prey(Agent):
    def __init__(self, position, env, **kwargs):
        super(Prey, self).__init__(position, speed=20.0, max_angular_velocity=2.0, agent_type='prey', env=env)
        self.perception_range = np.sqrt(env.width**2 + env.height**2)
    
    def get_observation(self):
        # Get up to six nearest predators
        predators = self.env.find_nearby_agents(self, agent_type='predator', max_agents=6)
        predator_obs = []
        for predator in predators:
            relative_pos = predator.position - self.position
            relative_pos = self.env.apply_periodic_adjustment(relative_pos)
            # Normalize relative position by perception range
            relative_pos /= self.perception_range
            predator_obs.extend(relative_pos)
        # Pad with zeros if fewer than 6 predators
        while len(predator_obs) < 6 * 2:
            predator_obs.extend([0.0, 0.0])
        
        # Get up to six nearest prey allies (excluding self)
        allies = self.env.find_nearby_agents(self, agent_type='prey', max_agents=6, exclude_self=True)
        ally_obs = []
        for ally in allies:
            relative_pos = ally.position - self.position
            relative_pos = self.env.apply_periodic_adjustment(relative_pos)
            relative_pos /= self.perception_range
            ally_obs.extend(relative_pos)
        # Pad with zeros if fewer than 6 allies
        while len(ally_obs) < 6 * 2:
            ally_obs.extend([0.0, 0.0])
        
        # Include self information (heading angle)
        observation = predator_obs + ally_obs + [np.cos(self.heading_angle), np.sin(self.heading_angle)]
        return np.array(observation)


    def move(self, dt=0.1):
        # Get observation
        observation = self.get_observation()

        # Select action using shared actor network
        action = self.env.prey_actor.select_action(observation)

        # Move agent
        super().move(action, dt)

        # Reward calculation
        self.is_captured = False
        for agent in self.env.agents:
            if agent.type == 'predator':
                distance_vector = self.position - agent.position
                distance_vector = self.env.apply_periodic_adjustment(distance_vector)
                distance = np.linalg.norm(distance_vector)
                if distance < (self.radius + agent.radius):
                    self.is_captured = True
                    break

        if self.is_captured:
            reward = -100.0  # Penalty for being caught
        else:
            reward = 1.0  # Survival reward

        # Movement cost
        reward -= 0.1 * abs(action[0])

        # Next observation
        next_observation = self.get_observation()

        # Store experience in replay buffer
        env.replay_buffer_prey.add((observation, action, reward, next_observation, self.is_captured))


In [None]:
class Predator(Agent):
    def __init__(self, position, env, **kwargs):
        super(Predator, self).__init__(position, speed=20.5, max_angular_velocity=2.5, agent_type='predator', env=env)
        self.perception_range = np.sqrt(env.width**2 + env.height**2)
    
    def get_observation(self):
        # Get up to six nearest prey
        preys = env.find_nearby_agents(self, agent_type='prey', max_agents=6)
        prey_obs = []
        for prey in preys:
            relative_pos = prey.position - self.position
            relative_pos = env.apply_periodic_adjustment(relative_pos)
            relative_pos /= self.perception_range
            prey_obs.extend(relative_pos)
        # Pad with zeros if fewer than 6 prey
        while len(prey_obs) < 6 * 2:
            prey_obs.extend([0.0, 0.0])
        
        # Get up to six nearest predator allies (excluding self)
        allies = env.find_nearby_agents(self, agent_type='predator', max_agents=6, exclude_self=True)
        ally_obs = []
        for ally in allies:
            relative_pos = ally.position - self.position
            relative_pos = env.apply_periodic_adjustment(relative_pos)
            relative_pos /= self.perception_range
            ally_obs.extend(relative_pos)
        # Pad with zeros if fewer than 6 allies
        while len(ally_obs) < 6 * 2:
            ally_obs.extend([0.0, 0.0])
        
        # Include self information (heading angle)
        observation = prey_obs + ally_obs + [np.cos(self.heading_angle), np.sin(self.heading_angle)]
        print(f"Prey {id(self)} Observation: {observation}")
        return np.array(observation)


    def move(self, dt=0.1):
        # Get observation
        observation = self.get_observation()

        # Select action using shared actor network
        action = env.predator_actor.select_action(observation)

        # Move agent
        super().move(action, dt)

        # Reward calculation
        captured_prey = None
        for agent in env.agents:
            if agent.type == 'prey':
                distance_vector = self.position - agent.position
                distance_vector = env.apply_periodic_adjustment(distance_vector)
                distance = np.linalg.norm(distance_vector)
                if distance < (self.radius + agent.radius):
                    captured_prey = agent
                    break

        if captured_prey:
            reward = 100.0  # Reward for capturing prey
            # Optionally, you can mark prey as captured or respawn it
        else:
            reward = -1.0  # Small penalty for not capturing prey

        # Movement cost
        reward -= 0.1 * abs(action[0])

        # Next observation
        next_observation = self.get_observation()

        # Store experience in replay buffer
        env.replay_buffer_predator.add((observation, action, reward, next_observation, False))


In [116]:
class PredatorPreyEnvironment:
    def __init__(self, width=100, height=100):
        self.width = width
        self.height = height
        self.agents = []

        # Replay buffers
        self.replay_buffer_prey = ReplayBuffer(capacity=100000)
        self.replay_buffer_predator = ReplayBuffer(capacity=100000)

        # Discount factor
        self.gamma = 0.99

        # Soft update parameter
        self.tau = 0.01

        # Shared networks for prey
        self.prey_actor = SharedAgentNetwork(input_dim=26, action_dim=1)
        self.prey_critic = SharedAgentNetwork(input_dim=26, action_dim=1, is_actor=False)
        
        self.predator_actor = SharedAgentNetwork(input_dim=26, action_dim=1)
        self.predator_critic = SharedAgentNetwork(input_dim=26, action_dim=1, is_actor=False)

    def add_agent(self, agent):
        self.agents.append(agent)

    def update(self, dt=0.1):
        # Update each agent
        for agent in self.agents[:]:
            agent.move(dt=dt)
        
        # After moving, check for overlapping agents and adjust positions
        for i, agent in enumerate(self.agents):
            for j in range(i + 1, len(self.agents)):
                other_agent = self.agents[j]
                distance_vector = agent.position - other_agent.position
                distance_vector = self.apply_periodic_adjustment(distance_vector)
                distance = np.linalg.norm(distance_vector)
                min_distance = (agent.radius + other_agent.radius) * 1.5  # Adjust as needed
                if distance < min_distance:
                    # Adjust the position slightly to avoid overlap
                    direction = distance_vector
                    if np.linalg.norm(direction) == 0:
                        direction = np.random.uniform(-1, 1, 2)  # Random direction if they are exactly on top
                    else:
                        direction = direction / np.linalg.norm(direction)
                    shift = (min_distance - distance) / 2
                    agent.position += direction * shift
                    other_agent.position -= direction * shift

                    # Apply boundary conditions after adjustment
                    agent.apply_boundary_conditions(self.width, self.height)
                    other_agent.apply_boundary_conditions(self.width, self.height)
        
        # Train agents
        self.train_agents()

    def train_agents(self, batch_size=64):
        # Train prey agents
        if len(self.replay_buffer_prey.buffer) >= batch_size:
            batch = self.replay_buffer_prey.sample(batch_size)
            self.update_shared_agent(batch, agent_type='prey')

        # Train predator agents
        if len(self.replay_buffer_predator.buffer) >= batch_size:
            batch = self.replay_buffer_predator.sample(batch_size)
            self.update_shared_agent(batch, agent_type='predator')

    def update_shared_agent(self, batch, agent_type):
        # Extract batch components
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states)
        actions = torch.FloatTensor(actions)
        rewards = torch.FloatTensor(rewards).unsqueeze(1)
        next_states = torch.FloatTensor(next_states)
        dones = torch.BoolTensor(dones).unsqueeze(1)

        # Select shared networks
        if agent_type == 'prey':
            actor = self.prey_actor.actor
            target_actor = self.prey_actor.target_actor
            critic = self.prey_critic.critic
            target_critic = self.prey_critic.target_critic
            actor_optimizer = self.prey_actor.actor_optimizer
            critic_optimizer = self.prey_critic.critic_optimizer
        else:
            actor = self.predator_actor.actor
            target_actor = self.predator_actor.target_actor
            critic = self.predator_critic.critic
            target_critic = self.predator_critic.target_critic
            actor_optimizer = self.predator_actor.actor_optimizer
            critic_optimizer = self.predator_critic.critic_optimizer

        # Critic update
        with torch.no_grad():
            next_actions = target_actor(next_states)
            q_next = target_critic(next_states, next_actions)
            q_target = rewards + self.gamma * q_next * (~dones)

        q_values = critic(states, actions)
        critic_loss = nn.MSELoss()(q_values, q_target)

        critic_optimizer.zero_grad()
        critic_loss.backward()
        critic_optimizer.step()

        # Actor update
        actor_loss = -critic(states, actor(states)).mean()

        actor_optimizer.zero_grad()
        actor_loss.backward()
        actor_optimizer.step()

        # Soft update target networks
        self.soft_update(actor, target_actor)
        self.soft_update(critic, target_critic)

    def soft_update(self, local_model, target_model):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(self.tau * local_param.data + (1.0 - self.tau) * target_param.data)

    def apply_periodic_adjustment(self, vector):
        # Adjust vector components for periodic boundary conditions
        adjusted_vector = vector.copy()
        half_width = self.width / 2.0
        half_height = self.height / 2.0
        if adjusted_vector[0] > half_width:
            adjusted_vector[0] -= self.width
        elif adjusted_vector[0] < -half_width:
            adjusted_vector[0] += self.width
        if adjusted_vector[1] > half_height:
            adjusted_vector[1] -= self.height
        elif adjusted_vector[1] < -half_height:
            adjusted_vector[1] += self.height
        return adjusted_vector

    def find_nearby_agents(self, agent, agent_type, max_agents=6, exclude_self=False):
        agents = []
        distances = []
        for other_agent in self.agents:
            if other_agent.type == agent_type and (not exclude_self or other_agent is not agent):
                distance_vector = other_agent.position - agent.position
                distance_vector = self.apply_periodic_adjustment(distance_vector)
                distance = np.linalg.norm(distance_vector)
                distances.append((distance, other_agent))
        # Sort by distance
        distances.sort(key=lambda x: x[0])
        # Return up to max_agents
        nearby_agents = [item[1] for item in distances[:max_agents]]
        return nearby_agents

    def adjust_positions_for_traces(self, positions):
        adjusted_positions = [positions[0]]
        for i in range(1, len(positions)):
            prev = adjusted_positions[-1]
            curr = positions[i].copy()
            dx = curr[0] - prev[0]
            dy = curr[1] - prev[1]

            # Adjust for wrapping in x-direction
            if dx > self.width / 2:
                curr[0] -= self.width
            elif dx < -self.width / 2:
                curr[0] += self.width

            # Adjust for wrapping in y-direction
            if dy > self.height / 2:
                curr[1] -= self.height
            elif dy < -self.height / 2:
                curr[1] += self.height

            adjusted_positions.append(curr)
        return np.array(adjusted_positions)

    def get_agent_shape(self, agent):
        direction = agent.heading_vector
        direction = direction / np.linalg.norm(direction)
        perp_direction = np.array([-direction[1], direction[0]])
        shape_scale_factor = 2.0  # Match this with the collision avoidance
        size = agent.radius * shape_scale_factor  # Scale the size based on agent's radius
        front = agent.position + direction * size
        left = agent.position - direction * size * 0.5 + perp_direction * size * 0.5
        right = agent.position - direction * size * 0.5 - perp_direction * size * 0.5
        return [front, left, right]
    
    def animate(self, steps=100):
        fig, ax = plt.subplots(figsize=(5, 5))
        ax.set_xlim(0, self.width)
        ax.set_ylim(0, self.height)
        ax.set_xticks([])
        ax.set_yticks([])

        # Initialize a dictionary to keep track of agent patches and traces
        agent_artists = {}

        # Initialize the agents' patches and traces
        for agent in self.agents:
            if agent.type == 'prey':
                triangle = plt.Polygon(self.get_agent_shape(agent), closed=True, color='b')
                line, = ax.plot([], [], color='b', alpha=0.2)
            elif agent.type == 'predator':
                triangle = plt.Polygon(self.get_agent_shape(agent), closed=True, color='orange', alpha=0.8)
                line, = ax.plot([], [], color='orange', alpha=0.2)
            agent_artists[agent] = {'patch': triangle, 'trace': line}
            ax.add_patch(triangle)

        def update(frame):
            self.update(dt=0.05)  # Reduced dt for smoother motion

            # Update positions and traces
            for agent in self.agents:
                artist = agent_artists.get(agent)
                if artist:
                    patch = artist['patch']
                    trace = artist['trace']

                    patch.set_xy(self.get_agent_shape(agent))
                    # Update the trace with the agent's position history
                    positions = np.array(agent.position_history)
                    # Adjust positions for toroidal wrapping
                    positions = self.adjust_positions_for_traces(positions)
                    trace.set_data(positions[:, 0], positions[:, 1])
                else:
                    # Handle new agents (if any)
                    if agent.type == 'prey':
                        triangle = plt.Polygon(self.get_agent_shape(agent), closed=True, color='b')
                        line, = ax.plot([], [], color='b', alpha=0.2)
                    elif agent.type == 'predator':
                        triangle = plt.Polygon(self.get_agent_shape(agent), closed=True, color='orange', alpha=0.8)
                        line, = ax.plot([], [], color='orange', alpha=0.2)
                    agent_artists[agent] = {'patch': triangle, 'trace': line}
                    ax.add_patch(triangle)

            # Remove agents that are no longer in the environment
            agents_to_remove = [agent for agent in agent_artists if agent not in self.agents]
            for agent in agents_to_remove:
                artist = agent_artists.pop(agent)
                artist['patch'].remove()
                artist['trace'].remove()

            return []

        ani = animation.FuncAnimation(fig, update, frames=steps, blit=False, interval=50, repeat=True)  # Set repeat=True
        plt.close(fig)  # Close the figure to prevent additional static display
        return HTML(ani.to_jshtml())

In [117]:
# Create environment instance
width = 70
env = PredatorPreyEnvironment(width, width)

# Add prey agents
num_prey = 15
for _ in range(num_prey):
    initial_position = np.random.uniform(0, width, 2)
    prey = Prey(position=initial_position, env=env)
    env.add_agent(prey)

# Add predator agents
num_predators = 3
for _ in range(num_predators):
    initial_position = np.random.uniform(0, width, 2)
    predator = Predator(position=initial_position, env=env)
    env.add_agent(predator)

# Run the simulation
for episode in range(1):  # Number of episodes
    for step in range(200):  # Steps per episode
        env.update(dt=0.1)

    # Optionally, print progress
    print(f"Episode {episode + 1} completed.")

# Animate the environment
animation_html = env.animate(steps=200)
animation_html


Episode 1 completed.
