Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ jobs:
- name: libcvc-release
kind: libcvc
build_type: Release
- name: libcvc-debug-grpc
kind: libcvc
build_type: Debug
enable_grpc: true
- name: volrover3-debug
kind: volrover3
build_type: Debug
Expand Down Expand Up @@ -167,6 +171,10 @@ jobs:
if [ "${{ matrix.build_type }}" = "Debug" ]; then
sudo apt-get install -y --no-install-recommends lcov
fi
if [ "${{ matrix.enable_grpc }}" = "true" ]; then
sudo apt-get install -y --no-install-recommends \
libgrpc++-dev libprotobuf-dev protobuf-compiler-grpc
fi

# CUDA toolkit so the shipped artifacts can offload to NVIDIA GPUs
# at runtime when an NVIDIA driver is present. cudart is linked
Expand Down Expand Up @@ -292,6 +300,10 @@ jobs:
coverage=ON
fi
fi
grpc=OFF
if [ "${{ matrix.enable_grpc }}" = "true" ]; then
grpc=ON
fi
cmake -B build -G Ninja \
-DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \
-DCMAKE_PREFIX_PATH="$extra_prefix" \
Expand All @@ -302,7 +314,8 @@ jobs:
-DDISABLE_CGAL=OFF \
-DCVC_BUILD_VOLROVER3=${{ matrix.kind == 'volrover3' && 'ON' || 'OFF' }} \
-DCVC_ENABLE_MESHER=ON \
-DCVC_ENABLE_SDF=ON
-DCVC_ENABLE_SDF=ON \
-DCVC_ENABLE_GRPC=$grpc

- name: Build
run: cmake --build build --parallel
Expand Down Expand Up @@ -540,14 +553,14 @@ jobs:
- name: Install dependencies (libcvc)
if: matrix.kind == 'libcvc'
run: |
brew install cmake ninja boost hdf5 fftw gsl imagemagick cgal log4cplus
brew install cmake ninja boost hdf5 fftw gsl imagemagick cgal log4cplus zstd
echo "BOOST_ROOT=$(brew --prefix boost)" >> $GITHUB_ENV
echo "CMAKE_PREFIX_PATH=$(brew --prefix boost):$(brew --prefix hdf5)" >> $GITHUB_ENV

- name: Install dependencies (volrover3)
if: matrix.kind == 'volrover3'
run: |
brew install cmake ninja boost hdf5 fftw gsl imagemagick qt@6 vtk cgal
brew install cmake ninja boost hdf5 fftw gsl imagemagick qt@6 vtk cgal zstd
echo "BOOST_ROOT=$(brew --prefix boost)" >> $GITHUB_ENV
echo "CMAKE_PREFIX_PATH=$(brew --prefix boost):$(brew --prefix qt@6):$(brew --prefix hdf5)" >> $GITHUB_ENV
echo "$(brew --prefix qt@6)/bin" >> $GITHUB_PATH
Expand Down
181 changes: 181 additions & 0 deletions USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,184 @@ cmake -B build -DCMAKE_PREFIX_PATH=/path/to/libcvc-3.1.0
cmake --build build
./build/my_app
```

---

## 6. Distributed state

libcvc includes an optional replicated state layer that synchronises an
in-process key-value tree across multiple nodes in a cluster. The layer
is assembled from composable pieces — a shard, a transport, interest
filters, conflict resolution, blob storage — but the easiest entry point
is `distributed_state_session`, which wires everything together from a
single config struct.

### 6.1 Quick start

```cpp
#include <cvc/app.h>
#include <cvc/distributed_state_session.h>
#include <cvc/state.h>

