# Stage 1

In this step, my goal is to implement a task distribution system with information gathering. Assume I have 5 agents, all of whome have a random chance of finding a task. Once a task is found, the agent will broadcast the task to all other agents. The agents will then decide which agent is best suited to complete the task. The agent that is best suited to complete the task will then be assigned the task.


In [7]:
from typing import Any
import random

def get_info():
    """Simulating information gathering"""

    tasks = list("abcde")
    number_of_tasks = random.randint(1, len(tasks))
    return random.sample(tasks, number_of_tasks)

In [8]:
for i in range(10):
    print(get_info())

['b', 'd', 'c', 'a', 'e']
['e', 'd', 'a']
['c']
['e']
['e']
['a', 'e', 'b', 'd']
['b']
['c', 'e', 'a']
['b']
['a', 'b', 'c', 'd']


### Iteration 1: Information Gathering

In [9]:
class Agent:
    """A basic agent class that can gather information, store it, and be assigned tasks"""
    def __init__(self, id: int):
        self.id = id
        self.captured_info = None
        self.gathered_info = None
        self.assigned_task = None
        
    def get_info(self) -> Any:
        if self.captured_info is None:
            self.captured_info = get_info()
        return self.captured_info
    
    def __str__(self):
        return str(
            {
                "id": self.id,
                "captured_info": self.captured_info,
                "gathered_info": self.gathered_info,
                "assigned_task": self.assigned_task
            }
        )

In [13]:
A = Agent(1)
A.get_info()
print(A)

{'id': 1, 'captured_info': ['e', 'a'], 'gathered_info': None, 'assigned_task': None}


### Iteration 2: Information Sharing

In [14]:
import threading
class Agent:
    """A basic agent class that can gather information, store it, and be assigned tasks
    Now with thread safety!
    Given a list of all other agents, if can share its captured tasks with them.
    """
    
    def __init__(self, id: int):
        self.id = id
        self.captured_info = None
        self.gathered_info = set()  # Set to track known tasks from all agents
        self.assigned_task = None
        self.lock = threading.Lock()

    def get_info(self) -> Any:
        if self.captured_info is None:
            self.captured_info = get_info()
        return self.captured_info
    
    def share_info(self, agent_list):
        """Share captured tasks with other agents."""
        for agent in agent_list:
            if agent.id != self.id:
                with agent.lock:  # Ensure thread-safe updates
                    agent.gathered_info.update(self.captured_info)

    def __str__(self):
        return str(
            {
                "id": self.id,
                "captured_info": self.captured_info,
                "gathered_info": self.gathered_info,
                "assigned_task": self.assigned_task
            }
        )

In [19]:
# Initialization
agent_1 = Agent(1)
agent_2 = Agent(2)

# Information Gathering
agent_1.get_info()
agent_2.get_info()

# Sharing information
agent_1.share_info([agent_2])
agent_2.share_info([agent_1])

# Displaying the agents
print(agent_1)
print(agent_2)

{'id': 1, 'captured_info': ['b'], 'gathered_info': {'a'}, 'assigned_task': None}
{'id': 2, 'captured_info': ['a'], 'gathered_info': {'b'}, 'assigned_task': None}


### Iteration 3: multithreaded task assignment

In [22]:
import threading
import time
import random
from queue import Queue

class Agent(threading.Thread):
    def __init__(self, id: int, result_lock: threading.Lock):
        super().__init__()
        self.id = id
        self.captured_info = set()
        self.assigned_task = None
        self.task_queue = Queue()
        self.result_lock = result_lock
        self.neighbors = []
        
    def get_info(self):
        """Capture initial tasks."""
        tasks = list("abcde")
        number_of_tasks = random.randint(1, len(tasks))
        self.captured_info.update(random.sample(tasks, number_of_tasks))

    def broadcast_info(self):
        """Broadcast task information to neighbors."""
        for neighbor in self.neighbors:
            neighbor.task_queue.put((self.id, self.captured_info))

    def listen_for_info(self):
        """Listen for task updates from neighbors."""
        while not self.task_queue.empty():
            sender_id, tasks = self.task_queue.get()
            print(f"Agent {self.id} received tasks {tasks} from Agent {sender_id}")
            self.captured_info.update(tasks)

    def assign_unique_task(self, assigned_tasks):
        """Choose a unique task."""
        available_tasks = self.captured_info - assigned_tasks
        if available_tasks:
            self.assigned_task = available_tasks.pop()
            assigned_tasks.add(self.assigned_task)

    def run(self):
        """Main agent behavior."""
        # Step 1: Capture tasks
        self.get_info()

        # Step 2: Broadcast initial tasks
        self.broadcast_info()

        # Step 3: Listen and update from neighbors
        time.sleep(0.5)  # Allow time for messages to propagate
        self.listen_for_info()

        # Synchronize task assignment
        with self.result_lock:
            self.assign_unique_task(assigned_tasks)

        print(f"Agent {self.id} assigned task: {self.assigned_task}")

