Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/roadmap/DISTRIBUTED_STATE_ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ Date: May 22, 2026
- Subscription-collapse over N-link cycles: covered by `StateSyncAdapterLinkForwardingTest`.
- `state_distributed_admin::link_cycles()` static cycle enumerator.
- Slice 6: `state::resolveRemote(hop_budget)` — pull-on-demand remote link resolution composing with delegation + lease expiry. Cross-cluster link tests covering authority transfer, lease expiry invalidation, and sendMessage routing over transparent links without the caller naming a cluster_id.
- Phase 7 (perf + production hardening) and Phase 9 (network analytics) are not started.
- Phase 9 (network analytics & live telemetry) core delivered: `state_node_telemetry` (EWMA, latency histogram, JSON ser/deser, OOB publish), `state_telemetry_aggregator` (cluster rollups, stale detection, routing feedback), admin `to_text()` telemetry section. 31 tests across two new test suites. Remaining: 100-node bench, CBOR encoding, periodic auto-publish timer.
- Phase 7 (perf + production hardening): admin telemetry integration delivered; production benchmarks, delta encoding, and zstd compression not started.

## Purpose

Expand Down Expand Up @@ -310,10 +311,17 @@ Remaining for Phase 8:
- Cluster-agnostic API: `linkTo("data.world.geometry")` and `sendMessage(...)` on the link node must succeed when authority for `data.world.geometry` is local, when it has been delegated to a second cluster mid-test, and after authority moves back — the test never names a `cluster_id`. A regression that requires the caller to know the owning cluster fails this test.
- Bench: 1M-path tree with 10k links, including a few cycles, and assert resolver/subscription latency stays bounded.

### Phase 9: Network Analytics And Live Telemetry
### Phase 9: Network Analytics And Live Telemetry (in progress)

Each node needs a live picture of the cluster's health and shape so operators (and the system itself, for routing decisions) can answer questions like "how big is this tree, how fast can I move a blob to peer X, how many nodes are in cluster Y" without per-query polling. Analytics piggyback on the OOB messaging bus and the existing `state_distributed_metrics` module so they cost no extra connections.

Delivered so far on `feature/phase9-telemetry`:

- `state_node_telemetry` — per-node sampler with EWMA (configurable half-life), power-of-2 latency histogram (22 buckets, O(1) record, p50/p90/p99 queries), `telemetry_snapshot` POD struct (35+ fields covering counters, rates, latency percentiles, tree/cluster shape), `sample()` reads shard/transport/bus counters, `publish_snapshot()` serializes to JSON and publishes via OOB bus on `__telemetry.<cluster>.<node>` topic, JSON round-trip serialize/deserialize with no external dependency.
- `state_telemetry_aggregator` — cluster-level rollup: subscribes to `__telemetry.<cluster_id>` on the OOB bus, ingests peer snapshots, computes `cluster_telemetry_summary` (aggregated counters, max latencies, summed rates), stale-peer detection (configurable threshold, default 5 s), `evaluate_routing_feedback(policy)` returning isolate/release node lists based on p99 latency and outbox drop thresholds, `to_text()` for human-readable dumps.
- `state_distributed_admin` extensions — `attach_telemetry(state_telemetry_aggregator*)`, `telemetry()` accessor, `to_text()` now appends a `[telemetry]` section when an aggregator is attached.
- Tests — 18 test cases in `state_node_telemetry_test` (EWMA, histogram, JSON round-trip, sampling, latency recording, rate computation, bus publish), 13 test cases in `state_telemetry_aggregator_test` (aggregation, stale detection, routing feedback, admin integration). All pass.

- Per-node sampler (`state_node_telemetry`):
- Counters and rolling EWMAs for: bytes-sent / bytes-received per peer and per transport; mutation publish rate; message admit/dedup/drop rates; bounded-queue depths and overflow events; blob bytes uploaded/downloaded with throughput EWMA.
- Latency histograms for: local enqueue, replica apply, remote delivery, callback dispatch, blob round-trip, message propagation. Use t-digest or HDR-style buckets so percentiles (p50/p90/p99) are stable.
Expand Down
9 changes: 7 additions & 2 deletions inc/cvc/state_distributed_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class state_blob_store;
class state_cluster_shard;
class state_message_bus;
class state_peer_registry;
class state_telemetry_aggregator;

