Skip to content

Commit

Permalink
Adding &self support to embedded indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
ecton committed May 9, 2022
1 parent 0ceeda2 commit 3fd0c47
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 48 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
function previously was returning `State::next_transaction_id()`, which made
the name a little weird. To preserve existing behavior, use
`log.state().next_transaction_id()`.
- `EmbeddedIndex::index` has been moved to a new trait, `Indexer<T>`.
`EmbeddedIndex::Reducer` has been renamed to `EmbeddedIndex::Indexer`, and now
requires both `Reducer` and `Indexer` to be implemented.

This change also adds `&self` to the `index()` function's signature, allowing
the embedded indexer to have a configuration/state.

### Fixed

Expand Down
24 changes: 16 additions & 8 deletions nebari/examples/embedded-indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::convert::Infallible;
use byteorder::BigEndian;
use nanorand::{Pcg64, Rng};
use nebari::{
tree::{EmbeddedIndex, Reducer, Root, ScanEvaluation, Serializable, VersionedTreeRoot},
tree::{
EmbeddedIndex, Indexer, Reducer, Root, ScanEvaluation, Serializable, VersionedTreeRoot,
},
Error,
};

Expand Down Expand Up @@ -55,21 +57,27 @@ pub struct Zeroes(pub u32);

impl EmbeddedIndex for Zeroes {
type Reduced = Self;
type Reducer = ZeroesReducer;
type Indexer = ZeroesIndexer;
}

#[derive(Default, Clone, Debug)]
pub struct ZeroesIndexer;

