# DMAS - Distributed Multi-Agent System

Production notebook for managing and testing the DMAS stack with mem0/graphiti memory backends.


## Configuration

- Set sensitive keys (for example `OPENAI_API_KEY`) manually inside `.env`.
- The notebook only refreshes `MEMORY_BACKEND`, `DMAS_RUN_ID`, and inserts placeholders for missing required keys.


In [314]:
import subprocess
import requests
import json
import time
import os
import socket
from datetime import datetime, timezone
from typing import Optional

# Backend selection: "mem0" or "graphiti"
MEMORY_BACKEND = "mem0"
# Experiment run identifier written to .env
DMAS_RUN_ID = f"notebook-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}"

# Service URLs
LOCOMO_URL = "http://localhost:8002"
COORDINATOR_URL = "http://localhost:8003"
MEMORY_URL = "http://localhost:8005"
RESPONDER_URL = "http://localhost:8006"

# Paths and required env entries
PROJECT_ROOT = os.path.abspath(".")
ENV_FILE = os.path.join(PROJECT_ROOT, ".env")
REQUIRED_SECRETS = ["OPENAI_API_KEY"]

# Mirror config into current interpreter environment for local runs
os.environ["MEMORY_BACKEND"] = MEMORY_BACKEND
os.environ["DMAS_RUN_ID"] = DMAS_RUN_ID

print("Configuration loaded:")
print(f"  Backend: {MEMORY_BACKEND}")
print(f"  Run ID: {DMAS_RUN_ID}")
print(f"  Project: {PROJECT_ROOT}")


Configuration loaded:
  Backend: mem0
  Run ID: notebook-20251102-150228
  Project: /home/jacobbista/Documents/RM&SW/Experiment/dmas-long-context-memory


## System Functions