int main() {
cvc::app ctx;

cvc::distributed_state_config cfg;
cfg.cluster_id = "my_cluster";
cfg.node_id = "node_1";
cfg.transport = cvc::transport_kind::ipc;
cfg.listen_address = "/tmp/my_cluster_node1.sock";
cfg.seeds = {"/tmp/my_cluster_node2.sock"};

auto session = cvc::distributed_state_session::join(ctx, cfg);

// Write a value — it replicates to every peer in the cluster.
cvc::state::instance(ctx)("scene.title").value(std::string("Hello"));

// Read a value (may have arrived from a remote peer).
std::string title = cvc::state::instance(ctx)("scene.title").value();

session->stop(); // graceful shutdown (also called by destructor)
}
```

### 6.2 Configuration reference (`distributed_state_config`)

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `cluster_id` | `string` | *(required)* | Unique cluster name shared by all peers |
| `node_id` | `string` | *(required)* | Unique node name within the cluster |
| `root_path` | `string` | `""` | Subtree prefix to replicate (`""` = whole tree) |
| `transport` | `transport_kind` | `inproc` | `inproc`, `ipc`, or `grpc` |
| `listen_address` | `string` | `""` | Socket path (IPC) or `host:port` (gRPC) |
| `seeds` | `vector<string>` | `{}` | Peer endpoints to connect to on startup |
| `mounts` | `vector<distributed_state_mount>` | `{}` | Per-path replication modes |
| `pump_interval_ms` | `uint32` | `10` | Background replication interval (0 = no pump thread) |
| `max_inline_payload_bytes` | `uint32` | `65536` | Values larger than this go to the blob store |
| `blob_store_path` | `string` | `""` | Filesystem path for blob persistence (`""` = memory only) |
| `snapshot_on_join` | `bool` | `false` | Request a full snapshot from the first seed on join |
| `enforce_authority` | `bool` | `false` | Reject mutations owned by a different cluster |
| `enforce_write_policy` | `bool` | `false` | Consult write-policy before applying remote writes |
| `resolve_conflicts` | `bool` | `false` | Track and resolve concurrent writes (last-writer-wins) |
| `enforce_delegation` | `bool` | `false` | Respect delegation boundaries |
| `enforce_interest` | `bool` | `false` | Drop inbound mutations outside the interest set |

**gRPC-only TLS / auth fields** (ignored for `inproc` / `ipc`):

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `tls_server_cert_pem` | `string` | `""` | PEM server certificate |
| `tls_server_key_pem` | `string` | `""` | PEM server private key |
| `tls_root_ca_pem` | `string` | `""` | PEM CA certificate for peer verification |
| `tls_require_client_auth` | `bool` | `false` | Require mutual TLS |
| `require_tls` | `bool` | `false` | Throw if TLS cert/key are missing (safety guard) |
| `auth_expected_token` | `string` | `""` | Bearer token expected from inbound peers |
| `auth_outbound_token` | `string` | `""` | Bearer token sent to outbound peers |

> **Note:** identifiers (`cluster_id`, `node_id`) must follow C identifier
> rules — alphanumeric plus underscores. Hyphens are not allowed.

### 6.3 Transports

**In-process (`inproc`)** — Multiple shards in one process, no network.
Useful for tests and multi-tree applications.

**IPC (Unix domain sockets)** — Same-host, multi-process replication.
Full-duplex, automatic reconnection.

```cpp
cfg.transport = cvc::transport_kind::ipc;
cfg.listen_address = "/tmp/cluster.sock";
cfg.seeds = {"/tmp/peer1.sock", "/tmp/peer2.sock"};
```

**gRPC (network)** — Cross-host clustering with optional TLS and bearer-token
auth. Only available when libcvc is built with `-DCVC_ENABLE_GRPC=ON`.

```cpp
cfg.transport = cvc::transport_kind::grpc;
cfg.listen_address = "0.0.0.0:9999";
cfg.seeds = {"peer1.example.com:9999"};
cfg.tls_server_cert_pem = load_file("server.crt");
cfg.tls_server_key_pem = load_file("server.key");
cfg.tls_root_ca_pem = load_file("ca.crt");
```

### 6.4 Reading and writing state

State nodes are addressed by dot-separated paths and accessed through the
`cvc::state` API:

```cpp
auto &root = cvc::state::instance(ctx);

// Write (any type convertible to string)
root("scene.camera.fov").value(std::string("90"));
root("scene.camera.fov").value(90);

// Read
std::string fov = root("scene.camera.fov").value();
int fov_i = root("scene.camera.fov").value<int>();

// Block until a value arrives
std::string val = root("data.result").wait_for_value<std::string>();

// Metadata
std::string name = root("scene.camera").name(); // "camera"
std::string full = root("scene.camera").fullName(); // "scene.camera"
```

### 6.5 Interest filters

In large clusters a node can limit which paths it mirrors by registering
prefix-based interest filters:

```cpp
auto &shard = session->shard();
shard.add_interest("scene.geometry"); // receive scene.geometry.*
shard.add_interest("scene.metadata");
shard.set_enforce_interest(true); // drop everything else

// Query
bool ok = shard.path_is_of_interest("scene.geometry.mesh"); // true
auto all = shard.interests();
shard.remove_interest("scene.geometry");
```

### 6.6 Snapshots and conflict resolution

**Snapshots** let a new node catch up to the current cluster state:

```cpp
// Automatic on join:
cfg.snapshot_on_join = true;

// Manual:
auto entries = shard.snapshot("scene"); // prefix filter
```

**Conflict resolution** uses deterministic last-writer-wins ordering
(lexicographic `node_id` + sequence number):

```cpp
shard.set_resolve_conflicts(true);

