Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove RocksDb writer thread and disable WAL #1456

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions crates/storage-query-datafusion/src/idempotency/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn get_idempotency_key() {
.default_runtime_handle(tokio::runtime::Handle::current())
.build()
.expect("task_center builds");
let (mut engine, shutdown) = tc
let mut engine = tc
.run_in_scope("mock-query-engine", None, MockQueryEngine::create())
.await;

Expand Down Expand Up @@ -96,6 +96,4 @@ async fn get_idempotency_key() {
)
)
);

shutdown.await;
}
4 changes: 1 addition & 3 deletions crates/storage-query-datafusion/src/journal/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn get_entries() {
.default_runtime_handle(tokio::runtime::Handle::current())
.build()
.expect("task_center builds");
let (mut engine, shutdown) = tc
let mut engine = tc
.run_in_scope("mock-query-engine", None, MockQueryEngine::create())
.await;

Expand Down Expand Up @@ -127,6 +127,4 @@ async fn get_entries() {
)
)
);

shutdown.await;
}
19 changes: 5 additions & 14 deletions crates/storage-query-datafusion/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use restate_types::config::{CommonOptions, QueryEngineOptions, WorkerOptions};
use restate_types::identifiers::{DeploymentId, ServiceRevision};
use restate_types::invocation::ServiceType;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;

#[derive(Default, Clone, Debug)]
Expand Down Expand Up @@ -87,35 +86,27 @@ impl MockQueryEngine {
+ Debug
+ Clone
+ 'static,
) -> (Self, impl Future<Output = ()>) {
) -> Self {
// Prepare Rocksdb
task_center().run_in_scope_sync("db-manager-init", None, || {
RocksDbManager::init(Constant::new(CommonOptions::default()))
});
let worker_options = WorkerOptions::default();
let (rocksdb, writer) = RocksDBStorage::open(
let rocksdb = RocksDBStorage::open(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb),
)
.await
.expect("RocksDB storage creation should succeed");
let (signal, watch) = drain::channel();
let writer_join_handle = writer.run(watch);

let query_engine = Self(
Self(
rocksdb.clone(),
QueryContext::from_options(&QueryEngineOptions::default(), rocksdb, status, schemas)
.unwrap(),
);

// Return shutdown future
(query_engine, async {
signal.drain().await;
writer_join_handle.await.unwrap().unwrap();
})
)
}

pub async fn create() -> (Self, impl Future<Output = ()>) {
pub async fn create() -> Self {
Self::create_with(MockStatusHandle::default(), MockSchemas::default()).await
}

Expand Down
4 changes: 1 addition & 3 deletions crates/storage-query-datafusion/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn query_sys_invocation() {
.default_runtime_handle(tokio::runtime::Handle::current())
.build()
.expect("task_center builds");
let (mut engine, shutdown) = tc
let mut engine = tc
.run_in_scope(
"mock-query-engine",
None,
Expand Down Expand Up @@ -117,6 +117,4 @@ async fn query_sys_invocation() {
}
))
);

shutdown.await;
}
8 changes: 1 addition & 7 deletions crates/storage-rocksdb/benches/basic_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ async fn writing_to_rocksdb(worker_options: WorkerOptions) {
//
// setup
//
let (mut rocksdb, writer) = RocksDBStorage::open(
let mut rocksdb = RocksDBStorage::open(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb),
)
.await
.expect("RocksDB storage creation should succeed");

let (signal, watch) = drain::channel();
let writer_join_handler = writer.run(watch);

//
// write
//
Expand All @@ -45,9 +42,6 @@ async fn writing_to_rocksdb(worker_options: WorkerOptions) {
}
txn.commit().await.unwrap();
}

signal.drain().await;
writer_join_handler.await.unwrap().unwrap();
}

