## 1. Configuration (matches `main.py`)

> This notebook mirrors the behavior of `main.py`, but is split into inspectable steps so you can tweak parameters and rerun parts interactively.

In [None]:
# Configuration (mirrors main.py)
NUM_AGENTS = 10
STREAM_NAME = "agent_stream"
REDIS_HOST = "localhost"
REDIS_PORT = 6379
RUN_DURATION_SECONDS = 120

# Runtime switches
USE_LOCAL_LLM = True
ENABLE_STANCE_WORKER = False
STANCE_BATCH_SIZE = 5
STANCE_BATCH_INTERVAL = 30

# Embedding-based context + topology (requires OPENAI_API_KEY)
ENABLE_EMBEDDING_CONTEXT = True
ROLLING_STORE_MAX_ITEMS = 2000
CONTEXT_TOP_K = 8
PROFILE_WINDOW_SIZE = 50
PROFILE_SEED_WEIGHT = 5.0

# Topology logging cadence
TOPOLOGY_LOG_INTERVAL_S = 5.0

# Prompt template used by `main.py` (fixed stance sentence is authoritative)
initial_prompt_template = (
   "You are participating in a social-media-style discussion about {topic}." \
   "The sentence {unique_prompt} is your fixed stance and is authoritative and exhaustive. Write entirely from the worldview, assumptions, tone, values, and constraints it defines; it fully determines what you believe, how you speak, and what claims you are willing to make." \
   "Produce a short, attention-grabbing post that hooks readers, makes a clear and strong claim aligned with that grounding, and invites engagement (likes, replies, shares)." \
   "Be concise, bold, and evocative. Use a distinct memorable opening line, assertive language, and a direct call-to-action every time. Emulate authentic social media posts." \
   "Make sure posts are distinct, do not copy formatting and language of previous posts, instead contradict any claims that oppose your fixed stance"
   "Do not introduce outside viewpoints, neutral framing, balance, or meta-commentary. Do not soften or qualify claims unless explicitly required by the authoritative sentence. Never refer to yourself as an agent, AI, or participant in a debate."
 )

# Initial instruction to Agent 1 (matches `main.py`)
STARTER_USER_INSTRUCTION = (
    "Write the first viral post that kicks off a heated comment thread about this topic. "
    "Make a strong claim, then invite replies."
 )

print(
    "Configuration:\n"
    f"  NUM_AGENTS={NUM_AGENTS}\n"
    f"  STREAM_NAME={STREAM_NAME!r}\n"
    f"  REDIS_HOST={REDIS_HOST!r}:{REDIS_PORT}\n"
    f"  RUN_DURATION_SECONDS={RUN_DURATION_SECONDS}\n"
    f"  USE_LOCAL_LLM={USE_LOCAL_LLM}\n"
    f"  ENABLE_STANCE_WORKER={ENABLE_STANCE_WORKER} (batch_size={STANCE_BATCH_SIZE}, interval={STANCE_BATCH_INTERVAL}s)\n"
    f"  ENABLE_EMBEDDING_CONTEXT={ENABLE_EMBEDDING_CONTEXT} (top_k={CONTEXT_TOP_K}, rolling_max={ROLLING_STORE_MAX_ITEMS})\n"
    f"  PROFILE_WINDOW_SIZE={PROFILE_WINDOW_SIZE} PROFILE_SEED_WEIGHT={PROFILE_SEED_WEIGHT}\n"
    f"  TOPOLOGY_LOG_INTERVAL_S={TOPOLOGY_LOG_INTERVAL_S}\n"
 )

Configuration: 3 agents, stream='agent_stream', duration=60s


## 2. Imports (Jupyter async-friendly)

Notes on coroutines in Jupyter:
- Prefer top-level `await` over `asyncio.run(...)` (which can error in notebooks).
- This notebook applies `nest_asyncio` so you can rerun cells without event-loop conflicts.

In [None]:
import asyncio
import os
import nest_asyncio
from dotenv import load_dotenv

# Allow re-entrant event loops (helps with reruns)
nest_asyncio.apply()
load_dotenv()

from agents.network_agent import NetworkAgent
from agents.llm_service import LLMService
from agents.local_llm import HuggingFaceLLM
from agents.prompt_configs.generate_prompt import PromptGenerator

