Skip to content

events streaming improvements#14

Open
michalkucharczyk wants to merge 10 commits intomainfrom
mku-events-improvements
Open

events streaming improvements#14
michalkucharczyk wants to merge 10 commits intomainfrom
mku-events-improvements

Conversation

@michalkucharczyk
Copy link
Contributor

@michalkucharczyk michalkucharczyk commented Mar 4, 2026

Breaking WS API changes

Subscribe filter NodeNodes:

// Before — single node only
{"type":"Subscribe","filter":{"type":"Node","node_id":"abc123"}}

// After — single node
{"type":"Subscribe","filter":{"type":"Nodes","node_ids":["abc123"]}}

// After — multiple nodes
{"type":"Subscribe","filter":{"type":"Nodes","node_ids":["abc123","def456"]}}

// After — all events  (recommended, main broadcast channel, )
{"type":"Subscribe","filter":{"type":"All"}}

// After — all node channels (wildcard, for StreamMap perf evaluation)
{"type":"Subscribe","filter":{"type":"Nodes","node_ids":["*"]}}

New batch message type:

Events are now can batched — up to 100 per WS frame:

{"type":"batch","data":[{event1},{event2},...],"timestamp":"2025-..."}

Single events still sent unwrapped as {"type":"event",...}. Clients must handle both.

--no-database mode:

Minimal /api/ws + /api/health router for WS-only streaming without a DB. Stats/metrics/alerts disabled. Mainly for streaming performance evaluation.

Optimizations

WS sender: Lagged handling

Previously, a RecvError::Lagged from the broadcast channel killed the WS connection — the client had to reconnect and lost all context. Now lagged events are counted and skipped, the connection stays alive, and the client keeps receiving. Debug logs report lagged_total so slow clients are visible without being punished.

WS sender: opportunistic batching

After receiving one event, the WS handler drains up to 100 more via try_recv() (non-blocking) and sends them in a single frame. Measured 1.6M ev/s vs 158K unbatched. Also helps drain broadcast_lag spikes faster (observed up to 524K with batch=10).

Dedicated ingestion runtimes (default: 8)

With a single tokio runtime, 1024 TCP ingestion tasks starved WS client tasks — the WS sender couldn't get scheduled often enough to drain its broadcast receiver, causing broadcast_lag to climb and events to be lost. Ingestion now runs on 8 dedicated single-thread tokio runtimes (--ingestion-threads, SO_REUSEPORT), freeing the main runtime for WS clients, API, and the aggregator.

Aggregator bottleneck fixes

Three bottlenecks were hit at 600K+ events/s with WS subscribers connected:

  1. Lock contention on node_channels HashMapbroadcast_event() took a write lock on RwLock<HashMap> for every event. Replaced with actor pattern: aggregator owns the HashMap, WS handlers send commands via mpsc with oneshot replies. Lock removed entirely.

  2. Aggregator busy-looptry_recv() in a tight loop burned CPU when idle. Replaced with select! over batch channel + command channel.

  3. JSON serialization in single aggregator — Building WS envelopes (RawValue parse + serde) for every event at 600K/s caused mpsc_lag to grow to 272K — aggregator couldn't keep up. Moved serialization to the 8 ingestion runtimes (~75K ev/s each). Aggregator now does pure routing. Result: mpsc_lag stays at 0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant