# SWARM: Job Selection via Consensus

## Import the libraries

In [None]:
from ipaddress import ip_address, IPv4Address, IPv6Address, IPv4Network, IPv6Network
import ipaddress

from fabrictestbed_extensions.fablib.fablib import FablibManager as fablib_manager

fablib = fablib_manager()
                     
fablib.show_config();

## Define variables

In [None]:
name_prefix = "agent"
node_count = 30
agents_per_node = 1
slice_name = f'MySlice-swarm-single-site-{node_count}'

db_node_name = "database"

# Node profile parameters
cores = 8
ram = 8 #32
disk = 100
image = "docker_ubuntu_22"
branch = "15-resilience-and-perf-improvements"
network_name = "fabv4"

## Determine sites

In [None]:
sites = fablib.get_random_sites(count=1)
print(f'Preparing to create slice "{slice_name}" in site {sites}')

In [None]:
sites=["TACC"]

## Slice Creation

- **Database Node**
  - Allocate a node to host the Redis database. Ensure this node is connected to the L3 FabNetV4 network to enable communication with the agent nodes.

- **Agent Cluster**
  - Provision the number of nodes specified by `swarm_node_count` for deploying Swarm agents, ideally distributing them across multiple sites.
  - Each agent node should also be connected to the L3 FabNetV4 network to facilitate inter-node communication.

In [None]:
import random
# Create Slice
slice = fablib.new_slice(name=slice_name)

net = slice.add_l3network(name=network_name, type='IPv4')


database = slice.add_node(name=db_node_name, site=sites[0], image=image, disk=disk, cores=cores, ram=ram)
iface = database.add_component(model='NIC_Basic', name='nic1').get_interfaces()[0]
iface.set_mode('manual')
net.add_interface(iface)

# Add nodes for Agents and connect them to the kafka cluster
for idx in range(node_count):
    site = sites[0]
    agent = slice.add_node(name=f"{name_prefix}-{idx+1}", site=site, image=image, disk=disk, cores=cores, ram=ram)
    iface = agent.add_component(model='NIC_Basic', name='nic1').get_interfaces()[0]
    iface.set_mode('manual')
    net.add_interface(iface)

# Submit Slice Request
slice.submit(wait=False)

In [None]:
slice=fablib.get_slice(slice_name)

net = slice.get_network(name=network_name)


# Add nodes for Agents and connect them to the kafka cluster
for idx in range(30, 60):
    site = sites[0]
    agent = slice.add_node(name=f"{name_prefix}-{idx+1}", site=site, image=image, disk=disk, cores=cores, ram=ram)
    iface = agent.add_component(model='NIC_Basic', name='nic1').get_interfaces()[0]
    iface.set_mode('manual')
    net.add_interface(iface)

# Submit Slice Request
slice.submit(wait=False)

In [None]:
slice=fablib.get_slice(slice_name)

slice.wait(timeout=1200)
slice.wait_ssh()

In [None]:
slice.post_boot_config()

In [None]:
slice=fablib.get_slice(slice_name)
slice.list_nodes();

In [None]:
slice.list_networks();

In [None]:
slice = fablib.get_slice(slice_name)

# Cache the nodes, networks, interfaces; this becomes expensive as the slice scales due to fablib's limitation of doing SSH for interfaces
nodes = slice.get_nodes()
node_by_name = {n.get_name(): n for n in nodes}

networks = slice.get_networks()
nw_by_name = {nw.get_name(): nw for nw in networks}

# Cache interfaces (expensive) once
node_ifaces = {n.get_name(): n.get_interfaces() for n in nodes}
nw_ifaces = {nw.get_name(): nw.get_interfaces() for nw in networks}

In [None]:
for n in nodes:
    n.upload_directory("node_tools", ".")
    n.execute("cd node_tools && chmod +x *.sh")

In [None]:
# Helper: next host IP generator for a subnet
def host_iter(ipnet):
    # Skip network & broadcast using .hosts()
    return ipnet.hosts()

assigned_ip = {}

for nw_name, nw in nw_by_name.items():
    subnet = nw.get_subnet()
    hiter = host_iter(subnet)
    ip = next(hiter)    
    for iface in nw_ifaces[nw_name]:
        ip = next(hiter)
        node_name = iface.get_node().get_name()
        print("Configuring IP on {node_name} for nw {nw_name}")
        cmd = f"sudo node_tools/setup-netplan-multihomed.sh -i {iface.get_physical_os_interface_name()} -a {ip}/24 -g {nw.get_gateway()}"
        print(cmd)
        iface.get_node().execute(cmd)
        assigned_ip[(nw_name, node_name)] = str(ip)

In [None]:
for node in nodes:
    node.execute('sudo ssh-keygen -t rsa -N "" -f /root/.ssh/id_rsa', quiet=True, output_file=f"{node.get_name()}.log")

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from ipaddress import IPv4Network

# ------------------------------------
# 1) Collect SSH pubkeys in parallel
# ------------------------------------
def read_pubkey(node):
    out, err = node.execute("sudo cat /root/.ssh/id_rsa.pub", quiet=True)
    return node.get_name(), out.strip()

key_map = {}
with ThreadPoolExecutor(max_workers=min(16, len(nodes) or 1)) as pool:
    futures = [pool.submit(read_pubkey, n) for n in nodes]
    for f in as_completed(futures):
        name, key = f.result()
        key_map[name] = key