from controller.stance_analysis.embedding_analyzer import EmbeddingAnalyzer
from controller.stance_analysis.rolling_embedding_store import RollingEmbeddingStore
from controller.stance_analysis.agent_profile_store import AgentProfileStore
from controller.stance_analysis.network_topology import NetworkTopologyTracker

from controller.time_manager import TimeManager
from controller.order_manager import OrderManager
from controller.stance_worker import StanceWorker

from network.cache import RedisCache
from network.stream import RedisStream

from logs.logger import Logger, console_logger
from logs.topology_logger import TopologyLogger

print("Imports successful")
print("OPENAI_API_KEY set:", bool(os.getenv("OPENAI_API_KEY")))

Imports successful!


## 3. Preflight (rerun safety + Redis check)

If you rerun the notebook, it’s important to stop background tasks first (agent consumers, topology loop, logger threads).

In [None]:
# Best-effort cleanup from a previous notebook run
async def _stop_previous_run_if_any():
    # Stop agents
    if "agents" in globals():
        for a in list(globals().get("agents") or []):
            try:
                await a.stop()
            except Exception as exc:
                print(f"Warning: failed stopping agent {getattr(a,'id','?')}: {exc}")
        globals()["agents"] = []

    # Cancel topology task
    topo_task = globals().get("topo_task")
    if topo_task is not None:
        try:
            topo_task.cancel()
            await topo_task
        except asyncio.CancelledError:
            pass
        except Exception as exc:
            print(f"Warning: topology task cancel error: {exc}")
        globals()["topo_task"] = None

    # Stop topology logger thread
    topology_logger = globals().get("topology_logger")
    if topology_logger is not None:
        try:
            topology_logger.stop()
        except Exception as exc:
            print(f"Warning: topology_logger.stop() failed: {exc}")
        globals()["topology_logger"] = None

    # Stop stance worker
    stance_worker = globals().get("stance_worker")
    if stance_worker is not None:
        try:
            await stance_worker.stop()
        except Exception as exc:
            print(f"Warning: stance_worker.stop() failed: {exc}")
        globals()["stance_worker"] = None

    # Stop logger thread
    logger = globals().get("logger")
    if logger is not None:
        try:
            await logger.async_stop()
        except Exception as exc:
            print(f"Warning: logger.async_stop() failed: {exc}")
        globals()["logger"] = None

    # Stop LLM service worker
    llm_service = globals().get("llm_service")
    if llm_service is not None:
        try:
            await llm_service.stop()
        except Exception as exc:
            print(f"Warning: llm_service.stop() failed: {exc}")
        globals()["llm_service"] = None

    # Close caches
    for name in ["message_cache", "embed_cache", "topology_cache"]:
        c = globals().get(name)
        if c is not None:
            try:
                await c.close()
            except Exception as exc:
                print(f"Warning: {name}.close() failed: {exc}")
            globals()[name] = None

await _stop_previous_run_if_any()

# Redis connectivity check (optional but useful)
try:
    rs = RedisStream(host=REDIS_HOST, port=REDIS_PORT)
    ok = rs.redis.ping()
    print("Redis ping:", ok)
except Exception as exc:
    print("WARNING: Redis ping failed; start Redis before running agents.")
    print("  Error:", exc)

Shared discussion topic: energy policy
Agent 1 prompt: My experience as a neighbor shows that energy policy is robust and impacts salad availability; even a fox can notice the change....
Agent 2 prompt: In terms of resilience, energy policy should be seen as pragmatic and we should investigate accordingly....
Agent 3 prompt: It's sustainable when engineer interact with energy policy through a edge device in the university....


## 4. Topic + Per-Agent Stance Sentences

All agents share the same topic, but each gets a unique *stance sentence* (the authoritative grounding).

In [None]:
# Shared topic + per-agent stance sentences (same topic; unique wording per agent)
prompt_generator = PromptGenerator()
topic = prompt_generator.get_topic()
agent_prompts = prompt_generator.generate_multiple_prompts(NUM_AGENTS)