fn index(_key: &nebari::ArcBytes<'_>, value: Option<&nebari::ArcBytes<'static>>) -> Self {
Self(
impl Indexer<Zeroes> for ZeroesIndexer {
fn index(
&self,
_key: &nebari::ArcBytes<'_>,
value: Option<&nebari::ArcBytes<'static>>,
) -> Zeroes {
Zeroes(
value
.map(|bytes| bytes.iter().filter(|&b| b as char == '0').count())
.unwrap_or_default() as u32,
)
}
}

#[derive(Default, Clone, Debug)]
pub struct ZeroesReducer;

impl Reducer<Zeroes> for ZeroesReducer {
impl Reducer<Zeroes> for ZeroesIndexer {
fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> Zeroes
where
Indexes: IntoIterator<Item = &'a Zeroes, IntoIter = IndexesIter> + ExactSizeIterator,
Expand Down
6 changes: 6 additions & 0 deletions nebari/src/tree/btree_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ pub trait Reducer<Index, ReducedIndex = Index>: Debug + Clone + Send + Sync {
ReducedIndexesIter: Iterator<Item = &'a ReducedIndex> + ExactSizeIterator + Clone;
}

/// Creates an `Index` from a key and value.
pub trait Indexer<Index>: Debug + Send + Sync {
/// Index the key and value.
fn index(&self, key: &ArcBytes<'_>, value: Option<&ArcBytes<'static>>) -> Index;
}

impl<Index> Reducer<Index, ()> for () {
fn reduce<'a, Indexes, IndexesIter>(&self, _indexes: Indexes) -> Self
where
Expand Down
26 changes: 13 additions & 13 deletions nebari/src/tree/by_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ where
}

#[derive(Clone, Default, Debug)]
pub struct ByIdReducer<EmbeddedReducer>(pub EmbeddedReducer);
pub struct ByIdIndexer<EmbeddedIndexer>(pub EmbeddedIndexer);

impl<EmbeddedReducer, EmbeddedIndex, EmbeddedStats>
impl<EmbeddedIndexer, EmbeddedIndex, EmbeddedStats>
Reducer<VersionedByIdIndex<EmbeddedIndex>, ByIdStats<EmbeddedStats>>
for ByIdReducer<EmbeddedReducer>
for ByIdIndexer<EmbeddedIndexer>
where
EmbeddedReducer: Reducer<EmbeddedIndex, EmbeddedStats>,
EmbeddedIndex: super::EmbeddedIndex<Reducer = EmbeddedReducer, Reduced = EmbeddedStats>,
EmbeddedIndexer: Reducer<EmbeddedIndex, EmbeddedStats>,
EmbeddedIndex: super::EmbeddedIndex<Indexer = EmbeddedIndexer, Reduced = EmbeddedStats>,
{
fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> ByIdStats<EmbeddedStats>
where
Expand Down Expand Up @@ -192,12 +192,12 @@ where
}
}

impl<EmbeddedReducer, EmbeddedIndex, EmbeddedStats>
impl<EmbeddedIndexer, EmbeddedIndex, EmbeddedStats>
Reducer<UnversionedByIdIndex<EmbeddedIndex>, ByIdStats<EmbeddedStats>>
for ByIdReducer<EmbeddedReducer>
for ByIdIndexer<EmbeddedIndexer>
where
EmbeddedReducer: Reducer<EmbeddedIndex, EmbeddedStats>,
EmbeddedIndex: super::EmbeddedIndex<Reducer = EmbeddedReducer, Reduced = EmbeddedStats>,
EmbeddedIndexer: Reducer<EmbeddedIndex, EmbeddedStats>,
EmbeddedIndex: super::EmbeddedIndex<Indexer = EmbeddedIndexer, Reduced = EmbeddedStats>,
{
fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> ByIdStats<EmbeddedStats>
where
Expand Down Expand Up @@ -226,18 +226,18 @@ where
}
}

impl<EmbeddedReducer> ByIdReducer<EmbeddedReducer> {
impl<EmbeddedIndexer> ByIdIndexer<EmbeddedIndexer> {
fn reduce<'a, EmbeddedIndex, EmbeddedStats, Id, Indexes, IndexesIter>(
&self,
values: Indexes,
) -> ByIdStats<EmbeddedStats>
where
Id: IdIndex<EmbeddedIndex> + 'a,
EmbeddedIndex:
super::EmbeddedIndex<Reducer = EmbeddedReducer, Reduced = EmbeddedStats> + 'a,
super::EmbeddedIndex<Indexer = EmbeddedIndexer, Reduced = EmbeddedStats> + 'a,
Indexes: IntoIterator<Item = &'a Id, IntoIter = IndexesIter> + ExactSizeIterator,
IndexesIter: Iterator<Item = &'a Id> + ExactSizeIterator + Clone,
EmbeddedReducer: Reducer<EmbeddedIndex, EmbeddedStats>,
EmbeddedIndexer: Reducer<EmbeddedIndex, EmbeddedStats>,
{
let values = values.into_iter();
let (alive_keys, deleted_keys, total_indexed_bytes) = values
Expand Down Expand Up @@ -279,7 +279,7 @@ impl<EmbeddedReducer> ByIdReducer<EmbeddedReducer> {
+ ExactSizeIterator,
ReducedIndexesIter:
Iterator<Item = &'a ByIdStats<EmbeddedStats>> + ExactSizeIterator + Clone,
EmbeddedReducer: Reducer<EmbeddedIndex, EmbeddedStats>,
EmbeddedIndexer: Reducer<EmbeddedIndex, EmbeddedStats>,
{
let values = values.into_iter();
ByIdStats {
Expand Down
14 changes: 7 additions & 7 deletions nebari/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ mod versioned;
pub(crate) const DEFAULT_MAX_ORDER: usize = 1000;

pub use self::{
btree_entry::{BTreeEntry, BTreeNode, KeyOperation, Reducer},
btree_entry::{BTreeEntry, BTreeNode, Indexer, KeyOperation, Reducer},
by_id::{ByIdStats, UnversionedByIdIndex, VersionedByIdIndex},
by_sequence::{BySequenceIndex, BySequenceStats, SequenceId},
interior::{Interior, Pointer},
Expand Down Expand Up @@ -1803,10 +1803,7 @@ pub trait EmbeddedIndex: Serializable + Clone + Debug + Send + Sync + 'static {
/// The reduced representation of this index.
type Reduced: Serializable + Clone + Debug + Send + Sync + 'static;
/// The reducer that reduces arrays of `Self` or `Self::Reduced` into `Self::Reduced`.
type Reducer: Reducer<Self, Self::Reduced>;

/// Index the key and value.
fn index(key: &ArcBytes<'_>, value: Option<&ArcBytes<'static>>) -> Self;
type Indexer: Indexer<Self> + Reducer<Self, Self::Reduced>;
}

/// A type that can be serialized and deserialized.
Expand All @@ -1821,8 +1818,11 @@ pub trait Serializable: Send + Sync + Sized + 'static {

impl EmbeddedIndex for () {
type Reduced = Self;
type Reducer = Self;
fn index(_key: &ArcBytes<'_>, _value: Option<&ArcBytes<'static>>) -> Self {}
type Indexer = Self;
}

impl Indexer<()> for () {
fn index(&self, _key: &ArcBytes<'_>, _value: Option<&ArcBytes<'static>>) -> Self {}
}

impl Serializable for () {
Expand Down
21 changes: 9 additions & 12 deletions nebari/src/tree/unversioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use crate::{
roots::AbortError,
transaction::TransactionId,
tree::{
btree_entry::{KeyOperation, ModificationContext, NodeInclusion, ScanArgs},
by_id::ByIdReducer,
btree_entry::{Indexer, KeyOperation, ModificationContext, NodeInclusion, ScanArgs},
by_id::ByIdIndexer,
copy_chunk, dynamic_order,
versioned::ChangeResult,
BTreeNode, EmbeddedIndex, PageHeader, Reducer, Root,
BTreeNode, PageHeader, Root,
},
vault::AnyVault,
ArcBytes, ChunkCache, ErrorKind,
Expand Down Expand Up @@ -78,16 +78,15 @@ where
) -> Result<(), Error> {
modification.reverse()?;

let total_keys = self
.by_id_root
.stats::<ByIdReducer<<Index as EmbeddedIndex>::Reducer>>(self.reducer())
.total_keys()
+ modification.keys.len() as u64;
let total_keys =
self.by_id_root.stats(self.reducer()).total_keys() + modification.keys.len() as u64;
let by_id_order = dynamic_order(total_keys, max_order);
let minimum_children = by_id_order / 2 - 1;
let minimum_children =
minimum_children.min(usize::try_from(total_keys).unwrap_or(usize::MAX));

let reducer = self.reducer.clone();

while !modification.keys.is_empty() {
match self.by_id_root.modify(
&mut modification,
Expand All @@ -107,7 +106,7 @@ where
Ok(KeyOperation::Set(UnversionedByIdIndex {
value_length,
position,
embedded: Index::index(key, Some(value)),
embedded: reducer.0.index(key, Some(value)),
}))
} else {
Ok(KeyOperation::Remove)
Expand Down Expand Up @@ -142,13 +141,11 @@ where
impl<EmbeddedIndex> Root for UnversionedTreeRoot<EmbeddedIndex>
where
EmbeddedIndex: Clone + super::EmbeddedIndex + 'static,
ByIdReducer<EmbeddedIndex::Reducer>:
Reducer<UnversionedByIdIndex<EmbeddedIndex>, ByIdStats<EmbeddedIndex::Reduced>>,
{
const HEADER: PageHeader = PageHeader::UnversionedHeader;
type Index = UnversionedByIdIndex<EmbeddedIndex>;
type ReducedIndex = ByIdStats<EmbeddedIndex::Reduced>;
type Reducer = ByIdReducer<EmbeddedIndex::Reducer>;
type Reducer = ByIdIndexer<EmbeddedIndex::Indexer>;

fn default_with(reducer: Self::Reducer) -> Self {
Self {
Expand Down
18 changes: 10 additions & 8 deletions nebari/src/tree/versioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
roots::AbortError,
transaction::TransactionId,
tree::{
btree_entry::{KeyOperation, ModificationContext, NodeInclusion, ScanArgs},
by_id::ByIdReducer,
btree_entry::{Indexer, KeyOperation, ModificationContext, NodeInclusion, ScanArgs},
by_id::ByIdIndexer,
by_sequence::{BySequenceReducer, SequenceId},
copy_chunk, dynamic_order,
key_entry::KeyEntry,
Expand Down Expand Up @@ -57,20 +57,20 @@ where
pub by_id_root:
BTreeEntry<VersionedByIdIndex<EmbeddedIndex>, ByIdStats<EmbeddedIndex::Reduced>>,

reducer: ByIdReducer<EmbeddedIndex::Reducer>,
reducer: ByIdIndexer<EmbeddedIndex::Indexer>,
}
impl<EmbeddedIndex> Default for VersionedTreeRoot<EmbeddedIndex>
where
EmbeddedIndex: super::EmbeddedIndex + Clone + Debug + 'static,
EmbeddedIndex::Reducer: Default,
EmbeddedIndex::Indexer: Default,
{
fn default() -> Self {
Self {
transaction_id: TransactionId(0),
sequence: SequenceId(0),
by_sequence_root: BTreeEntry::default(),
by_id_root: BTreeEntry::default(),
reducer: ByIdReducer(<EmbeddedIndex::Reducer as Default>::default()),
reducer: ByIdIndexer(<EmbeddedIndex::Indexer as Default>::default()),
}
}
}
Expand All @@ -92,7 +92,7 @@ pub enum Children<Index, ReducedIndex> {
impl<EmbeddedIndex> VersionedTreeRoot<EmbeddedIndex>
where
EmbeddedIndex: super::EmbeddedIndex + Clone + Debug + 'static,
ByIdReducer<EmbeddedIndex::Reducer>:
ByIdIndexer<EmbeddedIndex::Indexer>:
Reducer<VersionedByIdIndex<EmbeddedIndex>, ByIdStats<EmbeddedIndex::Reduced>>,
{
fn modify_sequence_root(
Expand Down Expand Up @@ -165,6 +165,8 @@ where
let by_id_minimum_children =
by_id_minimum_children.min(usize::try_from(total_id_records).unwrap_or(usize::MAX));

let reducer = self.reducer.clone();

while !modification.keys.is_empty() {
match self.by_id_root.modify(
&mut modification,
Expand All @@ -185,7 +187,7 @@ where
} else {
(0, 0)
};
let embedded = EmbeddedIndex::index(key, value);
let embedded = reducer.0.index(key, value);
changes.current_sequence = changes
.current_sequence
.next_sequence()
Expand Down Expand Up @@ -249,7 +251,7 @@ where
const HEADER: PageHeader = PageHeader::VersionedHeader;
type Index = VersionedByIdIndex<EmbeddedIndex>;
type ReducedIndex = ByIdStats<EmbeddedIndex::Reduced>;
type Reducer = ByIdReducer<EmbeddedIndex::Reducer>;
type Reducer = ByIdIndexer<EmbeddedIndex::Indexer>;

fn default_with(reducer: Self::Reducer) -> Self {
Self {
Expand Down

0 comments on commit 3fd0c47

Please sign in to comment.