In [23]:
# Shared resources
NUM_AGENTS = 5
assigned_tasks = set()
result_lock = threading.Lock()

# Create agents
agents = [Agent(id=i, result_lock=result_lock) for i in range(NUM_AGENTS)]

# Link agents as neighbors (fully connected graph)
for i, agent in enumerate(agents):
    agent.neighbors = [a for j, a in enumerate(agents) if i != j]

# Start all agents
for agent in agents:
    agent.start()

# Wait for all agents to complete
for agent in agents:
    agent.join()

# Show the final state
print("\nFinal Agent States:")
for agent in agents:
    print(f"Agent {agent.id}: Assigned Task = {agent.assigned_task}")

Agent 0 received tasks {'a'} from Agent 1Agent 4 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 0
Agent 4 received tasks {'a'} from Agent 1
Agent 4 received tasks {'d', 'c'} from Agent 2
Agent 4 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 3
Agent 4 assigned task: e
Agent 3 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 0
Agent 3 received tasks {'a'} from Agent 1
Agent 3 received tasks {'d', 'c'} from Agent 2
Agent 3 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 4
Agent 3 assigned task: d
Agent 2 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 0
Agent 2 received tasks {'a'} from Agent 1
Agent 2 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 3
Agent 2 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 4
Agent 2 assigned task: c
Agent 1 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 0
Agent 1 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 2
Agent 1 received tasks {'d', 'c', 'b', 'e', 'a'} from Agent 3
Agent 1 received tasks {'d', 'c', 'b', 'e', 

### Iteration 4: Over the network task assignment

In [10]:
import socket
import threading
import json
import random
import time

HOST = "127.0.0.1"
BASE_PORT = 5000
NUM_AGENTS = 5

class Agent(threading.Thread):
    def __init__(self, id: int):
        super().__init__()
        self.id = id
        self.tasks = set()
        self.assigned_task = None
        self.port = BASE_PORT + id
        self.neighbor_ports = [BASE_PORT + i for i in range(NUM_AGENTS) if i != id]
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((HOST, self.port))
        self.server_socket.listen()
        self.running = True  # Flag to control the listening loop

    def get_info(self):
        """Capture initial tasks."""
        tasks = list("abc")
        number_of_tasks = random.randint(1, len(tasks))
        self.tasks.update(random.sample(tasks, number_of_tasks))
        print(f"Agent {self.id} initial tasks: {self.tasks}")

    def listen_for_messages(self):
        """Listen for incoming task information."""
        while self.running:
            try:
                conn, _ = self.server_socket.accept()
                with conn:
                    data = conn.recv(1024)
                    if data:
                        message = json.loads(data.decode())
                        print(f"Agent {self.id} received: {message}")
                        self.tasks.update(message["tasks"])
            except socket.timeout:
                continue  # Allow graceful shutdown during blocking accept
            except Exception as e:
                if self.running:
                    print(f"Agent {self.id} error: {e}")
                break

    def send_task_info(self):
        """Send task information to neighbors."""
        message = json.dumps({"id": self.id, "tasks": list(self.tasks)})
        for port in self.neighbor_ports:
            try:
                with socket.create_connection((HOST, port), timeout=1) as sock:
                    sock.sendall(message.encode())
            except (ConnectionRefusedError, socket.timeout):
                print(f"Agent {self.id} failed to connect to Agent at port {port}")

    def assign_unique_task(self, assigned_tasks):
        """Choose a unique task."""
        available_tasks = self.tasks - assigned_tasks
        if available_tasks:
            self.assigned_task = available_tasks.pop()
            assigned_tasks.add(self.assigned_task)
        print(f"Agent {self.id} assigned task: {self.assigned_task}")

    def run(self):
        """Main agent behavior."""
        self.get_info()

        # Start listener thread for incoming messages
        listener_thread = threading.Thread(target=self.listen_for_messages, daemon=True)
        listener_thread.start()

        # Broadcast task information
        time.sleep(1)  # Allow time for all agents to start
        self.send_task_info()

        # Wait for task propagation
        time.sleep(2)

        # Task assignment phase
        with result_lock:
            self.assign_unique_task(assigned_tasks)

        # Close the socket and stop listening
        self.stop_listening()

    def stop_listening(self):
        """Close the server socket and stop the listening loop."""
        self.running = False
        self.server_socket.close()
        print(f"Agent {self.id}: Server socket closed.")


In [11]:
# Shared resources
assigned_tasks = set()
result_lock = threading.Lock()

# Create and start agents
agents = [Agent(id=i) for i in range(NUM_AGENTS)]
for agent in agents:
    agent.start()

# Wait for all agents to complete
for agent in agents:
    agent.join()

print("\nFinal Agent States:")
for agent in agents:
    print(f"Agent {agent.id}: Assigned Task = {agent.assigned_task}")


Agent 0 initial tasks: {'a', 'c', 'b'}
Agent 1 initial tasks: {'a', 'c'}
Agent 2 initial tasks: {'a', 'c', 'b'}
Agent 3 initial tasks: {'c'}
Agent 4 initial tasks: {'a', 'c', 'b'}
Agent 0 received: {'id': 1, 'tasks': ['a', 'c']}Agent 1 received: {'id': 0, 'tasks': ['a', 'c', 'b']}

Agent 0 received: {'id': 3, 'tasks': ['c']}
Agent 0 received: {'id': 2, 'tasks': ['a', 'c', 'b']}
Agent 2 received: {'id': 1, 'tasks': ['a', 'c']}
Agent 2 received: {'id': 0, 'tasks': ['a', 'c', 'b']}
Agent 1 received: {'id': 3, 'tasks': ['c']}
Agent 0 received: {'id': 4, 'tasks': ['a', 'c', 'b']}
Agent 1 received: {'id': 2, 'tasks': ['a', 'c', 'b']}
Agent 1 received: {'id': 4, 'tasks': ['a', 'c', 'b']}
Agent 3 received: {'id': 1, 'tasks': ['a', 'c']}
Agent 2 received: {'id': 3, 'tasks': ['c']}
Agent 3 received: {'id': 0, 'tasks': ['a', 'c', 'b']}
Agent 3 received: {'id': 2, 'tasks': ['a', 'c', 'b']}
Agent 2 received: {'id': 4, 'tasks': ['a', 'c', 'b']}
Agent 4 received: {'id': 1, 'tasks': ['a', 'c']}
Agent 

### Stage 4: Network Discovery

In [12]:
import socket
import threading
import json
import random
import time

HOST = "127.0.0.1"
BASE_PORT = 5000
PORT_RANGE = 10  # Adjust for the number of agents

class Agent(threading.Thread):
    def __init__(self, id: int):
        super().__init__()
        self.id = id
        self.tasks = set()
        self.assigned_task = None
        self.port = BASE_PORT + id
        self.neighbor_ports = []
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((HOST, self.port))
        self.server_socket.listen()
        self.server_socket.settimeout(1)
        self.running = True

    def get_info(self):
        """Capture initial tasks."""
        tasks = list("abcde")
        number_of_tasks = random.randint(1, len(tasks))
        self.tasks.update(random.sample(tasks, number_of_tasks))
        print(f"Agent {self.id} initial tasks: {self.tasks}")

    def listen_for_messages(self):
        """Listen for incoming task information."""
        while self.running:
            try:
                conn, _ = self.server_socket.accept()
                with conn:
                    data = conn.recv(1024)
                    if data:
                        message = json.loads(data.decode())
                        print(f"Agent {self.id} received: {message}")
                        self.tasks.update(message["tasks"])
            except socket.timeout:
                continue
            except Exception as e:
                if self.running:
                    print(f"Agent {self.id} error: {e}")
                break

    def send_task_info(self):
        """Send task information to neighbors."""
        message = json.dumps({"id": self.id, "tasks": list(self.tasks)})
        for port in self.neighbor_ports:
            try:
                with socket.create_connection((HOST, port), timeout=1) as sock:
                    sock.sendall(message.encode())
            except (ConnectionRefusedError, socket.timeout):
                print(f"Agent {self.id} failed to connect to Agent at port {port}")

    def discover_neighbors(self):
        """Scan ports within a range to find active neighbors."""
        for port in range(BASE_PORT, BASE_PORT + PORT_RANGE):
            if port != self.port:
                try:
                    with socket.create_connection((HOST, port), timeout=0.5) as sock:
                        self.neighbor_ports.append(port)
                        print(f"Agent {self.id} detected neighbor at port {port}")
                except (ConnectionRefusedError, socket.timeout):
                    continue

    def assign_unique_task(self, assigned_tasks):
        """Choose a unique task."""
        available_tasks = self.tasks - assigned_tasks
        if available_tasks:
            self.assigned_task = available_tasks.pop()
            assigned_tasks.add(self.assigned_task)
        print(f"Agent {self.id} assigned task: {self.assigned_task}")

    def run(self):
        """Main agent behavior."""
        self.get_info()

        # Start listener thread for incoming messages
        listener_thread = threading.Thread(target=self.listen_for_messages, daemon=True)
        listener_thread.start()

        # Discover neighbors by scanning ports
        time.sleep(1)  # Allow time for agents to start their servers
        self.discover_neighbors()

        # Broadcast task information
        time.sleep(1)
        self.send_task_info()

        # Wait for task propagation
        time.sleep(2)

        # Task assignment phase
        with result_lock:
            self.assign_unique_task(assigned_tasks)

        # Close the socket and stop listening
        self.stop_listening()

    def stop_listening(self):
        """Close the server socket and stop the listening loop."""
        self.running = False
        self.server_socket.close()
        print(f"Agent {self.id}: Server socket closed.")


In [13]:
if __name__ == "__main__":
    assigned_tasks = set()
    result_lock = threading.Lock()
    agents = [Agent(i) for i in range(PORT_RANGE)]

    for agent in agents:
        agent.start()

    for agent in agents:
        agent.join()


Agent 0 initial tasks: {'a', 'd', 'e', 'c', 'b'}
Agent 1 initial tasks: {'e', 'a', 'd'}
Agent 2 initial tasks: {'d', 'e', 'b'}
Agent 3 initial tasks: {'c'}
Agent 4 initial tasks: {'a', 'd'}
Agent 5 initial tasks: {'e', 'c', 'b'}
Agent 6 initial tasks: {'a', 'd', 'e', 'c', 'b'}
Agent 7 initial tasks: {'e', 'a', 'c', 'd'}
Agent 8 initial tasks: {'e', 'a'}
Agent 9 initial tasks: {'e', 'c', 'b', 'd'}
Agent 4 detected neighbor at port 5000
Agent 3 detected neighbor at port 5000
Agent 1 detected neighbor at port 5000
Agent 0 detected neighbor at port 5001
Agent 6 detected neighbor at port 5000
Agent 5 detected neighbor at port 5000
Agent 0 detected neighbor at port 5002
Agent 1 detected neighbor at port 5002
Agent 7 detected neighbor at port 5000
Agent 2 detected neighbor at port 5000
Agent 8 detected neighbor at port 5000
Agent 5 detected neighbor at port 5001
Agent 9 detected neighbor at port 5000
Agent 4 detected neighbor at port 5001
Agent 3 detected neighbor at port 5001
Agent 2 detecte

### Stage 6: Adding Logging because the output is getting out of hand

In [3]:
import socket
import threading
import json
import random
import time
import logging

HOST = "127.0.0.1"
BASE_PORT = 5000
PORT_RANGE = 10  # Adjust for the number of agents

logger = logging.getLogger()

# Console handler for CRITICAL logs only
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.CRITICAL)  # Print only CRITICAL logs to the console
console_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)