agent_configs = {}  # agent_id -> init_prompt (logged to disk like main.py)
for i in range(NUM_AGENTS):
    agent_id = f"agent_{i+1}"
    init_prompt = initial_prompt_template.format(topic=topic, unique_prompt=agent_prompts[i])
    agent_configs[agent_id] = init_prompt

print(f"Shared discussion topic: {topic}")
print(f"Generated {len(agent_prompts)} unique stance sentences.")
for i in range(min(3, NUM_AGENTS)):
    print(f"- agent_{i+1}: {agent_prompts[i]}")

TimeManager initialized with 3.0s global interval
RedisCache connected to localhost:6379
Logger initialized, writing to: /home/sammli/llm-network/logs/network_logs/log_20260112-011804_3.log


## 5. Initialize Shared Components (matches `main.py`)

This step starts optional local generation, initializes Redis caches, and (optionally) sets up embedding-based context + topology logging.

In [None]:
# Time + Redis caches
time_manager = TimeManager(global_interval=3.0)
message_cache = RedisCache(host=REDIS_HOST, port=REDIS_PORT)

# Optional embedding/topology caches use separate prefixes (mirrors main.py)
embed_cache = RedisCache(host=REDIS_HOST, port=REDIS_PORT, prefix="embed:")
topology_cache = RedisCache(host=REDIS_HOST, port=REDIS_PORT, prefix="topology:")

analysis_lock = asyncio.Lock()

# Local LLM service (optional; can be heavy to load)
llm_service = None
local_llm = None
if USE_LOCAL_LLM:
    console_logger.info("Using local LLM service (quantized HF model).")
    local_llm = HuggingFaceLLM()
    llm_service = LLMService(local_llm)
    await llm_service.start()
    console_logger.info("LLMService started.")
else:
    console_logger.info("USE_LOCAL_LLM=False; will use OpenAI if configured in agents.")

# Embedding-based context + agent profiles + topology (optional)
rolling_store = None
profile_store = None
topology_tracker = None
topology_logger = None

embedding_enabled = ENABLE_EMBEDDING_CONTEXT and bool(os.getenv("OPENAI_API_KEY"))
print("Embedding context enabled:", embedding_enabled)

if embedding_enabled:
    analyzer = EmbeddingAnalyzer(topic)
    rolling_store = RollingEmbeddingStore(
        topic=topic,
        analyzer=analyzer,
        redis_cache=embed_cache,
        max_items=ROLLING_STORE_MAX_ITEMS,
    )
    loaded = await rolling_store.load_from_redis(last_n=min(500, ROLLING_STORE_MAX_ITEMS))
    console_logger.info(f"Loaded {loaded} embedded posts from Redis.")

    # Cold-start: seed the latent space with stable opposing anchors + a few high-contrast seeds
    if loaded == 0:
        seed_texts: list[tuple[str, str]] = []  # (side, text)
        for side, texts in analyzer.anchor_groups.items():
            for t in texts:
                seed_texts.append((side, t))

        seed_texts.extend(
            [
                ("pro", f"Enough dithering. {topic} is non-negotiable — we should expand it now."),
                ("pro", f"If you're against {topic}, you're choosing stagnation. Push it through."),
                ("anti", f"Wake up: {topic} is a harmful mistake. Stop pretending it's 'progress'."),
                ("anti", f"{topic} is a disaster in slow motion. Reject it before it spreads."),
            ]
        )

        for i, (side, text) in enumerate(seed_texts):
            await rolling_store.add(
                text,
                id=f"seed:{i}",
                metadata={"sender_id": "__seed__", "seed": True, "side": side},
                persist=True,
            )
        console_logger.info(f"Seeded rolling store with {len(seed_texts)} synthetic posts.")

    profile_store = AgentProfileStore(
        redis=message_cache.redis,
        window_size=PROFILE_WINDOW_SIZE,
        seed_weight=PROFILE_SEED_WEIGHT,
    )
    topology_tracker = NetworkTopologyTracker(
        topic=topic,
        profile_store=profile_store,
        redis_cache=topology_cache,
        redis_key=f"snapshot:{topic}",
    )
    topology_logger = TopologyLogger()
    console_logger.info(f"Topology snapshots will be written to: {topology_logger.file_path}")
