diff --git a/docs/roadmap/DISTRIBUTED_STATE_ROADMAP.md b/docs/roadmap/DISTRIBUTED_STATE_ROADMAP.md index 65e4039..52a3cca 100644 --- a/docs/roadmap/DISTRIBUTED_STATE_ROADMAP.md +++ b/docs/roadmap/DISTRIBUTED_STATE_ROADMAP.md @@ -15,7 +15,8 @@ Date: May 22, 2026 - Subscription-collapse over N-link cycles: covered by `StateSyncAdapterLinkForwardingTest`. - `state_distributed_admin::link_cycles()` static cycle enumerator. - Slice 6: `state::resolveRemote(hop_budget)` — pull-on-demand remote link resolution composing with delegation + lease expiry. Cross-cluster link tests covering authority transfer, lease expiry invalidation, and sendMessage routing over transparent links without the caller naming a cluster_id. -- Phase 7 (perf + production hardening) and Phase 9 (network analytics) are not started. +- Phase 9 (network analytics & live telemetry) core delivered: `state_node_telemetry` (EWMA, latency histogram, JSON ser/deser, OOB publish), `state_telemetry_aggregator` (cluster rollups, stale detection, routing feedback), admin `to_text()` telemetry section. 31 tests across two new test suites. Remaining: 100-node bench, CBOR encoding, periodic auto-publish timer. +- Phase 7 (perf + production hardening): admin telemetry integration delivered; production benchmarks, delta encoding, and zstd compression not started. ## Purpose @@ -310,10 +311,17 @@ Remaining for Phase 8: - Cluster-agnostic API: `linkTo("data.world.geometry")` and `sendMessage(...)` on the link node must succeed when authority for `data.world.geometry` is local, when it has been delegated to a second cluster mid-test, and after authority moves back — the test never names a `cluster_id`. A regression that requires the caller to know the owning cluster fails this test. - Bench: 1M-path tree with 10k links, including a few cycles, and assert resolver/subscription latency stays bounded. -### Phase 9: Network Analytics And Live Telemetry +### Phase 9: Network Analytics And Live Telemetry (in progress) Each node needs a live picture of the cluster's health and shape so operators (and the system itself, for routing decisions) can answer questions like "how big is this tree, how fast can I move a blob to peer X, how many nodes are in cluster Y" without per-query polling. Analytics piggyback on the OOB messaging bus and the existing `state_distributed_metrics` module so they cost no extra connections. +Delivered so far on `feature/phase9-telemetry`: + +- `state_node_telemetry` — per-node sampler with EWMA (configurable half-life), power-of-2 latency histogram (22 buckets, O(1) record, p50/p90/p99 queries), `telemetry_snapshot` POD struct (35+ fields covering counters, rates, latency percentiles, tree/cluster shape), `sample()` reads shard/transport/bus counters, `publish_snapshot()` serializes to JSON and publishes via OOB bus on `__telemetry..` topic, JSON round-trip serialize/deserialize with no external dependency. +- `state_telemetry_aggregator` — cluster-level rollup: subscribes to `__telemetry.` on the OOB bus, ingests peer snapshots, computes `cluster_telemetry_summary` (aggregated counters, max latencies, summed rates), stale-peer detection (configurable threshold, default 5 s), `evaluate_routing_feedback(policy)` returning isolate/release node lists based on p99 latency and outbox drop thresholds, `to_text()` for human-readable dumps. +- `state_distributed_admin` extensions — `attach_telemetry(state_telemetry_aggregator*)`, `telemetry()` accessor, `to_text()` now appends a `[telemetry]` section when an aggregator is attached. +- Tests — 18 test cases in `state_node_telemetry_test` (EWMA, histogram, JSON round-trip, sampling, latency recording, rate computation, bus publish), 13 test cases in `state_telemetry_aggregator_test` (aggregation, stale detection, routing feedback, admin integration). All pass. + - Per-node sampler (`state_node_telemetry`): - Counters and rolling EWMAs for: bytes-sent / bytes-received per peer and per transport; mutation publish rate; message admit/dedup/drop rates; bounded-queue depths and overflow events; blob bytes uploaded/downloaded with throughput EWMA. - Latency histograms for: local enqueue, replica apply, remote delivery, callback dispatch, blob round-trip, message propagation. Use t-digest or HDR-style buckets so percentiles (p50/p90/p99) are stable. diff --git a/inc/cvc/state_distributed_admin.h b/inc/cvc/state_distributed_admin.h index b3f4fc0..c052394 100644 --- a/inc/cvc/state_distributed_admin.h +++ b/inc/cvc/state_distributed_admin.h @@ -24,6 +24,7 @@ class state_blob_store; class state_cluster_shard; class state_message_bus; class state_peer_registry; +class state_telemetry_aggregator; // ---------------- // cvc::state_distributed_admin @@ -123,11 +124,13 @@ class state_distributed_admin { void attach_peer_registry(state_peer_registry *peers) noexcept; void attach_blob_store(state_blob_store *blobs) noexcept; void attach_message_bus(state_message_bus *bus) noexcept; + void attach_telemetry(state_telemetry_aggregator *tel) noexcept; state_cluster_shard *shard() const noexcept { return _shard; } state_peer_registry *peer_registry() const noexcept { return _peers; } state_blob_store *blob_store() const noexcept { return _blobs; } state_message_bus *message_bus() const noexcept { return _bus; } + state_telemetry_aggregator *telemetry() const noexcept { return _tel; } // Capture a coherent snapshot of all attached subsystems. Each // sub-snapshot is taken independently; callers should not assume @@ -139,8 +142,9 @@ class state_distributed_admin { // stable. Detached subsystems are listed as ": detached". static std::string to_text(const report &r); - // Convenience: snapshot() + to_text(). - std::string to_text() const { return to_text(snapshot()); } + // Convenience: snapshot() + to_text(). If a telemetry + // aggregator is attached, its summary is appended. + std::string to_text() const; // Erase every blob in the attached blob store whose digest is NOT // in `live_digests`. Returns counts and bytes freed. If no blob @@ -244,6 +248,7 @@ class state_distributed_admin { state_peer_registry *_peers = nullptr; state_blob_store *_blobs = nullptr; state_message_bus *_bus = nullptr; + state_telemetry_aggregator *_tel = nullptr; }; } // namespace CVC_NAMESPACE diff --git a/inc/cvc/state_node_telemetry.h b/inc/cvc/state_node_telemetry.h new file mode 100644 index 0000000..ad9a114 --- /dev/null +++ b/inc/cvc/state_node_telemetry.h @@ -0,0 +1,253 @@ +/* + Copyright 2026 The University of Texas at Austin + + This file is part of libcvc. + + libcvc is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. +*/ + +#ifndef __CVC_STATE_NODE_TELEMETRY_H__ +#define __CVC_STATE_NODE_TELEMETRY_H__ + +#include +#include +#include +#include +#include +#include +#include + +namespace CVC_NAMESPACE { + +class app; +class state_cluster_shard; +class state_message_bus; +class state_transport; + +// ---------------- +// cvc::ewma +// ---------------- +// Exponentially-weighted moving average with configurable +// half-life. Thread-safe via double-wide CAS on 64-bit platforms. +// +struct ewma { + // Construct with a half-life in nanoseconds. The default 1 s + // means that a sample taken 1 second after the previous one + // moves the EWMA halfway toward the new value. + explicit ewma(std::uint64_t half_life_ns = 1'000'000'000ULL) noexcept; + + // Record a new sample at `now_ns` (monotonic nanoseconds). + void update(double sample, std::uint64_t now_ns) noexcept; + + // Current smoothed value. + double value() const noexcept; + + // Reset to zero. + void reset() noexcept; + + std::uint64_t half_life_ns() const noexcept { return _half_life_ns; } + +private: + std::uint64_t _half_life_ns; + mutable std::mutex _mu; + double _value = 0.0; + std::uint64_t _last_ns = 0; + bool _initialized = false; +}; + +// ---------------- +// cvc::latency_histogram +// ---------------- +// Fixed-bucket histogram for latency values in nanoseconds. +// Bucket boundaries are powers of two from 2^10 (1 µs) to +// 2^30 (1.07 s), plus an overflow bucket. +// +// Provides O(1) record and O(bucket_count) percentile queries. +// +struct latency_histogram { + // 21 buckets: [0, 1µs), [1µs, 2µs), [2µs, 4µs), ... + // [512ms, 1.07s), [1.07s, ∞) + static constexpr std::size_t BUCKET_COUNT = 22; + + void record(std::uint64_t latency_ns) noexcept; + std::uint64_t count() const noexcept; + + // Returns the bucket boundary that the given percentile (0.0–1.0) + // falls into, or 0 if no samples have been recorded. + std::uint64_t percentile(double p) const noexcept; + + // p50 / p90 / p99 convenience. + std::uint64_t p50() const noexcept { return percentile(0.50); } + std::uint64_t p90() const noexcept { return percentile(0.90); } + std::uint64_t p99() const noexcept { return percentile(0.99); } + + void reset() noexcept; + + // Raw bucket access for serialization. + std::uint64_t bucket(std::size_t i) const noexcept; + +private: + std::atomic _buckets[BUCKET_COUNT] = {}; + static std::size_t bucket_index(std::uint64_t ns) noexcept; +}; + +// ---------------- +// cvc::telemetry_snapshot +// ---------------- +// POD snapshot of a single node's telemetry at a point in time. +// Cheap to copy and safe to serialize. +// +struct telemetry_snapshot { + std::string node_id; + std::string cluster_id; + std::uint64_t timestamp_ns = 0; // monotonic clock + + // --- Counters (cumulative) --- + std::uint64_t mutations_published = 0; + std::uint64_t mutations_applied = 0; + std::uint64_t mutations_duplicates = 0; + std::uint64_t mutations_rejected = 0; + std::uint64_t mutations_conflicts = 0; + std::uint64_t messages_admitted = 0; + std::uint64_t messages_duplicates = 0; + std::uint64_t messages_dispatched = 0; + std::uint64_t messages_dropped = 0; + std::uint64_t bytes_sent = 0; + std::uint64_t bytes_received = 0; + std::uint64_t blobs_stored = 0; + std::uint64_t blob_bytes = 0; + std::uint64_t delegations_routed = 0; + std::uint64_t delegations_expired = 0; + std::uint64_t quarantined_mutations = 0; + std::uint64_t quarantined_messages = 0; + std::uint64_t auto_isolations = 0; + + // --- Rates (EWMA, per second) --- + double mutation_publish_rate = 0.0; + double mutation_apply_rate = 0.0; + double message_admit_rate = 0.0; + + // --- Queue depths --- + std::uint64_t outbox_depth = 0; + std::uint64_t outbox_drops = 0; + + // --- Tree shape --- + std::uint64_t path_count = 0; + std::uint64_t link_count = 0; + + // --- Latency percentiles (ns) --- + std::uint64_t enqueue_p50 = 0; + std::uint64_t enqueue_p90 = 0; + std::uint64_t enqueue_p99 = 0; + std::uint64_t delivery_p50 = 0; + std::uint64_t delivery_p90 = 0; + std::uint64_t delivery_p99 = 0; + + // --- Peer count --- + std::uint64_t peer_count = 0; + std::uint64_t slow_peer_count = 0; +}; + +// ---------------- +// cvc::state_node_telemetry +// ---------------- +// Phase 9: per-node telemetry sampler. +// +// Collects counters, EWMAs, and latency histograms from attached +// subsystems (shard, transport, message bus, blob store) and +// produces a telemetry_snapshot on demand. Also supports periodic +// publishing of snapshots onto the OOB message bus for distribution +// to other cluster nodes. +// +// Typical usage: +// state_node_telemetry tel(a, "node-1", "alpha"); +// tel.attach_shard(shard); +// tel.attach_transport(transport); +// tel.attach_message_bus(bus); +// // ... on a timer: +// tel.sample(); +// tel.publish_delta(); +// // ... for local inspection: +// auto snap = tel.snapshot(); +// +// Threading: +// sample() is not thread-safe with itself (call from one thread +// or one timer). snapshot() is safe concurrent with sample(). +// record_*_latency() are lock-free (atomic histogram buckets). +// +class state_node_telemetry { +public: + state_node_telemetry(app &ctx, std::string node_id, std::string cluster_id); + ~state_node_telemetry(); + + state_node_telemetry(const state_node_telemetry &) = delete; + state_node_telemetry &operator=(const state_node_telemetry &) = delete; + + // Subsystem attachment (non-owning pointers). + void attach_shard(state_cluster_shard *s) noexcept; + void attach_transport(state_transport *t) noexcept; + void attach_message_bus(state_message_bus *b) noexcept; + + // Capture counters from attached subsystems and update EWMAs. + // Call this periodically (e.g. once per second). + void sample(); + + // Return a snapshot of the latest sampled state. + telemetry_snapshot snapshot() const; + + // Record a latency measurement. Callers in the hot path should + // bracket their operation with steady_clock and call these. + void record_enqueue_latency(std::uint64_t ns) noexcept; + void record_delivery_latency(std::uint64_t ns) noexcept; + + // Serialize the latest snapshot into a state_message suitable + // for publishing on the OOB bus. Uses JSON for debug builds, + // a compact binary format for release. + static constexpr const char *MIME_TELEMETRY = "application/x-cvc-telemetry+json"; + static constexpr const char *TOPIC_PREFIX = "__telemetry."; + + // Publish the latest snapshot as an OOB message on the attached + // bus (if any). Returns true if the message was admitted. + bool publish_snapshot(); + + // Serialize / deserialize a snapshot to/from JSON. + static std::string serialize_json(const telemetry_snapshot &snap); + static bool deserialize_json(const std::string &json, telemetry_snapshot &out); + + const std::string &node_id() const noexcept { return _node_id; } + const std::string &cluster_id() const noexcept { return _cluster_id; } + +private: + app &_ctx; + std::string _node_id; + std::string _cluster_id; + + state_cluster_shard *_shard = nullptr; + state_transport *_transport = nullptr; + state_message_bus *_bus = nullptr; + + // Rate EWMAs (1 s half-life). + ewma _mutation_publish_rate; + ewma _mutation_apply_rate; + ewma _message_admit_rate; + + // Latency histograms. + latency_histogram _enqueue_hist; + latency_histogram _delivery_hist; + + // Previous counter values for rate computation. + std::uint64_t _prev_mutations_published = 0; + std::uint64_t _prev_mutations_applied = 0; + std::uint64_t _prev_messages_admitted = 0; + std::uint64_t _prev_sample_ns = 0; + std::uint64_t _publish_seq = 0; + + mutable std::mutex _snap_mu; + telemetry_snapshot _latest; +}; + +} // namespace CVC_NAMESPACE + +#endif // __CVC_STATE_NODE_TELEMETRY_H__ diff --git a/inc/cvc/state_telemetry_aggregator.h b/inc/cvc/state_telemetry_aggregator.h new file mode 100644 index 0000000..be0b518 --- /dev/null +++ b/inc/cvc/state_telemetry_aggregator.h @@ -0,0 +1,172 @@ +/* + Copyright 2026 The University of Texas at Austin + + This file is part of libcvc. + + libcvc is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. +*/ + +#ifndef __CVC_STATE_TELEMETRY_AGGREGATOR_H__ +#define __CVC_STATE_TELEMETRY_AGGREGATOR_H__ + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace CVC_NAMESPACE { + +class state_message; +class state_message_bus; + +// ---------------- +// cvc::routing_feedback_policy +// ---------------- +// Thresholds for telemetry-driven slow-peer isolation. +// +struct routing_feedback_policy { + // Isolate a peer whose delivery latency p99 exceeds this (ns). + // 0 = disabled. + std::uint64_t latency_p99_threshold_ns = 0; + + // Isolate a peer whose outbox drop count exceeds this per sample. + // 0 = disabled. + std::uint64_t outbox_drop_threshold = 0; + + // Minimum number of samples before a peer can be isolated + // (prevents acting on startup noise). + std::uint64_t min_samples = 3; +}; + +// ---------------- +// cvc::cluster_telemetry_summary +// ---------------- +// Aggregated view of the cluster derived from per-node snapshots. +// +struct cluster_telemetry_summary { + std::uint64_t timestamp_ns = 0; + std::size_t node_count = 0; + std::size_t stale_count = 0; // nodes not heard from recently + + // Aggregate counters (sum across nodes). + std::uint64_t total_mutations_published = 0; + std::uint64_t total_mutations_applied = 0; + std::uint64_t total_messages_admitted = 0; + std::uint64_t total_bytes_sent = 0; + std::uint64_t total_bytes_received = 0; + std::uint64_t total_blobs_stored = 0; + std::uint64_t total_blob_bytes = 0; + std::uint64_t total_path_count = 0; + std::uint64_t total_link_count = 0; + + // Worst-case latencies across all nodes. + std::uint64_t max_enqueue_p99 = 0; + std::uint64_t max_delivery_p99 = 0; + + // Aggregate rates (sum across nodes). + double cluster_mutation_publish_rate = 0.0; + double cluster_mutation_apply_rate = 0.0; + double cluster_message_admit_rate = 0.0; +}; + +// ---------------- +// cvc::state_telemetry_aggregator +// ---------------- +// Phase 9: cluster-level telemetry aggregator. +// +// Consumes telemetry_snapshot messages from the OOB bus and +// maintains a latest-snapshot-per-node map. On demand, produces +// a cluster_telemetry_summary by aggregating across all known +// nodes. +// +// Multiple aggregators may run in the same cluster; results are +// eventually consistent. Stale detection: a node whose latest +// snapshot is older than 3 × publish_interval is marked stale. +// +// Typical usage: +// state_telemetry_aggregator agg("alpha"); +// agg.attach_bus(bus); // auto-subscribes to __telemetry.* +// // ... on demand: +// auto summary = agg.summarize(); +// for (auto& [id, snap] : agg.peer_snapshots()) { ... } +// +class state_telemetry_aggregator { +public: + // `stale_threshold_ns` defaults to 3 × 1 s = 3 s. + explicit state_telemetry_aggregator(std::string cluster_id, + std::uint64_t stale_threshold_ns = 3'000'000'000ULL); + + ~state_telemetry_aggregator(); + + state_telemetry_aggregator(const state_telemetry_aggregator &) = delete; + state_telemetry_aggregator &operator=(const state_telemetry_aggregator &) = delete; + + // Attach to a message bus and subscribe to __telemetry.* messages. + // Returns true if subscription was installed successfully. + bool attach_bus(state_message_bus &bus); + + // Detach from the bus (unsubscribe). + void detach_bus(); + + // Ingest a telemetry snapshot directly (e.g. from local node + // or from a deserialized message). Thread-safe. + void ingest(const telemetry_snapshot &snap); + + // Produce a cluster-wide summary from all known peer snapshots. + cluster_telemetry_summary summarize() const; + + // Return a copy of all known peer snapshots. + std::unordered_map peer_snapshots() const; + + // Number of known peers (including stale). + std::size_t peer_count() const; + + // Number of peers whose latest snapshot is older than the stale + // threshold. + std::size_t stale_count() const; + + // Mark a peer as no longer tracked (e.g. after explicit removal). + bool remove_peer(const std::string &node_id); + + // Clear all peer snapshots. + void clear(); + + // Human-readable cluster summary text. + std::string to_text() const; + + // Phase 9 routing feedback: evaluate peers against the policy + // and return lists of node_ids that should be isolated or + // released. The caller is responsible for mapping node_ids to + // transport-level peer handles and calling mark_peer_slow / + // clear_peer_slow. + struct routing_feedback_result { + std::vector isolate; // node_ids to mark slow + std::vector release; // node_ids to clear slow + }; + routing_feedback_result evaluate_routing_feedback(const routing_feedback_policy &policy) const; + + const std::string &cluster_id() const noexcept { return _cluster_id; } + std::uint64_t stale_threshold_ns() const noexcept { return _stale_threshold_ns; } + +private: + void on_message(const state_message &msg); + + std::string _cluster_id; + std::uint64_t _stale_threshold_ns; + + state_message_bus *_bus = nullptr; + std::uint64_t _sub_id = 0; + + mutable std::mutex _mu; + std::unordered_map _peers; +}; + +} // namespace CVC_NAMESPACE + +#endif // __CVC_STATE_TELEMETRY_AGGREGATOR_H__ diff --git a/src/cvc/CMakeLists.txt b/src/cvc/CMakeLists.txt index 2e2513d..0ee4ab1 100644 --- a/src/cvc/CMakeLists.txt +++ b/src/cvc/CMakeLists.txt @@ -44,6 +44,8 @@ set(INCLUDE_FILES ../../inc/cvc/state_volume_codec.h ../../inc/cvc/state_brick_manifest.h ../../inc/cvc/state_data_hydrator.h + ../../inc/cvc/state_node_telemetry.h + ../../inc/cvc/state_telemetry_aggregator.h ../../inc/cvc/state_cluster_membership.h ../../inc/cvc/state_hybrid_time.h ../../inc/cvc/state_hash_partition.h @@ -103,6 +105,8 @@ set(SOURCE_FILES state_volume_codec.cpp state_brick_manifest.cpp state_data_hydrator.cpp + state_node_telemetry.cpp + state_telemetry_aggregator.cpp state_cluster_membership.cpp distributed_state_session.cpp state_message.cpp diff --git a/src/cvc/state_distributed_admin.cpp b/src/cvc/state_distributed_admin.cpp index de3868a..c1d204c 100644 --- a/src/cvc/state_distributed_admin.cpp +++ b/src/cvc/state_distributed_admin.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,18 @@ void state_distributed_admin::attach_blob_store(state_blob_store *blobs) noexcep void state_distributed_admin::attach_message_bus(state_message_bus *bus) noexcept { _bus = bus; } +void state_distributed_admin::attach_telemetry(state_telemetry_aggregator *tel) noexcept { + _tel = tel; +} + +std::string state_distributed_admin::to_text() const { + std::string result = to_text(snapshot()); + if (_tel) { + result += _tel->to_text(); + } + return result; +} + state_distributed_admin::report state_distributed_admin::snapshot() const { report r; diff --git a/src/cvc/state_node_telemetry.cpp b/src/cvc/state_node_telemetry.cpp new file mode 100644 index 0000000..54e8941 --- /dev/null +++ b/src/cvc/state_node_telemetry.cpp @@ -0,0 +1,420 @@ +/* + Copyright 2026 The University of Texas at Austin + + This file is part of libcvc. + + libcvc is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef _MSC_VER +#include +#endif +#include +#include +#include +#include + +namespace CVC_NAMESPACE { + +// --------------------------------------------------------------- +// ewma +// --------------------------------------------------------------- + +ewma::ewma(std::uint64_t half_life_ns) noexcept : _half_life_ns(half_life_ns) {} + +void ewma::update(double sample, std::uint64_t now_ns) noexcept { + std::lock_guard lk(_mu); + if (!_initialized) { + _value = sample; + _last_ns = now_ns; + _initialized = true; + return; + } + if (now_ns <= _last_ns) { + _value = sample; + return; + } + double dt = static_cast(now_ns - _last_ns); + double alpha = 1.0 - std::exp(-dt * 0.693147180559945 / static_cast(_half_life_ns)); + _value += alpha * (sample - _value); + _last_ns = now_ns; +} + +double ewma::value() const noexcept { + std::lock_guard lk(_mu); + return _value; +} + +void ewma::reset() noexcept { + std::lock_guard lk(_mu); + _value = 0.0; + _last_ns = 0; + _initialized = false; +} + +// --------------------------------------------------------------- +// latency_histogram +// --------------------------------------------------------------- + +// Bucket boundaries: [0, 1024), [1024, 2048), [2048, 4096), ... +// The last bucket catches everything >= 2^30 (~1.07 s). +std::size_t latency_histogram::bucket_index(std::uint64_t ns) noexcept { + if (ns < 1024) + return 0; + // floor(log2(ns)) - 9, clamped to BUCKET_COUNT - 1. +#ifdef _MSC_VER + unsigned long idx_msb = 0; + _BitScanReverse64(&idx_msb, ns); + unsigned log2_floor = static_cast(idx_msb); +#else + unsigned leading = static_cast(__builtin_clzll(ns)); + unsigned log2_floor = 63u - leading; +#endif + std::size_t idx = static_cast(log2_floor - 9); + return idx < BUCKET_COUNT - 1 ? idx : BUCKET_COUNT - 1; +} + +void latency_histogram::record(std::uint64_t latency_ns) noexcept { + _buckets[bucket_index(latency_ns)].fetch_add(1, std::memory_order_relaxed); +} + +std::uint64_t latency_histogram::count() const noexcept { + std::uint64_t total = 0; + for (std::size_t i = 0; i < BUCKET_COUNT; ++i) + total += _buckets[i].load(std::memory_order_relaxed); + return total; +} + +std::uint64_t latency_histogram::percentile(double p) const noexcept { + std::uint64_t total = count(); + if (total == 0) + return 0; + std::uint64_t target = static_cast(static_cast(total) * p); + if (target >= total) + target = total - 1; + std::uint64_t cumulative = 0; + for (std::size_t i = 0; i < BUCKET_COUNT; ++i) { + cumulative += _buckets[i].load(std::memory_order_relaxed); + if (cumulative > target) { + // Return the upper boundary of this bucket. + if (i == 0) + return 1024ULL; + if (i >= BUCKET_COUNT - 1) + return 1ULL << 30; + return 1ULL << (static_cast(i) + 10); + } + } + return 1ULL << 30; +} + +void latency_histogram::reset() noexcept { + for (std::size_t i = 0; i < BUCKET_COUNT; ++i) + _buckets[i].store(0, std::memory_order_relaxed); +} + +std::uint64_t latency_histogram::bucket(std::size_t i) const noexcept { + if (i >= BUCKET_COUNT) + return 0; + return _buckets[i].load(std::memory_order_relaxed); +} + +// --------------------------------------------------------------- +// state_node_telemetry +// --------------------------------------------------------------- + +static std::uint64_t now_ns() { + using namespace std::chrono; + return static_cast( + duration_cast(steady_clock::now().time_since_epoch()).count()); +} + +state_node_telemetry::state_node_telemetry(app &ctx, std::string node_id, std::string cluster_id) + : _ctx(ctx), _node_id(std::move(node_id)), _cluster_id(std::move(cluster_id)) {} + +state_node_telemetry::~state_node_telemetry() = default; + +void state_node_telemetry::attach_shard(state_cluster_shard *s) noexcept { _shard = s; } +void state_node_telemetry::attach_transport(state_transport *t) noexcept { _transport = t; } +void state_node_telemetry::attach_message_bus(state_message_bus *b) noexcept { _bus = b; } + +void state_node_telemetry::record_enqueue_latency(std::uint64_t ns) noexcept { + _enqueue_hist.record(ns); +} + +void state_node_telemetry::record_delivery_latency(std::uint64_t ns) noexcept { + _delivery_hist.record(ns); +} + +void state_node_telemetry::sample() { + auto ts = now_ns(); + telemetry_snapshot snap; + snap.node_id = _node_id; + snap.cluster_id = _cluster_id; + snap.timestamp_ns = ts; + + // Shard counters. + if (_shard) { + snap.mutations_applied = _shard->total_remote_applied(); + snap.mutations_duplicates = _shard->total_remote_duplicates(); + snap.mutations_rejected = _shard->total_remote_rejected(); + snap.mutations_conflicts = _shard->total_conflicts_detected(); + snap.delegations_routed = _shard->total_delegation_routed(); + snap.delegations_expired = _shard->total_delegation_expired(); + } + + // Transport counters. + if (_transport) { + // Try to downcast to inproc for detailed counters. + auto *inproc = dynamic_cast(_transport); + if (inproc) { + snap.mutations_published = inproc->total_published(); + snap.bytes_sent = inproc->total_published(); // proxy: count-based + snap.bytes_received = inproc->total_delivered(); + snap.quarantined_mutations = inproc->total_quarantined_mutations(); + snap.quarantined_messages = inproc->total_quarantined_messages(); + snap.auto_isolations = inproc->total_auto_isolations(); + snap.outbox_drops = + inproc->total_outbox_dropped_newest() + inproc->total_outbox_dropped_oldest(); + snap.slow_peer_count = static_cast(inproc->slow_peers().size()); + } + snap.peer_count = static_cast(_transport->peers().size()); + } + + // Message bus counters. + if (_bus) { + snap.messages_admitted = _bus->total_admitted(); + snap.messages_duplicates = _bus->total_duplicates(); + snap.messages_dispatched = _bus->total_dispatched(); + snap.messages_dropped = _bus->total_dropped(); + } + + // Tree shape — count children of root. + try { + auto &root = state::instance(_ctx); + snap.path_count = static_cast(root.numChildren()); + } catch (...) { + } + + // Rate EWMAs. + if (_prev_sample_ns > 0 && ts > _prev_sample_ns) { + double dt_s = static_cast(ts - _prev_sample_ns) / 1e9; + if (dt_s > 0.0) { + double pub_delta = static_cast(snap.mutations_published - _prev_mutations_published); + _mutation_publish_rate.update(pub_delta / dt_s, ts); + + double app_delta = static_cast(snap.mutations_applied - _prev_mutations_applied); + _mutation_apply_rate.update(app_delta / dt_s, ts); + + double msg_delta = static_cast(snap.messages_admitted - _prev_messages_admitted); + _message_admit_rate.update(msg_delta / dt_s, ts); + } + } + _prev_mutations_published = snap.mutations_published; + _prev_mutations_applied = snap.mutations_applied; + _prev_messages_admitted = snap.messages_admitted; + _prev_sample_ns = ts; + + snap.mutation_publish_rate = _mutation_publish_rate.value(); + snap.mutation_apply_rate = _mutation_apply_rate.value(); + snap.message_admit_rate = _message_admit_rate.value(); + + // Latency histograms. + snap.enqueue_p50 = _enqueue_hist.p50(); + snap.enqueue_p90 = _enqueue_hist.p90(); + snap.enqueue_p99 = _enqueue_hist.p99(); + snap.delivery_p50 = _delivery_hist.p50(); + snap.delivery_p90 = _delivery_hist.p90(); + snap.delivery_p99 = _delivery_hist.p99(); + + { + std::lock_guard lk(_snap_mu); + _latest = snap; + } +} + +telemetry_snapshot state_node_telemetry::snapshot() const { + std::lock_guard lk(_snap_mu); + return _latest; +} + +// --------------------------------------------------------------- +// JSON serialization — lightweight, no external dependency +// --------------------------------------------------------------- + +static void json_kv(std::ostringstream &os, const char *key, std::uint64_t val, bool &first) { + if (!first) + os << ','; + os << '"' << key << '"' << ':' << val; + first = false; +} + +static void json_kv(std::ostringstream &os, const char *key, double val, bool &first) { + if (!first) + os << ','; + os << '"' << key << '"' << ':' << val; + first = false; +} + +static void json_kv(std::ostringstream &os, const char *key, const std::string &val, bool &first) { + if (!first) + os << ','; + os << '"' << key << "\":\"" << val << '"'; + first = false; +} + +std::string state_node_telemetry::serialize_json(const telemetry_snapshot &s) { + std::ostringstream os; + os << '{'; + bool f = true; + json_kv(os, "node_id", s.node_id, f); + json_kv(os, "cluster_id", s.cluster_id, f); + json_kv(os, "timestamp_ns", s.timestamp_ns, f); + json_kv(os, "mutations_published", s.mutations_published, f); + json_kv(os, "mutations_applied", s.mutations_applied, f); + json_kv(os, "mutations_duplicates", s.mutations_duplicates, f); + json_kv(os, "mutations_rejected", s.mutations_rejected, f); + json_kv(os, "mutations_conflicts", s.mutations_conflicts, f); + json_kv(os, "messages_admitted", s.messages_admitted, f); + json_kv(os, "messages_duplicates", s.messages_duplicates, f); + json_kv(os, "messages_dispatched", s.messages_dispatched, f); + json_kv(os, "messages_dropped", s.messages_dropped, f); + json_kv(os, "bytes_sent", s.bytes_sent, f); + json_kv(os, "bytes_received", s.bytes_received, f); + json_kv(os, "blobs_stored", s.blobs_stored, f); + json_kv(os, "blob_bytes", s.blob_bytes, f); + json_kv(os, "delegations_routed", s.delegations_routed, f); + json_kv(os, "delegations_expired", s.delegations_expired, f); + json_kv(os, "quarantined_mutations", s.quarantined_mutations, f); + json_kv(os, "quarantined_messages", s.quarantined_messages, f); + json_kv(os, "auto_isolations", s.auto_isolations, f); + json_kv(os, "mutation_publish_rate", s.mutation_publish_rate, f); + json_kv(os, "mutation_apply_rate", s.mutation_apply_rate, f); + json_kv(os, "message_admit_rate", s.message_admit_rate, f); + json_kv(os, "outbox_depth", s.outbox_depth, f); + json_kv(os, "outbox_drops", s.outbox_drops, f); + json_kv(os, "path_count", s.path_count, f); + json_kv(os, "link_count", s.link_count, f); + json_kv(os, "enqueue_p50", s.enqueue_p50, f); + json_kv(os, "enqueue_p90", s.enqueue_p90, f); + json_kv(os, "enqueue_p99", s.enqueue_p99, f); + json_kv(os, "delivery_p50", s.delivery_p50, f); + json_kv(os, "delivery_p90", s.delivery_p90, f); + json_kv(os, "delivery_p99", s.delivery_p99, f); + json_kv(os, "peer_count", s.peer_count, f); + json_kv(os, "slow_peer_count", s.slow_peer_count, f); + os << '}'; + return os.str(); +} + +// Minimal JSON parser for telemetry snapshots — extract key:value +// pairs from a flat JSON object. +static bool extract_u64(const std::string &json, const char *key, std::uint64_t &out) { + std::string needle = std::string("\"") + key + "\":"; + auto pos = json.find(needle); + if (pos == std::string::npos) + return false; + pos += needle.size(); + while (pos < json.size() && (json[pos] == ' ' || json[pos] == '"')) + ++pos; + char *end = nullptr; + out = std::strtoull(json.c_str() + pos, &end, 10); + return end != json.c_str() + pos; +} + +static bool extract_double(const std::string &json, const char *key, double &out) { + std::string needle = std::string("\"") + key + "\":"; + auto pos = json.find(needle); + if (pos == std::string::npos) + return false; + pos += needle.size(); + while (pos < json.size() && json[pos] == ' ') + ++pos; + char *end = nullptr; + out = std::strtod(json.c_str() + pos, &end); + return end != json.c_str() + pos; +} + +static bool extract_string(const std::string &json, const char *key, std::string &out) { + std::string needle = std::string("\"") + key + "\":\""; + auto pos = json.find(needle); + if (pos == std::string::npos) + return false; + pos += needle.size(); + auto end = json.find('"', pos); + if (end == std::string::npos) + return false; + out = json.substr(pos, end - pos); + return true; +} + +bool state_node_telemetry::deserialize_json(const std::string &json, telemetry_snapshot &s) { + if (json.empty() || json[0] != '{') + return false; + + extract_string(json, "node_id", s.node_id); + extract_string(json, "cluster_id", s.cluster_id); + extract_u64(json, "timestamp_ns", s.timestamp_ns); + extract_u64(json, "mutations_published", s.mutations_published); + extract_u64(json, "mutations_applied", s.mutations_applied); + extract_u64(json, "mutations_duplicates", s.mutations_duplicates); + extract_u64(json, "mutations_rejected", s.mutations_rejected); + extract_u64(json, "mutations_conflicts", s.mutations_conflicts); + extract_u64(json, "messages_admitted", s.messages_admitted); + extract_u64(json, "messages_duplicates", s.messages_duplicates); + extract_u64(json, "messages_dispatched", s.messages_dispatched); + extract_u64(json, "messages_dropped", s.messages_dropped); + extract_u64(json, "bytes_sent", s.bytes_sent); + extract_u64(json, "bytes_received", s.bytes_received); + extract_u64(json, "blobs_stored", s.blobs_stored); + extract_u64(json, "blob_bytes", s.blob_bytes); + extract_u64(json, "delegations_routed", s.delegations_routed); + extract_u64(json, "delegations_expired", s.delegations_expired); + extract_u64(json, "quarantined_mutations", s.quarantined_mutations); + extract_u64(json, "quarantined_messages", s.quarantined_messages); + extract_u64(json, "auto_isolations", s.auto_isolations); + extract_double(json, "mutation_publish_rate", s.mutation_publish_rate); + extract_double(json, "mutation_apply_rate", s.mutation_apply_rate); + extract_double(json, "message_admit_rate", s.message_admit_rate); + extract_u64(json, "outbox_depth", s.outbox_depth); + extract_u64(json, "outbox_drops", s.outbox_drops); + extract_u64(json, "path_count", s.path_count); + extract_u64(json, "link_count", s.link_count); + extract_u64(json, "enqueue_p50", s.enqueue_p50); + extract_u64(json, "enqueue_p90", s.enqueue_p90); + extract_u64(json, "enqueue_p99", s.enqueue_p99); + extract_u64(json, "delivery_p50", s.delivery_p50); + extract_u64(json, "delivery_p90", s.delivery_p90); + extract_u64(json, "delivery_p99", s.delivery_p99); + extract_u64(json, "peer_count", s.peer_count); + extract_u64(json, "slow_peer_count", s.slow_peer_count); + return !s.node_id.empty(); +} + +bool state_node_telemetry::publish_snapshot() { + if (!_bus) + return false; + auto snap = snapshot(); + std::string json = serialize_json(snap); + std::string path = std::string(TOPIC_PREFIX) + _cluster_id + "." + _node_id; + auto msg = state_message::make_text(path, json, MIME_TELEMETRY); + msg.cluster_id = _cluster_id; + msg.origin_node_id = _node_id; + msg.message_id = "tel-" + std::to_string(++_publish_seq); + msg.ttl_hops = 4; + return _bus->admit(msg); +} + +} // namespace CVC_NAMESPACE diff --git a/src/cvc/state_telemetry_aggregator.cpp b/src/cvc/state_telemetry_aggregator.cpp new file mode 100644 index 0000000..0a179ff --- /dev/null +++ b/src/cvc/state_telemetry_aggregator.cpp @@ -0,0 +1,193 @@ +/* + Copyright 2026 The University of Texas at Austin + + This file is part of libcvc. + + libcvc is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. +*/ + +#include +#include +#include +#include +#include +#include +#include + +namespace CVC_NAMESPACE { + +static std::uint64_t agg_now_ns() { + using namespace std::chrono; + return static_cast( + duration_cast(steady_clock::now().time_since_epoch()).count()); +} + +state_telemetry_aggregator::state_telemetry_aggregator(std::string cluster_id, + std::uint64_t stale_threshold_ns) + : _cluster_id(std::move(cluster_id)), _stale_threshold_ns(stale_threshold_ns) {} + +state_telemetry_aggregator::~state_telemetry_aggregator() { detach_bus(); } + +bool state_telemetry_aggregator::attach_bus(state_message_bus &bus) { + detach_bus(); + _bus = &bus; + std::string prefix = std::string(state_node_telemetry::TOPIC_PREFIX) + _cluster_id; + _sub_id = bus.subscribe(prefix, [this](const state_message &msg) { on_message(msg); }); + return _sub_id != 0; +} + +void state_telemetry_aggregator::detach_bus() { + if (_bus && _sub_id) { + _bus->unsubscribe(_sub_id); + _sub_id = 0; + } + _bus = nullptr; +} + +void state_telemetry_aggregator::on_message(const state_message &msg) { + if (msg.content_type != state_node_telemetry::MIME_TELEMETRY && + msg.effective_content_type() != state_node_telemetry::MIME_TELEMETRY) + return; + + telemetry_snapshot snap; + if (!state_node_telemetry::deserialize_json(msg.string_value, snap)) + return; + + ingest(snap); +} + +void state_telemetry_aggregator::ingest(const telemetry_snapshot &snap) { + std::lock_guard lk(_mu); + _peers[snap.node_id] = snap; +} + +cluster_telemetry_summary state_telemetry_aggregator::summarize() const { + std::lock_guard lk(_mu); + cluster_telemetry_summary sum; + sum.timestamp_ns = agg_now_ns(); + sum.node_count = _peers.size(); + + for (auto &[id, snap] : _peers) { + if (sum.timestamp_ns > snap.timestamp_ns && + (sum.timestamp_ns - snap.timestamp_ns) > _stale_threshold_ns) { + ++sum.stale_count; + } + sum.total_mutations_published += snap.mutations_published; + sum.total_mutations_applied += snap.mutations_applied; + sum.total_messages_admitted += snap.messages_admitted; + sum.total_bytes_sent += snap.bytes_sent; + sum.total_bytes_received += snap.bytes_received; + sum.total_blobs_stored += snap.blobs_stored; + sum.total_blob_bytes += snap.blob_bytes; + sum.total_path_count += snap.path_count; + sum.total_link_count += snap.link_count; + sum.max_enqueue_p99 = std::max(sum.max_enqueue_p99, snap.enqueue_p99); + sum.max_delivery_p99 = std::max(sum.max_delivery_p99, snap.delivery_p99); + sum.cluster_mutation_publish_rate += snap.mutation_publish_rate; + sum.cluster_mutation_apply_rate += snap.mutation_apply_rate; + sum.cluster_message_admit_rate += snap.message_admit_rate; + } + return sum; +} + +std::unordered_map +state_telemetry_aggregator::peer_snapshots() const { + std::lock_guard lk(_mu); + return _peers; +} + +std::size_t state_telemetry_aggregator::peer_count() const { + std::lock_guard lk(_mu); + return _peers.size(); +} + +std::size_t state_telemetry_aggregator::stale_count() const { + std::lock_guard lk(_mu); + auto now = agg_now_ns(); + std::size_t count = 0; + for (auto &[id, snap] : _peers) { + if (now > snap.timestamp_ns && (now - snap.timestamp_ns) > _stale_threshold_ns) + ++count; + } + return count; +} + +bool state_telemetry_aggregator::remove_peer(const std::string &node_id) { + std::lock_guard lk(_mu); + return _peers.erase(node_id) > 0; +} + +void state_telemetry_aggregator::clear() { + std::lock_guard lk(_mu); + _peers.clear(); +} + +std::string state_telemetry_aggregator::to_text() const { + auto sum = summarize(); + std::lock_guard lk(_mu); + + std::ostringstream os; + os << "[telemetry]\n"; + os << " cluster_id: " << _cluster_id << "\n"; + os << " nodes: " << sum.node_count << " (" << sum.stale_count << " stale)\n"; + os << " mutations_published: " << sum.total_mutations_published << "\n"; + os << " mutations_applied: " << sum.total_mutations_applied << "\n"; + os << " messages_admitted: " << sum.total_messages_admitted << "\n"; + os << " bytes_sent: " << sum.total_bytes_sent << "\n"; + os << " bytes_received: " << sum.total_bytes_received << "\n"; + os << " total_paths: " << sum.total_path_count << "\n"; + os << " total_links: " << sum.total_link_count << "\n"; + os << " max_enqueue_p99: " << sum.max_enqueue_p99 << " ns\n"; + os << " max_delivery_p99: " << sum.max_delivery_p99 << " ns\n"; + os << " cluster_mutation_rate: " << sum.cluster_mutation_publish_rate << " /s\n"; + os << " cluster_apply_rate: " << sum.cluster_mutation_apply_rate << " /s\n"; + os << " cluster_message_rate: " << sum.cluster_message_admit_rate << " /s\n"; + + for (auto &[id, snap] : _peers) { + os << " [peer " << id << "]\n"; + os << " cluster: " << snap.cluster_id << "\n"; + os << " mutations: pub=" << snap.mutations_published << " app=" << snap.mutations_applied + << " dup=" << snap.mutations_duplicates << " rej=" << snap.mutations_rejected << "\n"; + os << " messages: adm=" << snap.messages_admitted << " dup=" << snap.messages_duplicates + << " drop=" << snap.messages_dropped << "\n"; + os << " rates: pub=" << snap.mutation_publish_rate << "/s" + << " app=" << snap.mutation_apply_rate << "/s" << " msg=" << snap.message_admit_rate + << "/s\n"; + os << " latency: enq_p99=" << snap.enqueue_p99 << "ns" << " del_p99=" << snap.delivery_p99 + << "ns\n"; + os << " peers: " << snap.peer_count << " (slow=" << snap.slow_peer_count << ")\n"; + } + return os.str(); +} + +state_telemetry_aggregator::routing_feedback_result +state_telemetry_aggregator::evaluate_routing_feedback(const routing_feedback_policy &policy) const { + routing_feedback_result result; + std::lock_guard lk(_mu); + + for (auto &[id, snap] : _peers) { + bool should_isolate = false; + + if (policy.latency_p99_threshold_ns > 0 && + snap.delivery_p99 > policy.latency_p99_threshold_ns) { + should_isolate = true; + } + + if (policy.outbox_drop_threshold > 0 && snap.outbox_drops > policy.outbox_drop_threshold) { + should_isolate = true; + } + + if (should_isolate) { + result.isolate.push_back(id); + } else { + // Only suggest releasing if the peer was potentially isolated. + // Callers decide whether the peer is actually marked slow. + result.release.push_back(id); + } + } + return result; +} + +} // namespace CVC_NAMESPACE diff --git a/src/cvc/tests/CMakeLists.txt b/src/cvc/tests/CMakeLists.txt index 6e109da..fa5f898 100644 --- a/src/cvc/tests/CMakeLists.txt +++ b/src/cvc/tests/CMakeLists.txt @@ -47,6 +47,8 @@ add_executable(state_delta_codec_test state_delta_codec_test.cpp) add_executable(state_volume_codec_test state_volume_codec_test.cpp) add_executable(state_brick_manifest_test state_brick_manifest_test.cpp) add_executable(state_data_hydrator_test state_data_hydrator_test.cpp) +add_executable(state_node_telemetry_test state_node_telemetry_test.cpp) +add_executable(state_telemetry_aggregator_test state_telemetry_aggregator_test.cpp) add_executable(file_state_blob_store_test file_state_blob_store_test.cpp) add_executable(distributed_state_session_test distributed_state_session_test.cpp) add_executable(state_large_tree_bench_test state_large_tree_bench_test.cpp) @@ -103,6 +105,8 @@ set(TEST_TARGETS state_volume_codec_test state_brick_manifest_test state_data_hydrator_test + state_node_telemetry_test + state_telemetry_aggregator_test state_cluster_membership_test file_state_blob_store_test distributed_state_session_test @@ -467,6 +471,13 @@ target_link_libraries(state_volume_codec_test GTest::gtest_main ) +target_link_libraries(state_node_telemetry_test + PRIVATE + cvc + GTest::gtest + GTest::gtest_main +) + target_link_libraries(state_brick_manifest_test PRIVATE cvc @@ -481,6 +492,13 @@ target_link_libraries(state_data_hydrator_test GTest::gtest_main ) +target_link_libraries(state_telemetry_aggregator_test + PRIVATE + cvc + GTest::gtest + GTest::gtest_main +) + target_link_libraries(state_delta_codec_test PRIVATE cvc @@ -631,6 +649,8 @@ target_compile_features(state_delta_codec_test PRIVATE cxx_std_17) target_compile_features(state_volume_codec_test PRIVATE cxx_std_17) target_compile_features(state_brick_manifest_test PRIVATE cxx_std_17) target_compile_features(state_data_hydrator_test PRIVATE cxx_std_17) +target_compile_features(state_node_telemetry_test PRIVATE cxx_std_17) +target_compile_features(state_telemetry_aggregator_test PRIVATE cxx_std_17) target_compile_features(file_state_blob_store_test PRIVATE cxx_std_17) target_compile_features(distributed_state_session_test PRIVATE cxx_std_17) target_compile_features(state_large_tree_bench_test PRIVATE cxx_std_17) @@ -740,6 +760,8 @@ gtest_discover_tests(state_delta_codec_test) gtest_discover_tests(state_volume_codec_test) gtest_discover_tests(state_brick_manifest_test) gtest_discover_tests(state_data_hydrator_test) +gtest_discover_tests(state_node_telemetry_test) +gtest_discover_tests(state_telemetry_aggregator_test) gtest_discover_tests(file_state_blob_store_test) gtest_discover_tests(distributed_state_session_test) gtest_discover_tests(state_large_tree_bench_test) diff --git a/src/cvc/tests/state_node_telemetry_test.cpp b/src/cvc/tests/state_node_telemetry_test.cpp new file mode 100644 index 0000000..9f950e3 --- /dev/null +++ b/src/cvc/tests/state_node_telemetry_test.cpp @@ -0,0 +1,280 @@ +/* + Copyright 2026 The University of Texas at Austin + + This file is part of libcvc. + + libcvc is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace cvc; + +// ===================================================================== +// ewma +// ===================================================================== + +TEST(EwmaTest, InitializesToFirstSample) { + ewma e(1'000'000'000ULL); // 1 s half-life + e.update(42.0, 1'000'000'000ULL); + EXPECT_DOUBLE_EQ(e.value(), 42.0); +} + +TEST(EwmaTest, DecaysOverTime) { + ewma e(1'000'000'000ULL); + e.update(100.0, 1'000'000'000ULL); + // After one half-life, a new sample of 0 should move the EWMA + // halfway: ~50. + e.update(0.0, 2'000'000'000ULL); + double v = e.value(); + EXPECT_GT(v, 45.0); + EXPECT_LT(v, 55.0); +} + +TEST(EwmaTest, ConvergesAfterManySamples) { + ewma e(100'000'000ULL); // 100 ms half-life + for (int i = 0; i < 100; ++i) { + e.update(10.0, static_cast(i + 1) * 100'000'000ULL); + } + EXPECT_NEAR(e.value(), 10.0, 0.1); +} + +TEST(EwmaTest, ResetClearsState) { + ewma e; + e.update(100.0, 1'000'000'000ULL); + e.reset(); + EXPECT_DOUBLE_EQ(e.value(), 0.0); + e.update(5.0, 2'000'000'000ULL); + EXPECT_DOUBLE_EQ(e.value(), 5.0); +} + +// ===================================================================== +// latency_histogram +// ===================================================================== + +TEST(LatencyHistogramTest, RecordAndCount) { + latency_histogram h; + h.record(500); // < 1 µs bucket + h.record(2000); // ~ 2 µs bucket + h.record(50000); // ~ 32 µs bucket + EXPECT_EQ(h.count(), 3u); +} + +TEST(LatencyHistogramTest, PercentilesMonotonic) { + latency_histogram h; + // Fill with a spread of values. + for (int i = 0; i < 1000; ++i) { + h.record(static_cast(i) * 1000); + } + EXPECT_LE(h.p50(), h.p90()); + EXPECT_LE(h.p90(), h.p99()); +} + +TEST(LatencyHistogramTest, EmptyHistogramReturnsZero) { + latency_histogram h; + EXPECT_EQ(h.p50(), 0u); + EXPECT_EQ(h.p99(), 0u); + EXPECT_EQ(h.count(), 0u); +} + +TEST(LatencyHistogramTest, SingleValueInFirstBucket) { + latency_histogram h; + h.record(500); // < 1024 → bucket 0 + EXPECT_EQ(h.count(), 1u); + EXPECT_EQ(h.p50(), 1024u); // upper boundary of bucket 0 +} + +TEST(LatencyHistogramTest, ResetClearsAll) { + latency_histogram h; + for (int i = 0; i < 100; ++i) + h.record(static_cast(i) * 10000); + EXPECT_GT(h.count(), 0u); + h.reset(); + EXPECT_EQ(h.count(), 0u); +} + +// ===================================================================== +// telemetry_snapshot serialization +// ===================================================================== + +TEST(TelemetrySnapshotTest, JsonRoundTrip) { + telemetry_snapshot orig; + orig.node_id = "node1"; + orig.cluster_id = "alpha"; + orig.timestamp_ns = 123456789; + orig.mutations_published = 100; + orig.mutations_applied = 95; + orig.mutations_duplicates = 3; + orig.mutations_rejected = 2; + orig.mutations_conflicts = 1; + orig.messages_admitted = 50; + orig.messages_dispatched = 48; + orig.messages_dropped = 2; + orig.bytes_sent = 10000; + orig.bytes_received = 9500; + orig.peer_count = 5; + orig.slow_peer_count = 1; + orig.mutation_publish_rate = 42.5; + orig.enqueue_p99 = 8192; + orig.delivery_p99 = 16384; + + std::string json = state_node_telemetry::serialize_json(orig); + EXPECT_FALSE(json.empty()); + EXPECT_NE(json.find("node1"), std::string::npos); + + telemetry_snapshot parsed; + EXPECT_TRUE(state_node_telemetry::deserialize_json(json, parsed)); + EXPECT_EQ(parsed.node_id, "node1"); + EXPECT_EQ(parsed.cluster_id, "alpha"); + EXPECT_EQ(parsed.timestamp_ns, 123456789u); + EXPECT_EQ(parsed.mutations_published, 100u); + EXPECT_EQ(parsed.mutations_applied, 95u); + EXPECT_EQ(parsed.peer_count, 5u); + EXPECT_EQ(parsed.slow_peer_count, 1u); + EXPECT_NEAR(parsed.mutation_publish_rate, 42.5, 0.01); + EXPECT_EQ(parsed.enqueue_p99, 8192u); +} + +TEST(TelemetrySnapshotTest, DeserializeRejectsEmpty) { + telemetry_snapshot snap; + EXPECT_FALSE(state_node_telemetry::deserialize_json("", snap)); + EXPECT_FALSE(state_node_telemetry::deserialize_json("not json", snap)); +} + +// ===================================================================== +// state_node_telemetry +// ===================================================================== + +TEST(StateNodeTelemetryTest, SampleWithShard) { + app a; + state_cluster_shard shard(a, "alpha", "node1"); + shard.attach(); + + state_node_telemetry tel(a, "node1", "alpha"); + tel.attach_shard(&shard); + + tel.sample(); + auto snap = tel.snapshot(); + EXPECT_EQ(snap.node_id, "node1"); + EXPECT_EQ(snap.cluster_id, "alpha"); + EXPECT_GT(snap.timestamp_ns, 0u); +} + +TEST(StateNodeTelemetryTest, SampleWithTransport) { + app a; + state_cluster_shard shard(a, "alpha", "node1"); + shard.attach(); + + state_transport_inproc transport; + transport.register_shard(&shard); + + state_node_telemetry tel(a, "node1", "alpha"); + tel.attach_shard(&shard); + tel.attach_transport(&transport); + + tel.sample(); + auto snap = tel.snapshot(); + EXPECT_EQ(snap.peer_count, 0u); // 1 shard but no peers registered +} + +TEST(StateNodeTelemetryTest, SampleWithBus) { + app a; + state_message_bus bus; + + state_node_telemetry tel(a, "node1", "alpha"); + tel.attach_message_bus(&bus); + + // Admit a message to bump the counter. + auto msg = state_message::make_text("test.path", "hello"); + msg.origin_node_id = "remote"; + msg.message_id = "m1"; + bus.admit(msg); + + tel.sample(); + auto snap = tel.snapshot(); + EXPECT_EQ(snap.messages_admitted, 1u); +} + +TEST(StateNodeTelemetryTest, RecordLatency) { + app a; + state_node_telemetry tel(a, "node1", "alpha"); + + for (int i = 0; i < 100; ++i) { + tel.record_enqueue_latency(5000); // 5 µs + tel.record_delivery_latency(20000); // 20 µs + } + + tel.sample(); + auto snap = tel.snapshot(); + EXPECT_GT(snap.enqueue_p50, 0u); + EXPECT_GT(snap.delivery_p50, 0u); +} + +TEST(StateNodeTelemetryTest, RateComputation) { + app a; + state_cluster_shard shard(a, "alpha", "node1"); + shard.attach(); + + state_transport_inproc transport; + transport.register_shard(&shard); + + state_node_telemetry tel(a, "node1", "alpha"); + tel.attach_shard(&shard); + tel.attach_transport(&transport); + + // First sample establishes baseline. + tel.sample(); + auto snap1 = tel.snapshot(); + EXPECT_DOUBLE_EQ(snap1.mutation_publish_rate, 0.0); + + // Wait a tiny bit and sample again — rates should remain 0 + // since no mutations were published. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + tel.sample(); + auto snap2 = tel.snapshot(); + EXPECT_NEAR(snap2.mutation_publish_rate, 0.0, 1.0); +} + +TEST(StateNodeTelemetryTest, PublishSnapshot) { + app a; + state_message_bus bus; + + state_node_telemetry tel(a, "node1", "alpha"); + tel.attach_message_bus(&bus); + + // Subscribe to catch the telemetry message. + bool received = false; + std::string received_json; + bus.subscribe("__telemetry", [&](const state_message &msg) { + received = true; + received_json = msg.string_value; + }); + + tel.sample(); + EXPECT_TRUE(tel.publish_snapshot()); + EXPECT_TRUE(received); + EXPECT_FALSE(received_json.empty()); + + // Parse the received JSON. + telemetry_snapshot parsed; + EXPECT_TRUE(state_node_telemetry::deserialize_json(received_json, parsed)); + EXPECT_EQ(parsed.node_id, "node1"); + EXPECT_EQ(parsed.cluster_id, "alpha"); +} + +TEST(StateNodeTelemetryTest, PublishWithoutBusReturnsFalse) { + app a; + state_node_telemetry tel(a, "node1", "alpha"); + tel.sample(); + EXPECT_FALSE(tel.publish_snapshot()); +} diff --git a/src/cvc/tests/state_telemetry_aggregator_test.cpp b/src/cvc/tests/state_telemetry_aggregator_test.cpp new file mode 100644 index 0000000..0c1da5e --- /dev/null +++ b/src/cvc/tests/state_telemetry_aggregator_test.cpp @@ -0,0 +1,287 @@ +/* + Copyright 2026 The University of Texas at Austin + + This file is part of libcvc. + + libcvc is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace cvc; + +// ===================================================================== +// state_telemetry_aggregator +// ===================================================================== + +TEST(TelemetryAggregatorTest, IngestAndSummarize) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s1; + s1.node_id = "node1"; + s1.cluster_id = "alpha"; + s1.timestamp_ns = 1'000'000'000; + s1.mutations_published = 100; + s1.mutations_applied = 90; + s1.bytes_sent = 5000; + + telemetry_snapshot s2; + s2.node_id = "node2"; + s2.cluster_id = "alpha"; + s2.timestamp_ns = 1'000'000'000; + s2.mutations_published = 200; + s2.mutations_applied = 180; + s2.bytes_sent = 8000; + + agg.ingest(s1); + agg.ingest(s2); + + EXPECT_EQ(agg.peer_count(), 2u); + + auto sum = agg.summarize(); + EXPECT_EQ(sum.node_count, 2u); + EXPECT_EQ(sum.total_mutations_published, 300u); + EXPECT_EQ(sum.total_mutations_applied, 270u); + EXPECT_EQ(sum.total_bytes_sent, 13000u); +} + +TEST(TelemetryAggregatorTest, StaleDetection) { + // Use a very short stale threshold for testing. + state_telemetry_aggregator agg("alpha", 50'000'000ULL); // 50 ms + + telemetry_snapshot s; + s.node_id = "old-node"; + s.cluster_id = "alpha"; + // Set timestamp far in the past. + s.timestamp_ns = 1; + agg.ingest(s); + + EXPECT_EQ(agg.stale_count(), 1u); + + auto sum = agg.summarize(); + EXPECT_EQ(sum.stale_count, 1u); +} + +TEST(TelemetryAggregatorTest, RemovePeer) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s; + s.node_id = "node1"; + s.cluster_id = "alpha"; + s.timestamp_ns = 1'000'000'000; + agg.ingest(s); + + EXPECT_EQ(agg.peer_count(), 1u); + EXPECT_TRUE(agg.remove_peer("node1")); + EXPECT_EQ(agg.peer_count(), 0u); + EXPECT_FALSE(agg.remove_peer("nonexistent")); +} + +TEST(TelemetryAggregatorTest, Clear) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s; + s.node_id = "node1"; + s.cluster_id = "alpha"; + s.timestamp_ns = 1'000'000'000; + agg.ingest(s); + + agg.clear(); + EXPECT_EQ(agg.peer_count(), 0u); +} + +TEST(TelemetryAggregatorTest, PeerSnapshots) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s; + s.node_id = "node1"; + s.cluster_id = "alpha"; + s.timestamp_ns = 1'000'000'000; + s.mutations_published = 42; + agg.ingest(s); + + auto peers = agg.peer_snapshots(); + EXPECT_EQ(peers.size(), 1u); + EXPECT_EQ(peers["node1"].mutations_published, 42u); +} + +TEST(TelemetryAggregatorTest, AttachBusAndReceiveMessage) { + state_message_bus bus; + state_telemetry_aggregator agg("alpha"); + EXPECT_TRUE(agg.attach_bus(bus)); + + // Publish a telemetry message on the bus. + telemetry_snapshot snap; + snap.node_id = "node1"; + snap.cluster_id = "alpha"; + snap.timestamp_ns = 1'000'000'000; + snap.mutations_published = 77; + + std::string json = state_node_telemetry::serialize_json(snap); + auto msg = state_message::make_text("__telemetry.alpha.node1", json, + state_node_telemetry::MIME_TELEMETRY); + msg.origin_node_id = "node1"; + msg.message_id = "tel-1"; + bus.admit(msg); + + EXPECT_EQ(agg.peer_count(), 1u); + auto peers = agg.peer_snapshots(); + EXPECT_EQ(peers["node1"].mutations_published, 77u); + + agg.detach_bus(); +} + +TEST(TelemetryAggregatorTest, ToText) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s; + s.node_id = "node1"; + s.cluster_id = "alpha"; + s.timestamp_ns = 1'000'000'000; + s.mutations_published = 100; + agg.ingest(s); + + std::string text = agg.to_text(); + EXPECT_NE(text.find("[telemetry]"), std::string::npos); + EXPECT_NE(text.find("cluster_id: alpha"), std::string::npos); + EXPECT_NE(text.find("nodes: 1"), std::string::npos); + EXPECT_NE(text.find("[peer node1]"), std::string::npos); +} + +TEST(TelemetryAggregatorTest, MaxLatencyAcrossNodes) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s1; + s1.node_id = "node1"; + s1.cluster_id = "alpha"; + s1.timestamp_ns = 1'000'000'000; + s1.enqueue_p99 = 5000; + s1.delivery_p99 = 10000; + + telemetry_snapshot s2; + s2.node_id = "node2"; + s2.cluster_id = "alpha"; + s2.timestamp_ns = 1'000'000'000; + s2.enqueue_p99 = 8000; + s2.delivery_p99 = 3000; + + agg.ingest(s1); + agg.ingest(s2); + + auto sum = agg.summarize(); + EXPECT_EQ(sum.max_enqueue_p99, 8000u); + EXPECT_EQ(sum.max_delivery_p99, 10000u); +} + +// ===================================================================== +// Routing feedback +// ===================================================================== + +TEST(RoutingFeedbackTest, IsolateHighLatencyPeer) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s; + s.node_id = "slow-peer"; + s.cluster_id = "alpha"; + s.timestamp_ns = 1'000'000'000; + s.delivery_p99 = 500'000'000; // 500 ms + + agg.ingest(s); + + routing_feedback_policy policy; + policy.latency_p99_threshold_ns = 100'000'000; // 100 ms threshold + + auto result = agg.evaluate_routing_feedback(policy); + EXPECT_EQ(result.isolate.size(), 1u); + EXPECT_EQ(result.isolate[0], "slow-peer"); + EXPECT_EQ(result.release.size(), 0u); +} + +TEST(RoutingFeedbackTest, ReleaseHealthyPeer) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s; + s.node_id = "fast-peer"; + s.cluster_id = "alpha"; + s.timestamp_ns = 1'000'000'000; + s.delivery_p99 = 1000; // 1 µs + + agg.ingest(s); + + routing_feedback_policy policy; + policy.latency_p99_threshold_ns = 100'000'000; + + auto result = agg.evaluate_routing_feedback(policy); + EXPECT_EQ(result.isolate.size(), 0u); + EXPECT_EQ(result.release.size(), 1u); + EXPECT_EQ(result.release[0], "fast-peer"); +} + +TEST(RoutingFeedbackTest, IsolateOnDrops) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s; + s.node_id = "dropping-peer"; + s.cluster_id = "alpha"; + s.timestamp_ns = 1'000'000'000; + s.outbox_drops = 500; + + agg.ingest(s); + + routing_feedback_policy policy; + policy.outbox_drop_threshold = 100; + + auto result = agg.evaluate_routing_feedback(policy); + EXPECT_EQ(result.isolate.size(), 1u); + EXPECT_EQ(result.isolate[0], "dropping-peer"); +} + +TEST(RoutingFeedbackTest, DisabledPolicyReleasesAll) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s; + s.node_id = "any-peer"; + s.cluster_id = "alpha"; + s.timestamp_ns = 1'000'000'000; + s.delivery_p99 = 999'999'999; + s.outbox_drops = 999999; + + agg.ingest(s); + + routing_feedback_policy policy; // all thresholds 0 = disabled + + auto result = agg.evaluate_routing_feedback(policy); + EXPECT_EQ(result.isolate.size(), 0u); + EXPECT_EQ(result.release.size(), 1u); +} + +// ===================================================================== +// Integration: state_distributed_admin with telemetry +// ===================================================================== + +TEST(AdminTelemetryTest, ToTextIncludesTelemetry) { + state_telemetry_aggregator agg("alpha"); + + telemetry_snapshot s; + s.node_id = "node1"; + s.cluster_id = "alpha"; + s.timestamp_ns = 1'000'000'000; + s.mutations_published = 42; + agg.ingest(s); + + state_distributed_admin admin; + admin.attach_telemetry(&agg); + + std::string text = admin.to_text(); + EXPECT_NE(text.find("[telemetry]"), std::string::npos); + EXPECT_NE(text.find("mutations_published: 42"), std::string::npos); +}