In [2]:
import time

from SwarmEnvironment import *
from SwarmAgent import *
import numpy
# ------------------------------
# 4. Training Loop
# ------------------------------
numpy.random.seed(20)
manual_selected_device = "cpu"

env = SwarmEnv(n_agents=20, space_size=50)
env.set_random_goals()
agent = Agent(state_dim=9, action_dim=7, device="cpu")  # each agent observes 9 values


agent.gamma = 0.95 # q learning gamma, learning rate
agent.epsilon = 1.0 # action randomness 1 for fully random
agent.batch_size = 64

epsilon_decay = 0.9 # action randomness decay rate
epsilon_min = 0.05 # minimum epsilon

env.goal_reward = 2.0
env.collision_reward = -5.0
env.distance_reward_factor = 2.0

training_steps = 1000
episodes_length = 200

for episode in range(training_steps):
    observations = env.reset()
    total_reward = 0

    for step in range(episodes_length):
       # Batched GPU/MPS inference for all agents
       actions = agent.select_multiple_actions(observations)  # replaces the for-loop

        # Environment step (expects actions as a list or array)
       next_observations, rewards, done, _ = env.step(actions)

       # Store transitions for all agents
       for i in range(env.n_agents):
           agent.store(observations[i], actions[i], rewards[i], next_observations[i])

       # Train DQN
       agent.train_step()

       # Move to next step
       observations = next_observations
       total_reward += np.mean(rewards)

       # End early if environment finishes
       if done:
           break



    
    agent.update_target()
    agent.epsilon = max(epsilon_min, agent.epsilon * epsilon_decay)
    print(f"Episode {episode}, average total reward {total_reward:.2f}, eps {agent.epsilon:.2f}")

using device :  cpu
Episode 0, average total reward -0.07, eps 0.90
Episode 1, average total reward 12.69, eps 0.81
Episode 2, average total reward 20.42, eps 0.73
Episode 3, average total reward 30.18, eps 0.66
Episode 4, average total reward 46.71, eps 0.59
Episode 5, average total reward 41.64, eps 0.53
Episode 6, average total reward 49.62, eps 0.48
Episode 7, average total reward 52.33, eps 0.43
Episode 8, average total reward 59.24, eps 0.39
Episode 9, average total reward 61.19, eps 0.35
Episode 10, average total reward 57.21, eps 0.31
Episode 11, average total reward 59.33, eps 0.28
Episode 12, average total reward 57.20, eps 0.25
Episode 13, average total reward 63.32, eps 0.23
Episode 14, average total reward 58.21, eps 0.21
Episode 15, average total reward 62.10, eps 0.19
Episode 16, average total reward 61.67, eps 0.17
Episode 17, average total reward 64.63, eps 0.15
Episode 18, average total reward 65.77, eps 0.14
Episode 19, average total reward 61.13, eps 0.12
Episode 20

In [22]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import animation
from IPython.display import display, clear_output