fn basic_writing_reading_benchmark(c: &mut Criterion) {
Expand Down
43 changes: 16 additions & 27 deletions crates/storage-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ pub mod scan;
pub mod service_status_table;
pub mod state_table;
pub mod timer_table;
mod writer;

use crate::keys::TableKey;
use crate::scan::{PhysicalScan, TableScan};
use crate::writer::{Writer, WriterHandle};
use crate::TableKind::{
Deduplication, Idempotency, Inbox, InvocationStatus, Journal, Outbox, PartitionStateMachine,
ServiceStatus, State, Timers,
Expand All @@ -49,17 +47,12 @@ use rocksdb::PrefixRange;
use rocksdb::ReadOptions;
use std::sync::Arc;

pub use writer::JoinHandle as RocksDBWriterJoinHandle;
pub use writer::Writer as RocksDBWriter;

pub type DB = rocksdb::OptimisticTransactionDB<MultiThreaded>;
type TransactionDB<'a> = rocksdb::Transaction<'a, DB>;

pub type DBIterator<'b> = DBRawIteratorWithThreadMode<'b, DB>;
pub type DBIteratorTransaction<'b> = DBRawIteratorWithThreadMode<'b, rocksdb::Transaction<'b, DB>>;

type WriteBatch = rocksdb::WriteBatchWithTransaction<true>;

// matches the default directory name
const DB_NAME: &str = "db";

Expand Down Expand Up @@ -156,7 +149,6 @@ pub enum BuildError {

pub struct RocksDBStorage {
db: Arc<DB>,
writer_handle: WriterHandle,
key_buffer: BytesMut,
value_buffer: BytesMut,
}
Expand All @@ -165,7 +157,6 @@ impl std::fmt::Debug for RocksDBStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RocksDBStorage")
.field("db", &self.db)
.field("writer_handle", &self.writer_handle)
.field("key_buffer", &self.key_buffer)
.field("value_buffer", &self.value_buffer)
.finish()
Expand All @@ -176,7 +167,6 @@ impl Clone for RocksDBStorage {
fn clone(&self) -> Self {
RocksDBStorage {
db: self.db.clone(),
writer_handle: self.writer_handle.clone(),
key_buffer: BytesMut::default(),
value_buffer: BytesMut::default(),
}
Expand Down Expand Up @@ -231,7 +221,7 @@ impl RocksDBStorage {
pub async fn open(
mut storage_opts: impl Updateable<StorageOptions> + Send + 'static,
updateable_opts: impl Updateable<RocksDbOptions> + Send + 'static,
) -> std::result::Result<(Self, Writer), BuildError> {
) -> std::result::Result<Self, BuildError> {
let cfs = vec![
//
// keyed by partition key + user key
Expand Down Expand Up @@ -272,18 +262,11 @@ impl RocksDBStorage {
.await
.map_err(|_| ShutdownError)??;

let writer = Writer::new(rdb.clone(), storage_opts);
let writer_handle = writer.create_writer_handle();

Ok((
Self {
db: rdb,
writer_handle,
key_buffer: BytesMut::default(),
value_buffer: BytesMut::default(),
},
writer,
))
Ok(Self {
db: rdb,
key_buffer: BytesMut::default(),
value_buffer: BytesMut::default(),
})
}

fn table_handle(&self, table_kind: TableKind) -> Arc<BoundColumnFamily> {
Expand Down Expand Up @@ -341,7 +324,6 @@ impl RocksDBStorage {
db,
key_buffer: &mut self.key_buffer,
value_buffer: &mut self.value_buffer,
writer_handle: &self.writer_handle,
}
}
}
Expand Down Expand Up @@ -406,7 +388,6 @@ pub struct RocksDBTransaction<'a> {
db: Arc<DB>,
key_buffer: &'a mut BytesMut,
value_buffer: &'a mut BytesMut,
writer_handle: &'a WriterHandle,
}

impl<'a> RocksDBTransaction<'a> {
Expand Down Expand Up @@ -446,8 +427,16 @@ impl<'a> Transaction for RocksDBTransaction<'a> {
// writes to RocksDB. However, it is safe to write the WriteBatch for a given partition,
// because there can only be a single writer (the leading PartitionProcessor).
let write_batch = self.txn.get_writebatch();

self.writer_handle.write(write_batch).await
// todo: make async and use configuration to control use of WAL
if write_batch.is_empty() {
return Ok(());
}
let mut opts = rocksdb::WriteOptions::default();
// We disable WAL since bifrost is our durable distributed log.
opts.disable_wal(true);
self.db
.write_opt(&write_batch, &rocksdb::WriteOptions::default())
.map_err(|error| StorageError::Generic(error.into()))
}
}

Expand Down
4 changes: 1 addition & 3 deletions crates/storage-rocksdb/tests/idempotency_table_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const IDEMPOTENCY_ID_3: IdempotencyId =

#[tokio::test]
async fn test_idempotency_key() {
let (mut rocksdb, close) = storage_test_environment().await;
let mut rocksdb = storage_test_environment().await;

// Fill in some data
let mut txn = rocksdb.transaction();
Expand Down Expand Up @@ -114,6 +114,4 @@ async fn test_idempotency_key() {
.unwrap(),
None
);

close.await;
}
19 changes: 4 additions & 15 deletions crates/storage-rocksdb/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use restate_types::invocation::{InvocationTarget, ServiceInvocation, Source, Spa
use restate_types::state_mut::ExternalStateMutation;
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::pin::pin;
use tokio_stream::StreamExt;

Expand All @@ -34,7 +33,7 @@ mod state_table_test;
mod timer_table_test;
mod virtual_object_status_table_test;

async fn storage_test_environment() -> (RocksDBStorage, impl Future<Output = ()>) {
async fn storage_test_environment() -> RocksDBStorage {
//
// create a rocksdb storage from options
//
Expand All @@ -46,25 +45,17 @@ async fn storage_test_environment() -> (RocksDBStorage, impl Future<Output = ()>
RocksDbManager::init(Constant::new(CommonOptions::default()))
});
let worker_options = WorkerOptions::default();
let (rocksdb, writer) = RocksDBStorage::open(
RocksDBStorage::open(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb),
)
.await
.expect("RocksDB storage creation should succeed");

let (signal, watch) = drain::channel();
let writer_join_handle = writer.run(watch);

(rocksdb, async {
signal.drain().await;
writer_join_handle.await.unwrap().unwrap();
})
.expect("RocksDB storage creation should succeed")
}

#[tokio::test]
async fn test_read_write() {
let (rocksdb, close) = storage_test_environment().await;
let rocksdb = storage_test_environment().await;

//
// run the tests
Expand All @@ -76,8 +67,6 @@ async fn test_read_write() {
invocation_status_table_test::run_tests(rocksdb.clone()).await;
virtual_object_status_table_test::run_tests(rocksdb.clone()).await;
timer_table_test::run_tests(rocksdb).await;

close.await;
}

pub(crate) fn mock_service_invocation(service_id: ServiceId) -> ServiceInvocation {
Expand Down
4 changes: 1 addition & 3 deletions crates/storage-rocksdb/tests/state_table_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub(crate) async fn run_tests(mut rocksdb: RocksDBStorage) {

#[tokio::test]
async fn test_delete_all() {
let (mut rocksdb, close) = storage_test_environment().await;
let mut rocksdb = storage_test_environment().await;

let mut txn = rocksdb.transaction();

Expand Down Expand Up @@ -143,6 +143,4 @@ async fn test_delete_all() {
.await
.expect("should not fail")
.is_some());

close.await;
}
Loading
Loading