<a href="https://colab.research.google.com/github/novice108/ITT2025/blob/main/%D0%A2%D0%B0%D0%B1%D0%BB%D0%9F%D0%BE%D1%80%D1%96%D0%B2%D0%BD%D0%90%D0%BB%D0%B3%D0%9F%D1%80%D1%8F%D0%BC%D0%BE%D0%BB%D1%96%D0%BD%D0%A0%D1%83%D1%851000%D0%B0%D0%B3%D0%B5%D0%BD%D1%82%D1%96%D0%B2_ipynb%22.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# %% [markdown]
# # Об'єктивне порівняння алгоритмів керування зграями
# **Однакові умови тестування: фіксований час на агента та регульована середня швидкість**

# %%
#!pip install cvxpy osqp numpy matplotlib pandas pyswarm
import numpy as np
import cvxpy as cp
from dataclasses import dataclass, field
from typing import List, Optional
import time
import pandas as pd
import random
from pyswarm import pso  # Додано імпорт pyswarm

# %% [markdown]
# ## 1. Уніфіковані класи агентів

# %%
@dataclass
class PIDController:
    Kp: float = 1.2
    Ki: float = 0.05
    Kd: float = 0.4
    target_speed: float = 2.0
    integral: float = 0.0
    prev_error: float = 0.0
    dt: float = 0.1
    safety_zone: str = "normal"

    @property
    def safety_penalty(self) -> float:
        return {"critical": 0.9, "warning": 0.5, "normal": 0.1}[self.safety_zone]

    def compute(self, current_speed: float) -> float:
        error = self.target_speed - current_speed
        self.integral += error * self.dt
        derivative = (error - self.prev_error) / self.dt
        self.prev_error = error

        if self.safety_zone == "critical":
            return -current_speed * 2.0
        elif self.safety_zone == "warning":
            return self.Kp*error + 0.3*self.Ki*self.integral + 0.9*self.Kd*derivative
        return self.Kp*error + 0.1*self.Ki*self.integral + self.Kd*derivative

@dataclass
class Agent:
    id: int
    position: np.ndarray = field(default_factory=lambda: np.zeros(2))
    velocity: np.ndarray = field(default_factory=lambda: np.zeros(2))
    pid: PIDController = field(default_factory=PIDController)
    radius: float = 1.0
    role: str = "follower"
    max_speed: float = 3.0
    base_target: np.ndarray = field(default_factory=lambda: np.array([100.0, 0.0]))

    def update_safety_zone(self, min_dist: float):
        if min_dist < 1.5*self.radius:
            self.pid.safety_zone = "critical"
        elif min_dist < 3*self.radius:
            self.pid.safety_zone = "warning"
        else:
            self.pid.safety_zone = "normal"

    def apply_speed_limit(self):
        speed = np.linalg.norm(self.velocity)
        if speed > self.max_speed:
            self.velocity = self.velocity * self.max_speed / speed

# %% [markdown]
# ## 2. Стабілізовані алгоритми керування

# %%
class CBFPIDController:
    def __init__(self, target_speed=2.0):
        self.target_speed = target_speed
        self.solver_settings = {'verbose': False, 'max_iter': 500}

    def compute_control(self, agent: Agent, neighbors: List[Agent]) -> np.ndarray:
        u = cp.Variable(2)
        target_direction = np.array([self.target_speed, 0.0])
        objective = cp.Minimize(cp.sum_squares(u - target_direction))
        constraints = [
            cp.sum_squares(agent.position - n.position) >= (4*agent.radius)**2
            for n in neighbors
        ] if neighbors else []

        prob = cp.Problem(objective, constraints)
        prob.solve(solver=cp.OSQP, **self.solver_settings)
        return u.value if u.value is not None else target_direction