def visualize_swarm(agent, env, steps=50, save=False, interval=10):
    """
    Visualize swarm movement in 3D and optionally save 4 views as .gif.
    - save=False → real-time live animation (in Jupyter)
    - save=True  → export 4 gifs: normal, xy, xz, yz
    """

    obs = env.reset()
    positions_history = [env.positions.copy()]

    for _ in range(steps):
        actions = [agent.select_action(o) for o in obs]
        obs, _, done, _ = env.step(actions)
        positions_history.append(env.positions.copy())
        if done:
            break

    positions_history = np.array(positions_history)  # shape: [T, n_agents, 3]
    n_steps, n_agents, _ = positions_history.shape

    # Normalize goal shape
    goals = np.atleast_2d(env.goal)
    if goals.shape[0] == 1:
        goals = np.repeat(goals, n_agents, axis=0)

    # ----------------------------
    # Helper to make and save GIFs
    # ----------------------------
    def make_animation(view_name, elev, azim):
        fig = plt.figure(figsize=(6, 6))
        ax = fig.add_subplot(111, projection='3d')
        ax.set_xlim(0, env.space_size)
        ax.set_ylim(0, env.space_size)
        ax.set_zlim(0, env.space_size)
        ax.set_xlabel("X-axis")
        ax.set_ylabel("Y-axis")
        ax.set_zlabel("Z-axis")
        ax.set_title(f"3D Swarm Movement ({view_name})")

        scat = ax.scatter([], [], [], c='blue', s=50, label='Agents')
        ax.scatter(goals[:,0], goals[:,1], goals[:,2],
                   c='red', s=100, marker='*', label='Goals')

        lines = [ax.plot([], [], [], 'gray', linestyle='--', linewidth=1)[0]
                 for _ in range(n_agents)]

        ax.view_init(elev=elev, azim=azim)
        ax.legend()

        def init():
            scat._offsets3d = ([], [], [])
            for line in lines:
                line.set_data([], [])
                line.set_3d_properties([])
            return [scat, *lines]

        def update(frame):
            pos = positions_history[frame]
            scat._offsets3d = (pos[:,0], pos[:,1], pos[:,2])
            for i, line in enumerate(lines):
                x = [pos[i, 0], goals[i, 0]]
                y = [pos[i, 1], goals[i, 1]]
                z = [pos[i, 2], goals[i, 2]]
                line.set_data(x, y)
                line.set_3d_properties(z)
            ax.set_title(f"3D Swarm Movement ({view_name}) - Step {frame}/{n_steps}")
            return [scat, *lines]

        ani = animation.FuncAnimation(
            fig, update, frames=n_steps, init_func=init,
            interval=interval, blit=False
        )

        if save:
            filename = f"swarm_simulation_{view_name.lower()}.gif"
            ani.save(filename, writer='pillow')
            print(f"✅ Saved {filename}")
        else:
            plt.show()

        plt.close(fig)


    # ------------------------------------------------
    # A) SAVE FOUR VIEWS AS GIFS
    # ------------------------------------------------
    if save:
        make_animation("normal", elev=30, azim=45)
        make_animation("xy", elev=90, azim=-90)
        make_animation("xz", elev=0, azim=-90)
        make_animation("yz", elev=0, azim=0)
        return

    # ------------------------------------------------
    # B) LIVE DISPLAY (REAL-TIME UPDATE in Jupyter)
    # ------------------------------------------------
    plt.ion()  # Turn on interactive mode
    fig = plt.figure(figsize=(6, 6))
    ax = fig.add_subplot(111, projection='3d')
    ax.set_xlim(0, env.space_size)
    ax.set_ylim(0, env.space_size)
    ax.set_zlim(0, env.space_size)
    ax.set_title("3D Swarm Movement (Live)")
    scat = ax.scatter([], [], [], c='blue', s=50, label='Agents')
    ax.scatter(goals[:,0], goals[:,1], goals[:,2],
               c='red', s=100, marker='*', label='Goals')
    lines = [ax.plot([], [], [], 'gray', linestyle='--', linewidth=1)[0]
             for _ in range(n_agents)]
    ax.legend()

    for frame in range(n_steps):
        pos = positions_history[frame]
        scat._offsets3d = (pos[:,0], pos[:,1], pos[:,2])
        for i, line in enumerate(lines):
            x = [pos[i, 0], goals[i, 0]]
            y = [pos[i, 1], goals[i, 1]]
            z = [pos[i, 2], goals[i, 2]]
            line.set_data(x, y)
            line.set_3d_properties(z)
        ax.set_title(f"3D Swarm Movement (Live) - Step {frame}/{n_steps}")
        display(fig)
        clear_output(wait=True)
        plt.pause(interval / 1000.0)

    plt.ioff()
    plt.show()


In [23]:
  visualize_swarm(agent, env, steps=500, save=True)

✅ Saved swarm_simulation_normal.gif
✅ Saved swarm_simulation_xy.gif
✅ Saved swarm_simulation_xz.gif
✅ Saved swarm_simulation_yz.gif