else:
    console_logger.info("Embedding-based context disabled (missing OPENAI_API_KEY or ENABLE_EMBEDDING_CONTEXT=0).")

# Redis stream helper (used for cleanup)
redis_stream = RedisStream(host=REDIS_HOST, port=REDIS_PORT)

# Logging
logger = Logger(num_agents=NUM_AGENTS)
logger.log_agent_configs(agent_configs)
console_logger.info(f"Logging publishes to: {logger.file_path}")
print("Logger file:", logger.file_path)

Created agent_1
Created agent_2
Created agent_3

Total agents created: 3


## 6. Create Agents + OrderManager

This wires together `NetworkAgent` + `OrderManager` using the same parameters as `main.py` (including optional embedding context).

In [None]:
# Create agents (constructor args match `main.py`)
agents = []
for i in range(NUM_AGENTS):
    agent_id = f"agent_{i+1}"
    init_prompt = agent_configs[agent_id]
    agent = NetworkAgent(
        id=agent_id,
        init_prompt=init_prompt,
        topic=topic,
        stream_name=STREAM_NAME,
        stream_group=f"group_{i+1}",
        redis_host=REDIS_HOST,
        redis_port=REDIS_PORT,
        time_manager=time_manager,
        order_manager=None,  # injected after OrderManager is created
        message_cache=message_cache,
        logger=logger,
        llm_service=llm_service,
        local_llm=local_llm,
        rolling_store=rolling_store,
        profile_store=profile_store,
        topology_tracker=topology_tracker,
        analysis_lock=analysis_lock,
        context_top_k=CONTEXT_TOP_K,
    )
    agents.append(agent)

print(f"Created {len(agents)} agents")

# OrderManager (mirrors main.py signature, including profile_store)
order_manager = OrderManager(
    agents=agents,
    message_cache=message_cache,
    profile_store=profile_store,
    redis_host=REDIS_HOST,
    redis_port=REDIS_PORT,
)

for agent in agents:
    agent.order_manager = order_manager
print("OrderManager injected into all agents")

OrderManager initialized
OrderManager injected into all agents


## 7. Start Agents (and optional background workers)

This starts each agent’s Redis consumer loop. If topology tracking is enabled, it also starts a periodic snapshot logger (like `main.py`).

In [None]:
# Optional stance worker (off by default, mirrors `main.py`)
stance_worker = None
if ENABLE_STANCE_WORKER:
    stance_worker = StanceWorker(
        topic=topic,
        stream_name=STREAM_NAME,
        group_name="stance_group",
        consumer_name="stance_consumer",
        redis_host=REDIS_HOST,
        redis_port=REDIS_PORT,
        batch_size=STANCE_BATCH_SIZE,
        batch_interval=STANCE_BATCH_INTERVAL,
        local_llm=local_llm,
        llm_service=llm_service,
        use_openai=bool(os.getenv("OPENAI_API_KEY")),
    )
    await stance_worker.start()
    console_logger.info("StanceWorker started.")
else:
    print("ENABLE_STANCE_WORKER=False (skipping)")

# Start agents
print("Starting agents...")
for agent in agents:
    await agent.start()
print(f"All {NUM_AGENTS} agents are running.")

# Periodic topology logging (if enabled)
topo_task = None
if topology_tracker and topology_logger:
    async def _topology_loop():
        while True:
            try:
                agent_ids = [a.id for a in agents]
                snap = await topology_tracker.maybe_update(agent_ids, force=True)
                if snap is not None:
                    topology_logger.log_snapshot(snap)
            except Exception as exc:
                console_logger.info(f"Topology snapshot failed (continuing): {exc}")
            await asyncio.sleep(float(TOPOLOGY_LOG_INTERVAL_S))

    topo_task = asyncio.create_task(_topology_loop())
    print("Topology loop started. Writing to:", topology_logger.file_path)
else:
    print("Topology logging disabled (no topology_tracker/topology_logger).")

[INFO] 2026-01-12 01:18:12,159 - Consumer group 'group_1' already exists for stream 'agent_stream'.
[INFO] 2026-01-12 01:18:12,160 - Agent agent_1 is consuming from stream 'agent_stream' as part of group 'group_1'.


Starting agents...