// Inspect recent conflicts:
auto conflicts = shard.recent_conflicts(64);
for (auto &c : conflicts)
std::cout << c.path << ": winner=" << c.winner_node_id << "\n";
```

### 6.7 Session lifecycle

```cpp
auto session = cvc::distributed_state_session::join(ctx, cfg);

session->is_running(); // true while active
session->status(); // replica health snapshot
session->shard(); // access the underlying shard
session->transport(); // access the transport
session->blob_store(); // access the blob store

session->stop(); // graceful shutdown
```
40 changes: 40 additions & 0 deletions inc/cvc/distributed_state_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
#define __CVC_DISTRIBUTED_STATE_SESSION_H__

#include <atomic>
#include <chrono>
#include <cstdint>
#include <cvc/namespace.h>
#include <cvc/state_blob_store.h>
#include <cvc/state_cluster_shard.h>
#include <cvc/state_compression_registry.h>
#include <cvc/state_data_hydrator.h>
#include <cvc/state_distributed_admin.h>
#include <cvc/state_transport.h>
#include <cvc/state_transport_inproc.h>
Expand Down Expand Up @@ -84,9 +86,34 @@ struct distributed_state_config {
bool resolve_conflicts = false;
bool enforce_interest = false;

// TLS / auth (gRPC only — ignored for inproc / ipc).
std::string tls_server_cert_pem;
std::string tls_server_key_pem;
std::string tls_root_ca_pem;
bool tls_require_client_auth = false;
bool require_tls = false; // when true, session throws if TLS certs are missing
std::string auth_expected_token;
std::string auth_outbound_token;

// Tuning.
std::uint32_t max_inline_payload_bytes = 65536;
std::string blob_store_path; // empty = memory-only blob store
bool snapshot_on_join = false; // request full snapshot from first seed on join

std::uint32_t pump_interval_ms = 10; // background pump loop interval (0 = no pump thread)
};

// ----------------
// Snapshot of current replica health.
// ----------------
struct replica_status {
bool running = false;
std::size_t peer_count = 0;
std::uint64_t local_sequence = 0;
std::uint64_t pump_cycles = 0;
std::uint64_t pending_hydrations = 0;
};

// ----------------
// cvc::distributed_state_session
// ----------------
Expand Down Expand Up @@ -138,6 +165,16 @@ class distributed_state_session {
state_transport &transport() noexcept { return *_transport; }
state_blob_store &blob_store() noexcept { return *_blob_store; }
state_distributed_admin &admin() noexcept { return *_admin; }
state_data_hydrator &hydrator() noexcept { return *_hydrator; }

// Wait until the blob at `path` has been hydrated, or timeout
// expires. Returns the hydration status.
state_data_hydrator::hydration_status
wait_for_data(const std::string &path,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0));

// Get a snapshot of the replica health.
replica_status status() const;

// Diagnostics.
const std::string &cluster_id() const noexcept { return _config.cluster_id; }
Expand All @@ -157,6 +194,9 @@ class distributed_state_session {
std::unique_ptr<state_transport> _transport;
std::unique_ptr<memory_state_blob_store> _blob_store;
std::unique_ptr<state_distributed_admin> _admin;
std::unique_ptr<state_data_hydrator> _hydrator;

app *_app_ctx = nullptr;

std::thread _pump_thread;
std::atomic<bool> _running{false};
Expand Down
10 changes: 10 additions & 0 deletions inc/cvc/state_brick_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ struct state_brick_manifest {
std::vector<std::size_t> bricks_in_region(std::uint64_t lo_x, std::uint64_t lo_y,
std::uint64_t lo_z, std::uint64_t hi_x,
std::uint64_t hi_y, std::uint64_t hi_z) const;

// A half-space plane Ax+By+Cz+D >= 0.
struct plane {
double a, b, c, d;
};

// Query: return indices of chunks whose AABB is NOT completely
// outside all 6 frustum planes (conservative — false positives
// are possible). Each plane's positive half-space is "inside".
std::vector<std::size_t> bricks_in_frustum(const plane planes[6]) const;
};

// ----------------
Expand Down
1 change: 1 addition & 0 deletions inc/cvc/state_change_journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct state_mutation {
std::string tree_id;
std::string origin_node_id;
std::uint64_t sequence = 0;
std::uint64_t hlc_time = 0; // packed hybrid logical clock (0 = not set)
std::string mutation_id;
std::string path;
state_mutation_op op = state_mutation_op::set_value;
Expand Down
Loading
Loading