# Module 02: Exploring NATS Messaging

> **Goal**: Set up a subscriber to see what messages Dynamo publishes to NATS and understand the event flow.

---

## What is NATS?

NATS is a lightweight publish-subscribe messaging system. Think of it like Kafka or RabbitMQ, but simpler:

| Concept | Description |
|---------|-------------|
| **Subject** | Topic name (like `dynamo.kvcache.update`) |
| **Publisher** | Sends messages to a subject |
| **Subscriber** | Receives messages from a subject |
| **Wildcard `>`** | Subscribe to all subjects |

In Dynamo, NATS is used for **async events** (not inference requests):
- KV cache state changes
- Worker coordination
- Metrics/telemetry

---

## Step 1: Check NATS is Running

First, let's verify NATS is up and check its current state.

In [2]:
import urllib.request
import json

def check_nats():
    """Check NATS server status and show current connections"""
    print("=" * 60)
    print("NATS SERVER STATUS")
    print("=" * 60)
    
    # Health check
    try:
        with urllib.request.urlopen('http://localhost:8222/healthz', timeout=5) as resp:
            print(f"\n‚úì NATS is healthy: {resp.read().decode()}")
    except Exception as e:
        print(f"\n‚úó NATS not responding: {e}")
        print("\nStart NATS with: docker run -d --name dynamo-nats -p 4222:4222 -p 8222:8222 nats:latest -js -m 8222")
        return
    
    # Server info
    with urllib.request.urlopen('http://localhost:8222/varz', timeout=5) as resp:
        varz = json.loads(resp.read())
        print(f"\nServer Info:")
        print(f"  Version:      {varz.get('version')}")
        print(f"  Connections:  {varz.get('connections', 0)}")
        print(f"  Messages In:  {varz.get('in_msgs', 0)}")
        print(f"  Messages Out: {varz.get('out_msgs', 0)}")
    
    # Current connections (who's connected?)
    with urllib.request.urlopen('http://localhost:8222/connz?subs=true', timeout=5) as resp:
        connz = json.loads(resp.read())
        print(f"\nActive Connections ({connz.get('num_connections', 0)} total):")
        for conn in connz.get('connections', []):
            name = conn.get('name', 'unnamed')
            ip = conn.get('ip', '?')
            subs = conn.get('subscriptions_list', [])
            print(f"\n  üìå {name}")
            print(f"     IP: {ip}")
            print(f"     Msgs In/Out: {conn.get('in_msgs', 0)} / {conn.get('out_msgs', 0)}")
            if subs:
                print(f"     Subscriptions: {subs[:5]}{'...' if len(subs) > 5 else ''}")

check_nats()

NATS SERVER STATUS

‚úì NATS is healthy: {"status":"ok"}

Server Info:
  Version:      2.12.4
  Connections:  0
  Messages In:  0
  Messages Out: 0

Active Connections (0 total):


---

## Step 2: Install NATS Python Client

In [3]:
# Install nats-py
!pip install -q nats-py
print("‚úì nats-py installed")

‚úì nats-py installed


---

## Step 3: Monitor NATS While Running Inference

Let's start a subscriber AND send an inference request at the same time to see what messages flow.

In [4]:
import asyncio
import nats
import urllib.request
import json
import threading
from datetime import datetime

# Store messages
inference_messages = []
inference_result = None

def send_inference():
    """Send an inference request in a separate thread"""
    global inference_result
    import time
    time.sleep(1)  # Wait for subscriber to be ready
    
    print("\nüöÄ Sending inference request...")
    
    payload = {
        "model": "Qwen/Qwen3-0.6B",
        "messages": [{"role": "user", "content": "Say hello in 5 words."}],
        "max_tokens": 30
    }
    
    try:
        req = urllib.request.Request(
            "http://localhost:8000/v1/chat/completions",
            data=json.dumps(payload).encode(),
            headers={'Content-Type': 'application/json'}
        )
        with urllib.request.urlopen(req, timeout=60) as resp:
            inference_result = json.loads(resp.read())
            print(f"\n‚úì Inference complete: {inference_result['choices'][0]['message']['content'][:50]}...")
    except Exception as e:
        inference_result = {"error": str(e)}
        print(f"\n‚úó Inference failed: {e}")