# Logging setup
logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("agent_logs.log"),
    ]
)

class Agent(threading.Thread):
    def __init__(self, id: int):
        super().__init__()
        self.id = id
        self.tasks = set()
        self.assigned_task = None
        self.port = BASE_PORT + id
        self.neighbor_ports = []
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((HOST, self.port))
        self.server_socket.listen()
        self.server_socket.settimeout(1)
        self.running = True

    def get_info(self):
        """Capture initial tasks."""
        tasks = list("abcde")
        number_of_tasks = random.randint(1, len(tasks))
        self.tasks.update(random.sample(tasks, number_of_tasks))
        logging.critical(f"Agent {self.id} initial tasks gathered: {self.tasks}")

    def listen_for_messages(self):
        """Listen for incoming task information."""
        while self.running:
            try:
                conn, _ = self.server_socket.accept()
                with conn:
                    data = conn.recv(1024)
                    if data:
                        message = json.loads(data.decode())
                        logging.info(f"Agent {self.id} received: {message}")
                        self.tasks.update(message["tasks"])
            except socket.timeout:
                continue
            except Exception as e:
                if self.running:
                    logging.error(f"Agent {self.id} error: {e}")
                break

    def send_task_info(self):
        """Send task information to neighbors."""
        message = json.dumps({"id": self.id, "tasks": list(self.tasks)})
        for port in self.neighbor_ports:
            try:
                with socket.create_connection((HOST, port), timeout=1) as sock:
                    sock.sendall(message.encode())
                    logging.info(f"Agent {self.id} sent tasks to Agent at port {port}")
            except (ConnectionRefusedError, socket.timeout):
                logging.warning(f"Agent {self.id} failed to connect to Agent at port {port}")

    def discover_neighbors(self):
        """Scan ports within a range to find active neighbors."""
        for port in range(BASE_PORT, BASE_PORT + PORT_RANGE):
            if port != self.port:
                try:
                    with socket.create_connection((HOST, port), timeout=0.5) as sock:
                        self.neighbor_ports.append(port)
                        logging.info(f"Agent {self.id} detected neighbor at port {port}")
                except (ConnectionRefusedError, socket.timeout):
                    continue

    def assign_unique_task(self, assigned_tasks):
        """Choose a unique task."""
        available_tasks = self.tasks - assigned_tasks
        if available_tasks:
            self.assigned_task = available_tasks.pop()
            assigned_tasks.add(self.assigned_task)
            logging.critical(f"Agent {self.id} assigned unique task: {self.assigned_task}")
        else:
            logging.warning(f"Agent {self.id} found no available unique task")

    def run(self):
        """Main agent behavior."""
        self.get_info()

        # Start listener thread for incoming messages
        listener_thread = threading.Thread(target=self.listen_for_messages, daemon=True)
        listener_thread.start()

        # Discover neighbors by scanning ports
        time.sleep(1)  # Allow time for agents to start their servers
        self.discover_neighbors()

        # Broadcast task information
        time.sleep(1)
        self.send_task_info()

        # Wait for task propagation
        time.sleep(2)

        # Task assignment phase
        with result_lock:
            self.assign_unique_task(assigned_tasks)

        # Close the socket and stop listening
        self.stop_listening()

    def stop_listening(self):
        """Close the server socket and stop the listening loop."""
        self.running = False
        self.server_socket.close()
        logging.info(f"Agent {self.id}: Server socket closed.")