[INFO] 2026-01-12 01:18:13,204 - Agent agent_1 started and is listening for messages.
[INFO] 2026-01-12 01:18:13,207 - Consumer group 'group_2' already exists for stream 'agent_stream'.
[INFO] 2026-01-12 01:18:13,208 - Agent agent_2 is consuming from stream 'agent_stream' as part of group 'group_2'.
[INFO] 2026-01-12 01:18:14,213 - Agent agent_2 started and is listening for messages.
[INFO] 2026-01-12 01:18:14,218 - Consumer group 'group_3' already exists for stream 'agent_stream'.
[INFO] 2026-01-12 01:18:14,218 - Agent agent_3 is consuming from stream 'agent_stream' as part of group 'group_3'.
[INFO] 2026-01-12 01:18:16,234 - Agent agent_3 started and is listening for messages.


All 3 agents are now running!


[INFO] 2026-01-12 01:18:22,303 - Agent agent_2 is generating response
[INFO] 2026-01-12 01:18:22,304 - Agent agent_3 skipped (designated responder is None).
[INFO] 2026-01-12 01:18:28,223 - Agent agent_2 designating next responder: agent_1
[INFO] 2026-01-12 01:18:28,224 - Message 1768202308224-0 published to stream 'agent_stream'.
[INFO] 2026-01-12 01:18:28,225 - Agent agent_1 ignored its own message 1768202302300-0.
[INFO] 2026-01-12 01:18:28,226 - Agent agent_3 skipped (designated responder is agent_1).
[DEBUG] 2026-01-12 01:18:28,228 - Agent agent_2 published message.
[INFO] 2026-01-12 01:18:29,341 - Agent agent_2 ignored its own message 1768202308224-0.
[INFO] 2026-01-12 01:18:29,343 - Agent agent_1 is generating response
[INFO] 2026-01-12 01:18:35,984 - Agent agent_1 designating next responder: agent_2
[INFO] 2026-01-12 01:18:35,987 - Message 1768202315986-0 published to stream 'agent_stream'.
[INFO] 2026-01-12 01:18:35,990 - Agent agent_3 skipped (designated responder is agent_2)

In [None]:
# Kick off the conversation with an initial LLM-generated post (matches `main.py`)
print("Agent 1 generating kickoff post...")
initial_message = await agents[0].generate_response(STARTER_USER_INSTRUCTION)
print("Agent 1 kickoff message:\n")
print(initial_message)

await agents[0].publish_message(initial_message)
print("Kickoff message published.")

[INFO] 2026-01-12 01:18:22,299 - Agent agent_1 designating next responder: agent_2
[INFO] 2026-01-12 01:18:22,300 - Message 1768202302300-0 published to stream 'agent_stream'.
[DEBUG] 2026-01-12 01:18:22,307 - Agent agent_1 published message.


Agent 1 starting conversation: Let's begin our discussion about energy policy. What are your initial thoughts on this subject?
Initial message published!


In [None]:
# Let the network run
print(f"Running for {RUN_DURATION_SECONDS} seconds...")
await asyncio.sleep(int(RUN_DURATION_SECONDS))
print("Run complete.")

Running conversation for 60 seconds...
Conversation complete!


## 8. Cleanup (important for reruns)

Stops consumers and background tasks, flushes/cleans Redis stream groups (optional), and closes Redis connections.