async def monitor_during_inference():
    """Monitor NATS while sending inference"""
    global inference_messages
    inference_messages = []
    
    print("=" * 60)
    print("MONITORING NATS DURING INFERENCE")
    print("=" * 60)
    
    async def on_message(msg):
        ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
        try:
            data = msg.data.decode('utf-8')
        except:
            data = f"<binary {len(msg.data)} bytes>"
        
        inference_messages.append({
            'time': ts,
            'subject': msg.subject,
            'data': data
        })
        print(f"[{ts}] üì® {msg.subject}: {data[:80]}..." if len(data) > 80 else f"[{ts}] üì® {msg.subject}: {data}")
    
    # Connect and subscribe
    nc = await nats.connect("nats://localhost:4222")
    sub = await nc.subscribe(">", cb=on_message)
    print("‚úì Subscriber ready, waiting for messages...\n")
    
    # Start inference in background thread
    thread = threading.Thread(target=send_inference)
    thread.start()
    
    # Monitor for 15 seconds (enough time for inference)
    await asyncio.sleep(15)
    
    # Cleanup
    await sub.unsubscribe()
    await nc.drain()
    thread.join()
    
    # Results
    print("\n" + "=" * 60)
    print("RESULTS")
    print("=" * 60)
    print(f"\nTotal NATS messages during inference: {len(inference_messages)}")
    
    if inference_messages:
        print("\nMessages captured:")
        for m in inference_messages:
            print(f"  [{m['time']}] {m['subject']}")
    else:
        print("\n‚ö†Ô∏è  No NATS messages captured during inference.")
        print("\nThis is EXPECTED in a single-worker setup!")
        print("")
        print("Why? Dynamo uses NATS for multi-worker coordination:")
        print("  ‚Ä¢ KV cache sharing between workers")
        print("  ‚Ä¢ Disaggregated prefill/decode coordination")
        print("  ‚Ä¢ Load balancing signals")
        print("")
        print("In a single worker setup, there's no need for cross-worker events.")
        print("The inference path goes directly: Frontend ‚Üí Worker via TCP/gRPC")

# Run it
await monitor_during_inference()

MONITORING NATS DURING INFERENCE
‚úì Subscriber ready, waiting for messages...


üöÄ Sending inference request...

‚úì Inference complete: <think>
Okay, the user wants me to say hello in fi...

RESULTS

Total NATS messages during inference: 0

‚ö†Ô∏è  No NATS messages captured during inference.

This is EXPECTED in a single-worker setup!

Why? Dynamo uses NATS for multi-worker coordination:
  ‚Ä¢ KV cache sharing between workers
  ‚Ä¢ Disaggregated prefill/decode coordination
  ‚Ä¢ Load balancing signals

In a single worker setup, there's no need for cross-worker events.
The inference path goes directly: Frontend ‚Üí Worker via TCP/gRPC


---

## Step 4: Check JetStream (Persistent Messages)

NATS JetStream can persist messages in **streams**. Let's see if Dynamo uses any.

In [5]:
import urllib.request
import json

print("=" * 60)
print("NATS JETSTREAM STATUS")
print("=" * 60)