In [4]:
# Shared resources
assigned_tasks = set()
result_lock = threading.Lock()

agents = [Agent(i) for i in range(PORT_RANGE)]

for agent in agents:
    agent.start()

for agent in agents:
    agent.join()

2025-01-28 20:02:12,142 - CRITICAL - Agent 0 initial tasks gathered: {'a', 'd', 'c', 'e'}
2025-01-28 20:02:12,142 - CRITICAL - Agent 0 initial tasks gathered: {'a', 'd', 'c', 'e'}
2025-01-28 20:02:12,144 - CRITICAL - Agent 1 initial tasks gathered: {'c'}
2025-01-28 20:02:12,144 - CRITICAL - Agent 2 initial tasks gathered: {'a', 'd', 'c'}
2025-01-28 20:02:12,144 - CRITICAL - Agent 1 initial tasks gathered: {'c'}
2025-01-28 20:02:12,144 - CRITICAL - Agent 3 initial tasks gathered: {'a', 'd'}
2025-01-28 20:02:12,145 - CRITICAL - Agent 4 initial tasks gathered: {'d'}
2025-01-28 20:02:12,160 - CRITICAL - Agent 5 initial tasks gathered: {'c', 'd', 'e'}
2025-01-28 20:02:12,144 - CRITICAL - Agent 2 initial tasks gathered: {'a', 'd', 'c'}
2025-01-28 20:02:12,161 - CRITICAL - Agent 6 initial tasks gathered: {'a', 'd', 'e'}
2025-01-28 20:02:12,144 - CRITICAL - Agent 3 initial tasks gathered: {'a', 'd'}
2025-01-28 20:02:12,163 - CRITICAL - Agent 7 initial tasks gathered: {'a', 'e'}
2025-01-28 20:0