// ----------------
// cvc::state_distributed_admin
Expand Down Expand Up @@ -123,11 +124,13 @@ class state_distributed_admin {
void attach_peer_registry(state_peer_registry *peers) noexcept;
void attach_blob_store(state_blob_store *blobs) noexcept;
void attach_message_bus(state_message_bus *bus) noexcept;
void attach_telemetry(state_telemetry_aggregator *tel) noexcept;

state_cluster_shard *shard() const noexcept { return _shard; }
state_peer_registry *peer_registry() const noexcept { return _peers; }
state_blob_store *blob_store() const noexcept { return _blobs; }
state_message_bus *message_bus() const noexcept { return _bus; }
state_telemetry_aggregator *telemetry() const noexcept { return _tel; }

// Capture a coherent snapshot of all attached subsystems. Each
// sub-snapshot is taken independently; callers should not assume
Expand All @@ -139,8 +142,9 @@ class state_distributed_admin {
// stable. Detached subsystems are listed as "<name>: detached".
static std::string to_text(const report &r);

// Convenience: snapshot() + to_text().
std::string to_text() const { return to_text(snapshot()); }
// Convenience: snapshot() + to_text(). If a telemetry
// aggregator is attached, its summary is appended.
std::string to_text() const;

// Erase every blob in the attached blob store whose digest is NOT
// in `live_digests`. Returns counts and bytes freed. If no blob
Expand Down Expand Up @@ -244,6 +248,7 @@ class state_distributed_admin {
state_peer_registry *_peers = nullptr;
state_blob_store *_blobs = nullptr;
state_message_bus *_bus = nullptr;
state_telemetry_aggregator *_tel = nullptr;
};

} // namespace CVC_NAMESPACE
Expand Down
253 changes: 253 additions & 0 deletions inc/cvc/state_node_telemetry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
Copyright 2026 The University of Texas at Austin

This file is part of libcvc.

libcvc is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License version 2.1 as published by the Free Software Foundation.
*/

#ifndef __CVC_STATE_NODE_TELEMETRY_H__
#define __CVC_STATE_NODE_TELEMETRY_H__

#include <atomic>
#include <chrono>
#include <cstdint>
#include <cvc/namespace.h>
#include <mutex>
#include <string>
#include <vector>

namespace CVC_NAMESPACE {

class app;
class state_cluster_shard;
class state_message_bus;
class state_transport;

// ----------------
// cvc::ewma
// ----------------
// Exponentially-weighted moving average with configurable
// half-life. Thread-safe via double-wide CAS on 64-bit platforms.
//
struct ewma {
// Construct with a half-life in nanoseconds. The default 1 s
// means that a sample taken 1 second after the previous one
// moves the EWMA halfway toward the new value.
explicit ewma(std::uint64_t half_life_ns = 1'000'000'000ULL) noexcept;

// Record a new sample at `now_ns` (monotonic nanoseconds).
void update(double sample, std::uint64_t now_ns) noexcept;

// Current smoothed value.
double value() const noexcept;

// Reset to zero.
void reset() noexcept;

std::uint64_t half_life_ns() const noexcept { return _half_life_ns; }

private:
std::uint64_t _half_life_ns;
mutable std::mutex _mu;
double _value = 0.0;
std::uint64_t _last_ns = 0;
bool _initialized = false;
};

// ----------------
// cvc::latency_histogram
// ----------------
// Fixed-bucket histogram for latency values in nanoseconds.
// Bucket boundaries are powers of two from 2^10 (1 µs) to
// 2^30 (1.07 s), plus an overflow bucket.
//
// Provides O(1) record and O(bucket_count) percentile queries.
//
struct latency_histogram {
// 21 buckets: [0, 1µs), [1µs, 2µs), [2µs, 4µs), ...
// [512ms, 1.07s), [1.07s, ∞)
static constexpr std::size_t BUCKET_COUNT = 22;

void record(std::uint64_t latency_ns) noexcept;
std::uint64_t count() const noexcept;

// Returns the bucket boundary that the given percentile (0.0–1.0)
// falls into, or 0 if no samples have been recorded.
std::uint64_t percentile(double p) const noexcept;

// p50 / p90 / p99 convenience.
std::uint64_t p50() const noexcept { return percentile(0.50); }
std::uint64_t p90() const noexcept { return percentile(0.90); }
std::uint64_t p99() const noexcept { return percentile(0.99); }

void reset() noexcept;

// Raw bucket access for serialization.
std::uint64_t bucket(std::size_t i) const noexcept;

private:
std::atomic<std::uint64_t> _buckets[BUCKET_COUNT] = {};
static std::size_t bucket_index(std::uint64_t ns) noexcept;
};

// ----------------
// cvc::telemetry_snapshot
// ----------------
// POD snapshot of a single node's telemetry at a point in time.
// Cheap to copy and safe to serialize.
//
struct telemetry_snapshot {
std::string node_id;
std::string cluster_id;
std::uint64_t timestamp_ns = 0; // monotonic clock

// --- Counters (cumulative) ---
std::uint64_t mutations_published = 0;
std::uint64_t mutations_applied = 0;
std::uint64_t mutations_duplicates = 0;
std::uint64_t mutations_rejected = 0;
std::uint64_t mutations_conflicts = 0;
std::uint64_t messages_admitted = 0;
std::uint64_t messages_duplicates = 0;
std::uint64_t messages_dispatched = 0;
std::uint64_t messages_dropped = 0;
std::uint64_t bytes_sent = 0;
std::uint64_t bytes_received = 0;
std::uint64_t blobs_stored = 0;
std::uint64_t blob_bytes = 0;
std::uint64_t delegations_routed = 0;
std::uint64_t delegations_expired = 0;
std::uint64_t quarantined_mutations = 0;
std::uint64_t quarantined_messages = 0;
std::uint64_t auto_isolations = 0;

// --- Rates (EWMA, per second) ---
double mutation_publish_rate = 0.0;
double mutation_apply_rate = 0.0;
double message_admit_rate = 0.0;

// --- Queue depths ---
std::uint64_t outbox_depth = 0;
std::uint64_t outbox_drops = 0;

// --- Tree shape ---
std::uint64_t path_count = 0;
std::uint64_t link_count = 0;

// --- Latency percentiles (ns) ---
std::uint64_t enqueue_p50 = 0;
std::uint64_t enqueue_p90 = 0;
std::uint64_t enqueue_p99 = 0;
std::uint64_t delivery_p50 = 0;
std::uint64_t delivery_p90 = 0;
std::uint64_t delivery_p99 = 0;

// --- Peer count ---
std::uint64_t peer_count = 0;
std::uint64_t slow_peer_count = 0;
};

// ----------------
// cvc::state_node_telemetry
// ----------------
// Phase 9: per-node telemetry sampler.
//
// Collects counters, EWMAs, and latency histograms from attached
// subsystems (shard, transport, message bus, blob store) and
// produces a telemetry_snapshot on demand. Also supports periodic
// publishing of snapshots onto the OOB message bus for distribution
// to other cluster nodes.
//
// Typical usage:
// state_node_telemetry tel(a, "node-1", "alpha");
// tel.attach_shard(shard);
// tel.attach_transport(transport);
// tel.attach_message_bus(bus);
// // ... on a timer:
// tel.sample();
// tel.publish_delta();
// // ... for local inspection:
// auto snap = tel.snapshot();
//
// Threading:
// sample() is not thread-safe with itself (call from one thread
// or one timer). snapshot() is safe concurrent with sample().
// record_*_latency() are lock-free (atomic histogram buckets).
//
class state_node_telemetry {
public:
state_node_telemetry(app &ctx, std::string node_id, std::string cluster_id);
~state_node_telemetry();

state_node_telemetry(const state_node_telemetry &) = delete;
state_node_telemetry &operator=(const state_node_telemetry &) = delete;

// Subsystem attachment (non-owning pointers).
void attach_shard(state_cluster_shard *s) noexcept;
void attach_transport(state_transport *t) noexcept;
void attach_message_bus(state_message_bus *b) noexcept;

// Capture counters from attached subsystems and update EWMAs.
// Call this periodically (e.g. once per second).
void sample();

// Return a snapshot of the latest sampled state.
telemetry_snapshot snapshot() const;

// Record a latency measurement. Callers in the hot path should
// bracket their operation with steady_clock and call these.
void record_enqueue_latency(std::uint64_t ns) noexcept;
void record_delivery_latency(std::uint64_t ns) noexcept;

// Serialize the latest snapshot into a state_message suitable
// for publishing on the OOB bus. Uses JSON for debug builds,
// a compact binary format for release.
static constexpr const char *MIME_TELEMETRY = "application/x-cvc-telemetry+json";
static constexpr const char *TOPIC_PREFIX = "__telemetry.";

// Publish the latest snapshot as an OOB message on the attached
// bus (if any). Returns true if the message was admitted.
bool publish_snapshot();

// Serialize / deserialize a snapshot to/from JSON.
static std::string serialize_json(const telemetry_snapshot &snap);
static bool deserialize_json(const std::string &json, telemetry_snapshot &out);

const std::string &node_id() const noexcept { return _node_id; }
const std::string &cluster_id() const noexcept { return _cluster_id; }

private:
app &_ctx;
std::string _node_id;
std::string _cluster_id;

state_cluster_shard *_shard = nullptr;
state_transport *_transport = nullptr;
state_message_bus *_bus = nullptr;

// Rate EWMAs (1 s half-life).
ewma _mutation_publish_rate;
ewma _mutation_apply_rate;
ewma _message_admit_rate;

// Latency histograms.
latency_histogram _enqueue_hist;
latency_histogram _delivery_hist;

// Previous counter values for rate computation.
std::uint64_t _prev_mutations_published = 0;
std::uint64_t _prev_mutations_applied = 0;
std::uint64_t _prev_messages_admitted = 0;
std::uint64_t _prev_sample_ns = 0;
std::uint64_t _publish_seq = 0;

mutable std::mutex _snap_mu;
telemetry_snapshot _latest;
};

} // namespace CVC_NAMESPACE

#endif // __CVC_STATE_NODE_TELEMETRY_H__
Loading
Loading