From 7dec044038793314a4baddce9d0720f01b67e0ec Mon Sep 17 00:00:00 2001 From: Artem Goncharov Date: Sun, 12 Apr 2026 20:17:47 +0200 Subject: [PATCH] feat: Prepare raft storage promotion Formalize the storage service to become usable for the backend drivers. - add ADR for the storage specification - normalize interface - the storage exposes the command interface - prepare for data encryption: - transfer data as msgpack binary (to be encrypted next) - store data is serialized using msgpack and set/get commands use Serialize/Deserialize for the data for automatic serialization. The data in the store will be encrypted next) --- Cargo.lock | 16 + Cargo.toml | 1 + crates/storage/Cargo.toml | 8 +- crates/storage/build.rs | 21 +- crates/storage/proto/identity.proto | 18 -- crates/storage/proto/identity_types.proto | 23 -- crates/storage/proto/raft.proto | 19 +- crates/storage/proto/storage.proto | 13 + crates/storage/proto/storage_types.proto | 14 + crates/storage/src/app.rs | 302 +++++++++++++++--- crates/storage/src/cluster_admin_service.rs | 37 --- crates/storage/src/error.rs | 171 ++++++++++ crates/storage/src/grpc.rs | 2 +- .../storage/src/grpc/cluster_admin_service.rs | 74 ++++- crates/storage/src/grpc/identity_service.rs | 78 ----- crates/storage/src/grpc/raft_service.rs | 32 +- .../storage_service.rs} | 40 ++- crates/storage/src/lib.rs | 9 +- crates/storage/src/network.rs | 153 +++++---- crates/storage/src/proto_impl.rs | 2 +- ...set_request.rs => impl_command_request.rs} | 19 +- crates/storage/src/proto_impl/impl_entry.rs | 8 +- crates/storage/src/store/log_store.rs | 1 + crates/storage/src/store/state_machine.rs | 36 ++- crates/storage/src/store_command.rs | 111 +++++++ crates/storage/src/store_service.rs | 89 ------ crates/storage/src/types.rs | 81 +---- crates/storage/tests/test_cluster.rs | 215 +++++-------- doc/src/SUMMARY.md | 1 + doc/src/adr/0016-raft-storage.md | 135 ++++++++ 30 files changed, 1106 insertions(+), 623 deletions(-) delete mode 100644 crates/storage/proto/identity.proto delete mode 100644 crates/storage/proto/identity_types.proto create mode 100644 crates/storage/proto/storage.proto create mode 100644 crates/storage/proto/storage_types.proto delete mode 100644 crates/storage/src/cluster_admin_service.rs create mode 100644 crates/storage/src/error.rs delete mode 100644 crates/storage/src/grpc/identity_service.rs rename crates/storage/src/{raft_service.rs => grpc/storage_service.rs} (55%) rename crates/storage/src/proto_impl/{impl_set_request.rs => impl_command_request.rs} (66%) create mode 100644 crates/storage/src/store_command.rs delete mode 100644 crates/storage/src/store_service.rs create mode 100644 doc/src/adr/0016-raft-storage.md diff --git a/Cargo.lock b/Cargo.lock index 8106d3ed..0f251dd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3675,19 +3675,25 @@ version = "0.1.0" dependencies = [ "async-trait", "byteorder", + "dashmap", "eyre", "fjall", "futures", + "http", "openraft", "openstack-keystone-config", "prost", "rand 0.10.1", "rcgen", + "rmp", + "rmp-serde", "rustls", "serde", + "serde_bytes", "serde_json", "tempfile", "thiserror 2.0.18", + "tokio", "tonic", "tonic-prost", "tonic-prost-build", @@ -4937,6 +4943,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "rmp-serde" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72f81bee8c8ef9b577d1681a70ebbc962c232461e397b22c208c43c04b67a155" +dependencies = [ + "rmp", + "serde", +] + [[package]] name = "ron" version = "0.12.1" diff --git a/Cargo.toml b/Cargo.toml index d3893d36..fce42756 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ clap = { version = "4.6" } color-eyre = { version = "0.6" } config = { version = "0.15" } criterion = { version = "0.8" } +dashmap = "6.1" derive_builder = { version = "0.20" } dyn-clone = { version = "1.0" } eyre = { version = "0.6" } diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 6141f6e0..a5de3ede 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -16,19 +16,25 @@ workspace = true [dependencies] async-trait.workspace = true byteorder.workspace = true +dashmap.workspace = true eyre.workspace = true fjall = { version = "3.1" } futures.workspace = true +http.workspace = true openraft = { workspace = true, features = ["serde", "type-alias"] } openstack-keystone-config = { version = "0.1", path = "../config/" } prost.workspace = true rand.workspace = true +rmp.workspace = true +rmp-serde = "1.3" serde = { workspace = true, features = ["derive"] } +serde_bytes.workspace = true serde_json.workspace = true -tracing.workspace = true thiserror.workspace = true tonic = { workspace = true, features = ["router", "_tls-any"] } tonic-prost.workspace = true +tracing.workspace = true +tokio.workspace = true [dev-dependencies] rcgen = "0.14.7" diff --git a/crates/storage/build.rs b/crates/storage/build.rs index 6321f550..a61c2e81 100644 --- a/crates/storage/build.rs +++ b/crates/storage/build.rs @@ -1,8 +1,8 @@ fn main() -> Result<(), Box> { let proto_files = [ "proto/raft.proto", - "proto/identity_types.proto", - "proto/identity.proto", + "proto/storage_types.proto", + "proto/storage.proto", ]; tonic_prost_build::configure() @@ -23,11 +23,24 @@ fn main() -> Result<(), Box> { "#[derive(Deserialize, Serialize)]", ) .type_attribute("keystone.raft.Vote", "#[derive(Deserialize, Serialize)]") + .type_attribute("keystone.api.Response", "#[derive(Deserialize, Serialize)]") .type_attribute( - "keystone.api.SetRequest", + "keystone.api.CommandRequest", "#[derive(Deserialize, Serialize)]", ) - .type_attribute("keystone.api.Response", "#[derive(Deserialize, Serialize)]") + // .type_attribute( + // "keystone.api.DeleteRequest", + // "#[derive(Deserialize, Serialize)]", + // ) + // .type_attribute( + // "keystone.api.StoreRequest", + // "#[derive(Deserialize, Serialize)]", + // ) + // .type_attribute( + // "keystone.api.StoreRequest.request", + // "#[derive(serde::Deserialize, serde::Serialize)]", + // ) + //.type_attribute("keystone.api.Response", "#[derive(Deserialize, Serialize)]") .compile_protos(&proto_files, &["proto"])?; Ok(()) } diff --git a/crates/storage/proto/identity.proto b/crates/storage/proto/identity.proto deleted file mode 100644 index f0d6d4ad..00000000 --- a/crates/storage/proto/identity.proto +++ /dev/null @@ -1,18 +0,0 @@ -syntax = "proto3"; - -package keystone.api; - -import "google/protobuf/empty.proto"; -import "google/protobuf/wrappers.proto"; -import "identity_types.proto"; - - -// IdentityService provides the key-value store API operations. -service IdentityService { - // Get retrieves the value associated with a given key. - rpc Get(GetRequest) returns (Response) {} - - // Set stores a key-value pair in the distributed store. - rpc Set(SetRequest) returns (Response) {} -} - diff --git a/crates/storage/proto/identity_types.proto b/crates/storage/proto/identity_types.proto deleted file mode 100644 index d1eeb128..00000000 --- a/crates/storage/proto/identity_types.proto +++ /dev/null @@ -1,23 +0,0 @@ -syntax = "proto3"; - -package keystone.api; - -// SetRequest represents a key-value pair to be stored. -message SetRequest { - // Key to store. - string key = 1; - // Value to associate with the key. - string value = 2; -} - -// GetRequest represents a key lookup request. -message GetRequest { - // Key to look up. - string key = 1; -} - -// GetResponse contains the value associated with the requested key. -message Response { - // Retrieved value. - optional string value = 1; -} diff --git a/crates/storage/proto/raft.proto b/crates/storage/proto/raft.proto index 6c6a2632..75d62492 100644 --- a/crates/storage/proto/raft.proto +++ b/crates/storage/proto/raft.proto @@ -3,7 +3,7 @@ syntax = "proto3"; package keystone.raft; import "google/protobuf/empty.proto"; -import "identity_types.proto"; +import "storage_types.proto"; // Node represents a single node in the Raft cluster. @@ -30,11 +30,11 @@ message Entry { uint64 term = 1; uint64 index = 2; - // Optional Application data. - keystone.api.SetRequest app_data = 12; - // Optional Membership config. - Membership membership = 13; + Membership membership = 3; + + // Optional Store request. + optional bytes app_data = 4; } // NodeIds is a set of NodeIds. @@ -190,11 +190,11 @@ message AdminResponse { // The log id of the committed log entry. keystone.raft.LogId log_id = 1; - // If the committed log entry is a normal one. - keystone.api.Response data = 2; - // If the committed log entry is a change-membership entry. - keystone.raft.Membership membership = 3; + keystone.raft.Membership membership = 2; + + // If the committed log entry is a normal one. + keystone.api.Response data = 3; } message MetricsResponse { @@ -224,3 +224,4 @@ service ClusterAdminService { // Metrics retrieves cluster metrics and status information. rpc Metrics(google.protobuf.Empty) returns (MetricsResponse) {} } + diff --git a/crates/storage/proto/storage.proto b/crates/storage/proto/storage.proto new file mode 100644 index 00000000..544147c8 --- /dev/null +++ b/crates/storage/proto/storage.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package keystone.api; + +import "google/protobuf/empty.proto"; +import "google/protobuf/wrappers.proto"; +import "storage_types.proto"; + +// StorageService provides the key-value store API operations. +service StorageService { + // Write the command + rpc Command(keystone.api.CommandRequest) returns (keystone.api.Response) {} +} diff --git a/crates/storage/proto/storage_types.proto b/crates/storage/proto/storage_types.proto new file mode 100644 index 00000000..7cd63172 --- /dev/null +++ b/crates/storage/proto/storage_types.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package keystone.api; + +message CommandRequest { + // Store request. + bytes payload = 1; +} + +// GetResponse contains the value associated with the requested key. +message Response { + // Retrieved value. + optional bytes value = 1; +} diff --git a/crates/storage/src/app.rs b/crates/storage/src/app.rs index 9c776eb5..1c255924 100644 --- a/crates/storage/src/app.rs +++ b/crates/storage/src/app.rs @@ -13,24 +13,36 @@ // SPDX-License-Identifier: Apache-2.0 use std::sync::Arc; +use dashmap::DashMap; use openraft::Config; +// use openraft::ReadPolicy; +use openraft::async_runtime::WatchReceiver; +use openraft::errors::{ForwardToLeader, RaftError}; +use serde::Serialize; +use serde::de::DeserializeOwned; +use tokio::sync::watch; use tonic::service::Routes; +use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; use openstack_keystone_config::DistributedStorageConfiguration; -use crate::cluster_admin_service::ClusterAdminService; -use crate::network::Network; -use crate::pb::api::identity_service_server::IdentityServiceServer; +use crate::StoreError; +use crate::grpc::cluster_admin_service::ClusterAdminServiceImpl; +use crate::grpc::raft_service::RaftServiceImpl; +use crate::grpc::storage_service::StorageServiceImpl; +use crate::network::NetworkManager; +use crate::network::init_tls_watcher; use crate::pb::raft::cluster_admin_service_server::ClusterAdminServiceServer; use crate::pb::raft::raft_service_server::RaftServiceServer; -use crate::raft_service::RaftService; -use crate::store_service::StoreService; -use crate::{FjallStateMachine, types::*}; +use crate::protobuf::api::storage_service_client::StorageServiceClient; +use crate::protobuf::api::storage_service_server::StorageServiceServer; +use crate::store_command::*; +use crate::types::*; -/// Build a Raft instance. -pub async fn init_raft( +/// Initialize storage services backed by the raft. +pub async fn init_storage( ks_config: &DistributedStorageConfiguration, -) -> Result<(Raft, Arc), StoreError> { +) -> Result { // Create a configuration for the raft instance. let raft_config = Arc::new( Config { @@ -45,63 +57,251 @@ pub async fn init_raft( // Create stores and network let (log_store, sm) = crate::new::(ks_config.path.clone()).await?; let state_machine_store = Arc::new(sm); - let network = Network::new(ks_config)?; - - // Create Raft instance - Ok(( - Raft::new( - ks_config.node_id, - raft_config.clone(), - network, - log_store, - state_machine_store.clone(), - ) - .await?, - state_machine_store, - )) -} + let tls_watcher = init_tls_watcher(ks_config)?; + let network = Arc::new(NetworkManager::new(tls_watcher.clone())?); -/// Initialize storage services backed by the raft. -pub async fn init_storage( - ks_config: &DistributedStorageConfiguration, -) -> Result { // Create Raft instance - let (raft, state_machine_store) = init_raft(ks_config).await?; - - //// Create the management service with raft instance - let internal_service_impl = RaftService::new(raft.clone()); - let cluster_admin_impl = ClusterAdminService::new(raft.clone()); - let store_impl = StoreService::new(raft.clone(), state_machine_store.clone()); + let raft = Raft::new( + ks_config.node_id, + raft_config.clone(), + network.clone(), + log_store, + state_machine_store.clone(), + ) + .await?; Ok(Storage { - admin: Arc::new(cluster_admin_impl), - store: Arc::new(store_impl), - raft: Arc::new(internal_service_impl), + connection_pool: DashMap::new(), + raft, + state_machine_store, + tls_watcher, }) } -/// Distributed storage. -pub struct Storage { - /// Admin service (cluster management). - pub admin: Arc, - /// Identity service (read/write operations). - pub store: Arc, - /// Raft service (voting and sync). - pub raft: Arc, -} - /// Build a tonic `Server` instance for the raft instance. pub async fn get_app_server(storage: &Storage) -> Result { //// The app service uses the default limit since it's user-facing. - let raft_service = RaftServiceServer::new(storage.raft.clone()); - let identity_service = IdentityServiceServer::new(storage.store.clone()); - let cluster_admin_service = ClusterAdminServiceServer::new(storage.admin.clone()); + + let raft_svc_impl = RaftServiceImpl::new(storage.raft.clone()); + let cluster_admin_svc_impl = ClusterAdminServiceImpl::new(storage.raft.clone()); + let storage_svc_impl = StorageServiceImpl::new(storage.raft.clone()); + + let raft_service = RaftServiceServer::new(raft_svc_impl); + let cluster_admin_service = ClusterAdminServiceServer::new(cluster_admin_svc_impl); + let storage_service = StorageServiceServer::new(storage_svc_impl); let mut router = Routes::builder(); router .add_service(raft_service) .add_service(cluster_admin_service) - .add_service(identity_service); + .add_service(storage_service); Ok(router.routes()) } + +/// Distributed storage. +pub struct Storage { + /// Raft cluster nodes connection pool. + connection_pool: DashMap, + /// Tls client config watcher. + tls_watcher: watch::Receiver>, + /// Raft instance. + pub raft: Raft, + /// The state machine store for direct reads. + state_machine_store: Arc, +} + +impl Storage { + /// Deletes a value for a given key in the distributed store. + /// + /// # Arguments + /// * `key` - The key. + /// * `keyspace` - Optional keyspace name. + /// + /// # Returns + /// * `Ok(Response)` - Success response after the value is deleted + /// * `Err(Status)` - Error status if the set operation fails + pub async fn remove(&self, key: K, keyspace: Option) -> Result<(), StoreError> + where + K: Into, + S: Into, + { + let request = StoreCommand::Delete(DeleteCommand { + key: key.into(), + keyspace: keyspace.map(Into::into).unwrap_or("data".into()), + }); + let payload = crate::pb::api::CommandRequest::try_from(request)?; + match self.raft.client_write(payload.clone()).await { + Ok(_) => {} + Err(RaftError::APIError(ClientWriteError::ForwardToLeader(ForwardToLeader { + leader_id: Some(leader_id), + leader_node: Some(leader_node), + }))) => { + let channel = self.get_or_create_channel(leader_id, leader_node.rpc_addr)?; + + let mut client = StorageServiceClient::new(channel); + client.command(payload).await?; + } + Err(other) => { + return Err(other)?; + } + }; + Ok(()) + } + + /// Gets a value for a given key from the distributed store. + /// + /// # Arguments + /// * `key` - Contains the key to retrieve. + /// * `keyspace` - Optional keyspace name. + /// + /// # Returns + /// * `Ok(Vec)` - Success response containing the value as bytes + /// * `Err(Status)` - Error status if the get operation fails + pub async fn get_by_key( + &self, + key: K, + keyspace: Option, + ) -> Result, StoreError> + where + T: DeserializeOwned, + K: AsRef<[u8]>, + S: AsRef, + { + // wait for the node to apply the latest state + // self.raft.ensure_linearizable(ReadPolicy::ReadIndex).await?; + + let ks = match keyspace { + None => self.state_machine_store.data(), + Some(name) => &self.state_machine_store.keyspace(name)?, + }; + let value = ks + .get(&key)? + .map(|x| rmp_serde::from_slice(x.as_ref())) + .transpose()?; + // TODO: at REST decryption would come here + Ok(value) + } + + /// List key value pairs by the prefix. + /// + /// # Arguments + /// * `prefix` - The prefix to query. + /// * `keyspace` - Optional keyspace name. + /// + /// # Returns + /// * `Ok(Vec<(String, Vec)>` - Success response containing the value as bytes + /// * `Err(Status)` - Error status if the operation fails + pub async fn prefix( + &self, + prefix: K, + keyspace: Option, + ) -> Result, StoreError> + where + T: DeserializeOwned, + K: AsRef<[u8]>, + S: AsRef, + { + // wait for the node to apply the latest state + // self.raft.ensure_linearizable(ReadPolicy::ReadIndex).await?; + + let ks = match keyspace { + None => self.state_machine_store.data(), + Some(name) => &self.state_machine_store.keyspace(name)?, + }; + // TODO: at REST decryption would come here + ks.prefix(&prefix) + .map(|item| { + let (key, val) = item.into_inner()?; + Ok(( + String::from_utf8(key.to_vec())?, + rmp_serde::from_slice(val.as_ref())?, + )) + }) + .collect() + } + + /// Sets a value for a given key in the distributed store. + /// + /// # Arguments + /// * `key` - The key. + /// * `value` - The value to set for the key. + /// * `keyspace` - Optional keyspace name. + /// + /// # Returns + /// * `Ok(Response)` - Success response after the value is set + /// * `Err(StoreError)` - Error status if the set operation fails + pub async fn set_value( + &self, + key: K, + value: V, + keyspace: Option, + ) -> Result<(), StoreError> + where + K: Into, + V: Serialize, + S: Into, + { + let key: String = key.into(); + let request = StoreCommand::Set(SetCommand { + key: key.into(), + value: rmp_serde::to_vec(&value)?, + keyspace: keyspace.map(Into::into).unwrap_or("data".into()), + }); + + let payload = crate::pb::api::CommandRequest::try_from(request)?; + match self.raft.client_write(payload.clone()).await { + Ok(_) => {} + Err(RaftError::APIError(ClientWriteError::ForwardToLeader(ForwardToLeader { + leader_id: Some(leader_id), + leader_node: Some(leader_node), + }))) => { + let channel = self.get_or_create_channel(leader_id, leader_node.rpc_addr)?; + + let mut client = StorageServiceClient::new(channel); + client.command(payload).await?; + } + Err(other) => { + return Err(other)?; + } + }; + Ok(()) + } + + /// Get the last log index processed by the node. + pub fn last_log_index(&self) -> Option { + self.raft.metrics().borrow_watched().last_log_index + } + + /// Get the channel to the given node. + /// + /// Get the channel to the node if it is already establed or create a new one. This method uses + /// the connection pool. + /// + /// # Arguments + /// * `target` - Node Id. + /// * `addr` - String address of the node. + /// + /// # Returns + /// * `Ok(Channel)` - A channel result. + /// * `Err(StoreError)` - An error if the operation fails. + fn get_or_create_channel(&self, target: u64, addr: String) -> Result { + // 1. Return existing connection if valid + if let Some(channel) = self.connection_pool.get(&target) { + return Ok(channel.clone()); + } + + // 2. Otherwise, build it (applying Optional TLS) + let endpoint = if let Some(tls) = &*self.tls_watcher.borrow() { + Endpoint::from_shared(format!("https://{}", addr))?.tls_config(tls.clone())? + } else { + Endpoint::from_shared(format!("http://{}", addr))? + }; + + let channel = endpoint.connect_lazy(); + + // 3. Cache it + self.connection_pool.insert(target, channel.clone()); + Ok(channel) + } +} diff --git a/crates/storage/src/cluster_admin_service.rs b/crates/storage/src/cluster_admin_service.rs deleted file mode 100644 index a2e12d22..00000000 --- a/crates/storage/src/cluster_admin_service.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 -use crate::types::*; - -/// Raft cluster administrative operations. -/// -/// # Responsibilities -/// - Manages the Raft cluster -/// -/// # Protocol Safety -/// This service implements the client-facing API and should validate all inputs -/// before processing them through the Raft consensus protocol. -pub struct ClusterAdminService { - /// The Raft node instance for consensus operations. - pub(crate) raft_node: Raft, -} - -impl ClusterAdminService { - /// Creates a new instance of the API service. - /// - /// # Arguments - /// * `raft_node` - The Raft node instance this service will use. - pub fn new(raft_node: Raft) -> Self { - Self { raft_node } - } -} diff --git a/crates/storage/src/error.rs b/crates/storage/src/error.rs new file mode 100644 index 00000000..e83a3636 --- /dev/null +++ b/crates/storage/src/error.rs @@ -0,0 +1,171 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +//use std::io; + +use thiserror::Error; + +use crate::types::*; + +/// Keystone Store error. +#[derive(Error, Debug)] +pub enum StoreError { + /// Database error. + #[error(transparent)] + Fjall { + #[from] + source: fjall::Error, + }, + + #[error(transparent)] + IO { + #[from] + source: std::io::Error, + }, + + #[error(transparent)] + Json { + #[from] + source: serde_json::Error, + }, + + /// Key is already present in the store while the call expects it to be unset. + #[error("key is already set")] + KeyPresent, + + #[error("missing mTLS configuration")] + TlsConfigMissing, + + /// Raft config error. + #[error(transparent)] + RaftConfig { + #[from] + source: openraft::ConfigError, + }, + + /// Raft empty membership data error. + #[error("raft membership information missing")] + RaftEmptyMembership, + + /// Raft initialization error. + #[error(transparent)] + RaftInitError { + #[from] + source: + openraft::errors::RaftError>, + }, + + /// Raft error. + #[error(transparent)] + RaftError { + #[from] + source: openraft::errors::RaftError, + }, + + /// Raft fatal error. + #[error(transparent)] + RaftFatal { + #[from] + source: openraft::errors::Fatal, + }, + + /// Raft leader is unknown. + #[error("raft leader is not known")] + RaftLeaderUnknown, + + /// Raft membership error. + #[error(transparent)] + RaftMembership { + #[from] + source: openraft::errors::MembershipError, + }, + + /// Raft empty membership data error. + #[error("raft required parameter {0} missing")] + RaftMissingParameter(String), + + /// Raft linear read error. + #[error(transparent)] + RaftLinearReadError { + #[from] + source: openraft::errors::RaftError< + TypeConfig, + openraft::errors::LinearizableReadError, + >, + }, + + /// Raft RPC error. + #[error(transparent)] + RaftRPCError { + #[from] + source: openraft::errors::RPCError, + }, + + /// Rmp decode error. + #[error(transparent)] + RmpDecode { + #[from] + source: rmp_serde::decode::Error, + }, + + /// Rmp encode error. + #[error(transparent)] + RmpEncode { + #[from] + source: rmp_serde::encode::Error, + }, + + #[error(transparent)] + Storage { + #[from] + source: openraft::StorageError, + }, + + /// Tonic status error. + #[error(transparent)] + TonicStatus { + #[from] + source: tonic::Status, + }, + + /// Tonic transport error. + #[error(transparent)] + TonicTransport { + #[from] + source: tonic::transport::Error, + }, + + /// URI error. + #[error(transparent)] + Uri { + #[from] + source: http::uri::InvalidUri, + }, + + /// Non UTF8 data. + #[error(transparent)] + Utf8 { + /// The source of the error. + #[from] + source: std::string::FromUtf8Error, + }, + + #[error(transparent)] + Other(#[from] eyre::Report), +} + +impl From for std::io::Error { + fn from(value: StoreError) -> Self { + std::io::Error::other(value.to_string()) + } +} diff --git a/crates/storage/src/grpc.rs b/crates/storage/src/grpc.rs index 94923fb9..51ea9ea0 100644 --- a/crates/storage/src/grpc.rs +++ b/crates/storage/src/grpc.rs @@ -12,5 +12,5 @@ // // SPDX-License-Identifier: Apache-2.0 pub mod cluster_admin_service; -pub mod identity_service; pub mod raft_service; +pub mod storage_service; diff --git a/crates/storage/src/grpc/cluster_admin_service.rs b/crates/storage/src/grpc/cluster_admin_service.rs index d1c6dcc5..6918e226 100644 --- a/crates/storage/src/grpc/cluster_admin_service.rs +++ b/crates/storage/src/grpc/cluster_admin_service.rs @@ -12,7 +12,6 @@ // // SPDX-License-Identifier: Apache-2.0 use std::collections::BTreeMap; -use std::sync::Arc; use openraft::async_runtime::WatchReceiver; use tonic::Request; @@ -20,37 +19,82 @@ use tonic::Response; use tonic::Status; use tracing::trace; -use crate::cluster_admin_service::ClusterAdminService as ClusterAdminServiceImpl; +//use crate::cluster_admin_service::ClusterAdminService as ClusterAdminServiceImpl; +use crate::StoreError; use crate::pb; use crate::protobuf::raft::cluster_admin_service_server::ClusterAdminService; use crate::types::*; +/// Raft cluster administrative operations. +/// +/// # Responsibilities +/// - Manages the Raft cluster +/// +/// # Protocol Safety +/// This service implements the client-facing API and should validate all inputs +/// before processing them through the Raft consensus protocol. +pub struct ClusterAdminServiceImpl { + /// The Raft node instance for consensus operations. + pub(crate) raft_node: Raft, +} + +impl ClusterAdminServiceImpl { + /// Creates a new instance of the API service. + /// + /// # Arguments + /// * `raft_node` - The Raft node instance this service will use. + pub fn new(raft_node: Raft) -> Self { + Self { raft_node } + } + + /// Initializes a new Raft cluster with the specified nodes. + /// + /// # Arguments + /// * `nodes` - Contains the initial set of nodes for the cluster + /// + /// # Returns + /// * Success response + /// * Error if initialization fails + #[tracing::instrument(level = "trace", skip(self))] + pub async fn init_cluster(&self, nodes: Vec) -> Result<(), StoreError> { + // Convert nodes into required format + let nodes_map: BTreeMap = + nodes.into_iter().map(|node| (node.node_id, node)).collect(); + + // Initialize the cluster + Ok(self.raft_node.initialize(nodes_map).await?) + } + + /// Retrieves metrics about the Raft node + pub fn get_metrics(&self) -> Result { + Ok(self.raft_node.metrics().borrow_watched().clone()) + } + + /// Retrieves last log index appended to the node's log. + pub fn get_last_log_index(&self) -> Result, StoreError> { + let metrics = self.get_metrics()?; + Ok(metrics.last_log_index) + } +} + #[tonic::async_trait] -impl ClusterAdminService for Arc { +impl ClusterAdminService for ClusterAdminServiceImpl { /// Initializes a new Raft cluster with the specified nodes. /// /// # Arguments /// * `request` - Contains the initial set of nodes for the cluster /// /// # Returns - /// * Success response with initialization details + /// * Success response /// * Error if initialization fails #[tracing::instrument(level = "trace", skip(self))] async fn init(&self, request: Request) -> Result, Status> { trace!("Initializing Raft cluster"); let req = request.into_inner(); - // Convert nodes into required format - let nodes_map: BTreeMap = req - .nodes - .into_iter() - .map(|node| (node.node_id, node)) - .collect(); - // Initialize the cluster let result = self - .raft_node - .initialize(nodes_map) + .init_cluster(req.nodes) .await .map_err(|e| Status::internal(format!("Failed to initialize cluster: {}", e)))?; @@ -131,7 +175,9 @@ impl ClusterAdminService for Arc { _request: Request<()>, ) -> Result, Status> { trace!("Collecting metrics"); - let metrics = self.raft_node.metrics().borrow_watched().clone(); + let metrics = self + .get_metrics() + .map_err(|e| Status::internal(format!("Failed to write to store: {}", e)))?; let resp = pb::raft::MetricsResponse { membership: Some(metrics.membership_config.membership().clone().into()), other_metrics: metrics.to_string(), diff --git a/crates/storage/src/grpc/identity_service.rs b/crates/storage/src/grpc/identity_service.rs deleted file mode 100644 index 08e85b54..00000000 --- a/crates/storage/src/grpc/identity_service.rs +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 -use std::sync::Arc; - -use tonic::{Request, Response, Status}; -use tracing::debug; - -use crate::pb; -use crate::protobuf::api::Response as PbResponse; -use crate::protobuf::api::identity_service_server::IdentityService; -use crate::store_service::StoreService; - -#[tonic::async_trait] -impl IdentityService for Arc { - /// Sets a value for a given key in the distributed store. - /// - /// # Arguments - /// * `request` - Contains the key and value to set - /// - /// # Returns - /// * `Ok(Response)` - Success response after the value is set - /// * `Err(Status)` - Error status if the set operation fails - #[tracing::instrument(level = "trace", skip(self))] - async fn set( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - debug!("Processing set request for key: {}", req.key.clone()); - - let res = self - .set_value(req.key.clone(), req.value) - .await - .map_err(|e| Status::internal(format!("Failed to write to store: {}", e)))?; - - debug!("Successfully set value for key: {}", req.key); - Ok(Response::new(res.data)) - } - - /// Gets a value for a given key from the distributed store. - /// - /// # Arguments - /// * `request` - Contains the key to retrieve - /// - /// # Returns - /// * `Ok(Response)` - Success response containing the value - /// * `Err(Status)` - Error status if the get operation fails - #[tracing::instrument(level = "trace", skip(self))] - async fn get( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - debug!("Processing get request for key: {}", req.key); - - let value = self - .get_by_key(&req.key) - .await - .map_err(|_| Status::internal(format!("Key not found: {}", req.key)))? - .map(String::from_utf8) - .transpose() - .map_err(|e| Status::internal(format!("error while converting the data, {}", e)))?; - - debug!("Successfully retrieved value for key: {}", req.key); - Ok(Response::new(PbResponse { value })) - } -} diff --git a/crates/storage/src/grpc/raft_service.rs b/crates/storage/src/grpc/raft_service.rs index cb441ce8..9a7599d8 100644 --- a/crates/storage/src/grpc/raft_service.rs +++ b/crates/storage/src/grpc/raft_service.rs @@ -12,7 +12,6 @@ // // SPDX-License-Identifier: Apache-2.0 use std::pin::Pin; -use std::sync::Arc; use futures::Stream; use futures::StreamExt; @@ -27,11 +26,38 @@ use crate::protobuf as pb; use crate::protobuf::raft::VoteRequest; use crate::protobuf::raft::VoteResponse; use crate::protobuf::raft::raft_service_server::RaftService; -use crate::raft_service::RaftService as RaftServiceImpl; use crate::types::*; +/// Internal service implementation for Raft protocol communications. +/// This service handles the core Raft consensus protocol operations between +/// cluster nodes. +/// +/// # Responsibilities +/// - Vote requests/responses during leader election +/// - Log replication between nodes +/// - Snapshot installation for state synchronization +/// +/// # Protocol Safety +/// This service implements critical consensus protocol operations and should +/// only be exposed to other trusted Raft cluster nodes, never to external +/// clients. +pub struct RaftServiceImpl { + /// The local Raft node instance that this service operates on + pub(crate) raft_node: Raft, +} + +impl RaftServiceImpl { + /// Creates a new instance of the internal service. + /// + /// # Arguments + /// * `raft_node` - The Raft node instance this service will operate on + pub fn new(raft_node: Raft) -> Self { + Self { raft_node } + } +} + #[tonic::async_trait] -impl RaftService for Arc { +impl RaftService for RaftServiceImpl { /// Handles vote requests during leader election. /// /// # Arguments diff --git a/crates/storage/src/raft_service.rs b/crates/storage/src/grpc/storage_service.rs similarity index 55% rename from crates/storage/src/raft_service.rs rename to crates/storage/src/grpc/storage_service.rs index 5f139ca6..b50c3b73 100644 --- a/crates/storage/src/raft_service.rs +++ b/crates/storage/src/grpc/storage_service.rs @@ -11,7 +11,12 @@ // limitations under the License. // // SPDX-License-Identifier: Apache-2.0 -//use crate::protobuf::raft::raft_service_server::RaftService; + +use tonic::{Request, Response, Status}; + +use crate::pb; +use crate::protobuf::api::Response as PbResponse; +use crate::protobuf::api::storage_service_server::StorageService; use crate::types::*; /// Internal service implementation for Raft protocol communications. @@ -27,12 +32,12 @@ use crate::types::*; /// This service implements critical consensus protocol operations and should /// only be exposed to other trusted Raft cluster nodes, never to external /// clients. -pub struct RaftService { +pub struct StorageServiceImpl { /// The local Raft node instance that this service operates on pub(crate) raft_node: Raft, } -impl RaftService { +impl StorageServiceImpl { /// Creates a new instance of the internal service. /// /// # Arguments @@ -41,3 +46,32 @@ impl RaftService { Self { raft_node } } } + +#[tonic::async_trait] +impl StorageService for StorageServiceImpl { + /// Saves a storage modification command. + /// + /// # Arguments + /// * `request` - Contains the key and value to set + /// + /// # Returns + /// * `Ok(Response)` - Success response after the value is set + /// * `Err(Status)` - Error status if the set operation fails + #[tracing::instrument(level = "trace", skip(self))] + async fn command( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let res = self + .raft_node + .client_write(req) + //.set_value(req.key.clone(), req.value, req.keyspace) + .await + .map_err(|e| Status::internal(format!("Failed to write command to store: {}", e)))?; + + //debug!("Successfully set value for key: {}", req.key); + Ok(Response::new(res.data)) + } +} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 7f1d7f6f..8406cb3a 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -32,13 +32,12 @@ pub mod store { pub mod log_store; pub mod state_machine; } -pub mod cluster_admin_service; -pub mod raft_service; -pub mod store_service; +mod error; +pub mod store_command; +pub use error::StoreError; pub use store::log_store::FjallLogStore; pub use store::state_machine::FjallStateMachine; -pub use types::StoreError; pub mod protobuf { pub mod api { @@ -57,7 +56,7 @@ pub use crate::protobuf as pb; openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. pub TypeConfig: - D = pb::api::SetRequest, + D = pb::api::CommandRequest, R = pb::api::Response, LeaderId = pb::raft::LeaderId, Vote = pb::raft::Vote, diff --git a/crates/storage/src/network.rs b/crates/storage/src/network.rs index 75f7fbd5..67b39a3f 100644 --- a/crates/storage/src/network.rs +++ b/crates/storage/src/network.rs @@ -12,6 +12,7 @@ // // SPDX-License-Identifier: Apache-2.0 use std::future::Future; +use std::sync::Arc; use std::time::Duration; use eyre::WrapErr; @@ -27,99 +28,72 @@ use openraft::network::{ use openraft::raft::{StreamAppendError, StreamAppendResult, TransferLeaderRequest}; use openraft::{AnyError, OptionalSend, RaftNetworkFactory}; use openstack_keystone_config::DistributedStorageConfiguration; +use openstack_keystone_config::TlsConfiguration; +use tokio::sync::watch; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity}; +use tracing::error; +use crate::StoreError; use crate::protobuf as pb; use crate::protobuf::raft::VoteRequest as PbVoteRequest; use crate::protobuf::raft::VoteResponse as PbVoteResponse; use crate::protobuf::raft::raft_service_client::RaftServiceClient; -use crate::types::NodeId; -use crate::types::TypeConfig; use crate::types::*; /// Network implementation for gRPC-based Raft communication. /// Provides the networking layer for Raft nodes to communicate with each other. -pub struct Network { - tls_ca_cert: Option, - tls_client_identity: Option, +#[derive(Clone)] +pub struct NetworkManager { + /// Tls client config watcher. + tls_config_watcher: watch::Receiver>, } -impl Network { - pub fn new(config: &DistributedStorageConfiguration) -> Result { - if !config.disable_tls { - let tls_config = config - .tls_configuration - .as_ref() - .ok_or(StoreError::TlsConfigMissing)?; - let tls_client_identity = Identity::from_pem( - std::fs::read_to_string(&tls_config.tls_cert_file) - .wrap_err("reading server cert file")?, - std::fs::read_to_string(&tls_config.tls_key_file) - .wrap_err("reading server cert key file")?, - ); - let tls_ca_cert = if let Some(cert_ca) = &tls_config.tls_client_ca_file { - Some(Certificate::from_pem(std::fs::read_to_string(cert_ca)?)) - } else { - None - }; - - Ok(Self { - tls_ca_cert, - tls_client_identity: Some(tls_client_identity), - }) - } else { - Ok(Self { - tls_ca_cert: None, - tls_client_identity: None, - }) - } - } - - fn get_server_tls_config(&self) -> Option { - if let Some(identity) = &self.tls_client_identity { - let mut config = ClientTlsConfig::new().identity(identity.clone()); - if let Some(ca) = &self.tls_ca_cert { - config = config.ca_certificate(ca.clone()); - } - return Some(config); - } - None +impl NetworkManager { + pub fn new( + tls_config_watcher: watch::Receiver>, + ) -> Result { + Ok(Self { tls_config_watcher }) } } /// Implementation of the RaftNetworkFactory trait for creating new network /// connections. This factory creates gRPC client connections to other Raft /// nodes. -impl RaftNetworkFactory for Network { +impl RaftNetworkFactory for Arc { type Network = NetworkConnection; #[tracing::instrument(level = "debug", skip_all)] async fn new_client(&mut self, _: NodeId, node: &Node) -> Self::Network { - NetworkConnection::new(node.clone(), self.get_server_tls_config()) + NetworkConnection::new(node.clone(), self.tls_config_watcher.clone()) } } /// Represents an active network connection to a remote Raft node. /// Handles serialization and deserialization of Raft messages over gRPC. pub struct NetworkConnection { + /// Target node. target_node: pb::raft::Node, - tls_config: Option, + /// Watcher of the ClientTlsConfig. + tls_config_watcher: watch::Receiver>, } impl NetworkConnection { /// Creates a new NetworkConnection with the provided gRPC client. - pub fn new(target_node: Node, tls_config: Option) -> Self { + pub fn new( + target_node: Node, + tls_config_watcher: watch::Receiver>, + ) -> Self { NetworkConnection { target_node, - tls_config, + tls_config_watcher, } } /// Creates a gRPC client to the target node. - async fn make_client(&self) -> Result, RPCError> { + pub async fn make_client(&self) -> Result, RPCError> { let server_addr = &self.target_node.rpc_addr; - let ep = if let Some(tls_config) = &self.tls_config { + let ep = if let Some(tls_config) = &*self.tls_config_watcher.borrow() { Channel::builder( format!("https://{}", server_addr) .parse() @@ -183,16 +157,6 @@ impl NetworkConnection { } } -// ============================================================================= -// Sub-trait implementations for NetworkConnection -// ============================================================================= -// -// Instead of implementing RaftNetworkV2 as a monolithic trait, this example -// demonstrates implementing individual sub-traits directly. This approach: -// - Shows exactly which network capabilities are provided -// - Each impl is focused on a single concern -// - gRPC's native bidirectional streaming maps naturally to NetStreamAppend - impl NetStreamAppend for NetworkConnection { fn stream_append<'s, S>( &'s mut self, @@ -319,3 +283,68 @@ impl NetTransferLeader for NetworkConnection { )))) } } + +/// Parse the [TlsConfiguration] into the [ClientTlsConfig]. +pub fn load_tls_client_config( + disable_tls: bool, + tls_config: Option<&TlsConfiguration>, +) -> Result, StoreError> { + if !disable_tls { + let tls_config = tls_config.as_ref().ok_or(StoreError::TlsConfigMissing)?; + let identity = Identity::from_pem( + std::fs::read_to_string(&tls_config.tls_cert_file) + .wrap_err("reading server cert file")?, + std::fs::read_to_string(&tls_config.tls_key_file) + .wrap_err("reading server cert key file")?, + ); + let mut tls_client_config = ClientTlsConfig::new().identity(identity); + if let Some(cert_ca) = &tls_config.tls_client_ca_file { + tls_client_config = tls_client_config + .ca_certificate(Certificate::from_pem(std::fs::read_to_string(cert_ca)?)); + }; + Ok(Some(tls_client_config)) + } else { + Ok(None) + } +} + +/// Initialize the [ClientTlsConfig] configuration watcher. +pub fn init_tls_watcher( + ks_config: &DistributedStorageConfiguration, +) -> Result>, StoreError> { + // 1. Initial Load: Try to load the certs once to start with a valid state + let initial_config = + load_tls_client_config(ks_config.disable_tls, ks_config.tls_configuration.as_ref())?; + + // 2. Create the channel + let (tx, rx) = watch::channel(initial_config); + + if !ks_config.disable_tls && ks_config.tls_configuration.is_some() { + // 3. Spawn the File Watcher Task + let config_clone = ks_config.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); + + loop { + interval.tick().await; + + // Reload from disk + match load_tls_client_config( + config_clone.disable_tls, + config_clone.tls_configuration.as_ref(), + ) { + Ok(new_config) => { + // If the cert changed, broadcast to all receivers + let _ = tx.send(new_config); + } + Err(e) => { + error!("failed to reload TLS certificates: {:?}", e.to_string()); + } + } + } + }); + } + + // 4. Return the Receiver to be cloned into your RaftNetworkFactory + Ok(rx) +} diff --git a/crates/storage/src/proto_impl.rs b/crates/storage/src/proto_impl.rs index a168d7e5..95a7b23e 100644 --- a/crates/storage/src/proto_impl.rs +++ b/crates/storage/src/proto_impl.rs @@ -14,11 +14,11 @@ mod impl_admin_response; mod impl_append_entries_request; mod impl_append_entries_response; +mod impl_command_request; mod impl_entry; mod impl_leader_id; mod impl_log_id; mod impl_membership; -mod impl_set_request; mod impl_snapshot_request; mod impl_vote; mod impl_vote_request; diff --git a/crates/storage/src/proto_impl/impl_set_request.rs b/crates/storage/src/proto_impl/impl_command_request.rs similarity index 66% rename from crates/storage/src/proto_impl/impl_set_request.rs rename to crates/storage/src/proto_impl/impl_command_request.rs index f9849906..252dc76c 100644 --- a/crates/storage/src/proto_impl/impl_set_request.rs +++ b/crates/storage/src/proto_impl/impl_command_request.rs @@ -15,14 +15,21 @@ use std::fmt; use std::fmt::Formatter; +use crate::StoreError; use crate::protobuf as pb; +use crate::store_command::StoreCommand; -impl fmt::Display for pb::api::SetRequest { +impl fmt::Display for pb::api::CommandRequest { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!( - f, - "SetRequest {{ key: {}, value: {} }}", - self.key, self.value - ) + write!(f, "CommandRequest",) + } +} + +impl TryFrom for pb::api::CommandRequest { + type Error = StoreError; + fn try_from(value: StoreCommand) -> Result { + Ok(Self { + payload: value.pack()?, + }) } } diff --git a/crates/storage/src/proto_impl/impl_entry.rs b/crates/storage/src/proto_impl/impl_entry.rs index 524c9850..127759e8 100644 --- a/crates/storage/src/proto_impl/impl_entry.rs +++ b/crates/storage/src/proto_impl/impl_entry.rs @@ -13,13 +13,13 @@ // SPDX-License-Identifier: Apache-2.0 use std::fmt; -//use openraft::Membership; use openraft::EntryPayload; use openraft::alias::LogIdOf; use openraft::entry::RaftEntry; use openraft::entry::RaftPayload; use crate::TypeConfig; +use crate::pb::api::CommandRequest; use crate::protobuf as pb; impl fmt::Display for pb::raft::Entry { @@ -42,19 +42,19 @@ impl RaftPayload for pb::raft::Entry { impl RaftEntry for pb::raft::Entry { type CommittedLeaderId = u64; - type D = pb::api::SetRequest; + type D = CommandRequest; type NodeId = u64; type Node = pb::raft::Node; fn new( log_id: LogIdOf, - payload: EntryPayload, + payload: EntryPayload, ) -> Self { let mut app_data = None; let mut membership = None; match payload { EntryPayload::Blank => {} - EntryPayload::Normal(data) => app_data = Some(data), + EntryPayload::Normal(data) => app_data = Some(data.payload), EntryPayload::Membership(m) => membership = Some(m.into()), } diff --git a/crates/storage/src/store/log_store.rs b/crates/storage/src/store/log_store.rs index 844b3811..b8202896 100644 --- a/crates/storage/src/store/log_store.rs +++ b/crates/storage/src/store/log_store.rs @@ -165,6 +165,7 @@ where { for entry in entries { let key = entry.log_id().index.to_be_bytes(); + // TODO: encrypt the payload with AEAD using log_id and term as metadata tracing::debug!("appending {:?}, {:?}", entry.log_id().index, entry); let val = serde_json::to_vec(&entry).map_err(|e| io::Error::other(e.to_string()))?; self.logs diff --git a/crates/storage/src/store/state_machine.rs b/crates/storage/src/store/state_machine.rs index c15de8e3..0bf725a4 100644 --- a/crates/storage/src/store/state_machine.rs +++ b/crates/storage/src/store/state_machine.rs @@ -41,6 +41,7 @@ use serde::Serialize; use crate::StoreError; use crate::TypeConfig; use crate::protobuf as pb; +use crate::store_command::*; const KEY_LAST_APPLIED_LOG: &[u8] = b"last_applied_log"; const KEY_LAST_MEMBERSHIP: &[u8] = b"last_membership"; @@ -83,6 +84,16 @@ impl FjallStateMachine { &self.data } + pub fn db(&self) -> &Arc { + &self.db + } + + pub fn keyspace>(&self, name: S) -> Result { + Ok(self + .db + .keyspace(name.as_ref(), KeyspaceCreateOptions::default)?) + } + #[allow(clippy::result_large_err)] #[tracing::instrument(skip(self))] fn get_meta( @@ -336,11 +347,30 @@ impl RaftStateMachine for Arc { let mut last_membership = None; let mut batch = self.db.batch(); + // TODO: https://docs.rs/openraft/latest/openraft/raft/struct.Raft.html#method.client_write while let Some((entry, responder)) = entries.try_next().await? { last_applied_log = Some(entry.log_id()); - let response = if let Some(req) = entry.app_data { - batch.insert(&self.data, req.key.as_bytes(), req.value.as_bytes()); - Some(req.value.clone()) + let response = if let Some(store_req) = entry.app_data { + // Unpack the payload and apply the command + match StoreCommand::unpack(&store_req)? { + StoreCommand::Delete(cmd) => { + let ks = &self + .db + .keyspace(&cmd.keyspace, KeyspaceCreateOptions::default) + .map_err(|e| io::Error::other(e.to_string()))?; + batch.remove(ks, cmd.key.as_bytes()); + None + } + StoreCommand::Set(cmd) => { + let ks = &self + .db + .keyspace(&cmd.keyspace, KeyspaceCreateOptions::default) + .map_err(|e| io::Error::other(e.to_string()))?; + // TODO: at REST encryption would come here + batch.insert(ks, cmd.key.as_bytes(), cmd.value.clone()); + Some(cmd.value) + } + } } else if let Some(mem) = entry.membership { last_membership = Some(StoredMembershipOf::::new( last_applied_log, diff --git a/crates/storage/src/store_command.rs b/crates/storage/src/store_command.rs new file mode 100644 index 00000000..1e2d7e7e --- /dev/null +++ b/crates/storage/src/store_command.rs @@ -0,0 +1,111 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +//! Store modification command + +use serde::{Deserialize, Serialize}; + +use crate::StoreError; + +/// Store modification command. +#[derive(Debug, Deserialize, PartialEq, Serialize)] +pub enum StoreCommand { + /// Delete the entry from the store. + Delete(DeleteCommand), + /// Set the value for the key in the store. + Set(SetCommand), +} + +/// Command to delete the value from the store. +#[derive(Debug, Deserialize, PartialEq, Serialize)] +pub struct DeleteCommand { + /// Key to delete. + pub key: String, + + /// Keyspace of the key. + pub keyspace: String, +} + +/// Command to set the value in the store. +#[derive(Debug, Deserialize, PartialEq, Serialize)] +pub struct SetCommand { + /// Key to set. + pub key: String, + + /// Keyspace of the key. + pub keyspace: String, + + /// Value to set. + #[serde(with = "serde_bytes")] + pub value: Vec, +} + +impl StoreCommand { + /// Pack the [StoreCommand] into the format safe for the Raft log. + /// + /// Serialize the data into the bytes using the MsgPack format. + /// + /// # Returns + /// * `Ok(Vec)` - Bytes vector. + /// * `Err(StoreError)` - Error. + pub fn pack(&self) -> Result, StoreError> { + Ok(rmp_serde::to_vec(self)?) + } + + /// Restore the [StoreCommand] from the log safe data format. + /// + /// Unpack the [StoreCommand] from the bytes array. + /// + /// # Arguments + /// * `value` - The binary data. + /// + /// # Returns + /// * `Ok(StoreCommand)` - Success response. + /// * `Err(StoreError)` - Error if the operation fails. + pub fn unpack(value: &[u8]) -> Result { + Ok(rmp_serde::from_slice(value)?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_delete_command() { + let cmd = StoreCommand::Delete(DeleteCommand { + key: "foo".into(), + keyspace: "bar".into(), + }); + let packed = cmd.pack().unwrap(); + let unpacked = StoreCommand::unpack(&packed).unwrap(); + assert_eq!(cmd, unpacked); + } + + #[test] + fn test_set_command() { + let cmd = StoreCommand::Set(SetCommand { + key: "foo".into(), + keyspace: "bar".into(), + value: "value".as_bytes().to_vec(), + }); + let packed = cmd.pack().unwrap(); + let unpacked = StoreCommand::unpack(&packed).unwrap(); + assert_eq!(cmd, unpacked); + if let StoreCommand::Set(cmd) = unpacked { + assert_eq!("value", str::from_utf8(&cmd.value).unwrap()); + } else { + panic!("should be the set command"); + } + } +} diff --git a/crates/storage/src/store_service.rs b/crates/storage/src/store_service.rs deleted file mode 100644 index 82f95a12..00000000 --- a/crates/storage/src/store_service.rs +++ /dev/null @@ -1,89 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 -use std::sync::Arc; - -use crate::types::StateMachineStore; -use crate::types::*; - -/// External API service implementation providing key-value store operations. -/// This service handles client requests for getting and setting values in the -/// distributed store. -/// -/// # Responsibilities -/// - Ensure consistency through Raft consensus -/// -/// # Protocol Safety -/// This service implements the client-facing API and should validate all inputs -/// before processing them through the Raft consensus protocol. -pub struct StoreService { - /// The Raft node instance for consensus operations. - pub(crate) raft_node: Raft, - /// The state machine store for direct reads. - pub(crate) state_machine_store: Arc, -} - -impl StoreService { - /// Creates a new instance of the API service. - /// - /// # Arguments - /// * `raft_node` - The Raft node instance this service will use - /// * `state_machine_store` - The state machine store for reading data - pub fn new(raft_node: Raft, state_machine_store: Arc) -> Self { - Self { - raft_node, - state_machine_store, - } - } - - /// Sets a value for a given key in the distributed store. - /// - /// # Arguments - /// * `key` - The key - /// * `value` - The value to set for the key - /// - /// # Returns - /// * `Ok(Response)` - Success response after the value is set - /// * `Err(Status)` - Error status if the set operation fails - pub async fn set_value(&self, key: K, value: V) -> Result - where - K: Into, - V: Into, - { - let res = self - .raft_node - .client_write(crate::pb::api::SetRequest { - key: key.into(), - value: value.into(), - }) - .await?; - Ok(res) - } - - /// Gets a value for a given key from the distributed store. - /// - /// # Arguments - /// * `key` - Contains the key to retrieve - /// - /// # Returns - /// * `Ok(Vec)` - Success response containing the value as bytes - /// * `Err(Status)` - Error status if the get operation fails - pub async fn get_by_key>(&self, key: K) -> Result>, StoreError> { - let value = self - .state_machine_store - .data() - .get(&key)? - .map(|x| x.to_vec()); - Ok(value) - } -} diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs index 22f8af94..4fb07ee8 100644 --- a/crates/storage/src/types.rs +++ b/crates/storage/src/types.rs @@ -11,87 +11,10 @@ // limitations under the License. // // SPDX-License-Identifier: Apache-2.0 -use std::io; -use thiserror::Error; - -/// Keystone Store error. -#[derive(Error, Debug)] -pub enum StoreError { - /// Database error. - #[error(transparent)] - Fjall { - #[from] - source: fjall::Error, - }, - - #[error(transparent)] - IO { - #[from] - source: std::io::Error, - }, - - #[error(transparent)] - Json { - #[from] - source: serde_json::Error, - }, - - #[error("missing mTLS configuration")] - TlsConfigMissing, - - /// Raft config error. - #[error(transparent)] - RaftConfig { - #[from] - source: openraft::ConfigError, - }, - - /// Raft empty membership data error. - #[error("raft membership information missing")] - RaftEmptyMembership, - - /// Raft error. - #[error(transparent)] - RaftError { - #[from] - source: openraft::errors::RaftError, - }, - - /// Raft fatal error. - #[error(transparent)] - RaftFatal { - #[from] - source: openraft::errors::Fatal, - }, - - /// Raft membership error. - #[error(transparent)] - RaftMembership { - #[from] - source: openraft::errors::MembershipError, - }, - /// Raft empty membership data error. - #[error("raft required parameter {0} missing")] - RaftMissingParameter(String), - - #[error(transparent)] - Storage { - #[from] - source: openraft::StorageError, - }, - - #[error(transparent)] - Other(#[from] eyre::Report), -} - -impl From for io::Error { - fn from(value: StoreError) -> Self { - io::Error::other(value.to_string()) - } -} // Declare the Raft type with the TypeConfig. // Reference the containing module's type config and re-export it. + pub use crate::TypeConfig; pub type NodeId = u64; @@ -118,7 +41,7 @@ pub type Snapshot = openraft::alias::SnapshotOf; pub type RPCError = openraft::error::RPCError; pub type StreamingError = openraft::errors::StreamingError; pub type ClientWriteError = openraft::errors::ClientWriteError; -//pub type RaftMetrics = openraft::RaftMetrics; +pub type RaftMetrics = openraft::RaftMetrics; pub type VoteRequest = openraft::raft::VoteRequest; pub type VoteResponse = openraft::raft::VoteResponse; diff --git a/crates/storage/tests/test_cluster.rs b/crates/storage/tests/test_cluster.rs index 8fb2e9d1..6b195dbb 100644 --- a/crates/storage/tests/test_cluster.rs +++ b/crates/storage/tests/test_cluster.rs @@ -12,12 +12,11 @@ // // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::uninlined_format_args)] -use std::backtrace::Backtrace; use std::collections::BTreeMap; use std::io::Write; use std::net::IpAddr; -use std::panic::PanicHookInfo; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::thread; use std::time::Duration; @@ -31,103 +30,75 @@ use rcgen::{ }; use tempfile::TempDir; use tonic::transport::{Channel, ClientTlsConfig, Identity, ServerTlsConfig}; -use tracing_subscriber::EnvFilter; use openstack_keystone_config::{DistributedStorageConfiguration, TlsConfiguration}; use openstack_keystone_distributed_storage::TypeConfig; -use openstack_keystone_distributed_storage::app::{get_app_server, init_storage}; +use openstack_keystone_distributed_storage::app::{Storage, get_app_server, init_storage}; +use openstack_keystone_distributed_storage::network::load_tls_client_config; use openstack_keystone_distributed_storage::protobuf as pb; -use openstack_keystone_distributed_storage::protobuf::api::identity_service_client::IdentityServiceClient; use openstack_keystone_distributed_storage::protobuf::raft::cluster_admin_service_client::ClusterAdminServiceClient; -pub fn log_panic(panic: &PanicHookInfo) { - let backtrace = { format!("{:?}", Backtrace::force_capture()) }; - - eprintln!("{}", panic); - - if let Some(location) = panic.location() { - tracing::error!( - message = %panic, - backtrace = %backtrace, - panic.file = location.file(), - panic.line = location.line(), - panic.column = location.column(), - ); - eprintln!( - "{}:{}:{}", - location.file(), - location.line(), - location.column() - ); - } else { - tracing::error!(message = %panic, backtrace = %backtrace); - } - - eprintln!("{}", backtrace); -} - /// Set up a cluster of 3 nodes. /// Write to it and read from it. +#[tracing_test::traced_test] #[test] fn test_cluster() { TypeConfig::run(test_cluster_inner()).unwrap(); } -async fn test_cluster_inner() -> Result<()> { - std::panic::set_hook(Box::new(|panic| { - log_panic(panic); - })); - - tracing_subscriber::fmt() - .with_target(true) - .with_thread_ids(true) - .with_level(true) - .with_ansi(false) - .with_env_filter(EnvFilter::from_default_env()) - .init(); +struct InstanceHolder { + pub node_id: u64, + pub config: DistributedStorageConfiguration, + storage_dir: TempDir, + pub storage: Storage, +} + +impl InstanceHolder { + async fn new(node_id: u64, tls_config: Option) -> Result { + let storage_dir = tempfile::TempDir::new().unwrap(); + let config = get_config(node_id, storage_dir.path().to_path_buf(), tls_config); + let storage = init_storage(&config).await?; + Ok(Self { + node_id, + config, + storage_dir, + storage, + }) + } +} +async fn test_cluster_inner() -> Result<()> { let provider = rustls::crypto::aws_lc_rs::default_provider(); rustls::crypto::CryptoProvider::install_default(provider).unwrap(); let certs_dir = TempDir::new()?; let tls_configuration = Some(make_certificates(&certs_dir)?); - let tls_client_config = get_client_tls_config(&tls_configuration)?; + let tls_client_config = load_tls_client_config(false, tls_configuration.as_ref())?; // --- Start 3 raft node in 3 threads. + let instance1 = Arc::new(InstanceHolder::new(1, tls_configuration.clone()).await?); + let instance2 = Arc::new(InstanceHolder::new(2, tls_configuration.clone()).await?); + let instance3 = Arc::new(InstanceHolder::new(3, tls_configuration.clone()).await?); + let instances = vec![instance1.clone(), instance2.clone(), instance3.clone()]; - let certs_h1 = tls_configuration.clone(); - let _h1 = thread::spawn(|| { - let d1 = tempfile::TempDir::new().unwrap(); + let inst1 = instance1.clone(); + let _h1 = thread::spawn(move || { let mut rt = AsyncRuntimeOf::::new(1); - let x = rt.block_on(start_raft_app(&get_config( - 1, - d1.path().to_path_buf(), - certs_h1, - ))); + let x = rt.block_on(start_raft_app(&inst1.config, &inst1.storage)); println!("raft app exit result: {:?}", x); }); - let certs_h2 = tls_configuration.clone(); - let _h2 = thread::spawn(|| { - let d2 = tempfile::TempDir::new().unwrap(); + let inst2 = instance2.clone(); + let _h2 = thread::spawn(move || { let mut rt = AsyncRuntimeOf::::new(1); - let x = rt.block_on(start_raft_app(&get_config( - 2, - d2.path().to_path_buf(), - certs_h2, - ))); + let x = rt.block_on(start_raft_app(&inst2.config, &inst2.storage)); println!("raft app exit result: {:?}", x); }); - let certs_h3 = tls_configuration.clone(); - let _h3 = thread::spawn(|| { - let d3 = tempfile::TempDir::new().unwrap(); + let inst3 = instance3.clone(); + let _h3 = thread::spawn(move || { let mut rt = AsyncRuntimeOf::::new(1); - let x = rt.block_on(start_raft_app(&get_config( - 3, - d3.path().to_path_buf(), - certs_h3, - ))); + let x = rt.block_on(start_raft_app(&inst3.config, &inst3.storage)); println!("raft app exit result: {:?}", x); }); @@ -203,53 +174,62 @@ async fn test_cluster_inner() -> Result<()> { metrics.membership.unwrap().configs ); } - let mut client1 = new_client(get_addr(1), &tls_client_config).await?; println!("=== write `foo=bar`"); { - client1 - .set(pb::api::SetRequest { - key: "foo".to_string(), - value: "bar".to_string(), - }) + // Need to try to write to different nodes ensuring the write operation distributes across + // the cluster + instance1 + .storage + .set_value("foo", "bar", None::) .await?; - - // --- Wait for a while to let the replication get done. + instance2 + .storage + .set_value("foo1", "bar1", Some("another_keyspace")) + .await?; + // // --- Wait for a while to let the replication get done. TypeConfig::sleep(Duration::from_millis(1_000)).await; } println!("=== read `foo` on every node"); { - println!("=== read `foo` on node 1"); - { - let got = client1 - .get(pb::api::GetRequest { - key: "foo".to_string(), - }) - .await?; - assert_eq!(Some("bar".to_string()), got.into_inner().value); - } + for instance in &instances { + println!("=== read `foo` on node {}", instance.node_id); + let got: Option = instance.storage.get_by_key("foo", None::).await?; + assert_eq!(Some("bar".to_string()), got); + + let got: Option = instance.storage.get_by_key("foo1", None::).await?; + assert!(got.is_none()); - println!("=== read `foo` on node 2"); - { - let mut client2 = new_client(get_addr(2), &tls_client_config).await?; - let got = client2 - .get(pb::api::GetRequest { - key: "foo".to_string(), - }) + let got: Option = instance + .storage + .get_by_key("foo1", Some("another_keyspace")) .await?; - assert_eq!(Some("bar".to_string()), got.into_inner().value); + assert_eq!(Some("bar1".to_string()), got); } + } - println!("=== read `foo` on node 3"); - { - let mut client3 = new_client(get_addr(3), &tls_client_config).await?; - let got = client3 - .get(pb::api::GetRequest { - key: "foo".to_string(), - }) + println!("=== delete `foo=bar`"); + { + instance3.storage.remove("foo", None::).await?; + + // --- Wait for a while to let the replication get done. + TypeConfig::sleep(Duration::from_millis(1_000)).await; + } + + println!("=== read `foo` on every node"); + { + for instance in &instances { + println!("=== read `foo` on node {}", instance.node_id); + + let got: Option = instance.storage.get_by_key("foo", None::).await?; + assert!(got.is_none()); + + let got: Option = instance + .storage + .get_by_key("foo1", Some("another_keyspace")) .await?; - assert_eq!(Some("bar".to_string()), got.into_inner().value); + assert_eq!(Some("bar1".to_string()), got); } } @@ -280,23 +260,6 @@ async fn test_cluster_inner() -> Result<()> { Ok(()) } -fn get_client_tls_config(config: &Option) -> Result> { - if let Some(tls_config) = &config { - let ca = tonic::transport::Certificate::from_pem(std::fs::read_to_string( - &tls_config.tls_client_ca_file.as_ref().unwrap(), - )?); - let identity = Identity::from_pem( - std::fs::read_to_string(&tls_config.tls_cert_file)?, - std::fs::read_to_string(&tls_config.tls_key_file)?, - ); - return Ok(Some( - ClientTlsConfig::new().identity(identity).ca_certificate(ca), - )); - } else { - Ok(None) - } -} - async fn new_admin_client( addr: String, client_tls_config: &Option, @@ -310,19 +273,6 @@ async fn new_admin_client( Ok(client) } -async fn new_client( - addr: String, - client_tls_config: &Option, -) -> Result> { - let channel = if let Some(tls_config) = client_tls_config { - Channel::builder(format!("https://{}", addr).parse()?).tls_config(tls_config.clone())? - } else { - Channel::builder(format!("http://{}", addr).parse()?) - }; - let client = IdentityServiceClient::new(channel.connect().await?); - Ok(client) -} - fn new_node(node_id: u64) -> pb::raft::Node { pb::raft::Node { node_id, @@ -343,9 +293,11 @@ fn get_addr(node_id: u64) -> String { pub async fn start_raft_app( config: &DistributedStorageConfiguration, + storage: &Storage, ) -> Result<(), Box> { let http_addr = config.cluster_addr.clone(); let node_id = config.node_id; + //let storage = init_storage(config).await?; let mut server = tonic::transport::Server::builder(); if !config.disable_tls @@ -367,7 +319,6 @@ pub async fn start_raft_app( server = server.tls_config(tls_config)?; } - let storage = init_storage(config).await?; let server_future = server .add_routes(get_app_server(&storage).await?) .serve(http_addr.parse()?); diff --git a/doc/src/SUMMARY.md b/doc/src/SUMMARY.md index 8db74b49..01ea2dec 100644 --- a/doc/src/SUMMARY.md +++ b/doc/src/SUMMARY.md @@ -25,6 +25,7 @@ - [Federation OIDC: Expiring Group Membership](adr/0013-federation-oidc-expiring-group-membership.md) - [Application Credentials](adr/0014-application-credentials.md) - [Kubernetes Auth](adr/0015-kubernetes-auth.md) + - [Distributed Storage](adr/0016-raft-storage.md) - [Policy enforcement](policy.md) - [Fernet token]() - [Token payloads]() diff --git a/doc/src/adr/0016-raft-storage.md b/doc/src/adr/0016-raft-storage.md new file mode 100644 index 00000000..ff90c30b --- /dev/null +++ b/doc/src/adr/0016-raft-storage.md @@ -0,0 +1,135 @@ +# 16. Distributed Encrypted Storage via Raft and Fjall + +Date: 2026-04-12 + +## Status + +Proposed + +## Context + +The current implementation of keystone requires a storage back-end that provides +high availability, strong consistency for identity assignments, and +industry-leading security for PII and secrets. Traditional SQL databases often +introduce complexity in secret management and lack native "At-Rest" encryption +tied to the application's lifecycle. + +We need a solution that: + +- Guarantees Consistency: Identity changes must be linearizable. + +- Embedded Performance: An embedded LSM-tree to avoid external database network + overhead. + +- Cryptographic Sovereignty: Data must be encrypted before it hits the log or + the disk, ensuring a "Zero-Knowledge" storage layer. + +## Decision + +We will implement a distributed storage engine using OpenRaft for consensus and +Fjall as the local State Machine and Log Store. The architecture will follow the +"Vault-style" encryption model. + +1. The Storage Stack + +- Consensus: openraft (Rust) for managing cluster membership and log + replication. + +- LSM-Tree: fjall for high-performance, disk-backed storage of the state + machine. + +- Serialization: rmp-serde (MessagePack) for compact binary representation of + log entries. + +2. The Cryptographic Barrier + +To ensure data is never stored in plain-text on disk: + +- AEAD Encryption: Use AES-256-GCM for all payloads. + +- Log Binding: The Raft Index will be used as Associated Data (AD) for log + entries to prevent replay attacks. + +- Storage Binding: The Primary Key (e.g., UserID) will be used as AD for FjallDB + entries to prevent key-substitution attacks. + +- Key Hierarchy: A Master Key (KEK) provided via Environment/HSM will wrap a + volatile Data Encryption Key (DEK) kept in memory. + +3. Data Flow + +- Write Path: + + API receives a request → Serialize to MsgPack → Encrypt → Propose to OpenRaft. + + Apply step: Decrypt using Raft Index → Re-encrypt for storage → Write to + Fjall. + +- Read Path: + + Linearizable Read: Follower queries Leader for ReadIndex → Follower waits for + local apply → Decrypt from Fjall → Return over mTLS. + +## Technical Specifications + +### gRPC Definitions + +The internal Raft communication will use an opaque binary payload to keep the +consensus layer decoupled from the IAM logic. + +Protocol Buffers + +```rust + +message RaftEntry { + uint64 term = 1; + uint64 index = 2; + + // Optional Membership config. + Membership membership = 3; + + // Optional Store request. + // [12b Nonce][Ciphertext][16b Tag] } + optional bytes app_data = 4; +} +``` + +### Type Configurations (openraft) + +```rust +openraft::declare_raft_types!( + pub KeystoneConfig: + D = EncryptedBlob, // Vec wrapper + R = Response, // Ephemeral, plain-text over mTLS + NodeId = u64, + Node = BasicNode, +); +``` + +## Consequences + +### Positive + +- Security: Compromising the disk or the Raft log does not leak user secrets. + +- Performance: Fjall provides SSD-optimized writes and efficient prefix-seeking + for IAM queries. + +- Simplicity: No external dependency on Postgres/MySQL; the binary is + self-contained. Operator is able to select the traditional SQL backend drivers + though. + +### Negative / Risks + +- CPU Overhead: Every write/read involves AES-GCM operations. + +- Operational Complexity: Cluster forming, backup/restore operations are now + part of the Keystone operations. + +- Stale Reads: If not configured correctly, followers might serve stale identity + data unless the ReadIndex protocol is strictly followed. + +## Compliance + +All secret handling must implement the Zeroize trait to ensure plain-text data +is wiped from RAM immediately after gRPC transmission.