In [315]:
def read_env() -> dict:
    """Return key/value pairs from .env without exporting them."""
    env = {}
    if os.path.exists(ENV_FILE):
        with open(ENV_FILE, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith("#") or "=" not in line:
                    continue
                key, value = line.split("=", 1)
                env[key.strip()] = value.strip()
    return env


def write_env(env: dict) -> None:
    """Persist .env values while preserving manual entries."""
    lines = ["# DMAS Configuration", "# Managed by notebook where noted"]
    for key, value in sorted(env.items()):
        lines.append(f"{key}={value}")
    with open(ENV_FILE, "w", encoding="utf-8") as f:
        f.write("\n".join(lines) + "\n")


def update_env_file(backend: str, run_id: str) -> None:
    """Update runtime knobs; keep user secrets untouched."""
    env = read_env()
    env["MEMORY_BACKEND"] = backend
    env["DMAS_RUN_ID"] = run_id
    for key in REQUIRED_SECRETS:
        env.setdefault(key, f"<SET_{key}_HERE>")
    write_env(env)
    print(f"‚úÖ .env updated: MEMORY_BACKEND={backend}, DMAS_RUN_ID={run_id}")


def ensure_required_secrets() -> bool:
    """Warn if critical API keys are missing from env or .env."""
    env = read_env()
    missing = []
    for key in REQUIRED_SECRETS:
        value = os.getenv(key) or env.get(key)
        if not value or value.startswith("<SET_"):
            missing.append(key)
    if missing:
        print("‚ùå Missing secrets in environment/.env:")
        for key in missing:
            print(f"  - {key}")
        print("Add them manually to .env; the notebook will not write secrets.")
        return False
    print("‚úÖ Required secrets present")
    return True

def check_port_conflicts() -> bool:
    """Check if required ports are available."""
    ports = {
        8002: "Locomo",
        8003: "Coordinator",
        8005: "Memory",
        8006: "Responder",
        6333: "Qdrant",
        7474: "Neo4j Browser",
        7687: "Neo4j Bolt",
    }

    conflicts = []
    for port, service in ports.items():
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        if sock.connect_ex(("127.0.0.1", port)) == 0:
            conflicts.append((port, service))
        sock.close()

    if conflicts:
        print("‚ùå Port conflicts detected:")
        for port, service in conflicts:
            print(f"  - Port {port} ({service})")
        return False

    print("‚úÖ All ports available")
    return True


def docker_compose(*args: str) -> subprocess.CompletedProcess:
    """Run docker compose with shared options."""
    result = subprocess.run(
        ["docker", "compose", *args],
        capture_output=True,
        text=True,
        cwd=PROJECT_ROOT,
    )
    if result.stdout:
        print(result.stdout)
    if result.stderr:
        print(result.stderr)
    return result


def docker_up(build: bool = False) -> None:
    """Start Docker services for the selected profile."""
    print(f"Starting services with profile: {MEMORY_BACKEND}\n")

    args = ["--profile", MEMORY_BACKEND, "up", "-d", "--remove-orphans"]
    if build:
        args.append("--build")
    result = docker_compose(*args)
    
    if result.returncode == 0:
        print("\n‚úÖ Services starting (wait ~60s for healthy status)")
    else:
        print(f"\n‚ùå Failed: exit code {result.returncode}")


def docker_down() -> None:
    """Stop any running DMAS services (both profiles)."""
    print("Stopping services...\n")
    result = docker_compose(
        "--profile",
        "mem0",
        "--profile",
        "graphiti",
        "down",
        "--remove-orphans",
    )
    if result.returncode == 0:
        print("‚úÖ Services stopped")


def docker_clean() -> None:
    """Stop services and remove volumes (destructive)."""
    print("Cleaning all containers and volumes...\n")
    result = docker_compose(
        "--profile",
        "mem0",
        "--profile",
        "graphiti",
        "down",
        "-v",
        "--remove-orphans",
    )
    if result.returncode == 0:
        print("‚úÖ Clean complete")


def check_health(url: str, name: str, timeout: int = 5):
    """Check service health."""
    try:
        response = requests.get(f"{url}/health", timeout=timeout)
        if response.status_code == 200:
            data = response.json()
            print(f"‚úÖ {name:12} - {data.get('status', 'healthy')}")
            return True, data
        print(f"‚ùå {name:12} - HTTP {response.status_code}")
        return False, None
    except Exception as e:
        print(f"‚ùå {name:12} - {str(e)[:50]}")
        return False, None


def check_all_services() -> bool:
    """Check health of all services."""
    print("Checking services...\n")

    services = [
        (LOCOMO_URL, "Locomo"),
        (MEMORY_URL, "Memory"),
        (COORDINATOR_URL, "Coordinator"),
        (RESPONDER_URL, "Responder"),
    ]

    results = [check_health(url, name) for url, name in services]

    if all(r[0] for r in results):
        print("\n‚úÖ All services healthy")
        return True
    print("\n‚ùå Some services unhealthy")
    return False


def memory_stats():
    """Get memory statistics."""
    try:
        response = requests.get(f"{MEMORY_URL}/stats")
        data = response.json()
        print("üìä Memory Stats:")
        print(json.dumps(data, indent=2))
        return data
    except Exception as e:
        print(f"‚ùå Failed to get stats: {e}")
        return None


def memory_reset():
    """Reset memory (clear all stored data)."""
    try:
        response = requests.delete(f"{MEMORY_URL}/reset")
        data = response.json()
        print("üîÑ Memory Reset:")
        print(json.dumps(data, indent=2))
        return data
    except Exception as e:
        print(f"‚ùå Failed to reset: {e}")
        return None


def verify_memory_state() -> bool:
    """Verify what's loaded in memory."""
    print("üîç Memory State Verification\n")

    stats = memory_stats()
    if not stats:
        return False

    try:
        locomo_response = requests.get(f"{LOCOMO_URL}/stats")
        locomo_stats = locomo_response.json()
        print(f"\nüìö Locomo has {locomo_stats['total_conversations']} conversations available")
    except Exception as e:
        print(f"\n‚ö†Ô∏è  Could not get Locomo stats: {e}")

    return True


def load_conversations(indices=None, poll_interval=3, max_wait_s=900):
    """
    Start the load for each conversation and poll the status.
    """
    indices = list(range(10)) if indices is None else list(indices)
    summary = []

    for idx in indices:
        # 1. get info from locomo
        sample_id = None
        try:
            locomo_resp = requests.get(
                f"{LOCOMO_URL}/conversations/index/{idx}", timeout=30
            )
            locomo_resp.raise_for_status()
            sample_id = locomo_resp.json().get("sample_id")
        except Exception as err:
            sample_id = None

        # 2. START job on coordinator
        try:
            start_resp = requests.post(
                f"{COORDINATOR_URL}/load_conversation/index/{idx}",
                timeout=10,
            )
            start_resp.raise_for_status()
            start_data = start_resp.json()
            job_id = start_data["job_id"]
            memory_job_id = start_data.get("memory_job_id")
            
            print(f"[load] idx={idx} started job_id={job_id} (memory_job={memory_job_id or 'n/a'}) sample_id={sample_id or 'unknown'}")
        except Exception as err:
            print(f"[load] idx={idx} failed to start: {err}")
            summary.append({
                "index": idx,
                "status": "error",
                "sample_id": sample_id,
                "error": str(err),
            })
            continue

        # 3. Poll until it's done
        t0 = time.time()
        last_log_len = 0
        while True:
            if time.time() - t0 > max_wait_s:
                print(f"[load] idx={idx} TIMEOUT after {max_wait_s}s")
                summary.append({
                    "index": idx,
                    "status": "timeout",
                    "sample_id": sample_id,
                    "job_id": job_id,
                })
                break

            try:
                st_resp = requests.get(
                    f"{COORDINATOR_URL}/load_conversation/status/{job_id}",
                    timeout=10,
                )
                st_resp.raise_for_status()
                st = st_resp.json()
                memory_job_id = st.get("memory_job_id")
                if memory_job_id:
                    print(f"[{idx}] memory job is {memory_job_id}")
            except Exception as err:
                print(f"[load] idx={idx} polling error: {err}")
                time.sleep(poll_interval)
                continue

            logs = st.get("logs") or []
            # print only new logs
            for line in logs[last_log_len:]:
                print(f"[{idx}] {line}")
            last_log_len = len(logs)

            status = st.get("status")
            if status in ("done", "error"):
                print(f"[load] idx={idx} finished status={status}")
                st["index"] = idx
                st["sample_id"] = sample_id
                summary.append(st)
                break

            time.sleep(poll_interval)

    return summary


def query_memory(prompt: str, conversation_id: Optional[str] = None, limit: Optional[int] = 3):
    """Helper to call the memory /query endpoint."""

    payload = {"prompt": prompt}
    if conversation_id:
        payload["conversation_id"] = conversation_id
    if limit is not None:
        payload["limit"] = limit

    try:
        response = requests.post(
            f"{MEMORY_URL}/query",
            json=payload,
            timeout=60,
        )
        response.raise_for_status()
    except Exception as err:  # noqa: BLE001
        print(f"‚ùå Memory query failed: {err}")
        raise

    data = response.json()

    context_preview = data.get("context")
    if context_preview:
        print("Context preview:\n" + context_preview)
    else:
        print("Context preview: <empty>")

    return data


print("‚úÖ Functions loaded")


‚úÖ Functions loaded


## Phase 1: Setup

Run these cells in order for initial setup.


In [316]:
# Step 1: Update .env file
update_env_file(MEMORY_BACKEND, DMAS_RUN_ID)


‚úÖ .env updated: MEMORY_BACKEND=mem0, DMAS_RUN_ID=notebook-20251102-150228


In [317]:
# Step 2: Check for port conflicts
check_port_conflicts()


‚úÖ All ports available


True

In [318]:
# Step 3: Verify required secrets before starting services
ensure_required_secrets()


‚úÖ Required secrets present


True

## Phase 2: Start Services


In [319]:
# Start Docker Compose services
docker_up(build=True)


Starting services with profile: mem0

#1 [internal] load local bake definitions
#1 reading from stdin 2.35kB done
#1 DONE 0.0s

#2 [coordinator internal] load build definition from Dockerfile
#2 transferring dockerfile: 476B done
#2 DONE 0.0s

#3 [memory internal] load build definition from Dockerfile
#3 transferring dockerfile: 451B done
#3 DONE 0.0s

#4 [responder internal] load build definition from Dockerfile
#4 transferring dockerfile: 466B done
#4 DONE 0.0s

#5 [locomo internal] load build definition from Dockerfile
#5 transferring dockerfile: 360B done
#5 DONE 0.0s

#6 [responder internal] load metadata for docker.io/library/python:3.11-slim
#6 ...

#7 [locomo internal] load metadata for docker.io/library/python:3.10-slim
#7 DONE 1.3s

#6 [memory internal] load metadata for docker.io/library/python:3.11-slim
#6 DONE 1.3s

#8 [locomo internal] load .dockerignore
#8 transferring context: 2B done
#8 DONE 0.0s

#9 [responder internal] load .dockerignore
#9 transferring context: 2B d

In [320]:
# Wait 30 sec to 1 minute, then check health
time.sleep(5)
check_all_services()


Checking services...

‚úÖ Locomo       - healthy
‚úÖ Memory       - healthy
‚úÖ Coordinator  - healthy
‚úÖ Responder    - healthy

‚úÖ All services healthy


True

## Phase 3: Interact with Services


In [321]:
# Get Locomo stats
response = requests.get(f"{LOCOMO_URL}/stats")
stats = response.json()
print("üìä Locomo Stats:")
print(json.dumps(stats, indent=2))


üìä Locomo Stats:
{
  "total_conversations": 10,
  "total_sessions": 272,
  "total_turns": 5882,
  "total_questions": 1986,
  "conversations_loaded": true,
  "data_file": "/data/locomo10.json"
}


In [322]:
# Get memory stats
memory_stats()


üìä Memory Stats:
{
  "status": "success",
  "backend": "mem0",
  "collection": "conversations",
  "qdrant_host": "qdrant",
  "qdrant_port": 6333,
  "note": "Memory is operational. Use Qdrant API directly for detailed stats.",
  "test_query_success": true
}


{'status': 'success',
 'backend': 'mem0',
 'collection': 'conversations',
 'qdrant_host': 'qdrant',
 'qdrant_port': 6333,
 'note': 'Memory is operational. Use Qdrant API directly for detailed stats.',
 'test_query_success': True}

In [None]:
# Load conversations into memory via coordinator this may take a while (300s)
TARGET_INDEX = 0
load_summary = load_conversations(indices=[TARGET_INDEX])
load_summary

sample_id = next(
    (item.get("sample_id") for item in load_summary if item.get("sample_id")),
    None,
)
print(f"Selected sample_id: {sample_id or 'none'}")

sample_prompt = "Summarize the key events in the conversation."


In [323]:
# Preview condensed context via coordinator (raw vs condensed token counts)
if not sample_id:
    print("‚ö†Ô∏è Cannot preview without a conversation_id.")
else:
    preview_payload = {
        "question": sample_prompt,
        "limit": 3,
        "conversation_id": sample_id,
    }

    preview_response = requests.post(
        f"{COORDINATOR_URL}/preview",
        json=preview_payload,
        timeout=60,
    )
    preview_response.raise_for_status()
    preview_data = preview_response.json()

    raw_info = preview_data.get("raw", {})
    condensed_info = preview_data.get("condensed", {})
    memory_info = preview_data.get("memory", {})

    print(f"üîç Preview for: {sample_prompt}")
    print(f"   Conversation ID: {sample_id}")
    print(f"   Memory chunks: {memory_info.get('count', 0)}\n")
    
    print(f"üì¶ RAW context:")
    print(f"   Tokens: {raw_info.get('tokens', 0)}")
    print(f"   Chars: {raw_info.get('characters', 0)}")
    raw_ctx = raw_info.get('context') or ''
    print(f"   Text: {raw_ctx[:150]}...\n" if raw_ctx else "   Text: <empty>\n")
    
    print(f"üìâ CONDENSED context:")
    print(f"   Model: {condensed_info.get('model', 'N/A')}")
    print(f"   Tokens: {condensed_info.get('tokens', 0)}")
    print(f"   Chars: {condensed_info.get('characters', 0)}")
    ratio = condensed_info.get('compression_ratio')
    if ratio:
        print(f"   Compression: {ratio:.1%}")
    summary = condensed_info.get('summary') or ''
    print(f"   Summary: {summary[:150]}..." if summary else "   Summary: <empty>")
    
    preview_data

üîç Preview for: Summarize the key events in the conversation.
   Conversation ID: conv-26
   Memory chunks: 3

üì¶ RAW context:
   Tokens: 93
   Chars: 301
   Text: [session_10 | 8:56 pm on 20 July, 2023] Participated in regular meetings, events, and campaigns with the group

[session_3 | 7:55 pm on 9 June, 2023] ...

üìâ CONDENSED context:
   Model: gpt-4o-mini
   Tokens: 80
   Chars: 330
   Compression: 86.0%
   Summary: Key events in the conversation include:

- On July 20, 2023, the participant engaged in regular meetings, events, and campaigns with the group.
- On J...


In [324]:
# Fetch first Locomo QA pair for evaluation
qa_question = None
qa_answer = None
if sample_id is None:
    print("‚ö†Ô∏è Cannot fetch QA without a sample_id.")
else:
    try:
        qa_response = requests.get(
            f"{LOCOMO_URL}/conversations/index/{TARGET_INDEX}/questions",
            timeout=60,
        )
        qa_response.raise_for_status()
        qa_payload = qa_response.json()
        questions = qa_payload.get("questions") or []
        if not questions:
            print("‚ö†Ô∏è No QA entries found for this conversation.")
        else:
            qa_entry = questions[0]
            qa_question = qa_entry.get("question")
            qa_answer = qa_entry.get("answer") or qa_entry.get("adversarial_answer")
            print("Ground-truth question:", qa_question)
            print("Ground-truth answer:", qa_answer)
    except Exception as err:  # noqa: BLE001
        print(f"‚ùå Failed to fetch QA: {err}")



Ground-truth question: When did Caroline go to the LGBTQ support group?
Ground-truth answer: 7 May 2023


In [None]:
# Query memory manually to inspect retrieved context for the first QA
if not sample_id or not qa_question:
    print("‚ö†Ô∏è Missing sample_id or QA question.")
else:
    memory_response = query_memory(qa_question, conversation_id=sample_id, limit=10)
    memory_response

Context preview:
[session_14 | 1:33 pm on 25 August, 2023] Caroline is part of the transgender community

[session_8 | 1:51 pm on 15 July, 2023] The support group has made Caroline feel accepted and given her courage to embrace herself.

[session_14 | 1:33 pm on 25 August, 2023] Caroline is putting together an LGBTQ art show next month to showcase her paintings and feature LGBTQ artists.


In [None]:
# Ask a question end-to-end via coordinator/responder
if not qa_question:
    print("‚ö†Ô∏è No QA question available to ask.")
else:
    question = qa_question
    print(f"Question: {question}\n")

    ask_payload = {"question": question}
    if sample_id:
        ask_payload["conversation_id"] = sample_id
        ask_payload["limit"] = 15

    response = requests.post(
        f"{COORDINATOR_URL}/ask",
        json=ask_payload,
        timeout=60
    )

    result = response.json()
    print("Answer:")
    print(json.dumps(result, indent=2))


In [None]:
# Compare responder answer with ground truth
if qa_question and qa_answer and result.get("status") == "success":
    responder_answer = result.get("answer") or ""
    print("Ground truth:", qa_answer)
    print("Responder:", responder_answer)
else:
    print("‚ö†Ô∏è Unable to compute evaluation (missing data or responder error).")

## Phase 4: Memory Management & Evaluation


In [None]:
# Verify memory state
verify_memory_state()


In [None]:
# Reset memory (clear all data)
memory_reset()


In [None]:
# Verify memory cleared
memory_stats()


## Phase 5: Shutdown


In [312]:
# Stop services (preserves data volumes)
docker_down()

Stopping services...

 Container coordinator  Stopping
 Container coordinator  Stopped
 Container coordinator  Removing
 Container coordinator  Removed
 Container memory  Stopping
 Container locomo  Stopping
 Container responder  Stopping
 Container locomo  Stopped
 Container locomo  Removing
 Container locomo  Removed
 Container responder  Stopped
 Container responder  Removing
 Container responder  Removed
 Container memory  Stopped
 Container memory  Removing
 Container memory  Removed
 Container qdrant  Stopping
 Container qdrant  Stopped
 Container qdrant  Removing
 Container qdrant  Removed
 Network dmas-long-context-memory_memory-internal  Removing
 Network dmas-long-context-memory_dmas-network  Removing
 Network dmas-long-context-memory_memory-internal  Removed
 Network dmas-long-context-memory_dmas-network  Removed

‚úÖ Services stopped


In [313]:
# OPTIONAL: Clean everything (removes all volumes)
# Uncomment to use:
# docker_clean()