In [None]:
async def cleanup(*, cleanup_stream: bool = True, clear_caches: bool = True):
    print("Shutting down...")

    # Stop agents first (prevents publishes while tearing down Redis groups)
    for agent in agents:
        try:
            await agent.stop()
        except Exception as exc:
            print(f"Warning: failed stopping {agent.id}: {exc}")

    # Stop logger thread
    try:
        await logger.async_stop()
    except Exception as exc:
        print("Warning: logger stop failed:", exc)

    # Stop topology loop + logger
    global topo_task
    if topo_task is not None:
        topo_task.cancel()
        try:
            await topo_task
        except asyncio.CancelledError:
            pass
        except Exception as exc:
            print("Warning: topo_task cancel error:", exc)
        topo_task = None

    if topology_logger is not None:
        try:
            topology_logger.stop()
        except Exception as exc:
            print("Warning: topology_logger.stop failed:", exc)

    # Stop stance worker
    if stance_worker is not None:
        try:
            await stance_worker.stop()
        except Exception as exc:
            print("Warning: stance_worker.stop failed:", exc)

    # Optionally destroy consumer groups + delete stream key
    if cleanup_stream:
        try:
            await redis_stream.cleanup_stream(STREAM_NAME, num_groups=NUM_AGENTS)
        except Exception as exc:
            print("Warning: Redis stream cleanup error:", exc)

    # Optionally clear cache keys (matches main.py behavior)
    if clear_caches:
        try:
            await message_cache.clear_all()
        except Exception as exc:
            print("Warning: message_cache.clear_all failed:", exc)

    # Close caches
    try:
        await message_cache.close()
    except Exception as exc:
        print("Warning: message_cache.close failed:", exc)

    try:
        if rolling_store is not None:
            await embed_cache.close()
    except Exception as exc:
        print("Warning: embed_cache.close failed:", exc)

    try:
        await topology_cache.close()
    except Exception as exc:
        print("Warning: topology_cache.close failed:", exc)

    # Stop local LLM queue worker
    try:
        if llm_service is not None:
            await llm_service.stop()
    except Exception as exc:
        print("Warning: llm_service.stop failed:", exc)

    print("Done.")

# Cleanup
await cleanup(cleanup_stream=True, clear_caches=True)

Shutting down...
Done!


## 9. Inspect Results (optional)

After running the conversation, you can inspect the logged messages and cached data.

In [None]:
# Read the log file (written by Logger)
with open(logger.file_path, "r", encoding="utf-8") as f:
    logs_text = f.read()
print("=== Agent Publish Logs ===")
print(logs_text[:8000])
if len(logs_text) > 8000:
    print(f"... (truncated, total chars={len(logs_text)})")

=== Agent Publish Logs ===
[2026-01-12 01:18:22] PUBLISH agent=agent_1 | Let's begin our discussion about energy policy. What are your initial thoughts on this subject?
[2026-01-12 01:18:28] PUBLISH agent=agent_2 | Energy policy must be viewed through a pragmatic lens, particularly when considering resilience. Here’s why this approach is crucial:

1. **Adaptability in Uncertain Times**: Energy systems face increasing pressures from climate change, geopolitical instability, and economic fluctuations. A pragmatic energy policy focuses on flexibility and adaptability, allowing us to respond rapidly to unforeseen challenges, whether they are natural disasters or market disruptions.

2. **Diversification of Energy Sources**: By investigating a variety of energy sources—renewables, nuclear, and fossil fuels—we can create a more resilient energy infrastructure. This diversification mitigates risks associated with over-reliance on a single source, ensuring continuous energy supply during crise

In [None]:
async def inspect_cache(last_n: int = 5):
    """Inspect cached messages for each agent."""
    cache = RedisCache(host=REDIS_HOST, port=REDIS_PORT)
    print("=== Cached Messages per Agent ===")
    for agent in agents:
        raw_items = await cache.get_responses(agent.id, last_n=last_n)
        print(f"\n{agent.id} (last {last_n} messages):")
        for item in raw_items:
            if isinstance(item, (bytes, bytearray)):
                item = item.decode("utf-8", errors="replace")
            print(f"  - {item}")
    await cache.close()

await inspect_cache(last_n=5)

=== Cached Messages per Agent ===

agent_1 (last 5 messages):
  - b'{"sender_id": "agent_1", "content": "I appreciate your perspective on pragmatic energy policy, but I firmly believe that energy policies have a direct and profound impact on everyday life, particularly regarding the availability of fresh produce like salads. Here\xe2\x80\x99s how I support my statement:\\n\\n1. **Local Agriculture Resilience**: Energy policies that prioritize renewable sources can empower local farming. For instance, solar energy can keep greenhouses operational year-round, ensuring a steady supply of salad greens despite seasonal changes. This contributes to consistent availability in local markets.\\n\\n2. **Transportation and Distribution**: Energy policies influence fuel costs, directly affecting how quickly and efficiently fresh produce reaches consumers. A strong energy policy that promotes sustainable transport options can improve distribution networks, ensuring salads are consistently available