Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2/n] Bifrost provider configuration improvements pt1 #1322

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ derive_builder = { workspace = true }
derive_more = { workspace = true }
drain = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
humantime = { workspace = true }
once_cell = { workspace = true }
rocksdb = { workspace = true, optional = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
smallvec = { version = "1.13.2", features = ["serde"] }
static_assertions = { workspace = true }
strum = { workspace = true }
Expand Down
7 changes: 4 additions & 3 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ impl BifrostInner {
fn provider_for(&self, kind: ProviderKind) -> &dyn LogletProvider {
self.providers[kind]
.get_or_init(|| {
let provider = crate::loglet::create_provider(kind, &self.opts);
let provider = crate::loglet::create_provider(kind, &self.opts)
.expect("provider is able to get created");
if let Err(e) = provider.start() {
error!("Failed to start loglet provider {}: {}", kind, e);
// todo: Handle provider errors by a graceful system shutdown
Expand Down Expand Up @@ -364,7 +365,7 @@ mod tests {
let memory_provider = MemoryLogletProvider::with_init_delay(delay);

let bifrost_opts = Options {
default_provider: ProviderKind::Memory,
default_provider: ProviderKind::InMemory,
..Options::default()
};
let bifrost_svc = bifrost_opts.build(num_partitions);
Expand All @@ -373,7 +374,7 @@ mod tests {
// Inject out preconfigured memory provider
bifrost
.inner()
.inject_provider(ProviderKind::Memory, memory_provider);
.inject_provider(ProviderKind::InMemory, memory_provider);

// start bifrost service in the background
bifrost_svc.start().await.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ pub enum Error {
#[error(transparent)]
LogStoreError(#[from] LogStoreError),
}

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum ProviderError {
Shutdown(#[from] ShutdownError),
Other(#[from] anyhow::Error),
}
2 changes: 1 addition & 1 deletion crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod watchdog;
use std::collections::HashMap;

pub use bifrost::Bifrost;
pub use error::Error;
pub use error::{Error, ProviderError};
pub use options::Options;
pub use read_stream::LogReadStream;
use restate_types::logs::LogId;
Expand Down
33 changes: 14 additions & 19 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use enum_map::Enum;
use restate_types::logs::{Lsn, Payload, SequenceNumber};

use crate::metadata::LogletParams;
use crate::{Error, LogRecord, LsnExt, Options};
use crate::{Error, LogRecord, LsnExt, Options, ProviderError};

/// An enum with the list of supported loglet providers.
/// For each variant we must have a corresponding implementation of the
Expand All @@ -34,35 +34,30 @@ use crate::{Error, LogRecord, LsnExt, Options};
strum_macros::EnumIter,
strum_macros::Display,
)]
#[serde(rename_all = "snake_case")]
pub enum ProviderKind {
#[cfg(any(test, feature = "local_loglet"))]
/// A local rocksdb-backed loglet.
Local,
#[cfg(any(test, feature = "memory_loglet"))]
Memory,
}

pub fn provider_default_config(kind: ProviderKind) -> serde_json::Value {
match kind {
#[cfg(any(test, feature = "local_loglet"))]
ProviderKind::Local => crate::loglets::local_loglet::default_config(),
#[cfg(any(test, feature = "memory_loglet"))]
ProviderKind::Memory => crate::loglets::memory_loglet::default_config(),
}
/// An in-memory loglet, primarily for testing.
InMemory,
}

// why? because if all loglet features are disabled, clippy will complain about options being
// unused.
#[allow(unused_variables)]
pub fn create_provider(kind: ProviderKind, options: &Options) -> Arc<dyn LogletProvider> {
pub fn create_provider(
kind: ProviderKind,
options: &Options,
) -> Result<Arc<dyn LogletProvider>, ProviderError> {
match kind {
#[cfg(any(test, feature = "local_loglet"))]
ProviderKind::Local => crate::loglets::local_loglet::LocalLogletProvider::new(
&options.local_loglet_storage_path(),
&options.providers_config[kind],
),
ProviderKind::Local => Ok(crate::loglets::local_loglet::LocalLogletProvider::new(
options.local.clone(),
)?),
#[cfg(any(test, feature = "memory_loglet"))]
ProviderKind::Memory => crate::loglets::memory_loglet::MemoryLogletProvider::new(),
ProviderKind::InMemory => Ok(crate::loglets::memory_loglet::MemoryLogletProvider::new()?),
}
}

Expand Down Expand Up @@ -105,10 +100,10 @@ pub trait LogletProvider: Send + Sync {
async fn get_loglet(&self, params: &LogletParams) -> Result<Arc<dyn Loglet>, Error>;

// Hook for handling lazy initialization
fn start(&self) -> Result<(), Error>;
fn start(&self) -> Result<(), ProviderError>;

// Hook for handling graceful shutdown
async fn shutdown(&self) -> Result<(), Error> {
async fn shutdown(&self) -> Result<(), ProviderError> {
Ok(())
}
}
Expand Down
7 changes: 2 additions & 5 deletions crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::Path;
use std::sync::Arc;
use std::time::Instant;

use anyhow::Context;
use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DB};
use tracing::{debug, warn};

Expand Down Expand Up @@ -42,7 +40,7 @@ pub struct RocksDbLogStore {
}

impl RocksDbLogStore {
pub fn new(storage_path: &Path, options: &Options) -> anyhow::Result<Self> {
pub fn new(options: &Options) -> Result<Self, LogStoreError> {
let cache = if options.rocksdb_cache_size > 0 {
Some(Cache::new_lru_cache(options.rocksdb_cache_size))
} else {
Expand All @@ -68,8 +66,7 @@ impl RocksDbLogStore {
];
let db_options = db_options(options);

let db = DB::open_cf_descriptors(&db_options, storage_path, cfs)
.context("failed to open rocksdb for local loglet store")?;
let db = DB::open_cf_descriptors(&db_options, &options.path, cfs)?;

Ok(Self { db: Arc::new(db) })
}
Expand Down
4 changes: 0 additions & 4 deletions crates/bifrost/src/loglets/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ use self::log_store::RocksDbLogStore;
use self::log_store_writer::RocksDbLogWriterHandle;
use self::utils::OffsetWatch;

pub fn default_config() -> serde_json::Value {
serde_json::to_value(Options::default()).expect("default config to be serializable")
}

#[derive(Debug)]
pub struct LocalLoglet {
log_id: u64,
Expand Down
17 changes: 14 additions & 3 deletions crates/bifrost/src/loglets/local_loglet/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::{Path, PathBuf};
use std::time::Duration;

use restate_types::DEFAULT_STORAGE_DIRECTORY;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

#[derive(Debug, Serialize, Deserialize)]
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "options_schema", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "options_schema", schemars(rename = "LocalLoglet", default))]
#[builder(default)]
pub struct Options {
pub path: PathBuf,
pub rocksdb_threads: usize,
pub rocksdb_disable_statistics: bool,
pub rocksdb_disable_wal: bool,
Expand All @@ -24,7 +32,9 @@ pub struct Options {
/// write batch on every command.
pub writer_commit_batch_size_threshold: usize,
/// Trigger a commit when the time since the last commit exceeds this threshold.
pub writer_commit_time_interval: Duration,
#[serde_as(as = "serde_with::DisplayFromStr")]
#[cfg_attr(feature = "options_schema", schemars(with = "String"))]
pub writer_commit_time_interval: humantime::Duration,
/// The maximum number of write commands that can be queued.
pub writer_queue_len: usize,
/// If true, rocksdb flushes follow writing record batches, otherwise, we
Expand All @@ -35,6 +45,7 @@ pub struct Options {
impl Default for Options {
fn default() -> Self {
Self {
path: Path::new(DEFAULT_STORAGE_DIRECTORY).join("local_loglet"),
rocksdb_threads: 10,
// todo: enable when we have a way to expose the statistics through node-ctrl
rocksdb_disable_statistics: true,
Expand All @@ -43,7 +54,7 @@ impl Default for Options {
rocksdb_max_total_wal_size: 2 * (1 << 30), // 2 GiB
rocksdb_write_buffer_size: 0,
writer_commit_batch_size_threshold: 200,
writer_commit_time_interval: Duration::from_millis(13),
writer_commit_time_interval: Duration::from_millis(13).into(),
writer_queue_len: 200,
flush_wal_on_commit: true,
}
Expand Down
Loading
Loading