Skip to content

Commit

Permalink
Added self support to Reducer
Browse files Browse the repository at this point in the history
  • Loading branch information
ecton committed May 5, 2022
1 parent df8ddf1 commit d769a29
Show file tree
Hide file tree
Showing 16 changed files with 604 additions and 315 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Transaction IDs and Sequence IDs are now returned as new types,
`TransactionId` and `SequenceId` instead of `u64`.
- The `Root` trait now has a new associated type, `Reducer`. The `Reducer`
trait's methods now take `&self`. These changes allow dynamic reducers to be
used -- for example, in BonsaiDb, the View indexing system needs to call a
specific View's `reduce()` function. Without the ability to have `&self`
passed to the `Reducer`, there was no way to provide any state information to
the Reducer.

There are several changes related to this, including:

- `Root` no longer requires `Default`. Instead, a new function
`Root::default_with` is used to instantiate newly created tree states.
- `Root::tree()` requires that `Reducer` implements `Default`, but
`Root::tree_with_reducer()` can be used to create a `TreeRoot` with a
specific `Reducer` instance.
- `Root::deserialize` is provided an additional parameter: the reducer.

### Fixed

Expand Down
15 changes: 6 additions & 9 deletions benchmarks/benches/blobs/nebari.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use std::{
};

use nebari::{
io::{fs::StdFile, FileManager, ManagedFile, OpenableFile},
io::fs::StdFile,
tree::{State, TreeFile},
ArcBytes, ChunkCache,
ArcBytes, Context,
};
use tempfile::TempDir;