### Stage 7: Dockerized Agents

Making Dockerfile and Docker-Compose and stuff of stage 6 in the work dir
1. Create a network using:
```bash
docker network create agent_network
```
2. Create Volume for Logs:
```bash
docker volume create agent_logs
```
3. ```bash
docker-compose up --build
```


### Stage 8: One Agent per Container

In [None]:
import threading
import socket
import json
import random
import time
import logging
import os

# Get the environment variables for the container's ID and other settings
HOST = os.getenv("HOST", "0.0.0.0")
BASE_PORT = int(os.getenv("BASE_PORT", 5000))
PORT_RANGE = int(os.getenv("PORT_RANGE", 10))  # Number of agents to run
AGENT_ID = int(os.getenv("AGENT_ID", 0))  # Get agent ID from environment variable

# Logging setup to write to a shared volume
log_file = "/logs/agent_logs.log"
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.FileHandler(log_file), logging.StreamHandler()],
)


class Agent:
    def __init__(self, id: int):
        self.id = id
        self.tasks = set()
        self.assigned_task = None
        self.port = BASE_PORT + id
        self.neighbor_ports = []
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((HOST, self.port))
        self.server_socket.listen()
        self.server_socket.settimeout(1)
        self.running = True

    def get_info(self):
        """Capture initial tasks."""
        tasks = list("abcde")
        number_of_tasks = random.randint(1, len(tasks))
        self.tasks.update(random.sample(tasks, number_of_tasks))
        logging.critical(f"Agent {self.id} initial tasks gathered: {self.tasks}")

    def listen_for_messages(self):
        """Listen for incoming task information."""
        while self.running:
            try:
                conn, _ = self.server_socket.accept()
                with conn:
                    data = conn.recv(1024)
                    if data:
                        message = json.loads(data.decode())
                        logging.info(f"Agent {self.id} received: {message}")
                        self.tasks.update(message["tasks"])
            except socket.timeout:
                continue
            except Exception as e:
                if self.running:
                    logging.error(f"Agent {self.id} error: {e}")
                break

    def send_task_info(self):
        """Send task information to neighbors."""
        message = json.dumps({"id": self.id, "tasks": list(self.tasks)})
        for port in self.neighbor_ports:
            try:
                with socket.create_connection((HOST, port), timeout=1) as sock:
                    sock.sendall(message.encode())
                    logging.info(f"Agent {self.id} sent tasks to Agent at port {port}")
            except (ConnectionRefusedError, socket.timeout):
                logging.warning(
                    f"Agent {self.id} failed to connect to Agent at port {port}"
                )

    def discover_neighbors(self):
        """Scan ports within a range to find active neighbors."""
        for port in range(BASE_PORT, BASE_PORT + PORT_RANGE):
            if port != self.port:
                try:
                    with socket.create_connection((HOST, port), timeout=0.5) as sock:
                        self.neighbor_ports.append(port)
                        logging.info(
                            f"Agent {self.id} detected neighbor at port {port}"
                        )
                except (ConnectionRefusedError, socket.timeout):
                    continue

    def assign_unique_task(self, assigned_tasks):
        """Choose a unique task."""
        available_tasks = self.tasks - assigned_tasks
        if available_tasks:
            self.assigned_task = available_tasks.pop()
            assigned_tasks.add(self.assigned_task)
            logging.critical(
                f"Agent {self.id} assigned unique task: {self.assigned_task}"
            )
        else:
            logging.warning(f"Agent {self.id} found no available unique task")

    def run(self):
        """Main agent behavior."""
        self.get_info()

        # Start listener thread for incoming messages
        listener_thread = threading.Thread(target=self.listen_for_messages, daemon=True)
        listener_thread.start()

        # Discover neighbors by scanning ports
        time.sleep(1)  # Allow time for agents to start their servers
        self.discover_neighbors()

        # Network discovery phase: wait for 10 seconds
        time.sleep(10)
        logging.critical(f"Agent {self.id} completed network discovery.")

        # Broadcast task information
        self.send_task_info()

        # Wait for task propagation
        time.sleep(2)

        # Task assignment phase
        with result_lock:
            self.assign_unique_task(assigned_tasks)

        # Output gathered information and assigned task
        logging.critical(
            f"Agent {self.id} info gathered: {self.tasks}, assigned task: {self.assigned_task}"
        )

        # Close the socket and stop listening
        self.stop_listening()

    def stop_listening(self):
        """Close the server socket and stop the listening loop."""
        self.running = False
        self.server_socket.close()
        logging.info(f"Agent {self.id}: Server socket closed.")


# Shared resources
assigned_tasks = set()
result_lock = threading.Lock()

if __name__ == "__main__":
    agent = Agent(id=AGENT_ID)  # Use the agent ID from environment
    agent.run()


### Stage 9: Task uniqueness using some auction-like algorithm