# JetStream overview
try:
    with urllib.request.urlopen('http://localhost:8222/jsz?streams=true', timeout=5) as resp:
        jsz = json.loads(resp.read())
        
        print(f"\nJetStream Enabled: ‚úì")
        print(f"Streams:   {jsz.get('streams', 0)}")
        print(f"Consumers: {jsz.get('consumers', 0)}")
        print(f"Messages:  {jsz.get('messages', 0)}")
        print(f"Bytes:     {jsz.get('bytes', 0)}")
        
        # List streams if any
        account_details = jsz.get('account_details', [])
        for account in account_details:
            streams = account.get('stream_detail', [])
            if streams:
                print(f"\nStreams in account '{account.get('name', 'default')}':")
                for stream in streams:
                    print(f"  üì¶ {stream.get('name')}")
                    print(f"     Subjects: {stream.get('config', {}).get('subjects', [])}")
                    print(f"     Messages: {stream.get('state', {}).get('messages', 0)}")
        
        if jsz.get('streams', 0) == 0:
            print("\n‚ö†Ô∏è  No JetStream streams configured.")
            print("")
            print("Dynamo may use:")
            print("  ‚Ä¢ Pure pub/sub (no persistence) for real-time events")
            print("  ‚Ä¢ JetStream only in certain configurations")
            print("  ‚Ä¢ Streams created dynamically when needed")
            
except Exception as e:
    print(f"\n‚úó Could not query JetStream: {e}")

NATS JETSTREAM STATUS

JetStream Enabled: ‚úì
Streams:   0
Consumers: 0
Messages:  0
Bytes:     0

‚ö†Ô∏è  No JetStream streams configured.

Dynamo may use:
  ‚Ä¢ Pure pub/sub (no persistence) for real-time events
  ‚Ä¢ JetStream only in certain configurations
  ‚Ä¢ Streams created dynamically when needed


---

## Understanding NATS in Dynamo

### Message Flow Architecture

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ                     NATS Message Bus                         ‚îÇ
‚îÇ                                                              ‚îÇ
‚îÇ   Subjects:                                                  ‚îÇ
‚îÇ     dynamo.kvcache.{worker_id}.update  (cache block added)  ‚îÇ
‚îÇ     dynamo.kvcache.{worker_id}.evict   (cache block removed)‚îÇ
‚îÇ     dynamo.router.rebalance            (load balancing)     ‚îÇ
‚îÇ                                                              ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                           ‚îÇ
         ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
         ‚îÇ                 ‚îÇ                 ‚îÇ
         ‚ñº                 ‚ñº                 ‚ñº
   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê     ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê     ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
   ‚îÇ Worker 1 ‚îÇ     ‚îÇ Worker 2 ‚îÇ     ‚îÇ  Router  ‚îÇ
   ‚îÇ (Prefill)‚îÇ     ‚îÇ (Decode) ‚îÇ     ‚îÇ          ‚îÇ
   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

### When NATS is Used

| Scenario | NATS Messages |
|----------|---------------|
| Single worker (basic setup) | **Minimal** - no coordination needed |
| Multiple workers | KV cache eviction events |
| Disaggregated prefill/decode | KV cache transfer notifications |
| KV-aware routing | Cache state for routing decisions |

### Why No Messages in Single-Worker?

The inference path is **direct TCP**, not through NATS:

```
Client ‚îÄ‚îÄHTTP‚îÄ‚îÄ‚ñ∂ Frontend ‚îÄ‚îÄTCP/gRPC‚îÄ‚îÄ‚ñ∂ Worker
                     ‚îÇ                    ‚îÇ
                     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ etcd ‚óÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                       (discovery)
```

NATS is for **async events**, not request/response. This keeps inference fast!

---

## Key Takeaways

1. **NATS is a pub/sub messaging system** - components publish events, others subscribe
2. **Use `>` to subscribe to everything** - great for debugging
3. **Single-worker = minimal NATS traffic** - events are for multi-worker coordination
4. **JetStream = persistent messages** - Dynamo may use this for durable events
5. **Inference doesn't use NATS** - direct TCP for speed

### Command-Line Tools

```bash
# Monitor all NATS messages from terminal
docker exec -it dynamo-nats nats sub '>'

# Or use nats-cli locally
nats sub '>' --server=nats://localhost:4222

# Publish a test message
nats pub test.hello "world" --server=nats://localhost:4222
```