class BoidsSafeController:
    def __init__(self, target_speed=2.0):
        self.sep_weight = 2.5
        self.target_speed = target_speed

    def compute_control(self, agent: Agent, neighbors: List[Agent]) -> np.ndarray:
        if not neighbors:
            return np.array([self.target_speed, 0])

        positions = np.array([n.position for n in neighbors])
        velocities = np.array([n.velocity for n in neighbors])
        dists = np.linalg.norm(positions - agent.position, axis=1)

        agent.update_safety_zone(np.min(dists))
        sep = np.mean((agent.position - positions)/(dists[:, None]**2 + 1e-5), axis=0)
        align = np.mean(velocities, axis=0)
        coh = np.mean(positions, axis=0) - agent.position

        direction = self.sep_weight*sep + align + 0.5*coh
        speed = np.linalg.norm(direction)
        return direction * (self.target_speed / speed) if speed > 0 else np.array([self.target_speed, 0])

class VicsekSafeController:
    def __init__(self, target_speed=2.0, noise=0.05):
        self.noise = noise
        self.target_speed = target_speed

    def compute_control(self, agent: Agent, neighbors: List[Agent]) -> np.ndarray:
        if neighbors:
            avg_vel = np.mean([n.velocity for n in neighbors], axis=0)
            speed = np.linalg.norm(avg_vel)
            if speed > 0:
                avg_vel = avg_vel * (self.target_speed / speed)
            return avg_vel + np.random.normal(scale=self.noise, size=2)
        return np.array([self.target_speed, 0])

class PotentialFieldController:
    def __init__(self, target_speed=2.0, alpha=0.5, beta=1.2):
        self.alpha = alpha
        self.beta = beta
        self.target_speed = target_speed

    def compute_control(self, agent: Agent, neighbors: List[Agent]) -> np.ndarray:
        attraction = self.alpha * (agent.base_target - agent.position)
        repulsion = np.zeros(2)

        for n in neighbors:
            vec = agent.position - n.position
            dist = np.linalg.norm(vec)
            repulsion += self.beta * vec / (dist**3 + 1e-5)

        direction = attraction + repulsion
        speed = np.linalg.norm(direction)
        return direction * (self.target_speed / speed) if speed > 0 else np.array([self.target_speed, 0])

class LeaderFollowerController:
    def __init__(self, target_speed=2.0):
        self.target_speed = target_speed

    def compute_control(self, agent: Agent, neighbors: List[Agent]) -> np.ndarray:
        if agent.role == "leader":
            return np.array([self.target_speed, 0.0])

        leader = next((n for n in neighbors if n.role == "leader"), None)
        if leader:
            direction = leader.position - agent.position
            dist = np.linalg.norm(direction)
            return (direction / dist) * self.target_speed if dist > 0 else np.zeros(2)
        return np.zeros(2)

class PSOController:
    def __init__(self, target_speed=2.0, max_speed=3.0):
        self.target_speed = target_speed
        self.max_speed = max_speed

    def compute_control(self, agent: Agent, neighbors: List[Agent]) -> np.ndarray:
        dt = agent.pid.dt

        # Функція придатності для оптимізації PSO
        def fitness(v):
            v_vec = np.array(v)
            new_position = agent.position + v_vec * dt

            # Відстань до цільової точки
            target_dist = np.linalg.norm(new_position - agent.base_target)

            # Відстань до центру сусідів
            if neighbors:
                center = np.mean([n.position for n in neighbors], axis=0)
                center_dist = np.linalg.norm(new_position - center)
            else:
                center_dist = 0.0

            # Штраф за зіткнення
            collision_penalty = 0.0
            for n in neighbors:
                dist = np.linalg.norm(new_position - n.position)
                if dist < 2 * agent.radius:
                    collision_penalty += 1000 * (2 * agent.radius - dist)

            return target_dist + 0.5 * center_dist + collision_penalty

        # Межі для швидкості
        lb = [-self.max_speed, -self.max_speed]
        ub = [self.max_speed, self.max_speed]

        # Оптимізація за допомогою PSO
        v_opt, _ = pso(fitness, lb, ub, swarmsize=10, maxiter=5, debug=False)

        return np.array(v_opt)

class RandomWalkController:
    def __init__(self, target_speed=2.0):
        self.target_speed = target_speed

    def compute_control(self, agent: Agent, neighbors: List[Agent]) -> np.ndarray:
        base = np.random.uniform(-0.3, 0.3, size=2)
        direction = base * (1 - agent.pid.safety_penalty)
        speed = np.linalg.norm(direction)
        return direction * (self.target_speed / speed) if speed > 0 else np.array([self.target_speed, 0])