# ---------------------------------------------------
# 2) Append other nodes' pubkeys to each authorized_keys
#    (parallel + here-doc; idempotent-ish by dedupe)
# ---------------------------------------------------
def write_keys(node):
    my_name = node.get_name()
    ssh_keys_block = "\n".join(
        k for nn, k in key_map.items() if nn != my_name and k
    ).strip()
    if not ssh_keys_block:
        return

    # Ensure .ssh exists and permissions are correct, then append unique keys
    # Use sort -u to avoid duplicate lines across reruns.
    script = r"""sudo bash -lc '
set -e
mkdir -p /root/.ssh
touch /root/.ssh/authorized_keys
cat <<"EOF" >> /root/.ssh/authorized_keys.__tmp
{keys}
EOF
cat /root/.ssh/authorized_keys /root/.ssh/authorized_keys.__tmp | sort -u > /root/.ssh/authorized_keys.__new
mv /root/.ssh/authorized_keys.__new /root/.ssh/authorized_keys
rm -f /root/.ssh/authorized_keys.__tmp
chmod 700 /root/.ssh
chmod 600 /root/.ssh/authorized_keys
'""".format(keys=ssh_keys_block)
    node.execute(script, quiet=True)

with ThreadPoolExecutor(max_workers=min(16, len(nodes) or 1)) as pool:
    futures = [pool.submit(write_keys, n) for n in nodes]
    for _ in as_completed(futures):
        pass

In [None]:
# ---------------------------------------------------
# 3) Build /etc/hosts from the assigned_ip map (no extra get_* calls)
#     For each node: add all peers' IPs on the node's networks.
# ---------------------------------------------------
# Precompute: networks per node from cached node_ifaces
nets_per_node = {
    nname: list({iface.get_network().get_name() for iface in ifs})
    for nname, ifs in node_ifaces.items()
}


def build_hosts_block_for(node_name: str) -> str:
    # Sort networks by priority so the first IP chosen per peer is the preferred one.
    node_nets = sorted(nets_per_node[node_name])

    host_to_ip = {}  # peer_name -> chosen_ip
    for nw_name in node_nets:
        for iface in nw_ifaces[nw_name]:
            peer = iface.get_node().get_name()
            if peer in host_to_ip:
                # already recorded an IP for this peer; skip
                continue
            ip = assigned_ip.get((nw_name, peer))
            if ip:
                host_to_ip[peer] = ip

    # Stable output: sort by hostname
    lines = [f"{ip} {host}" for host, ip in sorted(host_to_ip.items(), key=lambda x: x[0])]
    return "\n".join(lines)

hosts_blocks = {n.get_name(): build_hosts_block_for(n.get_name()) for n in nodes}

for n in nodes:
    host_info = hosts_blocks[n.get_name()]
    n.execute(f"sudo sh -c 'echo \"{host_info}\" >> /etc/hosts'")

#-------------------------------
# Dump the etc hosts
#-------------------------------

import json
print("ETC Hosts:", json.dumps(hosts_blocks, indent=2))

In [None]:
database = node_by_name.get(db_node_name)
database.upload_file("push_swarmagents.sh", "push_swarmagents.sh")
stdout, stderr = database.execute(f"chmod +x push_swarmagents.sh && sudo ./push_swarmagents.sh {node_count}", quiet=True, output_file=f"{database.get_name()}.log")

In [None]:
for node in nodes:
    node.upload_file("install.sh", "install.sh")
    node.execute("chmod +x install.sh && ./install.sh", quiet=True, output_file=f"{node.get_name()}.log")

## Running SWARM-MULTI Consensus Setup

### Setup redis node

In [None]:
db_node = node_by_name.get(db_node_name)
stdout, stderr = db_node.execute(f'sudo bash -c "cd /root/SwarmAgents && docker compose up -d redis"', quiet=True)

### Setup agents

In [None]:
for n in nodes:
    stdout, stderr = n.execute(f'sudo bash -c "cd /root/SwarmAgents && pip3.11 install -r requirements.txt"', quiet=True)
    stdout, stderr = n.execute(f'sudo bash -c "cd /root/SwarmAgents && pip3.11 install protobuf==3.20.3"', quiet=True)
    stdout, stderr = n.execute(f'sudo bash -c "cd /root/SwarmAgents && pip3.11 install -r requirements.txt"', quiet=True)

## Trigger consensus from the database Node

In [None]:
db_node = node_by_name.get(db_node_name)

In [None]:
stdout, stderr = db_node.execute(f'sudo bash -c "cd /root/SwarmAgents && ./batch_tests_v2.py --runs 1 --base-out run-h-30-100 --mode remote --agent-type resource --agents 30 --topology hierarchical --hierarchical-level1-agent-type resource --jobs 100 --db-host database --job-interval 120 --jobs-per-interval 1"')

In [None]:
stdout, stderr = db_node.execute(f'sudo bash -c "cd /root/SwarmAgents && tar -zcf /tmp/run-h-30-100.tgz run-h-30-100/"')

In [None]:
db_node.download_file("run-h-30-100.tgz", "/tmp/run-h-30-100.tgz")

In [None]:
!tar -zxvf run-h-30-100.tgz

**Parent Agents - LLM**

**Children Agents - Heuristic**

![Topolgy](./run-h-30-100/run01/hierarchical_topology.png)

![](./run-h-30-100/run01/latency_comparison_by_hierarchy_level.png)

### Delete the Slice

In [None]:
#slice.delete()