Expand Down Expand Up @@ -37,14 +37,11 @@ impl<B: NebariBenchmark> SimpleBench for InsertBlobs<B> {
config_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Result<Self, anyhow::Error> {
let tempfile = TempDir::new()?;
let manager = <<StdFile as ManagedFile>::Manager as Default>::default();
let file = manager.append(tempfile.path().join("tree"))?;
let state = State::initialized(file.id(), None);
let tree = TreeFile::<B::Root, StdFile>::new(
file,
state,
let tree = TreeFile::<B::Root, StdFile>::write(
tempfile.path().join("tree"),
State::default(),
&Context::default(),
None,
Some(ChunkCache::new(100, 160_384)),
)?;

Ok(Self {
Expand Down
36 changes: 13 additions & 23 deletions benchmarks/benches/logs/nebari.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use nebari::{
io::{fs::StdFile, FileManager, ManagedFile, OpenableFile},
io::fs::StdFile,
tree::{Modification, Operation, State, TreeFile},
ArcBytes, ChunkCache, Context,
};
Expand Down Expand Up @@ -39,14 +39,11 @@ impl<B: NebariBenchmark> SimpleBench for InsertLogs<B> {
config_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Result<Self, anyhow::Error> {
let tempfile = TempDir::new()?;
let manager = <<StdFile as ManagedFile>::Manager as Default>::default();
let file = manager.append(tempfile.path().join("tree"))?;
let state = State::initialized(file.id(), None);
let tree = TreeFile::<B::Root, StdFile>::new(
file,
state,
let tree = TreeFile::<B::Root, StdFile>::write(
tempfile.path().join("tree"),
State::default(),
&Context::default(),
None,
Some(ChunkCache::new(100, 160_384)),
)?;

Ok(Self {
Expand Down Expand Up @@ -106,14 +103,11 @@ impl<B: NebariBenchmark> SimpleBench for ReadLogs<B> {
_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Self::GroupState {
let tempfile = TempDir::new().unwrap();
let manager = <<StdFile as ManagedFile>::Manager as Default>::default();
let file = manager.append(tempfile.path().join("tree")).unwrap();
let state = State::initialized(file.id(), None);
let mut tree = TreeFile::<B::Root, StdFile>::new(
file,
state,
let mut tree = TreeFile::<B::Root, StdFile>::write(
tempfile.path().join("tree"),
State::default(),
&Context::default(),
None,
Some(ChunkCache::new(2000, 160_384)),
)
.unwrap();

Expand Down Expand Up @@ -198,17 +192,13 @@ impl<B: NebariBenchmark> SimpleBench for ScanLogs<B> {
_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Self::GroupState {
let tempfile = TempDir::new().unwrap();
let manager = <<StdFile as ManagedFile>::Manager as Default>::default();
let file = manager.append(tempfile.path().join("tree")).unwrap();
let state = State::initialized(file.id(), None);
let mut tree = TreeFile::<B::Root, StdFile>::new(
file,
state,
let mut tree = TreeFile::<B::Root, StdFile>::write(
tempfile.path().join("tree"),
State::default(),
&Context::default(),
None,
Some(ChunkCache::new(2000, 160_384)),
)
.unwrap();

config.for_each_database_chunk(1_000_000, |chunk| {
tree.modify(Modification {
transaction_id: None,
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/benches/nebari-bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub trait BenchConfig: Display {

pub trait NebariBenchmark {
const BACKEND: &'static str;
type Root: Root;
type Root: Root + Default;
}

pub struct VersionedBenchmark;
Expand Down
3 changes: 1 addition & 2 deletions fuzz/fuzz_targets/compare_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ fuzz_target!(|batches: Vec<BTreeSet<u16>>| {
let context = Context::default();
let file = NamedTempFile::new().unwrap();
let mut tree =
TreeFile::<Unversioned, StdFile>::write(&file, State::new(None, None), &context, None)
.unwrap();
TreeFile::<Unversioned, StdFile>::write(&file, State::default(), &context, None).unwrap();

let mut oracle = BTreeMap::new();
let ops = batches.iter().map(|b| b.len()).sum::<usize>();
Expand Down
24 changes: 15 additions & 9 deletions nebari/examples/embedded-indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ fn main() -> Result<(), Error> {
pub struct Zeroes(pub u32);

impl EmbeddedIndex for Zeroes {
type Reduced = Self;
type Reducer = ZeroesReducer;

fn index(_key: &nebari::ArcBytes<'_>, value: Option<&nebari::ArcBytes<'static>>) -> Self {
Self(
value
Expand All @@ -63,24 +66,27 @@ impl EmbeddedIndex for Zeroes {
}
}

impl Reducer<Self> for Zeroes {
fn reduce<'a, Indexes, IndexesIter>(indexes: Indexes) -> Self
#[derive(Default, Clone, Debug)]
pub struct ZeroesReducer;

impl Reducer<Zeroes> for ZeroesReducer {
fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> Zeroes
where
Indexes: IntoIterator<Item = &'a Self, IntoIter = IndexesIter> + ExactSizeIterator,
IndexesIter: Iterator<Item = &'a Self> + ExactSizeIterator + Clone,
Indexes: IntoIterator<Item = &'a Zeroes, IntoIter = IndexesIter> + ExactSizeIterator,
IndexesIter: Iterator<Item = &'a Zeroes> + ExactSizeIterator + Clone,
{
Self(indexes.into_iter().map(|i| i.0).sum())
Zeroes(indexes.into_iter().map(|i| i.0).sum())
}

fn rereduce<'a, ReducedIndexes, ReducedIndexesIter>(values: ReducedIndexes) -> Self
fn rereduce<'a, ReducedIndexes, ReducedIndexesIter>(&self, values: ReducedIndexes) -> Zeroes
where
Self: 'a,
ReducedIndexes:
IntoIterator<Item = &'a Self, IntoIter = ReducedIndexesIter> + ExactSizeIterator,
ReducedIndexesIter: Iterator<Item = &'a Self> + ExactSizeIterator + Clone,
IntoIterator<Item = &'a Zeroes, IntoIter = ReducedIndexesIter> + ExactSizeIterator,
ReducedIndexesIter: Iterator<Item = &'a Zeroes> + ExactSizeIterator + Clone,
{
// TODO change reduce to an iterator too
Self::reduce(values)
self.reduce(values)
}
}

Expand Down
58 changes: 40 additions & 18 deletions nebari/src/roots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use crate::{
io::{fs::StdFileManager, FileManager, ManagedFile},
transaction::{LogEntry, ManagedTransaction, TransactionId, TransactionManager},
tree::{
self, root::AnyTreeRoot, state::AnyTreeState, EmbeddedIndex, KeySequence, Modification,
Operation, Reducer, ScanEvaluation, SequenceId, State, TransactableCompaction, TreeFile,
TreeRoot, VersionedTreeRoot,
self,
root::{AnyReducer, AnyTreeRoot},
state::AnyTreeState,
EmbeddedIndex, KeySequence, Modification, Operation, ScanEvaluation, SequenceId, State,
TransactableCompaction, TreeFile, TreeRoot, VersionedTreeRoot,
},
vault::AnyVault,
ArcBytes, ChunkCache, ErrorKind,
Expand Down Expand Up @@ -107,11 +109,12 @@ impl<File: ManagedFile> Roots<File> {
if !path.exists() {
self.context().file_manager.append(&path)?;
}
let state = self.tree_state(root.name.clone());
let state = self.tree_state(root.clone());
Ok(Tree {
roots: self.clone(),
state,
vault: root.vault,
reducer: root.reducer,
name: root.name,
})
}
Expand Down Expand Up @@ -144,8 +147,8 @@ impl<File: ManagedFile> Roots<File> {
Ok(names)
}

fn tree_state<Root: tree::Root>(&self, name: impl Into<Cow<'static, str>>) -> State<Root> {
self.tree_states(&[Root::tree(name)])
fn tree_state<Root: tree::Root>(&self, root: TreeRoot<Root, File>) -> State<Root> {
self.tree_states(&[root])
.into_iter()
.next()
.unwrap()
Expand Down Expand Up @@ -417,7 +420,7 @@ impl<Root: tree::Root, File: ManagedFile> AnyTransactionTree<File> for Transacti

impl<File: ManagedFile, Index> TransactionTree<VersionedTreeRoot<Index>, File>
where
Index: Clone + Reducer<Index> + EmbeddedIndex + Debug + 'static,
Index: Clone + EmbeddedIndex + Debug + 'static,
{
/// Returns the latest sequence id.
pub fn current_sequence_id(&self) -> SequenceId {
Expand Down Expand Up @@ -572,7 +575,7 @@ impl<Root: tree::Root, File: ManagedFile> TransactionTree<Root, File> {
pub fn reduce<'keys, KeyRangeBounds>(
&mut self,
range: &'keys KeyRangeBounds,
) -> Result<Root::ReducedIndex, Error>
) -> Result<Option<Root::ReducedIndex>, Error>
where
KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
{
Expand Down Expand Up @@ -717,6 +720,7 @@ impl<M: FileManager> Config<M> {
pub struct Tree<Root: tree::Root, File: ManagedFile> {
roots: Roots<File>,
state: State<Root>,
reducer: Arc<dyn AnyReducer>,
vault: Option<Arc<dyn AnyVault>>,
name: Cow<'static, str>,
}
Expand All @@ -727,6 +731,7 @@ impl<Root: tree::Root, File: ManagedFile> Clone for Tree<Root, File> {
roots: self.roots.clone(),
state: self.state.clone(),
vault: self.vault.clone(),
reducer: self.reducer.clone(),
name: self.name.clone(),
}
}
Expand Down Expand Up @@ -765,7 +770,14 @@ impl<Root: tree::Root, File: ManagedFile> Tree<Root, File> {
}

fn begin_transaction(&self) -> Result<ExecutingTransaction<File>, Error> {
let mut root = Root::tree(self.name.clone());
let reducer = self
.reducer
.as_ref()
.as_any()
.downcast_ref::<Root::Reducer>()
.unwrap()
.clone();
let mut root = Root::tree_with_reducer(self.name.clone(), reducer);
if let Some(vault) = &self.vault {
root.vault = Some(vault.clone());
}
Expand Down Expand Up @@ -970,18 +982,14 @@ impl<Root: tree::Root, File: ManagedFile> Tree<Root, File> {
pub fn reduce<'keys, KeyRangeBounds>(
&self,
range: &'keys KeyRangeBounds,
) -> Result<Root::ReducedIndex, Error>
) -> Result<Option<Root::ReducedIndex>, Error>
where
KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
{
catch_compaction_and_retry(move || {
let mut tree = match self.open_for_read() {
Ok(tree) => tree,
Err(err) if err.kind.is_file_not_found() => {
return Ok(<Root::ReducedIndex as Reducer<Root::Index>>::reduce(
std::iter::empty(),
))
}
Err(err) if err.kind.is_file_not_found() => return Ok(None),
Err(err) => return Err(err),
};

Expand Down Expand Up @@ -1074,7 +1082,18 @@ impl<Root: tree::Root, File: ManagedFile> AnyTreeRoot<File> for Tree<Root, File>
}

fn default_state(&self) -> Box<dyn AnyTreeState> {
Box::new(State::<Root>::default())
Box::new(State::<Root>::new(
None,
None,
Root::default_with(
self.reducer
.as_ref()
.as_any()
.downcast_ref::<Root::Reducer>()
.unwrap()
.clone(),
),
))
}

fn begin_transaction(
Expand Down Expand Up @@ -1109,7 +1128,7 @@ impl<Root: tree::Root, File: ManagedFile> AnyTreeRoot<File> for Tree<Root, File>

impl<File: ManagedFile, Index> Tree<VersionedTreeRoot<Index>, File>
where
Index: EmbeddedIndex + Reducer<Index> + Clone + Debug + 'static,
Index: EmbeddedIndex + Clone + Debug + 'static,
{
/// Scans the tree for keys that are contained within `range`. If `forwards`
/// is true, scanning starts at the lowest sort-order key and scans forward.
Expand Down Expand Up @@ -1480,7 +1499,10 @@ mod tests {
compact_test::<Unversioned, _>(AnyFileManager::memory());
}

fn compact_test<R: Root, M: FileManager>(file_manager: M) {
fn compact_test<R: Root, M: FileManager>(file_manager: M)
where
R::Reducer: Default,
{
const OPERATION_COUNT: usize = 256;
const WORKER_COUNT: usize = 4;
let tempdir = tempdir().unwrap();
Expand Down
Loading

0 comments on commit d769a29

Please sign in to comment.