# %% [markdown]
# ## 3. Уніфікована симуляція з контролем часу та швидкості

# %%
def balanced_simulation(agents: List[Agent], controller, target_speed: float, steps: int = 100):
    collisions = 0
    total_control_time = 0.0

    # Налаштування цільової швидкості
    if hasattr(controller, 'target_speed'):
        controller.target_speed = target_speed
    for agent in agents:
        agent.pid.target_speed = target_speed

    for _ in range(steps):
        positions = np.array([a.position for a in agents])

        for i, agent in enumerate(agents):
            dists = np.linalg.norm(positions - agent.position, axis=1)
            neighbor_indices = np.where((dists > 0) & (dists < 5*agent.radius))[0]
            neighbors = [agents[j] for j in neighbor_indices]

            start_time = time.perf_counter()
            control = controller.compute_control(agent, neighbors)
            total_control_time += time.perf_counter() - start_time

            agent.velocity = control
            agent.apply_speed_limit()
            agent.position += agent.velocity * agent.pid.dt

        # Перевірка зіткнень на оновлених позиціях
        new_positions = np.array([a.position for a in agents])
        dist_matrix = np.linalg.norm(new_positions[:, None] - new_positions, axis=2)
        np.fill_diagonal(dist_matrix, np.inf)
        collisions += np.sum(dist_matrix < 2*agents[0].radius) // 2

    # Розрахунок реальної середньої швидкості
    avg_speed = np.mean([np.linalg.norm(a.velocity) for a in agents])
    time_per_agent = total_control_time / (len(agents) * steps)

    return {
        "collisions": collisions,
        "avg_speed": avg_speed,
        "time_per_agent": time_per_agent
    }

# %% [markdown]
# ## 4. Порівняльне тестування алгоритмів

# %%
if __name__ == "__main__":
    # Параметри тестування
    TARGET_SPEED = 28.0
    AGENT_COUNT = 1000
    SIM_STEPS = 200

    # Створення агентів
    agents = [
        Agent(
            id=i,
            position=np.array([i*8.0, 0.]),
            base_target=np.array([200.0, 0.0])
        ) for i in range(AGENT_COUNT)
    ]
    agents[0].role = "leader"

    # Ініціалізація контролерів
    controllers = {
        "CBF-PID": CBFPIDController(target_speed=TARGET_SPEED),
        "Boids-Safe": BoidsSafeController(target_speed=TARGET_SPEED),
        "Vicsek-Safe": VicsekSafeController(target_speed=TARGET_SPEED),
        "Potential-Field": PotentialFieldController(target_speed=TARGET_SPEED),
        "Leader-Follower": LeaderFollowerController(target_speed=TARGET_SPEED),
        "PSO-Safe": PSOController(target_speed=TARGET_SPEED),
        "Random-Walk": RandomWalkController(target_speed=TARGET_SPEED)
    }

    # Виконання тестів
    results = []
    for name, controller in controllers.items():
        agents_copy = [
            Agent(
                id=a.id,
                position=a.position.copy(),
                velocity=a.velocity.copy(),
                radius=a.radius,
                max_speed=a.max_speed,
                base_target=a.base_target.copy()
            ) for a in agents
        ]
        agents_copy[0].role = "leader"

        res = balanced_simulation(
            agents_copy,
            controller,
            target_speed=TARGET_SPEED,
            steps=SIM_STEPS
        )

        results.append({
            "Algorithm": name,
            "Collisions": res["collisions"],
            "Avg Speed": f"{res['avg_speed']:.2f} m/s",
            "Time per Agent": f"{res['time_per_agent']*1e6:.2f} μs"
        })

    # Вивід результатів
    results_df = pd.DataFrame(results)
    print("\n" + "="*70)
    print(f"Результати тестування ({AGENT_COUNT} агентів, {SIM_STEPS} кроків)")
    print("="*70)
    print(results_df)

[1;30;43mПоказано результат, скорочений до останніх рядків (5000).[0m
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum iterations reached --> 5
Stopping search: maximum ite