From 899768a16a4d8e3f8cc3f98d3df2e4494d8ca57f Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Tue, 7 Nov 2023 17:23:39 +0100 Subject: [PATCH] feat(iroh-sync): Queries and "views" (#1766) ## Description Implementation of #1667, extends #1701 With the following changes after discussions in Discord: * Remove the notion of views, instead embed the info in the query. If the high-level concept of views makes more sense for people from an API perspective, we can restore it in the client api. However under the hood the query details are different per view, this is why here I expose a single `Query` struct * Add a query builder with a typestate to only allow possible combinations * Add an index to the redb (fs) document store to make queries that are sorted by key, or that are filtered by key but not by author, efficient. This also is what allows to do queries for the "latest only" entry per key, without allocating the full result set. In the process I did a refactoring of the redb store to be safer to use. Especially, I moved the `ouroboros` self-referencing stuff into a `ranges` module, and encapsulated in an inner type to keep the self-referencing compilcations scoped. Also did something I had mind for a while: Add some type-safe abstractions around the range bounds constructions that are used when selecting on redb tables. All this turned out quite nice, IMO. Also contains the changes from #1772 : * Renames `get_one` to `get_exact` * Add flag to `get_exact` whether to include empty entries or not * Add `get_one` to `iroh::client::Doc` to get a single entry with the same query mechanisms as `get_many` Open questions and tasks: * Naming review of the query builder methods * Integration of the query parameters in the console ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [ ] Tests if relevant. --------- Co-authored-by: dignifiedquire --- iroh-sync/src/actor.rs | 41 +- iroh-sync/src/net/codec.rs | 14 +- iroh-sync/src/store.rs | 276 +++++++++- iroh-sync/src/store/fs.rs | 796 ++++++++------------------- iroh-sync/src/store/fs/bounds.rs | 296 ++++++++++ iroh-sync/src/store/fs/migrations.rs | 120 ++++ iroh-sync/src/store/fs/query.rs | 161 ++++++ iroh-sync/src/store/fs/ranges.rs | 278 ++++++++++ iroh-sync/src/store/memory.rs | 359 ++++++------ iroh-sync/src/store/util.rs | 89 +++ iroh-sync/src/sync.rs | 285 ++++++++-- iroh/examples/client.rs | 4 +- iroh/src/client.rs | 33 +- iroh/src/commands/sync.rs | 92 ++-- iroh/src/node.rs | 4 +- iroh/src/rpc_protocol.rs | 29 +- iroh/src/sync_engine/rpc.rs | 32 +- iroh/tests/sync.rs | 10 +- 18 files changed, 2006 insertions(+), 913 deletions(-) create mode 100644 iroh-sync/src/store/fs/bounds.rs create mode 100644 iroh-sync/src/store/fs/migrations.rs create mode 100644 iroh-sync/src/store/fs/query.rs create mode 100644 iroh-sync/src/store/fs/ranges.rs create mode 100644 iroh-sync/src/store/util.rs diff --git a/iroh-sync/src/actor.rs b/iroh-sync/src/actor.rs index d3aee5432d..221a70efe7 100644 --- a/iroh-sync/src/actor.rs +++ b/iroh-sync/src/actor.rs @@ -15,7 +15,7 @@ use tracing::{debug, error, error_span, trace, warn}; use crate::{ ranger::Message, - store::{self, GetFilter, ImportNamespaceOutcome}, + store::{self, ImportNamespaceOutcome, Query}, Author, AuthorHeads, AuthorId, Capability, CapabilityKind, ContentStatus, ContentStatusCallback, Event, NamespaceId, NamespaceSecret, PeerIdBytes, Replica, SignedEntry, SyncOutcome, @@ -122,13 +122,14 @@ enum ReplicaAction { #[debug("reply")] reply: oneshot::Sender>, }, - GetOne { + GetExact { author: AuthorId, key: Bytes, + include_empty: bool, reply: oneshot::Sender>>, }, GetMany { - filter: GetFilter, + query: Query, reply: flume::Sender>, }, DropReplica { @@ -366,26 +367,31 @@ impl SyncHandle { rx.await? } - // TODO: it would be great if this could be a sync method... pub async fn get_many( &self, namespace: NamespaceId, - filter: GetFilter, + query: Query, reply: flume::Sender>, ) -> Result<()> { - let action = ReplicaAction::GetMany { filter, reply }; + let action = ReplicaAction::GetMany { query, reply }; self.send_replica(namespace, action).await?; Ok(()) } - pub async fn get_one( + pub async fn get_exact( &self, namespace: NamespaceId, author: AuthorId, key: Bytes, + include_empty: bool, ) -> Result> { let (reply, rx) = oneshot::channel(); - let action = ReplicaAction::GetOne { author, key, reply }; + let action = ReplicaAction::GetExact { + author, + key, + include_empty, + reply, + }; self.send_replica(namespace, action).await?; rx.await? } @@ -595,17 +601,20 @@ impl Actor { let res = self.store.register_useful_peer(namespace, peer); send_reply(reply, res) } - ReplicaAction::GetOne { author, key, reply } => { - send_reply_with(reply, self, move |this| { - this.states.ensure_open(&namespace)?; - this.store.get_one(namespace, author, key) - }) - } - ReplicaAction::GetMany { filter, reply } => { + ReplicaAction::GetExact { + author, + key, + include_empty, + reply, + } => send_reply_with(reply, self, move |this| { + this.states.ensure_open(&namespace)?; + this.store.get_exact(namespace, author, key, include_empty) + }), + ReplicaAction::GetMany { query, reply } => { let iter = self .states .ensure_open(&namespace) - .and_then(|_| self.store.get_many(namespace, filter)); + .and_then(|_| self.store.get_many(namespace, query)); iter_to_channel(reply, iter) } ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| { diff --git a/iroh-sync/src/net/codec.rs b/iroh-sync/src/net/codec.rs index 18876153da..46f67c0fd3 100644 --- a/iroh-sync/src/net/codec.rs +++ b/iroh-sync/src/net/codec.rs @@ -295,8 +295,8 @@ impl BobState { mod tests { use crate::{ actor::OpenOpts, - keys::{AuthorId, NamespaceSecret}, - store::{self, GetFilter, Store}, + store::{self, Query, Store}, + AuthorId, NamespaceSecret, }; use anyhow::Result; use iroh_bytes::Hash; @@ -330,7 +330,7 @@ mod tests { assert_eq!( bob_store - .get_many(bob_replica.id(), GetFilter::All) + .get_many(bob_replica.id(), Query::all(),) .unwrap() .collect::>>() .unwrap() @@ -339,7 +339,7 @@ mod tests { ); assert_eq!( alice_store - .get_many(alice_replica.id(), GetFilter::All) + .get_many(alice_replica.id(), Query::all()) .unwrap() .collect::>>() .unwrap() @@ -396,7 +396,7 @@ mod tests { assert_eq!( bob_store - .get_many(namespace.id(), GetFilter::All) + .get_many(namespace.id(), Query::all()) .unwrap() .collect::>>() .unwrap() @@ -405,7 +405,7 @@ mod tests { ); assert_eq!( alice_store - .get_many(namespace.id(), GetFilter::All) + .get_many(namespace.id(), Query::all()) .unwrap() .collect::>>() .unwrap() @@ -461,7 +461,7 @@ mod tests { fn get_messages(store: &S, namespace: NamespaceId) -> Vec { let mut msgs = store - .get_many(namespace, GetFilter::All) + .get_many(namespace, Query::all()) .unwrap() .map(|entry| { entry.map(|entry| { diff --git a/iroh-sync/src/store.rs b/iroh-sync/src/store.rs index 50b70b4e9c..8121f16406 100644 --- a/iroh-sync/src/store.rs +++ b/iroh-sync/src/store.rs @@ -3,6 +3,7 @@ use std::num::{NonZeroU64, NonZeroUsize}; use anyhow::Result; +use bytes::Bytes; use iroh_bytes::Hash; use rand_core::CryptoRngCore; use serde::{Deserialize, Serialize}; @@ -19,6 +20,7 @@ use crate::{ pub mod fs; pub mod memory; mod pubkeys; +mod util; pub use pubkeys::*; /// Number of [`PeerIdBytes`] objects to cache per document. @@ -126,16 +128,19 @@ pub trait Store: std::fmt::Debug + Clone + Send + Sync + 'static { fn get_author(&self, author: &AuthorId) -> Result>; /// Get an iterator over entries of a replica. - /// - /// The [`GetFilter`] has several methods of filtering the returned entries. - fn get_many(&self, namespace: NamespaceId, filter: GetFilter) -> Result>; + fn get_many( + &self, + namespace: NamespaceId, + query: impl Into, + ) -> Result>; /// Get an entry by key and author. - fn get_one( + fn get_exact( &self, namespace: NamespaceId, author: AuthorId, key: impl AsRef<[u8]>, + include_empty: bool, ) -> Result>; /// Get all content hashes of all replicas in the store. @@ -183,35 +188,250 @@ pub enum ImportNamespaceOutcome { NoChange, } -/// Filter a get query onto a namespace -#[derive(Debug, Serialize, Deserialize)] -pub enum GetFilter { - /// No filter, list all entries - All, - /// Filter for exact key match - Key(Vec), - /// Filter for key prefix - Prefix(Vec), - /// Filter by author - Author(AuthorId), - /// Filter by key prefix and author - AuthorAndPrefix(AuthorId, Vec), +/// A query builder for document queries. +#[derive(Debug, Default)] +pub struct QueryBuilder { + kind: K, + filter_author: AuthorFilter, + filter_key: KeyFilter, + limit: Option, + offset: u64, + include_empty: bool, + sort_direction: SortDirection, +} + +impl QueryBuilder { + /// Call to include empty entries (deletion markers). + pub fn include_empty(mut self) -> Self { + self.include_empty = true; + self + } + /// Filter by exact key match. + pub fn key_exact(mut self, key: impl AsRef<[u8]>) -> Self { + self.filter_key = KeyFilter::Exact(key.as_ref().to_vec().into()); + self + } + /// Filter by key prefix. + pub fn key_prefix(mut self, key: impl AsRef<[u8]>) -> Self { + self.filter_key = KeyFilter::Prefix(key.as_ref().to_vec().into()); + self + } + /// Filter by author. + pub fn author(mut self, author: AuthorId) -> Self { + self.filter_author = AuthorFilter::Exact(author); + self + } + /// Set the maximum number of entries to be returned. + pub fn limit(mut self, limit: u64) -> Self { + self.limit = Some(limit); + self + } + /// Set the offset within the result set from where to start returning results. + pub fn offset(mut self, offset: u64) -> Self { + self.offset = offset; + self + } +} + +/// Query on all entries without aggregation. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct FlatQuery { + sort_by: SortBy, +} + +/// Query that only returns the latest entry for a key which has entries from multiple authors. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct SingleLatestPerKeyQuery {} + +impl QueryBuilder { + /// Set the sort for the query. + /// + /// The default is to sort by author, then by key, in ascending order. + pub fn sort_by(mut self, sort_by: SortBy, direction: SortDirection) -> Self { + self.kind.sort_by = sort_by; + self.sort_direction = direction; + self + } + + /// Build the query. + pub fn build(self) -> Query { + Query::from(self) + } +} + +impl QueryBuilder { + /// Set the order direction for the query. + /// + /// Ordering is always by key for this query type. + /// Default direction is ascending. + pub fn sort_direction(mut self, direction: SortDirection) -> Self { + self.sort_direction = direction; + self + } + + /// Build the query. + pub fn build(self) -> Query { + Query::from(self) + } } -impl Default for GetFilter { - fn default() -> Self { - Self::All +impl From> for Query { + fn from(builder: QueryBuilder) -> Query { + Query { + kind: QueryKind::SingleLatestPerKey(builder.kind), + filter_author: builder.filter_author, + filter_key: builder.filter_key, + limit: builder.limit, + offset: builder.offset, + include_empty: builder.include_empty, + sort_direction: builder.sort_direction, + } } } -impl GetFilter { - /// Create a [`GetFilter`] from author and prefix options. - pub fn author_prefix(author: Option, prefix: Option>) -> Self { - match (author, prefix) { - (None, None) => Self::All, - (Some(author), None) => Self::Author(author), - (None, Some(prefix)) => Self::Prefix(prefix.as_ref().to_vec()), - (Some(author), Some(prefix)) => Self::AuthorAndPrefix(author, prefix.as_ref().to_vec()), +impl From> for Query { + fn from(builder: QueryBuilder) -> Query { + Query { + kind: QueryKind::Flat(builder.kind), + filter_author: builder.filter_author, + filter_key: builder.filter_key, + limit: builder.limit, + offset: builder.offset, + include_empty: builder.include_empty, + sort_direction: builder.sort_direction, } } } + +/// Note: When using the `SingleLatestPerKey` query kind, the key filter is applied *before* the +/// grouping, the author filter is applied *after* the grouping. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Query { + kind: QueryKind, + filter_author: AuthorFilter, + filter_key: KeyFilter, + limit: Option, + offset: u64, + include_empty: bool, + sort_direction: SortDirection, +} + +impl Query { + /// Query all records. + pub fn all() -> QueryBuilder { + Default::default() + } + /// Query only the latest entry for each key, omitting older entries if the entry was written + /// to by multiple authors. + pub fn single_latest_per_key() -> QueryBuilder { + Default::default() + } + + /// Create a [`Query::all`] query filtered by a single author. + pub fn author(author: AuthorId) -> QueryBuilder { + Self::all().author(author) + } + + /// Create a [`Query::all`] query filtered by a single key. + pub fn key_exact(key: impl AsRef<[u8]>) -> QueryBuilder { + Self::all().key_exact(key) + } + + /// Create a [`Query::all`] query filtered by a key prefix. + pub fn key_prefix(prefix: impl AsRef<[u8]>) -> QueryBuilder { + Self::all().key_prefix(prefix) + } + + /// Get the limit for this query (max. number of entries to emit). + pub fn limit(&self) -> Option { + self.limit + } + + /// Get the offset for this query (number of entries to skip at the beginning). + pub fn offset(&self) -> u64 { + self.offset + } +} + +/// Sort direction +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub enum SortDirection { + /// Sort ascending + #[default] + Asc, + /// Sort descending + Desc, +} + +#[derive(derive_more::Debug, Clone, Serialize, Deserialize)] +enum QueryKind { + #[debug("Flat {{ sort_by: {:?}}}", _0)] + Flat(FlatQuery), + #[debug("SingleLatestPerKey")] + SingleLatestPerKey(SingleLatestPerKeyQuery), +} + +/// Fields by which the query can be sorted +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub enum SortBy { + /// Sort by key, then author. + KeyAuthor, + /// Sort by author, then key. + #[default] + AuthorKey, +} + +/// Key matching. +#[derive(Debug, Serialize, Deserialize, Clone, Default, Eq, PartialEq)] +pub enum KeyFilter { + /// Matches any key. + #[default] + Any, + /// Only keys that are exactly the provided value. + Exact(Bytes), + /// All keys that start with the provided value. + Prefix(Bytes), +} + +impl> From for KeyFilter { + fn from(value: T) -> Self { + KeyFilter::Exact(Bytes::copy_from_slice(value.as_ref())) + } +} + +impl KeyFilter { + /// Test if a key is matched by this [`KeyFilter`]. + pub fn matches(&self, key: &[u8]) -> bool { + match self { + Self::Any => true, + Self::Exact(k) => &k[..] == key, + Self::Prefix(p) => key.starts_with(p), + } + } +} + +/// Author matching. +#[derive(Debug, Serialize, Deserialize, Clone, Default, Eq, PartialEq)] +pub enum AuthorFilter { + /// Matches any author. + #[default] + Any, + /// Matches exactly the provided author. + Exact(AuthorId), +} + +impl AuthorFilter { + /// Test if an author is matched by this [`AuthorFilter`]. + pub fn matches(&self, author: &AuthorId) -> bool { + match self { + Self::Any => true, + Self::Exact(a) => a == author, + } + } +} + +impl From for AuthorFilter { + fn from(value: AuthorId) -> Self { + AuthorFilter::Exact(value) + } +} diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index 7943e5cac7..7e0b1443fd 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -2,20 +2,22 @@ use std::{ cmp::Ordering, - collections::{HashMap, HashSet}, + collections::HashSet, + iter::{Chain, Flatten}, + ops::Bound, path::Path, sync::Arc, }; use anyhow::{anyhow, Result}; +use bytes::Bytes; use derive_more::From; use ed25519_dalek::{SignatureError, VerifyingKey}; use iroh_bytes::Hash; -use ouroboros::self_referencing; use parking_lot::RwLock; use redb::{ - Database, MultimapTableDefinition, Range as TableRange, ReadOnlyTable, ReadTransaction, - ReadableMultimapTable, ReadableTable, StorageError, Table, TableDefinition, TableHandle, + Database, MultimapTableDefinition, ReadOnlyTable, ReadableMultimapTable, ReadableTable, + TableDefinition, }; use crate::{ @@ -23,126 +25,85 @@ use crate::{ ranger::{Fingerprint, Range, RangeEntry}, store::Store as _, sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry}, - AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes, + AuthorId, Capability, CapabilityKind, NamespaceId, PeerIdBytes, }; -use super::{pubkeys::MemPublicKeyStore, ImportNamespaceOutcome, OpenError, PublicKeyStore}; +use super::{pubkeys::MemPublicKeyStore, ImportNamespaceOutcome, OpenError, PublicKeyStore, Query}; -/// Manages the replicas and authors for an instance. -#[derive(Debug, Clone)] -pub struct Store { - db: Arc, - open_replicas: Arc>>, - pubkeys: MemPublicKeyStore, -} +mod bounds; +mod migrations; +mod query; +mod ranges; + +use self::bounds::{ByKeyBounds, RecordsBounds}; +use self::query::QueryIterator; +use self::ranges::{TableRange, TableReader}; + +pub use self::ranges::RecordsRange; // Table Definitions -// Authors -// Table -// Key: [u8; 32] # AuthorId -// Value: #[u8; 32] # Author +/// Table: Authors +/// Key: `[u8; 32]` # AuthorId +/// Value: `[u8; 32]` # Author const AUTHORS_TABLE: TableDefinition<&[u8; 32], &[u8; 32]> = TableDefinition::new("authors-1"); -// Namespaces -// Table -// Key: [u8; 32] # NamespaceId -// Value: #[u8; 32] # Namespace +/// Table: Namespaces v1 (replaced by Namespaces v2 in migration ) +/// Key: `[u8; 32]` # NamespaceId +/// Value: `[u8; 32]` # NamespaceSecret const NAMESPACES_TABLE_V1: TableDefinition<&[u8; 32], &[u8; 32]> = TableDefinition::new("namespaces-1"); -// Namespaces -// Table -// Key: [u8; 32] # NamespaceId -// Value: (u8, [u8; 32]) # (CapabilityKind, Capability) +/// Table: Namespaces v2 +/// Key: `[u8; 32]` # NamespaceId +/// Value: `(u8, [u8; 32])` # (CapabilityKind, Capability) const NAMESPACES_TABLE: TableDefinition<&[u8; 32], (u8, &[u8; 32])> = TableDefinition::new("namespaces-2"); -// Records -// Table -// Key: ([u8; 32], [u8; 32], Vec) # (NamespaceId, AuthorId, Key) -// Value: -// (u64, [u8; 32], [u8; 32], u64, [u8; 32]) -// # (timestamp, signature_namespace, signature_author, len, hash) +/// Table: Records +/// Key: `([u8; 32], [u8; 32], &[u8])` +/// # (NamespaceId, AuthorId, Key) +/// Value: `(u64, [u8; 32], [u8; 32], u64, [u8; 32])` +/// # (timestamp, signature_namespace, signature_author, len, hash) const RECORDS_TABLE: TableDefinition = TableDefinition::new("records-1"); - -// Latest by author -// Table -// Key: ([u8; 32], [u8; 32]) # (NamespaceId, AuthorId) -// Value: (u64, Vec) # (Timestamp, Key) -const LATEST_TABLE: TableDefinition = - TableDefinition::new("latest-by-author-1"); -type LatestKey<'a> = (&'a [u8; 32], &'a [u8; 32]); -type LatestValue<'a> = (u64, &'a [u8]); -type LatestTable<'a> = ReadOnlyTable<'a, LatestKey<'static>, LatestValue<'static>>; -type LatestRange<'a> = TableRange<'a, LatestKey<'static>, LatestValue<'static>>; - type RecordsId<'a> = (&'a [u8; 32], &'a [u8; 32], &'a [u8]); +type RecordsIdOwned = ([u8; 32], [u8; 32], Bytes); type RecordsValue<'a> = (u64, &'a [u8; 64], &'a [u8; 64], u64, &'a [u8; 32]); -type RecordsRange<'a> = TableRange<'a, RecordsId<'static>, RecordsValue<'static>>; type RecordsTable<'a> = ReadOnlyTable<'a, RecordsId<'static>, RecordsValue<'static>>; -type DbResult = Result; +/// Table: Latest per author +/// Key: `([u8; 32], [u8; 32])` # (NamespaceId, AuthorId) +/// Value: `(u64, Vec)` # (Timestamp, Key) +const LATEST_PER_AUTHOR_TABLE: TableDefinition = + TableDefinition::new("latest-by-author-1"); +type LatestPerAuthorKey<'a> = (&'a [u8; 32], &'a [u8; 32]); +type LatestPerAuthorValue<'a> = (u64, &'a [u8]); + +/// Table: Records by key +/// Key: `([u8; 32], Vec, [u8; 32]])` # (NamespaceId, Key, AuthorId) +/// Value: `()` +const RECORDS_BY_KEY_TABLE: TableDefinition = + TableDefinition::new("records-by-key-1"); +type RecordsByKeyId<'a> = (&'a [u8; 32], &'a [u8], &'a [u8; 32]); +type RecordsByKeyIdOwned = ([u8; 32], Bytes, [u8; 32]); + +/// Table: Peers per document. +/// Key: `[u8; 32]` # NamespaceId +/// Value: `(u64, [u8; 32])` # ([`Nanos`], &[`PeerIdBytes`]) representing the last time a peer was used. +const NAMESPACE_PEERS_TABLE: MultimapTableDefinition<&[u8; 32], (Nanos, &PeerIdBytes)> = + MultimapTableDefinition::new("sync-peers-1"); /// Number of seconds elapsed since [`std::time::SystemTime::UNIX_EPOCH`]. Used to register the /// last time a peer was useful in a document. // NOTE: resolution is nanoseconds, stored as a u64 since this covers ~500years from unix epoch, // which should be more than enough type Nanos = u64; -/// Peers stored per document. -/// - Key: [`NamespaceId::as_bytes`] -/// - Value: ([`Nanos`], &[`PeerIdBytes`]) representing the last time a peer was used. -const NAMESPACE_PEERS_TABLE: MultimapTableDefinition<&[u8; 32], (Nanos, &PeerIdBytes)> = - MultimapTableDefinition::new("sync-peers-1"); - -/// migration 001: populate the latest table (which did not exist before) -fn migration_001_populate_latest_table( - records_table: &Table, RecordsValue<'static>>, - latest_table: &mut Table, LatestValue<'static>>, -) -> Result<()> { - tracing::info!("Starting migration: 001_populate_latest_table"); - #[allow(clippy::type_complexity)] - let mut heads: HashMap<([u8; 32], [u8; 32]), (u64, Vec)> = HashMap::new(); - let iter = records_table.iter()?; - - for next in iter { - let next = next?; - let (namespace, author, key) = next.0.value(); - let (timestamp, _namespace_sig, _author_sig, _len, _hash) = next.1.value(); - heads - .entry((*namespace, *author)) - .and_modify(|e| { - if timestamp >= e.0 { - *e = (timestamp, key.to_vec()); - } - }) - .or_insert_with(|| (timestamp, key.to_vec())); - } - let len = heads.len(); - for ((namespace, author), (timestamp, key)) in heads { - latest_table.insert((&namespace, &author), (timestamp, key.as_slice()))?; - } - tracing::info!("Migration finished (inserted {} entries)", len); - Ok(()) -} -/// Migrate the namespaces table from V1 to V2. -fn migration_002_namespaces_v2( - namespaces_v1: Table<&[u8; 32], &[u8; 32]>, - namespaces_v2: &mut Table<&[u8; 32], (u8, &[u8; 32])>, -) -> Result<()> { - tracing::info!("Starting migration: 002_namespaces_v2"); - let mut entries = 0; - for res in namespaces_v1.iter()? { - let db_value = res?.1; - let secret_bytes = db_value.value(); - let capability = Capability::Write(NamespaceSecret::from_bytes(secret_bytes)); - let id = capability.id().to_bytes(); - let (raw_kind, raw_bytes) = capability.raw(); - namespaces_v2.insert(&id, (raw_kind, &raw_bytes))?; - entries += 1; - } - tracing::info!("Migration finished ({entries} entries)"); - Ok(()) +/// Manages the replicas and authors for an instance. +#[derive(Debug, Clone)] +pub struct Store { + db: Arc, + open_replicas: Arc>>, + pubkeys: MemPublicKeyStore, } impl Store { @@ -155,51 +116,27 @@ impl Store { // Setup all tables let write_tx = db.begin_write()?; { - let records_table = write_tx.open_table(RECORDS_TABLE)?; - let namespaces_v1_exists = write_tx - .list_tables()? - .any(|handle| handle.name() == NAMESPACES_TABLE_V1.name()); - let mut namespaces_v2 = write_tx.open_table(NAMESPACES_TABLE)?; - let _table = write_tx.open_table(AUTHORS_TABLE)?; - let mut latest_table = write_tx.open_table(LATEST_TABLE)?; + let _table = write_tx.open_table(RECORDS_TABLE)?; + let _table = write_tx.open_table(NAMESPACES_TABLE)?; + let _table = write_tx.open_table(LATEST_PER_AUTHOR_TABLE)?; let _table = write_tx.open_multimap_table(NAMESPACE_PEERS_TABLE)?; - - // migration 001: populate latest table if it was empty before - if latest_table.is_empty()? && !records_table.is_empty()? { - migration_001_populate_latest_table(&records_table, &mut latest_table)?; - } - - // migration 002: update namespaces from V1 to V2 - if namespaces_v1_exists { - let namespaces_v1 = write_tx.open_table(NAMESPACES_TABLE_V1)?; - migration_002_namespaces_v2(namespaces_v1, &mut namespaces_v2)?; - write_tx.delete_table(NAMESPACES_TABLE_V1)?; - } } write_tx.commit()?; + // Run database migrations + migrations::run_migrations(&db)?; + Ok(Store { db: Arc::new(db), open_replicas: Default::default(), pubkeys: Default::default(), }) } - - fn insert_author(&self, author: Author) -> Result<()> { - let write_tx = self.db.begin_write()?; - { - let mut author_table = write_tx.open_table(AUTHORS_TABLE)?; - author_table.insert(author.id().as_bytes(), &author.to_bytes())?; - } - write_tx.commit()?; - - Ok(()) - } } impl super::Store for Store { type Instance = StoreInstance; - type GetIter<'a> = RangeIterator<'a>; + type GetIter<'a> = QueryIterator<'a>; type ContentHashesIter<'a> = ContentHashesIterator<'a>; type LatestIter<'a> = LatestIterator<'a>; type AuthorsIter<'a> = std::vec::IntoIter>; @@ -262,7 +199,12 @@ impl super::Store for Store { } fn import_author(&self, author: Author) -> Result<()> { - self.insert_author(author)?; + let write_tx = self.db.begin_write()?; + { + let mut author_table = write_tx.open_table(AUTHORS_TABLE)?; + author_table.insert(author.id().as_bytes(), &author.to_bytes())?; + } + write_tx.commit()?; Ok(()) } @@ -312,12 +254,18 @@ impl super::Store for Store { if self.open_replicas.read().contains(namespace) { return Err(anyhow!("replica is not closed")); } - let start = range_start(namespace); - let end = range_end(namespace); let write_tx = self.db.begin_write()?; { let mut record_table = write_tx.open_table(RECORDS_TABLE)?; - record_table.drain(start..=end)?; + let bounds = RecordsBounds::namespace(*namespace); + record_table.drain(bounds.as_ref())?; + } + { + let mut table = write_tx.open_table(RECORDS_BY_KEY_TABLE)?; + let bounds = ByKeyBounds::namespace(*namespace); + let _ = table.drain(bounds.as_ref()); + } + { let mut namespace_table = write_tx.open_table(NAMESPACES_TABLE)?; namespace_table.remove(namespace.as_bytes())?; } @@ -328,36 +276,29 @@ impl super::Store for Store { fn get_many( &self, namespace: NamespaceId, - filter: super::GetFilter, + query: impl Into, ) -> Result> { - match filter { - super::GetFilter::All => self.get_all(namespace), - super::GetFilter::Key(key) => self.get_by_key(namespace, key), - super::GetFilter::Prefix(prefix) => self.get_by_prefix(namespace, prefix), - super::GetFilter::Author(author) => self.get_by_author(namespace, author), - super::GetFilter::AuthorAndPrefix(author, prefix) => { - self.get_by_author_and_prefix(namespace, author, prefix) - } - } + QueryIterator::new(&self.db, namespace, query.into()) } - fn get_one( + fn get_exact( &self, namespace: NamespaceId, author: AuthorId, key: impl AsRef<[u8]>, + include_empty: bool, ) -> Result> { let read_tx = self.db.begin_read()?; let record_table = read_tx.open_table(RECORDS_TABLE)?; - get_one(&record_table, namespace, author, key) + get_exact(&record_table, namespace, author, key, include_empty) } fn content_hashes(&self) -> Result> { - ContentHashesIterator::create(&self.db) + ContentHashesIterator::new(&self.db) } fn get_latest_for_each_author(&self, namespace: NamespaceId) -> Result> { - LatestIterator::create(&self.db, namespace) + LatestIterator::new(&self.db, namespace) } fn register_useful_peer(&self, namespace: NamespaceId, peer: crate::PeerIdBytes) -> Result<()> { @@ -454,129 +395,21 @@ fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result, + include_empty: bool, ) -> Result> { - let db_key = (namespace.as_ref(), author.as_ref(), key.as_ref()); - let record = record_table.get(db_key)?; - let Some(record) = record else { - return Ok(None); - }; - let (timestamp, namespace_sig, author_sig, len, hash) = record.value(); - // return early if the hash equals the hash of the empty byte range, which we treat as - // delete marker (tombstone). - if hash == Hash::EMPTY.as_bytes() { - return Ok(None); - } - - let record = Record::new(hash.into(), len, timestamp); - let id = RecordIdentifier::new(namespace, author, key); - let entry = Entry::new(id, record); - let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig); - let signed_entry = SignedEntry::new(entry_signature, entry); - - Ok(Some(signed_entry)) -} - -impl Store { - fn get_by_key( - &self, - namespace: NamespaceId, - key: impl AsRef<[u8]>, - ) -> Result> { - RangeIterator::namespace( - &self.db, - &namespace, - RangeFilter::Key(key.as_ref().to_vec()), - ) - } - fn get_by_author(&self, namespace: NamespaceId, author: AuthorId) -> Result> { - let author = author.as_bytes(); - let start = (namespace.as_bytes(), author, &[][..]); - let end = prefix_range_end(&start); - RangeIterator::with_range( - &self.db, - |table| match end { - Some(end) => table.range(start..(&end.0, &end.1, &end.2)), - None => table.range(start..), - }, - RangeFilter::None, - ) - } - - fn get_by_author_and_prefix( - &self, - namespace: NamespaceId, - author: AuthorId, - prefix: impl AsRef<[u8]>, - ) -> Result> { - let author = author.as_bytes(); - let start = (namespace.as_bytes(), author, prefix.as_ref()); - let end = prefix_range_end(&start); - RangeIterator::with_range( - &self.db, - |table| match end { - Some(end) => table.range(start..(&end.0, &end.1, &end.2)), - None => table.range(start..), - }, - RangeFilter::None, - ) - } - - fn get_by_prefix( - &self, - namespace: NamespaceId, - prefix: impl AsRef<[u8]>, - ) -> Result> { - RangeIterator::namespace( - &self.db, - &namespace, - RangeFilter::Prefix(prefix.as_ref().to_vec()), - ) - } - - fn get_all(&self, namespace: NamespaceId) -> Result> { - RangeIterator::namespace(&self.db, &namespace, RangeFilter::None) - } + let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref()); + let record = record_table.get(id)?; + Ok(record + .map(|r| into_entry(id, r.value())) + .filter(|entry| include_empty || !entry.is_empty())) } -/// Increment a byte string by one, by incrementing the last byte that is not 255 by one. -/// -/// Returns false if all bytes are 255. -fn increment_by_one(value: &mut [u8]) -> bool { - for char in value.iter_mut().rev() { - if *char != 255 { - *char += 1; - return true; - } else { - *char = 0; - } - } - false -} - -// Get the end point of a prefix range -// -// Increments the last byte of the byte represenation of `prefix` and returns it as an owned tuple -// with the parts of the new [`RecordsId`]. -// Returns `None` if all bytes are equal to 255. -fn prefix_range_end<'a>(prefix: &'a RecordsId<'a>) -> Option<([u8; 32], [u8; 32], Vec)> { - let (mut namespace, mut author, mut prefix) = (*prefix.0, *prefix.1, prefix.2.to_vec()); - if !increment_by_one(&mut prefix) - && !increment_by_one(&mut author) - && !increment_by_one(&mut namespace) - { - // we have all-255 keys, so open-ended range - None - } else { - Some((namespace, author, prefix)) - } -} - -/// [`NamespaceSecret`] specific wrapper around the [`Store`]. +/// A wrapper around [`Store`] for a specific [`NamespaceId`] #[derive(Debug, Clone)] pub struct StoreInstance { namespace: NamespaceId, @@ -589,13 +422,6 @@ impl StoreInstance { } } -fn range_start(namespace: &NamespaceId) -> RecordsId { - (namespace.as_bytes(), &[u8::MIN; 32], &[][..]) -} -fn range_end(namespace: &NamespaceId) -> RecordsId { - (namespace.as_bytes(), &[u8::MAX; 32], &[][..]) -} - impl PublicKeyStore for StoreInstance { fn public_key(&self, id: &[u8; 32]) -> std::result::Result { self.store.pubkeys.public_key(id) @@ -604,7 +430,9 @@ impl PublicKeyStore for StoreInstance { impl crate::ranger::Store for StoreInstance { type Error = anyhow::Error; - type RangeIterator<'a> = std::iter::Chain, RangeIterator<'a>>; + type RangeIterator<'a> = + Chain, Flatten>>>; + type ParentIterator<'a> = ParentIterator<'a>; /// Get a the first key (or the default if none is available). fn get_first(&self) -> Result { @@ -612,9 +440,8 @@ impl crate::ranger::Store for StoreInstance { let record_table = read_tx.open_table(RECORDS_TABLE)?; // TODO: verify this fetches all keys with this namespace - let start = range_start(&self.namespace); - let end = range_end(&self.namespace); - let mut records = record_table.range(start..=end)?; + let bounds = RecordsBounds::namespace(self.namespace); + let mut records = record_table.range(bounds.as_ref())?; let Some(record) = records.next() else { return Ok(RecordIdentifier::default()); @@ -626,22 +453,23 @@ impl crate::ranger::Store for StoreInstance { } fn get(&self, id: &RecordIdentifier) -> Result> { - self.store.get_one(id.namespace(), id.author(), id.key()) + self.store + .get_exact(id.namespace(), id.author(), id.key(), true) } fn len(&self) -> Result { let read_tx = self.store.db.begin_read()?; let record_table = read_tx.open_table(RECORDS_TABLE)?; - // TODO: verify this fetches all keys with this namespace - let start = range_start(&self.namespace); - let end = range_end(&self.namespace); - let records = record_table.range(start..=end)?; + let bounds = RecordsBounds::namespace(self.namespace); + let records = record_table.range(bounds.as_ref())?; Ok(records.count()) } fn is_empty(&self) -> Result { - Ok(self.len()? == 0) + let read_tx = self.store.db.begin_read()?; + let record_table = read_tx.open_table(RECORDS_TABLE)?; + Ok(record_table.is_empty()?) } fn get_fingerprint(&self, range: &Range) -> Result { @@ -658,27 +486,37 @@ impl crate::ranger::Store for StoreInstance { } fn put(&mut self, e: SignedEntry) -> Result<()> { + let id = e.id(); let write_tx = self.store.db.begin_write()?; { // insert into record table let mut record_table = write_tx.open_table(RECORDS_TABLE)?; let key = ( - &e.id().namespace().to_bytes(), - &e.id().author().to_bytes(), - e.id().key(), + &id.namespace().to_bytes(), + &id.author().to_bytes(), + id.key(), ); - let hash = e.content_hash(); + let hash = e.content_hash(); // let binding is needed let value = ( e.timestamp(), - &e.signature().namespace_signature().to_bytes(), - &e.signature().author_signature().to_bytes(), + &e.signature().namespace().to_bytes(), + &e.signature().author().to_bytes(), e.content_len(), hash.as_bytes(), ); record_table.insert(key, value)?; + // insert into by key index table + let mut idx_by_key = write_tx.open_table(RECORDS_BY_KEY_TABLE)?; + let key = ( + &id.namespace().to_bytes(), + id.key(), + &id.author().to_bytes(), + ); + idx_by_key.insert(key, ())?; + // insert into latest table - let mut latest_table = write_tx.open_table(LATEST_TABLE)?; + let mut latest_table = write_tx.open_table(LATEST_PER_AUTHOR_TABLE)?; let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes()); let value = (e.timestamp(), e.id().key()); latest_table.insert(key, value)?; @@ -691,83 +529,64 @@ impl crate::ranger::Store for StoreInstance { let iter = match range.x().cmp(range.y()) { // identity range: iter1 = all, iter2 = none Ordering::Equal => { - let start = range_start(&self.namespace); - let end = range_end(&self.namespace); // iterator for all entries in replica - let iter = RangeIterator::with_range( - &self.store.db, - |table| table.range(start..=end), - RangeFilter::None, - )?; - // empty iterator, returns nothing - let iter2 = RangeIterator::empty(&self.store.db)?; - iter.chain(iter2) + let bounds = RecordsBounds::namespace(self.namespace); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + chain_none(iter) } // regular range: iter1 = x <= t < y, iter2 = none Ordering::Less => { - let start = range.x().as_byte_tuple(); - let end = range.y().as_byte_tuple(); // iterator for entries from range.x to range.y - let iter = RangeIterator::with_range( - &self.store.db, - |table| table.range(start..end), - RangeFilter::None, - )?; - // empty iterator - let iter2 = RangeIterator::empty(&self.store.db)?; - iter.chain(iter2) - // wrap-around range: iter1 = y <= t, iter2 = x >= t + let start = Bound::Included(range.x().to_byte_tuple()); + let end = Bound::Excluded(range.y().to_byte_tuple()); + let bounds = RecordsBounds::new(start, end); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + chain_none(iter) } + // split range: iter1 = start <= t < y, iter2 = x <= t <= end Ordering::Greater => { - let start = range_start(&self.namespace); - let end = range.y().as_byte_tuple(); - // iterator for entries start to from range.y - let iter = RangeIterator::with_range( - &self.store.db, - |table| table.range(start..end), - RangeFilter::None, - )?; - let start = range.x().as_byte_tuple(); - let end = range_end(&self.namespace); + // iterator for entries from start to range.y + let end = Bound::Excluded(range.y().to_byte_tuple()); + let bounds = RecordsBounds::from_start(&self.namespace, end); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + // iterator for entries from range.x to end - let iter2 = RangeIterator::with_range( - &self.store.db, - |table| table.range(start..=end), - RangeFilter::None, - )?; - iter.chain(iter2) + let start = Bound::Included(range.x().to_byte_tuple()); + let bounds = RecordsBounds::to_end(&self.namespace, start); + let iter2 = RecordsRange::with_bounds(&self.store.db, bounds)?; + + iter.chain(Some(iter2).into_iter().flatten()) } }; Ok(iter) } - fn remove(&mut self, k: &RecordIdentifier) -> Result> { + fn remove(&mut self, id: &RecordIdentifier) -> Result> { let write_tx = self.store.db.begin_write()?; - let res = { - let mut records_table = write_tx.open_table(RECORDS_TABLE)?; - let key = (&k.namespace().to_bytes(), &k.author().to_bytes(), k.key()); - let record = records_table.remove(key)?; - record.map(|record| { - let (timestamp, namespace_sig, author_sig, len, hash) = record.value(); - let record = Record::new(hash.into(), len, timestamp); - let entry = Entry::new(k.clone(), record); - let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig); - SignedEntry::new(entry_signature, entry) - }) + let (namespace, author, key) = id.as_byte_tuple(); + { + let mut table = write_tx.open_table(RECORDS_BY_KEY_TABLE)?; + let id = (namespace, key, author); + table.remove(id)?; + } + let entry = { + let mut table = write_tx.open_table(RECORDS_TABLE)?; + let id = (namespace, author, key); + let value = table.remove(id)?; + value.map(|value| into_entry(id, value.value())) }; write_tx.commit()?; - Ok(res) + Ok(entry) } fn all(&self) -> Result> { - let iter = RangeIterator::namespace(&self.store.db, &self.namespace, RangeFilter::None)?; - let iter2 = RangeIterator::empty(&self.store.db)?; - Ok(iter.chain(iter2)) + let bounds = RecordsBounds::namespace(self.namespace); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + Ok(chain_none(iter)) } - type ParentIterator<'a> = ParentIterator<'a>; fn prefixes_of(&self, id: &RecordIdentifier) -> Result, Self::Error> { - ParentIterator::create( + ParentIterator::new( &self.store.db, id.namespace(), id.author(), @@ -775,29 +594,18 @@ impl crate::ranger::Store for StoreInstance { ) } - fn prefixed_by(&self, prefix: &RecordIdentifier) -> Result> { - let start = prefix.as_byte_tuple(); - let end = prefix_range_end(&start); - let iter = RangeIterator::with_range( - &self.store.db, - |table| match end { - Some(end) => table.range(start..(&end.0, &end.1, &end.2)), - None => table.range(start..), - }, - RangeFilter::None, - )?; - let iter2 = RangeIterator::empty(&self.store.db)?; - Ok(iter.chain(iter2)) + fn prefixed_by(&self, id: &RecordIdentifier) -> Result> { + let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes()); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + Ok(chain_none(iter)) } fn remove_prefix_filtered( &mut self, - prefix: &RecordIdentifier, + id: &RecordIdentifier, predicate: impl Fn(&Record) -> bool, ) -> Result { - let start = prefix.as_byte_tuple(); - let end = prefix_range_end(&start); - + let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes()); let write_tx = self.store.db.begin_write()?; let count = { let mut table = write_tx.open_table(RECORDS_TABLE)?; @@ -807,10 +615,7 @@ impl crate::ranger::Store for StoreInstance { predicate(&record) }; - let iter = match end { - Some(end) => table.drain_filter(start..(&end.0, &end.1, &end.2), cb)?, - None => table.drain_filter(start.., cb)?, - }; + let iter = table.drain_filter(bounds.as_ref(), cb)?; iter.count() }; write_tx.commit()?; @@ -818,38 +623,36 @@ impl crate::ranger::Store for StoreInstance { } } +fn chain_none<'a, I: Iterator + 'a, T>( + iter: I, +) -> Chain>> { + iter.chain(None.into_iter().flatten()) +} + /// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which /// is a prefix of the key passed to the iterator. -#[self_referencing] +#[derive(Debug)] pub struct ParentIterator<'a> { - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - record_table: RecordsTable<'this>, + reader: TableReader<'a, RecordsId<'static>, RecordsValue<'static>>, namespace: NamespaceId, author: AuthorId, key: Vec, } impl<'a> ParentIterator<'a> { - fn create( + fn new( db: &'a Arc, namespace: NamespaceId, author: AuthorId, key: Vec, ) -> anyhow::Result { - let iter = Self::try_new( - db.begin_read()?, - |read_tx| { - read_tx - .open_table(RECORDS_TABLE) - .map_err(anyhow::Error::from) - }, + let reader = TableReader::new(db, |tx| tx.open_table(RECORDS_TABLE))?; + Ok(Self { + reader, namespace, author, key, - )?; - Ok(iter) + }) } } @@ -857,49 +660,28 @@ impl Iterator for ParentIterator<'_> { type Item = Result; fn next(&mut self) -> Option { - self.with_mut(|fields| { - while !fields.key.is_empty() { - let entry = get_one( - fields.record_table, - *fields.namespace, - *fields.author, - &fields.key, - ); - fields.key.pop(); - match entry { - Err(err) => return Some(Err(err)), - Ok(Some(entry)) => return Some(Ok(entry)), - Ok(None) => continue, - } + let records_table = self.reader.table(); + while !self.key.is_empty() { + let entry = get_exact(records_table, self.namespace, self.author, &self.key, false); + self.key.pop(); + match entry { + Err(err) => return Some(Err(err)), + Ok(Some(entry)) => return Some(Ok(entry)), + Ok(None) => continue, } - None - }) + } + None } } /// Iterator over all content hashes for the fs store. -#[self_referencing] -pub struct ContentHashesIterator<'a> { - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - record_table: RecordsTable<'this>, - #[covariant] - #[borrows(record_table)] - records: RecordsRange<'this>, -} +#[derive(Debug)] +pub struct ContentHashesIterator<'a>(RecordsRange<'a>); + impl<'a> ContentHashesIterator<'a> { - fn create(db: &'a Arc) -> anyhow::Result { - let iter = Self::try_new( - db.begin_read()?, - |read_tx| { - read_tx - .open_table(RECORDS_TABLE) - .map_err(anyhow::Error::from) - }, - |table| table.iter().map_err(anyhow::Error::from), - )?; - Ok(iter) + fn new(db: &'a Arc) -> anyhow::Result { + let range = RecordsRange::new(db, |table| table.iter())?; + Ok(Self(range)) } } @@ -907,44 +689,30 @@ impl Iterator for ContentHashesIterator<'_> { type Item = Result; fn next(&mut self) -> Option { - self.with_mut(|fields| match fields.records.next() { - None => None, - Some(Err(err)) => Some(Err(err.into())), - Some(Ok((_key, value))) => { - let (_timestamp, _namespace_sig, _author_sig, _len, hash) = value.value(); - Some(Ok(Hash::from(hash))) - } + self.0.next_mapped(|_key, value| { + let (_timestamp, _namespace_sig, _author_sig, _len, hash) = value; + Hash::from(hash) }) } } /// Iterator over the latest entry per author. -#[self_referencing] -pub struct LatestIterator<'a> { - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - record_table: LatestTable<'this>, - #[covariant] - #[borrows(record_table)] - records: LatestRange<'this>, -} +#[derive(Debug)] +pub struct LatestIterator<'a>( + TableRange<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>, +); + impl<'a> LatestIterator<'a> { - fn create(db: &'a Arc, namespace: NamespaceId) -> anyhow::Result { - let iter = Self::try_new( - db.begin_read()?, - |read_tx| { - read_tx - .open_table(LATEST_TABLE) - .map_err(anyhow::Error::from) - }, + fn new(db: &'a Arc, namespace: NamespaceId) -> anyhow::Result { + Ok(Self(TableRange::new( + db, + |tx| tx.open_table(LATEST_PER_AUTHOR_TABLE), |table| { let start = (namespace.as_bytes(), &[u8::MIN; 32]); let end = (namespace.as_bytes(), &[u8::MAX; 32]); - table.range(start..=end).map_err(anyhow::Error::from) + table.range(start..=end) }, - )?; - Ok(iter) + )?)) } } @@ -952,124 +720,29 @@ impl Iterator for LatestIterator<'_> { type Item = Result<(AuthorId, u64, Vec)>; fn next(&mut self) -> Option { - self.with_mut(|fields| match fields.records.next() { - None => None, - Some(Err(err)) => Some(Err(err.into())), - Some(Ok((key, value))) => { - let (_namespace, author) = key.value(); - let (timestamp, key) = value.value(); - Some(Ok((author.into(), timestamp, key.to_vec()))) - } + self.0.next_mapped(|key, value| { + let (_namespace, author) = key; + let (timestamp, key) = value; + (author.into(), timestamp, key.to_vec()) }) } } -#[self_referencing] -pub struct RangeIterator<'a> { - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - record_table: RecordsTable<'this>, - #[covariant] - #[borrows(record_table)] - records: RecordsRange<'this>, - filter: RangeFilter, -} - -impl<'a> RangeIterator<'a> { - fn with_range( - db: &'a Arc, - range: impl for<'this> FnOnce(&'this RecordsTable<'this>) -> DbResult>, - filter: RangeFilter, - ) -> anyhow::Result { - let iter = RangeIterator::try_new( - db.begin_read()?, - |read_tx| { - read_tx - .open_table(RECORDS_TABLE) - .map_err(anyhow::Error::from) - }, - |record_table| range(record_table).map_err(anyhow::Error::from), - filter, - )?; - Ok(iter) - } - - fn namespace( - db: &'a Arc, - namespace: &NamespaceId, - filter: RangeFilter, - ) -> anyhow::Result { - let start = range_start(namespace); - let end = range_end(namespace); - Self::with_range(db, |table| table.range(start..=end), filter) - } - - fn empty(db: &'a Arc) -> anyhow::Result { - let start = (&[0u8; 32], &[0u8; 32], &[0u8][..]); - let end = (&[0u8; 32], &[0u8; 32], &[0u8][..]); - Self::with_range(db, |table| table.range(start..end), RangeFilter::None) - } -} - -#[derive(Debug)] -enum RangeFilter { - None, - Prefix(Vec), - Key(Vec), -} - -impl RangeFilter { - fn matches(&self, id: &RecordIdentifier) -> bool { - match self { - RangeFilter::None => true, - RangeFilter::Prefix(ref prefix) => id.key().starts_with(prefix), - RangeFilter::Key(ref key) => id.key() == key, - } - } -} - -impl std::fmt::Debug for RangeIterator<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RangeIterator").finish_non_exhaustive() - } -} - -impl Iterator for RangeIterator<'_> { - type Item = Result; - - fn next(&mut self) -> Option { - self.with_mut(|fields| { - for next in fields.records.by_ref() { - let next = match next { - Ok(next) => next, - Err(err) => return Some(Err(err.into())), - }; - - let (namespace, author, key) = next.0.value(); - let (timestamp, namespace_sig, author_sig, len, hash) = next.1.value(); - if hash == Hash::EMPTY.as_bytes() { - continue; - } - let id = RecordIdentifier::new(namespace, author, key); - if fields.filter.matches(&id) { - let record = Record::new(hash.into(), len, timestamp); - let entry = Entry::new(id, record); - let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig); - let signed_entry = SignedEntry::new(entry_signature, entry); - - return Some(Ok(signed_entry)); - } - } - None - }) - } +fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry { + let (namespace, author, key) = key; + let (timestamp, namespace_sig, author_sig, len, hash) = value; + let id = RecordIdentifier::new(namespace, author, key); + let record = Record::new(hash.into(), len, timestamp); + let entry = Entry::new(id, record); + let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig); + SignedEntry::new(entry_signature, entry) } #[cfg(test)] mod tests { use crate::ranger::Store as _; - use crate::store::{GetFilter, Store as _}; + use crate::store::Store as _; + use crate::NamespaceSecret; use super::*; @@ -1088,10 +761,7 @@ mod tests { replica.hash_and_insert(&key1, &author, b"v1")?; replica.hash_and_insert(&key2, &author, b"v2")?; let res = store - .get_many( - replica.id(), - GetFilter::AuthorAndPrefix(author.id(), vec![255]), - )? + .get_many(replica.id(), Query::author(author.id()).key_prefix([255]))? .collect::>>()?; assert_eq!(res.len(), 2); assert_eq!( @@ -1144,12 +814,14 @@ mod tests { } // get all - let entries = store.get_all(namespace.id())?.collect::>>()?; + let entries = store + .get_many(namespace.id(), Query::all())? + .collect::>>()?; assert_eq!(entries.len(), 5); // get all prefix let entries = store - .get_by_prefix(namespace.id(), "hello-")? + .get_many(namespace.id(), Query::key_prefix("hello-"))? .collect::>>()?; assert_eq!(entries.len(), 5); @@ -1164,7 +836,9 @@ mod tests { } // get latest - let entries = store.get_all(namespace.id())?.collect::>>()?; + let entries = store + .get_many(namespace.id(), Query::all())? + .collect::>>()?; assert_eq!(entries.len(), 0); Ok(()) @@ -1211,7 +885,7 @@ mod tests { // create a copy of our db file with the latest table deleted. let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| { - tx.delete_table(LATEST_TABLE)?; + tx.delete_table(LATEST_PER_AUTHOR_TABLE)?; Ok(()) })?; diff --git a/iroh-sync/src/store/fs/bounds.rs b/iroh-sync/src/store/fs/bounds.rs new file mode 100644 index 0000000000..677929f338 --- /dev/null +++ b/iroh-sync/src/store/fs/bounds.rs @@ -0,0 +1,296 @@ +use std::ops::{Bound, RangeBounds}; + +use bytes::Bytes; + +use crate::{store::KeyFilter, AuthorId, NamespaceId}; + +use super::{RecordsByKeyId, RecordsByKeyIdOwned, RecordsId, RecordsIdOwned}; + +/// Bounds on the records table. +/// +/// Supports bounds by author, key +pub struct RecordsBounds(Bound, Bound); + +impl RecordsBounds { + pub fn new(start: Bound, end: Bound) -> Self { + Self(start, end) + } + + pub fn author_key(ns: NamespaceId, author: AuthorId, key_matcher: KeyFilter) -> Self { + let key_is_exact = matches!(key_matcher, KeyFilter::Exact(_)); + let key = match key_matcher { + KeyFilter::Any => Bytes::new(), + KeyFilter::Exact(key) => key, + KeyFilter::Prefix(prefix) => prefix, + }; + let author = author.to_bytes(); + let ns = ns.to_bytes(); + let mut author_end = author; + let mut ns_end = ns; + let mut key_end = key.to_vec(); + + let start = (ns, author, key); + + let end = if key_is_exact { + Bound::Included(start.clone()) + } else if increment_by_one(&mut key_end) { + Bound::Excluded((ns, author, key_end.into())) + } else if increment_by_one(&mut author_end) { + Bound::Excluded((ns, author_end, Bytes::new())) + } else if increment_by_one(&mut ns_end) { + Bound::Excluded((ns_end, [0u8; 32], Bytes::new())) + } else { + Bound::Unbounded + }; + + Self(Bound::Included(start), end) + } + + pub fn author_prefix(ns: NamespaceId, author: AuthorId, prefix: Bytes) -> Self { + RecordsBounds::author_key(ns, author, KeyFilter::Prefix(prefix)) + } + + pub fn namespace(ns: NamespaceId) -> Self { + Self::new(Self::namespace_start(&ns), Self::namespace_end(&ns)) + } + + pub fn from_start(ns: &NamespaceId, end: Bound) -> Self { + Self::new(Self::namespace_start(ns), end) + } + + pub fn to_end(ns: &NamespaceId, start: Bound) -> Self { + Self::new(start, Self::namespace_end(ns)) + } + + pub fn as_ref(&self) -> (Bound, Bound) { + fn map(id: &RecordsIdOwned) -> RecordsId { + (&id.0, &id.1, &id.2[..]) + } + (map_bound(&self.0, map), map_bound(&self.1, map)) + } + + fn namespace_start(namespace: &NamespaceId) -> Bound { + Bound::Included((namespace.to_bytes(), [0u8; 32], Bytes::new())) + } + + fn namespace_end(namespace: &NamespaceId) -> Bound { + let mut ns_end = namespace.to_bytes(); + if increment_by_one(&mut ns_end) { + Bound::Excluded((ns_end, [0u8; 32], Bytes::new())) + } else { + Bound::Unbounded + } + } +} + +impl RangeBounds for RecordsBounds { + fn start_bound(&self) -> Bound<&RecordsIdOwned> { + map_bound(&self.0, |s| s) + } + + fn end_bound(&self) -> Bound<&RecordsIdOwned> { + map_bound(&self.1, |s| s) + } +} + +impl From<(Bound, Bound)> for RecordsBounds { + fn from(value: (Bound, Bound)) -> Self { + Self::new(value.0, value.1) + } +} + +/// Bounds for the by-key index table. +/// +/// Supports bounds by key. +pub struct ByKeyBounds(Bound, Bound); +impl ByKeyBounds { + pub fn new(ns: NamespaceId, matcher: &KeyFilter) -> Self { + match matcher { + KeyFilter::Any => Self::namespace(ns), + KeyFilter::Exact(key) => { + let start = (ns.to_bytes(), key.clone(), [0u8; 32]); + let end = (ns.to_bytes(), key.clone(), [255u8; 32]); + Self(Bound::Included(start), Bound::Included(end)) + } + KeyFilter::Prefix(ref prefix) => { + let start = Bound::Included((ns.to_bytes(), prefix.clone(), [0u8; 32])); + + let mut ns_end = ns.to_bytes(); + let mut key_end = prefix.to_vec(); + let end = if increment_by_one(&mut key_end) { + Bound::Excluded((ns.to_bytes(), key_end.into(), [0u8; 32])) + } else if increment_by_one(&mut ns_end) { + Bound::Excluded((ns_end, Bytes::new(), [0u8; 32])) + } else { + Bound::Unbounded + }; + Self(start, end) + } + } + } + + pub fn namespace(ns: NamespaceId) -> Self { + let start = Bound::Included((ns.to_bytes(), Bytes::new(), [0u8; 32])); + let mut ns_end = ns.to_bytes(); + let end = if increment_by_one(&mut ns_end) { + Bound::Excluded((ns_end, Bytes::new(), [0u8; 32])) + } else { + Bound::Unbounded + }; + Self(start, end) + } + + pub fn as_ref(&self) -> (Bound, Bound) { + fn map(id: &RecordsByKeyIdOwned) -> RecordsByKeyId { + (&id.0, &id.1[..], &id.2) + } + (map_bound(&self.0, map), map_bound(&self.1, map)) + } +} + +impl RangeBounds for ByKeyBounds { + fn start_bound(&self) -> Bound<&RecordsByKeyIdOwned> { + map_bound(&self.0, |s| s) + } + + fn end_bound(&self) -> Bound<&RecordsByKeyIdOwned> { + map_bound(&self.1, |s| s) + } +} + +/// Increment a byte string by one, by incrementing the last byte that is not 255 by one. +/// +/// Returns false if all bytes are 255. +fn increment_by_one(value: &mut [u8]) -> bool { + for char in value.iter_mut().rev() { + if *char != 255 { + *char += 1; + return true; + } else { + *char = 0; + } + } + false +} + +fn map_bound<'a, T, U: 'a>(bound: &'a Bound, f: impl Fn(&'a T) -> U) -> Bound { + match bound { + Bound::Unbounded => Bound::Unbounded, + Bound::Included(t) => Bound::Included(f(t)), + Bound::Excluded(t) => Bound::Excluded(f(t)), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn records_bounds() { + let ns = NamespaceId::from(&[255u8; 32]); + + let bounds = RecordsBounds::namespace(ns); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), [0u8; 32], Bytes::new())) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let a = AuthorId::from(&[255u8; 32]); + + let bounds = RecordsBounds::author_key(ns, a, KeyFilter::Any); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), a.to_bytes(), Bytes::new())) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let a = AuthorId::from(&[0u8; 32]); + let mut a_end = a.to_bytes(); + a_end[31] = 1; + let bounds = RecordsBounds::author_key(ns, a, KeyFilter::Any); + assert_eq!( + bounds.end_bound(), + Bound::Excluded(&(ns.to_bytes(), a_end, Default::default())) + ); + + let bounds = RecordsBounds::author_key(ns, a, KeyFilter::Prefix(vec![1u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), a.to_bytes(), vec![1u8].into())) + ); + assert_eq!( + bounds.end_bound(), + Bound::Excluded(&(ns.to_bytes(), a.to_bytes(), vec![2u8].into())) + ); + + let bounds = RecordsBounds::author_key(ns, a, KeyFilter::Exact(vec![1u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), a.to_bytes(), vec![1u8].into())) + ); + assert_eq!( + bounds.end_bound(), + Bound::Included(&(ns.to_bytes(), a.to_bytes(), vec![1u8].into())) + ); + } + + #[test] + fn by_key_bounds() { + let ns = NamespaceId::from(&[255u8; 32]); + + let bounds = ByKeyBounds::namespace(ns); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), Bytes::new(), [0u8; 32])) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let bounds = ByKeyBounds::new(ns, &KeyFilter::Any); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), Bytes::new(), [0u8; 32])) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let bounds = ByKeyBounds::new(ns, &KeyFilter::Prefix(vec![1u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), vec![1u8].into(), [0u8; 32])) + ); + assert_eq!( + bounds.end_bound(), + Bound::Excluded(&(ns.to_bytes(), vec![2u8].into(), [0u8; 32])) + ); + + let bounds = ByKeyBounds::new(ns, &KeyFilter::Prefix(vec![255u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), vec![255u8].into(), [0u8; 32])) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let ns = NamespaceId::from(&[2u8; 32]); + let mut ns_end = ns.to_bytes(); + ns_end[31] = 3u8; + let bounds = ByKeyBounds::new(ns, &KeyFilter::Prefix(vec![255u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), vec![255u8].into(), [0u8; 32])) + ); + assert_eq!( + bounds.end_bound(), + Bound::Excluded(&(ns_end, Bytes::new(), [0u8; 32])) + ); + + let bounds = ByKeyBounds::new(ns, &KeyFilter::Exact(vec![1u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), vec![1u8].into(), [0u8; 32])) + ); + assert_eq!( + bounds.end_bound(), + Bound::Included(&(ns.to_bytes(), vec![1u8].into(), [255u8; 32])) + ); + } +} diff --git a/iroh-sync/src/store/fs/migrations.rs b/iroh-sync/src/store/fs/migrations.rs new file mode 100644 index 0000000000..7e6901a792 --- /dev/null +++ b/iroh-sync/src/store/fs/migrations.rs @@ -0,0 +1,120 @@ +use std::collections::HashMap; + +use anyhow::Result; +use redb::{Database, ReadableTable, TableHandle, WriteTransaction}; +use tracing::{debug, info}; + +use crate::{Capability, NamespaceSecret}; + +use super::{ + LATEST_PER_AUTHOR_TABLE, NAMESPACES_TABLE, NAMESPACES_TABLE_V1, RECORDS_BY_KEY_TABLE, + RECORDS_TABLE, +}; + +/// Run all database migrations, if needed. +pub fn run_migrations(db: &Database) -> Result<()> { + run_migration(db, migration_001_populate_latest_table)?; + run_migration(db, migration_002_namespaces_v2)?; + run_migration(db, migration_003_populate_by_key_index)?; + Ok(()) +} + +fn run_migration(db: &Database, f: F) -> Result<()> +where + F: Fn(&WriteTransaction) -> Result, +{ + let name = std::any::type_name::(); + let name = name.split("::").last().unwrap(); + let tx = db.begin_write()?; + debug!("Start migration {name}"); + match f(&tx)? { + MigrateOutcome::Execute(len) => { + tx.commit()?; + info!("Executed migration {name} ({len} rows affected)"); + } + MigrateOutcome::Skip => debug!("Skip migration {name}: Not needed"), + } + Ok(()) +} + +enum MigrateOutcome { + Skip, + Execute(usize), +} + +/// migration 001: populate the latest table (which did not exist before) +fn migration_001_populate_latest_table(tx: &WriteTransaction) -> Result { + let mut latest_table = tx.open_table(LATEST_PER_AUTHOR_TABLE)?; + let records_table = tx.open_table(RECORDS_TABLE)?; + if !latest_table.is_empty()? || records_table.is_empty()? { + return Ok(MigrateOutcome::Skip); + } + + #[allow(clippy::type_complexity)] + let mut heads: HashMap<([u8; 32], [u8; 32]), (u64, Vec)> = HashMap::new(); + let iter = records_table.iter()?; + + for next in iter { + let next = next?; + let (namespace, author, key) = next.0.value(); + let (timestamp, _namespace_sig, _author_sig, _len, _hash) = next.1.value(); + heads + .entry((*namespace, *author)) + .and_modify(|e| { + if timestamp >= e.0 { + *e = (timestamp, key.to_vec()); + } + }) + .or_insert_with(|| (timestamp, key.to_vec())); + } + let len = heads.len(); + for ((namespace, author), (timestamp, key)) in heads { + latest_table.insert((&namespace, &author), (timestamp, key.as_slice()))?; + } + Ok(MigrateOutcome::Execute(len)) +} + +/// Migrate the namespaces table from V1 to V2. +fn migration_002_namespaces_v2(tx: &WriteTransaction) -> Result { + let namespaces_v1_exists = tx + .list_tables()? + .any(|handle| handle.name() == NAMESPACES_TABLE_V1.name()); + // migration 002: update namespaces from V1 to V2 + if !namespaces_v1_exists { + return Ok(MigrateOutcome::Skip); + } + let namespaces_v1 = tx.open_table(NAMESPACES_TABLE_V1)?; + let mut namespaces_v2 = tx.open_table(NAMESPACES_TABLE)?; + let mut entries = 0; + for res in namespaces_v1.iter()? { + let db_value = res?.1; + let secret_bytes = db_value.value(); + let capability = Capability::Write(NamespaceSecret::from_bytes(secret_bytes)); + let id = capability.id().to_bytes(); + let (raw_kind, raw_bytes) = capability.raw(); + namespaces_v2.insert(&id, (raw_kind, &raw_bytes))?; + entries += 1; + } + tx.delete_table(NAMESPACES_TABLE_V1)?; + Ok(MigrateOutcome::Execute(entries)) +} + +/// migration 003: populate the by_key index table(which did not exist before) +fn migration_003_populate_by_key_index(tx: &WriteTransaction) -> Result { + let mut by_key_table = tx.open_table(RECORDS_BY_KEY_TABLE)?; + let records_table = tx.open_table(RECORDS_TABLE)?; + if !by_key_table.is_empty()? || records_table.is_empty()? { + return Ok(MigrateOutcome::Skip); + } + + let iter = records_table.iter()?; + let mut len = 0; + for next in iter { + let next = next?; + let (namespace, author, key) = next.0.value(); + let id = (namespace, key, author); + by_key_table.insert(id, ())?; + len += 1; + } + Ok(MigrateOutcome::Execute(len)) +} diff --git a/iroh-sync/src/store/fs/query.rs b/iroh-sync/src/store/fs/query.rs new file mode 100644 index 0000000000..d21033c755 --- /dev/null +++ b/iroh-sync/src/store/fs/query.rs @@ -0,0 +1,161 @@ +use std::sync::Arc; + +use anyhow::Result; +use iroh_bytes::Hash; +use redb::Database; + +use crate::{ + store::{ + util::{IndexKind, LatestPerKeySelector, SelectorRes}, + AuthorFilter, KeyFilter, Query, + }, + AuthorId, NamespaceId, SignedEntry, +}; + +use super::{ + bounds::{ByKeyBounds, RecordsBounds}, + ranges::{RecordsByKeyRange, RecordsRange}, + RecordsValue, +}; + +/// A query iterator for entry queries. +#[derive(Debug)] +pub struct QueryIterator<'a> { + range: QueryRange<'a>, + query: Query, + offset: u64, + count: u64, +} + +#[derive(Debug)] +enum QueryRange<'a> { + AuthorKey { + range: RecordsRange<'a>, + key_filter: KeyFilter, + }, + KeyAuthor { + range: RecordsByKeyRange<'a>, + author_filter: AuthorFilter, + selector: Option, + }, +} + +impl<'a> QueryIterator<'a> { + pub fn new(db: &'a Arc, namespace: NamespaceId, query: Query) -> Result { + let index_kind = IndexKind::from(&query); + let range = match index_kind { + IndexKind::AuthorKey { range, key_filter } => { + let (bounds, filter) = match range { + // single author: both author and key are selected via the range. therefore + // set `filter` to `Any`. + AuthorFilter::Exact(author) => ( + RecordsBounds::author_key(namespace, author, key_filter), + KeyFilter::Any, + ), + // no author set => full table scan with the provided key filter + AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter), + }; + let range = RecordsRange::with_bounds(db, bounds)?; + QueryRange::AuthorKey { + range, + key_filter: filter, + } + } + IndexKind::KeyAuthor { + range, + author_filter, + latest_per_key, + } => { + let bounds = ByKeyBounds::new(namespace, &range); + let range = RecordsByKeyRange::with_bounds(db, bounds)?; + let selector = latest_per_key.then(LatestPerKeySelector::default); + QueryRange::KeyAuthor { + author_filter, + range, + selector, + } + } + }; + + Ok(QueryIterator { + range, + query, + offset: 0, + count: 0, + }) + } +} + +impl<'a> Iterator for QueryIterator<'a> { + type Item = Result; + + fn next(&mut self) -> Option> { + // early-return if we reached the query limit. + if let Some(limit) = self.query.limit() { + if self.count >= limit { + return None; + } + } + loop { + let next = match &mut self.range { + QueryRange::AuthorKey { range, key_filter } => { + // get the next entry from the query range, filtered by the key and empty filters + range.next_filtered(&self.query.sort_direction, |(_ns, _author, key), value| { + key_filter.matches(key) + && (self.query.include_empty || !value_is_empty(&value)) + }) + } + + QueryRange::KeyAuthor { + range, + author_filter, + selector, + } => loop { + // get the next entry from the query range, filtered by the author filter + let next = range + .next_filtered(&self.query.sort_direction, |(_ns, _key, author)| { + author_filter.matches(&(AuthorId::from(author))) + }); + + // early-break if next contains Err + let next = match next.transpose() { + Err(err) => break Some(Err(err)), + Ok(next) => next, + }; + + // push the entry into the selector. if active, only the latest entry + // for each key will be emitted. + let next = match selector { + None => next, + Some(selector) => match selector.push(next) { + SelectorRes::Continue => continue, + SelectorRes::Finished => None, + SelectorRes::Some(res) => Some(res), + }, + }; + + // skip the entry if empty and no empty entries requested + if !self.query.include_empty && matches!(&next, Some(e) if e.is_empty()) { + continue; + } + + break next.map(Result::Ok); + }, + }; + + // skip the entry if we didn't get past the requested offset yet. + if self.offset < self.query.offset() && matches!(next, Some(Ok(_))) { + self.offset += 1; + continue; + } + + self.count += 1; + return next; + } + } +} + +fn value_is_empty(value: &RecordsValue) -> bool { + let (_timestamp, _namespace_sig, _author_sig, _len, hash) = value; + *hash == Hash::EMPTY.as_bytes() +} diff --git a/iroh-sync/src/store/fs/ranges.rs b/iroh-sync/src/store/fs/ranges.rs new file mode 100644 index 0000000000..48eeefe49e --- /dev/null +++ b/iroh-sync/src/store/fs/ranges.rs @@ -0,0 +1,278 @@ +//! Ranges on [`redb`] tables +//! +//! Because the [`redb`] types all contain references, this uses [`ouroboros`] to create +//! self-referential structs so that we can embed the [`Range`] iterator together with the +//! [`ReadableTable`] and the [`ReadTransaction`] in a struct for our iterators returned from the +//! store. + +use std::{fmt, sync::Arc}; + +use ouroboros::self_referencing; +use redb::{ + Database, Range, ReadOnlyTable, ReadTransaction, ReadableTable, RedbKey, RedbValue, + StorageError, TableError, +}; + +use crate::{store::SortDirection, SignedEntry}; + +use super::{ + bounds::{ByKeyBounds, RecordsBounds}, + into_entry, RecordsByKeyId, RecordsId, RecordsValue, RECORDS_BY_KEY_TABLE, RECORDS_TABLE, +}; + +/// A [`ReadTransaction`] with a [`ReadOnlyTable`] that can be stored in a struct. +/// +/// This uses [`ouroboros::self_referencing`] to store a [`ReadTransaction`] and a [`ReadOnlyTable`] +/// with self-referencing. +pub struct TableReader<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static>( + TableReaderInner<'a, K, V>, +); + +#[self_referencing] +struct TableReaderInner<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> { + #[debug("ReadTransaction")] + read_tx: ReadTransaction<'a>, + #[borrows(read_tx)] + #[covariant] + table: ReadOnlyTable<'this, K, V>, +} + +impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableReader<'a, K, V> { + /// Create a new [`TableReader`] + pub fn new( + db: &'a Arc, + table_fn: impl for<'this> FnOnce( + &'this ReadTransaction<'this>, + ) -> Result, TableError>, + ) -> anyhow::Result { + let reader = TableReaderInner::try_new(db.begin_read()?, |read_tx| { + table_fn(read_tx).map_err(anyhow::Error::from) + })?; + Ok(Self(reader)) + } + + /// Get a reference to the [`ReadOnlyTable`]; + pub fn table(&self) -> &ReadOnlyTable { + self.0.borrow_table() + } +} + +impl<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> fmt::Debug for TableReader<'a, K, V> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "TableReader({:?})", self.table()) + } +} + +/// A range reader for a [`ReadOnlyTable`] that can be stored in a struct. +/// +/// This uses [`ouroboros::self_referencing`] to store a [`ReadTransaction`], a [`ReadOnlyTable`] +/// and a [`Range`] together. Useful to build iterators with. +pub struct TableRange<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static>( + TableRangeReaderInner<'a, K, V>, +); + +#[self_referencing] +struct TableRangeReaderInner<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> { + #[debug("ReadTransaction")] + read_tx: ReadTransaction<'a>, + #[borrows(read_tx)] + #[covariant] + table: ReadOnlyTable<'this, K, V>, + #[covariant] + #[borrows(table)] + range: Range<'this, K, V>, +} + +impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableRange<'a, K, V> { + /// Create a new [`TableReader`] + pub fn new(db: &'a Arc, table_fn: TF, range_fn: RF) -> anyhow::Result + where + TF: for<'s> FnOnce(&'s ReadTransaction<'s>) -> Result, TableError>, + RF: for<'s> FnOnce(&'s ReadOnlyTable<'s, K, V>) -> Result, StorageError>, + { + let reader = TableRangeReaderInner::try_new( + db.begin_read()?, + |tx| table_fn(tx).map_err(anyhow_err), + |table| range_fn(table).map_err(anyhow_err), + )?; + Ok(Self(reader)) + } + + /// Get a reference to the [`ReadOnlyTable`]; + pub fn table(&self) -> &ReadOnlyTable { + self.0.borrow_table() + } + + pub fn next_mapped( + &mut self, + map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T, + ) -> Option> { + self.0.with_range_mut(|records| { + records + .next() + .map(|r| r.map_err(Into::into).map(|r| map(r.0.value(), r.1.value()))) + }) + } + + pub fn next_filtered( + &mut self, + direction: &SortDirection, + filter: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> bool, + map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T, + ) -> Option> { + self.0.with_range_mut(|records| loop { + let next = match direction { + SortDirection::Asc => records.next(), + SortDirection::Desc => records.next_back(), + }; + match next { + None => break None, + Some(Err(err)) => break Some(Err(err.into())), + Some(Ok(res)) => match filter(res.0.value(), res.1.value()) { + false => continue, + true => break Some(Ok(map(res.0.value(), res.1.value()))), + }, + } + }) + } +} + +impl<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> fmt::Debug for TableRange<'a, K, V> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "TableRangeReader({:?})", self.table()) + } +} + +/// An iterator over a range of entries from the records table. +#[derive(Debug)] +pub struct RecordsRange<'a>(TableRange<'a, RecordsId<'static>, RecordsValue<'static>>); +impl<'a> RecordsRange<'a> { + pub(super) fn new(db: &'a Arc, range_fn: RF) -> anyhow::Result + where + RF: for<'s> FnOnce( + &'s ReadOnlyTable<'s, RecordsId<'static>, RecordsValue<'static>>, + ) -> Result< + Range<'s, RecordsId<'static>, RecordsValue<'static>>, + StorageError, + >, + { + Ok(Self(TableRange::new( + db, + |tx| tx.open_table(RECORDS_TABLE), + range_fn, + )?)) + } + + pub(super) fn with_bounds( + db: &'a Arc, + bounds: RecordsBounds, + ) -> anyhow::Result { + Self::new(db, |table| table.range(bounds.as_ref())) + } + + /// Get the next item in the range. + /// + /// Omit items for which the `matcher` function returns false. + pub(super) fn next_filtered( + &mut self, + direction: &SortDirection, + filter: impl for<'x> Fn(RecordsId<'x>, RecordsValue<'x>) -> bool, + ) -> Option> { + self.0.next_filtered(direction, filter, into_entry) + } + + pub(super) fn next_mapped( + &mut self, + map: impl for<'x> Fn(RecordsId<'x>, RecordsValue<'x>) -> T, + ) -> Option> { + self.0.next_mapped(map) + } +} + +impl<'a> Iterator for RecordsRange<'a> { + type Item = anyhow::Result; + fn next(&mut self) -> Option { + self.0.next_mapped(into_entry) + } +} + +#[derive(derive_more::Debug)] +#[debug("RecordsByKeyRange")] +pub struct RecordsByKeyRange<'a>(RecordsByKeyRangeInner<'a>); + +#[self_referencing] +struct RecordsByKeyRangeInner<'a> { + read_tx: ReadTransaction<'a>, + + #[covariant] + #[borrows(read_tx)] + records_table: ReadOnlyTable<'this, RecordsId<'static>, RecordsValue<'static>>, + + #[covariant] + #[borrows(read_tx)] + by_key_table: ReadOnlyTable<'this, RecordsByKeyId<'static>, ()>, + + #[borrows(by_key_table)] + #[covariant] + by_key_range: Range<'this, RecordsByKeyId<'static>, ()>, +} + +impl<'a> RecordsByKeyRange<'a> { + pub fn new(db: &'a Arc, range_fn: RF) -> anyhow::Result + where + RF: for<'s> FnOnce( + &'s ReadOnlyTable<'s, RecordsByKeyId<'static>, ()>, + ) -> Result, ()>, StorageError>, + { + let inner = RecordsByKeyRangeInner::try_new( + db.begin_read()?, + |tx| tx.open_table(RECORDS_TABLE).map_err(anyhow_err), + |tx| tx.open_table(RECORDS_BY_KEY_TABLE).map_err(anyhow_err), + |table| range_fn(table).map_err(Into::into), + )?; + Ok(Self(inner)) + } + + pub fn with_bounds(db: &'a Arc, bounds: ByKeyBounds) -> anyhow::Result { + Self::new(db, |table| table.range(bounds.as_ref())) + } + + /// Get the next item in the range. + /// + /// Omit items for which the `matcher` function returns false. + pub fn next_filtered( + &mut self, + direction: &SortDirection, + filter: impl for<'x> Fn(RecordsByKeyId<'x>) -> bool, + ) -> Option> { + self.0.with_mut(|fields| { + let by_key_id = loop { + let next = match direction { + SortDirection::Asc => fields.by_key_range.next(), + SortDirection::Desc => fields.by_key_range.next_back(), + }; + match next { + Some(Ok(res)) => match filter(res.0.value()) { + false => continue, + true => break res.0, + }, + Some(Err(err)) => return Some(Err(err.into())), + None => return None, + } + }; + + let (namespace, key, author) = by_key_id.value(); + let records_id = (namespace, author, key); + let entry = fields.records_table.get(&records_id); + match entry { + Ok(Some(entry)) => Some(Ok(into_entry(records_id, entry.value()))), + Ok(None) => None, + Err(err) => Some(Err(err.into())), + } + }) + } +} + +fn anyhow_err(err: impl Into) -> anyhow::Error { + err.into() +} diff --git a/iroh-sync/src/store/memory.rs b/iroh-sync/src/store/memory.rs index 9a69e6d986..aa29fde1e7 100644 --- a/iroh-sync/src/store/memory.rs +++ b/iroh-sync/src/store/memory.rs @@ -7,6 +7,7 @@ use std::{ }; use anyhow::{anyhow, Result}; +use bytes::Bytes; use ed25519_dalek::{SignatureError, VerifyingKey}; use iroh_bytes::Hash; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; @@ -18,7 +19,11 @@ use crate::{ AuthorId, Capability, CapabilityKind, NamespaceId, PeerIdBytes, Record, }; -use super::{pubkeys::MemPublicKeyStore, ImportNamespaceOutcome, OpenError, PublicKeyStore}; +use super::{ + pubkeys::MemPublicKeyStore, + util::{IndexKind, LatestPerKeySelector, SelectorRes}, + ImportNamespaceOutcome, OpenError, PublicKeyStore, Query, SortDirection, +}; type SyncPeersCache = Arc>>>; @@ -37,18 +42,48 @@ pub struct Store { peers_per_doc: SyncPeersCache, } -type Rid = (AuthorId, Vec); -type Rvalue = SignedEntry; -type RecordMap = BTreeMap; +type Key = Bytes; type ReplicaRecordsOwned = BTreeMap; +#[derive(Debug, Default)] +struct RecordMap { + by_author: BTreeMap<(AuthorId, Key), SignedEntry>, + by_key: BTreeMap<(Key, AuthorId), ()>, +} + +impl RecordMap { + fn insert(&mut self, entry: SignedEntry) { + self.by_key + .insert((entry.id().key_bytes(), entry.author()), ()); + self.by_author + .insert((entry.author(), entry.id().key_bytes()), entry); + } + fn remove(&mut self, id: &RecordIdentifier) -> Option { + let entry = self.by_author.remove(&(id.author(), id.key_bytes())); + self.by_key.remove(&(id.key_bytes(), id.author())); + entry + } + fn len(&self) -> usize { + self.by_author.len() + } + fn retain(&mut self, f: impl Fn(&(AuthorId, Key), &mut SignedEntry) -> bool) { + self.by_author.retain(|key, value| { + let retain = f(key, value); + if !retain { + self.by_key.remove(&(key.1.clone(), key.0)); + } + retain + }) + } +} + type LatestByAuthorMapOwned = BTreeMap)>; type LatestMapOwned = HashMap; type LatestByAuthorMap<'a> = MappedRwLockReadGuard<'a, LatestByAuthorMapOwned>; impl super::Store for Store { type Instance = ReplicaStoreInstance; - type GetIter<'a> = RangeIterator<'a>; + type GetIter<'a> = QueryIterator<'a>; type ContentHashesIter<'a> = ContentHashesIterator<'a>; type AuthorsIter<'a> = std::vec::IntoIter>; type NamespaceIter<'a> = std::vec::IntoIter>; @@ -134,34 +169,31 @@ impl super::Store for Store { fn get_many( &self, namespace: NamespaceId, - filter: super::GetFilter, + query: impl Into, ) -> Result> { - match filter { - super::GetFilter::All => self.get_all(namespace), - super::GetFilter::Key(key) => self.get_by_key(namespace, key), - super::GetFilter::Prefix(prefix) => self.get_by_prefix(namespace, prefix), - super::GetFilter::Author(author) => self.get_by_author(namespace, author), - super::GetFilter::AuthorAndPrefix(author, prefix) => { - self.get_by_author_and_prefix(namespace, author, prefix) - } - } + let query = query.into(); + let records = self.replica_records.read(); + Ok(QueryIterator::new(records, namespace, query)) } - fn get_one( + fn get_exact( &self, namespace: NamespaceId, author: AuthorId, key: impl AsRef<[u8]>, + include_empty: bool, ) -> Result> { let inner = self.replica_records.read(); - - let value = inner - .get(&namespace) - .and_then(|records| records.get(&(author, key.as_ref().to_vec()))); - Ok(match value { + let Some(records) = inner.get(&namespace) else { + return Ok(None); + }; + let entry = records + .by_author + .get(&(author, key.as_ref().to_vec().into())); + Ok(match entry { + Some(entry) if !include_empty && entry.is_empty() => None, + Some(entry) => Some(entry.clone()), None => None, - Some(value) if value.is_empty() => None, - Some(value) => Some(value.clone()), }) } @@ -205,121 +237,6 @@ impl super::Store for Store { } } -impl Store { - fn get_by_key( - &self, - namespace: NamespaceId, - key: impl AsRef<[u8]>, - ) -> Result> { - let records = self.replica_records.read(); - let key = key.as_ref().to_vec(); - let filter = GetFilter::Key { namespace, key }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } - - fn get_by_prefix( - &self, - namespace: NamespaceId, - prefix: impl AsRef<[u8]>, - ) -> Result> { - let records = self.replica_records.read(); - let prefix = prefix.as_ref().to_vec(); - let filter = GetFilter::Prefix { namespace, prefix }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } - - fn get_by_author(&self, namespace: NamespaceId, author: AuthorId) -> Result> { - let records = self.replica_records.read(); - let filter = GetFilter::Author { namespace, author }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } - - fn get_by_author_and_prefix( - &self, - namespace: NamespaceId, - author: AuthorId, - prefix: Vec, - ) -> Result> { - let records = self.replica_records.read(); - let filter = GetFilter::AuthorAndPrefix { - namespace, - author, - prefix, - }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } - - fn get_all(&self, namespace: NamespaceId) -> Result> { - let records = self.replica_records.read(); - let filter = GetFilter::All { namespace }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } -} - -#[derive(Debug)] -enum GetFilter { - /// All entries. - All { namespace: NamespaceId }, - /// Filter by author. - Author { - namespace: NamespaceId, - author: AuthorId, - }, - /// Filter by key only. - Key { - namespace: NamespaceId, - key: Vec, - }, - /// Filter by prefix only. - Prefix { - namespace: NamespaceId, - prefix: Vec, - }, - /// Filter by author and prefix. - AuthorAndPrefix { - namespace: NamespaceId, - prefix: Vec, - author: AuthorId, - }, -} - -impl GetFilter { - fn namespace(&self) -> NamespaceId { - match self { - GetFilter::All { namespace } => *namespace, - GetFilter::Key { namespace, .. } => *namespace, - GetFilter::Prefix { namespace, .. } => *namespace, - GetFilter::Author { namespace, .. } => *namespace, - GetFilter::AuthorAndPrefix { namespace, .. } => *namespace, - } - } -} - /// Iterator over all content hashes in the memory store. #[derive(Debug)] pub struct ContentHashesIterator<'a> { @@ -333,7 +250,7 @@ impl<'a> Iterator for ContentHashesIterator<'a> { fn next(&mut self) -> Option { loop { let records = self.records.values().nth(self.namespace_i)?; - match records.values().nth(self.record_i) { + match records.by_author.values().nth(self.record_i) { None => { self.namespace_i += 1; self.record_i = 0; @@ -370,48 +287,126 @@ impl<'a> Iterator for LatestIterator<'a> { /// Iterator over entries in the memory store #[derive(Debug)] -pub struct RangeIterator<'a> { +pub struct QueryIterator<'a> { records: ReplicaRecords<'a>, - filter: GetFilter, - /// Current iteration index. - index: usize, + namespace: NamespaceId, + query: Query, + index: IndexKind, + selector: Option, + // current iterator index + position: usize, + // number of entries returned from the iterator + count: u64, + // number of entries skipped at the beginning + offset: u64, } -impl<'a> Iterator for RangeIterator<'a> { +impl<'a> QueryIterator<'a> { + fn new(records: ReplicaRecords<'a>, namespace: NamespaceId, query: Query) -> Self { + let index = IndexKind::from(&query); + let selector = match index { + IndexKind::KeyAuthor { latest_per_key, .. } if latest_per_key => { + Some(LatestPerKeySelector::default()) + } + _ => None, + }; + + Self { + records, + namespace, + query, + index, + selector, + position: 0, + offset: 0, + count: 0, + } + } +} + +impl<'a> Iterator for QueryIterator<'a> { type Item = Result; fn next(&mut self) -> Option { loop { - let records = self.records.get(&self.filter.namespace())?; - let entry = match self.filter { - GetFilter::All { .. } => records.iter().nth(self.index)?, - GetFilter::Key { ref key, .. } => records - .iter() - .filter(|((_, k), _)| k == key) - .nth(self.index)?, - GetFilter::Prefix { ref prefix, .. } => records - .iter() - .filter(|((_, k), _)| k.starts_with(prefix)) - .nth(self.index)?, - GetFilter::Author { ref author, .. } => records - .iter() - .filter(|((a, _), _)| a == author) - .nth(self.index)?, - GetFilter::AuthorAndPrefix { - ref prefix, - ref author, + if let Some(limit) = self.query.limit() { + if self.count >= limit { + return None; + } + } + + let records = self.records.get(&self.namespace)?; + + let entry = match &self.index { + IndexKind::AuthorKey { range, key_filter } => { + let mut iter = records + .by_author + .iter() + .filter(|(_key, entry)| { + range.matches(&entry.author()) + && key_filter.matches(entry.key()) + && (self.query.include_empty || !entry.is_empty()) + }) + .map(|(_key, entry)| entry); + + let next = match self.query.sort_direction { + SortDirection::Asc => iter.nth(self.position), + SortDirection::Desc => iter.nth_back(self.position), + }; + next.cloned() + } + IndexKind::KeyAuthor { + range, + author_filter, .. - } => records - .iter() - .filter(|((a, k), _)| a == author && k.starts_with(prefix)) - .nth(self.index)?, + } => loop { + let mut iter = records + .by_key + .keys() + .flat_map(|k| records.by_author.get(&(k.1, k.0.clone())).into_iter()) + .filter(|entry| { + range.matches(entry.key()) && author_filter.matches(&entry.author()) + }); + let next = match self.query.sort_direction { + SortDirection::Asc => iter.nth(self.position), + SortDirection::Desc => iter.nth_back(self.position), + }; + let next = next.cloned(); + + let next = match self.selector.as_mut() { + None => next, + Some(selector) => match selector.push(next) { + SelectorRes::Continue => { + self.position += 1; + continue; + } + SelectorRes::Finished => None, + SelectorRes::Some(res) => Some(res), + }, + }; + let Some(entry) = next else { + break None; + }; + + // final check for empty entries: if the selector is active, the latest + // entry for a key might be empty, so skip it if no empty entries were + // requested + if !self.query.include_empty && entry.is_empty() { + self.position += 1; + continue; + } else { + break Some(entry); + } + }, }; - self.index += 1; - if entry.1.is_empty() { + + self.position += 1; + self.offset += 1; + if self.offset <= self.query.offset() { continue; - } else { - return Some(Ok(entry.1.clone())); } + self.count += 1; + return entry.map(Result::Ok); } } } @@ -493,7 +488,7 @@ impl Iterator for RecordsIter<'_> { fn next(&mut self) -> Option { let records = self.replica_records.get(&self.namespace)?; - let ((author, key), value) = records.iter().nth(self.i)?; + let ((author, key), value) = records.by_author.iter().nth(self.i)?; let id = RecordIdentifier::new(self.namespace, *author, key); self.i += 1; Some((id, value.clone())) @@ -508,25 +503,27 @@ impl crate::ranger::Store for ReplicaStoreInstance { Ok(self.with_records(|records| { records .and_then(|r| { - r.first_key_value().map(|((author, key), _value)| { - RecordIdentifier::new(self.namespace, *author, key.clone()) - }) + r.by_author + .first_key_value() + .map(|((author, key), _value)| { + RecordIdentifier::new(self.namespace, *author, key.clone()) + }) }) .unwrap_or_default() })) } - fn get(&self, key: &RecordIdentifier) -> Result, Self::Error> { + fn get(&self, id: &RecordIdentifier) -> Result, Self::Error> { Ok(self.with_records(|records| { records.and_then(|r| { - let v = r.get(&(key.author(), key.key().to_vec()))?; + let v = r.by_author.get(&(id.author(), id.key_bytes()))?; Some(v.clone()) }) })) } fn len(&self) -> Result { - Ok(self.with_records(|records| records.map(|v| v.len()).unwrap_or_default())) + Ok(self.with_records(|records| records.map(|v| v.by_author.len()).unwrap_or_default())) } fn is_empty(&self) -> Result { @@ -548,7 +545,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { records.insert(e.author_bytes(), (e.timestamp(), e.key().to_vec())); }); self.with_records_mut_with_default(|records| { - records.insert((e.author_bytes(), e.key().to_vec()), e); + records.insert(e); }); Ok(()) } @@ -567,9 +564,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { fn remove(&mut self, key: &RecordIdentifier) -> Result, Self::Error> { // TODO: what if we are trying to remove with the wrong timestamp? - let res = self.with_records_mut(|records| { - records.and_then(|records| records.remove(&(key.author(), key.key().to_vec()))) - }); + let res = self.with_records_mut(|records| records.and_then(|records| records.remove(key))); Ok(res) } @@ -616,7 +611,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { let Some(records) = records else { return Ok(0); }; - let old_len = records.len(); + let old_len = records.by_author.len(); records.retain(|(a, k), v| { !(a == &prefix.author() && k.starts_with(prefix.key()) && predicate(v.entry())) }); diff --git a/iroh-sync/src/store/util.rs b/iroh-sync/src/store/util.rs new file mode 100644 index 0000000000..e967e04bd9 --- /dev/null +++ b/iroh-sync/src/store/util.rs @@ -0,0 +1,89 @@ +//! Utilities useful across different store impls. + +use crate::SignedEntry; + +use super::{AuthorFilter, KeyFilter, Query, QueryKind, SortBy}; + +/// A helper for stores that have by-author and by-key indexes for records. +#[derive(Debug)] +pub enum IndexKind { + AuthorKey { + range: AuthorFilter, + key_filter: KeyFilter, + }, + KeyAuthor { + range: KeyFilter, + author_filter: AuthorFilter, + latest_per_key: bool, + }, +} + +impl From<&Query> for IndexKind { + fn from(query: &Query) -> Self { + match &query.kind { + QueryKind::Flat(details) => match (&query.filter_author, details.sort_by) { + (AuthorFilter::Any, SortBy::KeyAuthor) => IndexKind::KeyAuthor { + range: query.filter_key.clone(), + author_filter: AuthorFilter::Any, + latest_per_key: false, + }, + _ => IndexKind::AuthorKey { + range: query.filter_author.clone(), + key_filter: query.filter_key.clone(), + }, + }, + QueryKind::SingleLatestPerKey(_) => IndexKind::KeyAuthor { + range: query.filter_key.clone(), + author_filter: query.filter_author.clone(), + latest_per_key: true, + }, + } + } +} + +/// Helper to extract the latest entry per key from an iterator that yields [`SignedEntry`] items. +/// +/// Items must be pushed in key-sorted order. +#[derive(Debug, Default)] +pub struct LatestPerKeySelector(Option); + +pub enum SelectorRes { + /// The iterator is finished. + Finished, + /// The selection is not yet finished, keep pushing more items. + Continue, + /// The selection yielded an entry. + Some(SignedEntry), +} + +impl LatestPerKeySelector { + /// Push an entry into the selector. + /// + /// Entries must be sorted by key beforehand. + pub fn push(&mut self, entry: Option) -> SelectorRes { + let Some(entry) = entry else { + return match self.0.take() { + Some(entry) => SelectorRes::Some(entry), + None => SelectorRes::Finished, + }; + }; + match self.0.take() { + None => { + self.0 = Some(entry); + SelectorRes::Continue + } + Some(last) if last.key() == entry.key() => { + if entry.timestamp() > last.timestamp() { + self.0 = Some(entry); + } else { + self.0 = Some(last); + } + SelectorRes::Continue + } + Some(last) => { + self.0 = Some(entry); + SelectorRes::Some(last) + } + } + } +} diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index 46ab8438e0..935c12c38e 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -824,12 +824,12 @@ impl EntrySignature { } #[cfg(feature = "fs-store")] - pub(crate) fn author_signature(&self) -> &Signature { + pub(crate) fn author(&self) -> &Signature { &self.author_signature } #[cfg(feature = "fs-store")] - pub(crate) fn namespace_signature(&self) -> &Signature { + pub(crate) fn namespace(&self) -> &Signature { &self.namespace_signature } } @@ -939,7 +939,7 @@ impl Debug for RecordIdentifier { impl RangeKey for RecordIdentifier { fn is_prefix_of(&self, other: &Self) -> bool { - other.as_bytes().starts_with(self.as_bytes()) + other.as_ref().starts_with(self.as_ref()) } } @@ -969,9 +969,9 @@ impl RecordIdentifier { out.extend_from_slice(&self.0); } - /// Get this [`RecordIdentifier`] as a byte slices. - pub fn as_bytes(&self) -> &[u8] { - &self.0 + /// Get this [`RecordIdentifier`] as [Bytes]. + pub fn as_bytes(&self) -> Bytes { + self.0.clone() } /// Get this [`RecordIdentifier`] as a tuple of byte slices. @@ -983,11 +983,25 @@ impl RecordIdentifier { ) } + /// Get this [`RecordIdentifier`] as a tuple of bytes. + pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) { + ( + self.0[NAMESPACE_BYTES].try_into().unwrap(), + self.0[AUTHOR_BYTES].try_into().unwrap(), + self.0.slice(KEY_BYTES), + ) + } + /// Get the key of this record. pub fn key(&self) -> &[u8] { &self.0[KEY_BYTES] } + /// Get the key of this record as [`Bytes`]. + pub fn key_bytes(&self) -> Bytes { + self.0.slice(KEY_BYTES) + } + /// Get the [`NamespaceId`] of this record as byte array. pub fn namespace(&self) -> NamespaceId { let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap(); @@ -1001,6 +1015,12 @@ impl RecordIdentifier { } } +impl AsRef<[u8]> for RecordIdentifier { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + impl Deref for SignedEntry { type Target = Entry; fn deref(&self) -> &Self::Target { @@ -1127,7 +1147,7 @@ mod tests { use crate::{ actor::SyncHandle, ranger::{Range, Store as _}, - store::{self, GetFilter, OpenError, Store}, + store::{self, OpenError, Query, SortBy, SortDirection, Store}, }; use super::*; @@ -1172,7 +1192,7 @@ mod tests { for i in 0..10 { let res = store - .get_one(my_replica.id(), alice.id(), format!("/{i}"))? + .get_exact(my_replica.id(), alice.id(), format!("/{i}"), false)? .unwrap(); let len = format!("{i}: hello from alice").as_bytes().len() as u64; assert_eq!(res.entry().record().content_len(), len); @@ -1182,35 +1202,35 @@ mod tests { // Test multiple records for the same key my_replica.hash_and_insert("/cool/path", &alice, "round 1")?; let _entry = store - .get_one(my_replica.id(), alice.id(), "/cool/path")? + .get_exact(my_replica.id(), alice.id(), "/cool/path", false)? .unwrap(); // Second my_replica.hash_and_insert("/cool/path", &alice, "round 2")?; let _entry = store - .get_one(my_replica.id(), alice.id(), "/cool/path")? + .get_exact(my_replica.id(), alice.id(), "/cool/path", false)? .unwrap(); // Get All by author let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Author(alice.id()))? + .get_many(my_replica.id(), Query::author(alice.id()))? .collect::>()?; assert_eq!(entries.len(), 11); // Get All by author let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Author(bob.id()))? + .get_many(my_replica.id(), Query::author(bob.id()))? .collect::>()?; assert_eq!(entries.len(), 0); // Get All by key let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Key(b"/cool/path".to_vec()))? + .get_many(my_replica.id(), Query::key_exact(b"/cool/path"))? .collect::>()?; assert_eq!(entries.len(), 1); // Get All let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::All)? + .get_many(my_replica.id(), Query::all())? .collect::>()?; assert_eq!(entries.len(), 11); @@ -1219,24 +1239,24 @@ mod tests { // Get All by author let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Author(alice.id()))? + .get_many(my_replica.id(), Query::author(alice.id()))? .collect::>()?; assert_eq!(entries.len(), 11); let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Author(bob.id()))? + .get_many(my_replica.id(), Query::author(bob.id()))? .collect::>()?; assert_eq!(entries.len(), 1); // Get All by key let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Key(b"/cool/path".to_vec()))? + .get_many(my_replica.id(), Query::key_exact(b"/cool/path"))? .collect::>()?; assert_eq!(entries.len(), 2); // Get all by prefix let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Prefix(b"/cool".to_vec()))? + .get_many(my_replica.id(), Query::key_prefix(b"/cool"))? .collect::>()?; assert_eq!(entries.len(), 2); @@ -1244,7 +1264,7 @@ mod tests { let entries: Vec<_> = store .get_many( my_replica.id(), - GetFilter::AuthorAndPrefix(alice.id(), b"/cool".to_vec()), + Query::author(alice.id()).key_prefix(b"/cool"), )? .collect::>()?; assert_eq!(entries.len(), 1); @@ -1252,14 +1272,14 @@ mod tests { let entries: Vec<_> = store .get_many( my_replica.id(), - GetFilter::AuthorAndPrefix(bob.id(), b"/cool".to_vec()), + Query::author(bob.id()).key_prefix(b"/cool"), )? .collect::>()?; assert_eq!(entries.len(), 1); // Get All let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::All)? + .get_many(my_replica.id(), Query::all())? .collect::>()?; assert_eq!(entries.len(), 12); @@ -1491,7 +1511,9 @@ mod tests { replica .insert_entry(entry.clone(), InsertOrigin::Local) .unwrap(); - let res = store.get_one(namespace.id(), author.id(), key)?.unwrap(); + let res = store + .get_exact(namespace.id(), author.id(), key, false)? + .unwrap(); assert_eq!(res, entry); let entry2 = { @@ -1503,7 +1525,9 @@ mod tests { let res = replica.insert_entry(entry2, InsertOrigin::Local); assert!(matches!(res, Err(InsertError::NewerEntryExists))); - let res = store.get_one(namespace.id(), author.id(), key)?.unwrap(); + let res = store + .get_exact(namespace.id(), author.id(), key, false)? + .unwrap(); assert_eq!(res, entry); Ok(()) @@ -1716,9 +1740,18 @@ mod tests { // delete let deleted = replica.delete_prefix(b"foo", &alice)?; assert_eq!(deleted, 2); - assert_eq!(store.get_one(myspace.id(), alice.id(), b"foobar")?, None); - assert_eq!(store.get_one(myspace.id(), alice.id(), b"fooboo")?, None); - assert_eq!(store.get_one(myspace.id(), alice.id(), b"foo")?, None); + assert_eq!( + store.get_exact(myspace.id(), alice.id(), b"foobar", false)?, + None + ); + assert_eq!( + store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?, + None + ); + assert_eq!( + store.get_exact(myspace.id(), alice.id(), b"foo", false)?, + None + ); Ok(()) } @@ -1797,7 +1830,7 @@ mod tests { // insert entry let hash = replica.hash_and_insert(b"foo", &author, b"bar")?; let res = store - .get_many(namespace.id(), GetFilter::All)? + .get_many(namespace.id(), Query::all())? .collect::>(); assert_eq!(res.len(), 1); @@ -1808,7 +1841,7 @@ mod tests { store.close_replica(replica); store.remove_replica(&namespace.id())?; let res = store - .get_many(namespace.id(), GetFilter::All)? + .get_many(namespace.id(), Query::all())? .collect::>(); assert_eq!(res.len(), 0); @@ -1820,7 +1853,7 @@ mod tests { let mut replica = store.new_replica(namespace.clone())?; replica.insert(b"foo", &author, hash, 3)?; let res = store - .get_many(namespace.id(), GetFilter::All)? + .get_many(namespace.id(), Query::all())? .collect::>(); assert_eq!(res.len(), 1); Ok(()) @@ -2134,6 +2167,192 @@ mod tests { Ok(()) } + #[test] + fn test_replica_queries_mem() -> Result<()> { + let store = store::memory::Store::default(); + + test_replica_queries(store)?; + Ok(()) + } + + #[cfg(feature = "fs-store")] + #[test] + fn test_replica_queries_fs() -> Result<()> { + let dbfile = tempfile::NamedTempFile::new()?; + let store = store::fs::Store::new(dbfile.path())?; + test_replica_queries(store)?; + + Ok(()) + } + + fn test_replica_queries(store: S) -> Result<()> { + let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); + let namespace = NamespaceSecret::new(&mut rng); + let mut replica = store.new_replica(namespace)?; + let namespace = replica.id(); + + let a1 = store.new_author(&mut rng)?; + let a2 = store.new_author(&mut rng)?; + let a3 = store.new_author(&mut rng)?; + println!( + "a1 {} a2 {} a3 {}", + a1.id().fmt_short(), + a2.id().fmt_short(), + a3.id().fmt_short() + ); + + replica.hash_and_insert("hi/world", &a2, "a2")?; + replica.hash_and_insert("hi/world", &a1, "a1")?; + replica.hash_and_insert("hi/moon", &a2, "a1")?; + replica.hash_and_insert("hi", &a3, "a3")?; + + struct QueryTester<'a, S: store::Store> { + store: &'a S, + namespace: NamespaceId, + } + impl<'a, S: store::Store> QueryTester<'a, S> { + fn assert(&self, query: impl Into, expected: Vec<(&'static str, &Author)>) { + let query = query.into(); + let actual = self + .store + .get_many(self.namespace, query.clone()) + .unwrap() + .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author()))) + .collect::>>() + .unwrap(); + let expected = expected + .into_iter() + .map(|(key, author)| (key.to_string(), author.id())) + .collect::>(); + assert_eq!(actual, expected, "query: {query:#?}") + } + } + + let qt = QueryTester { + store: &store, + namespace, + }; + + qt.assert( + Query::all(), + vec![ + ("hi/world", &a1), + ("hi/moon", &a2), + ("hi/world", &a2), + ("hi", &a3), + ], + ); + + qt.assert( + Query::single_latest_per_key(), + vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)], + ); + + qt.assert( + Query::single_latest_per_key().sort_direction(SortDirection::Desc), + vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)], + ); + + qt.assert( + Query::single_latest_per_key().key_prefix("hi/"), + vec![("hi/moon", &a2), ("hi/world", &a1)], + ); + + qt.assert( + Query::single_latest_per_key() + .key_prefix("hi/") + .sort_direction(SortDirection::Desc), + vec![("hi/world", &a1), ("hi/moon", &a2)], + ); + + qt.assert( + Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc), + vec![ + ("hi", &a3), + ("hi/moon", &a2), + ("hi/world", &a1), + ("hi/world", &a2), + ], + ); + + qt.assert( + Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc), + vec![ + ("hi/world", &a2), + ("hi/world", &a1), + ("hi/moon", &a2), + ("hi", &a3), + ], + ); + + qt.assert( + Query::all().key_prefix("hi/"), + vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)], + ); + + qt.assert( + Query::all().key_prefix("hi/").offset(1).limit(1), + vec![("hi/moon", &a2)], + ); + + qt.assert( + Query::all() + .key_prefix("hi/") + .sort_by(SortBy::KeyAuthor, SortDirection::Desc), + vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)], + ); + + qt.assert( + Query::all() + .key_prefix("hi/") + .sort_by(SortBy::KeyAuthor, SortDirection::Desc) + .offset(1) + .limit(1), + vec![("hi/world", &a1)], + ); + + qt.assert( + Query::all() + .key_prefix("hi/") + .sort_by(SortBy::AuthorKey, SortDirection::Asc), + vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)], + ); + + qt.assert( + Query::all() + .key_prefix("hi/") + .sort_by(SortBy::AuthorKey, SortDirection::Desc), + vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)], + ); + + qt.assert( + Query::all() + .sort_by(SortBy::KeyAuthor, SortDirection::Asc) + .limit(2) + .offset(1), + vec![("hi/moon", &a2), ("hi/world", &a1)], + ); + + replica.delete_prefix("hi/world", &a2)?; + + qt.assert( + Query::all(), + vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)], + ); + + qt.assert( + Query::all().include_empty(), + vec![ + ("hi/world", &a1), + ("hi/moon", &a2), + ("hi/world", &a2), + ("hi", &a3), + ], + ); + + Ok(()) + } + fn assert_keys(store: &S, namespace: NamespaceId, mut expected: Vec>) { expected.sort(); assert_eq!(expected, get_keys_sorted(store, namespace)); @@ -2141,7 +2360,7 @@ mod tests { fn get_keys_sorted(store: &S, namespace: NamespaceId) -> Vec> { let mut res = store - .get_many(namespace, GetFilter::All) + .get_many(namespace, Query::all()) .unwrap() .map(|e| e.map(|e| e.key().to_vec())) .collect::>>() @@ -2157,7 +2376,7 @@ mod tests { key: &[u8], ) -> anyhow::Result { let entry = store - .get_one(namespace, author, key)? + .get_exact(namespace, author, key, true)? .ok_or_else(|| anyhow::anyhow!("not found"))?; Ok(entry) } @@ -2169,7 +2388,7 @@ mod tests { key: &[u8], ) -> anyhow::Result> { let hash = store - .get_one(namespace, author, key)? + .get_exact(namespace, author, key, false)? .map(|e| e.content_hash()); Ok(hash) } @@ -2205,7 +2424,7 @@ mod tests { set: &[&str], ) -> Result<()> { for el in set { - store.get_one(*namespace, author.id(), el)?; + store.get_exact(*namespace, author.id(), el, false)?; } Ok(()) } diff --git a/iroh/examples/client.rs b/iroh/examples/client.rs index 45618ed29a..6a332ebe5a 100644 --- a/iroh/examples/client.rs +++ b/iroh/examples/client.rs @@ -8,7 +8,7 @@ use indicatif::HumanBytes; use iroh::node::Node; use iroh_bytes::util::runtime; -use iroh_sync::{store::GetFilter, Entry}; +use iroh_sync::{store::Query, Entry}; use tokio_stream::StreamExt; #[tokio::main] @@ -26,7 +26,7 @@ async fn main() -> anyhow::Result<()> { let key = b"hello".to_vec(); let value = b"world".to_vec(); doc.set_bytes(author, key.clone(), value).await?; - let mut stream = doc.get_many(GetFilter::All).await?; + let mut stream = doc.get_many(Query::all()).await?; while let Some(entry) = stream.try_next().await? { println!("entry {}", fmt_entry(&entry)); let content = doc.read_to_bytes(&entry).await?; diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 684f3cab71..47b75c1f36 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -22,8 +22,7 @@ use iroh_bytes::Hash; use iroh_bytes::{BlobFormat, Tag}; use iroh_net::{key::PublicKey, magic_endpoint::ConnectionInfo, NodeAddr}; use iroh_sync::actor::OpenState; -use iroh_sync::CapabilityKind; -use iroh_sync::{store::GetFilter, AuthorId, Entry, NamespaceId}; +use iroh_sync::{store::Query, AuthorId, CapabilityKind, Entry, NamespaceId}; use quic_rpc::message::RpcMsg; use quic_rpc::{RpcClient, ServiceConnection}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; @@ -36,7 +35,7 @@ use crate::rpc_protocol::{ BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest, CounterStats, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest, - DocDelResponse, DocDropRequest, DocGetManyRequest, DocGetOneRequest, DocImportRequest, + DocDelResponse, DocDropRequest, DocGetExactRequest, DocGetManyRequest, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetHashRequest, DocSetRequest, DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, @@ -659,33 +658,49 @@ where Ok(removed) } - /// Get the latest entry for a key and author. - pub async fn get_one(&self, author: AuthorId, key: impl Into) -> Result> { + /// Get an entry for a key and author. + /// + /// Optionally also get the entry if it is empty (i.e. a deletion marker). + pub async fn get_exact( + &self, + author: AuthorId, + key: impl AsRef<[u8]>, + include_empty: bool, + ) -> Result> { self.ensure_open()?; let res = self - .rpc(DocGetOneRequest { + .rpc(DocGetExactRequest { author, - key: key.into(), + key: key.as_ref().to_vec().into(), doc_id: self.id(), + include_empty, }) .await??; Ok(res.entry.map(|entry| entry.into())) } /// Get entries. - pub async fn get_many(&self, filter: GetFilter) -> Result>> { + pub async fn get_many( + &self, + query: impl Into, + ) -> Result>> { self.ensure_open()?; let stream = self .0 .rpc .server_streaming(DocGetManyRequest { doc_id: self.id(), - filter, + query: query.into(), }) .await?; Ok(flatten(stream).map_ok(|res| res.entry.into())) } + /// Get a single entry. + pub async fn get_one(&self, query: impl Into) -> Result> { + self.get_many(query).await?.next().await.transpose() + } + /// Share this document with peers over a ticket. pub async fn share(&self, mode: ShareMode) -> anyhow::Result { self.ensure_open()?; diff --git a/iroh/src/commands/sync.rs b/iroh/src/commands/sync.rs index 022f8e1b94..c4eb69c031 100644 --- a/iroh/src/commands/sync.rs +++ b/iroh/src/commands/sync.rs @@ -21,7 +21,10 @@ use iroh::{ util::fs::{path_content_info, PathContent}, }; use iroh_bytes::{provider::AddProgress, Hash, Tag}; -use iroh_sync::{store::GetFilter, AuthorId, Entry, NamespaceId}; +use iroh_sync::{ + store::{Query, SortDirection}, + AuthorId, Entry, NamespaceId, +}; use crate::config::ConsoleEnv; @@ -140,6 +143,12 @@ pub enum DocCommands { author: Option, /// Optional key prefix (parsed as UTF-8 string) prefix: Option, + /// How to sort the entries + #[clap(long, default_value_t=Sorting::Author)] + sort: Sorting, + /// Sort in descending order + #[clap(long)] + desc: bool, /// How to show the contents of the keys. #[clap(short, long, default_value_t=DisplayContentMode::Hash)] mode: DisplayContentMode, @@ -218,6 +227,24 @@ pub enum DocCommands { }, } +#[derive(clap::ValueEnum, Clone, Debug, Default, strum::Display)] +#[strum(serialize_all = "kebab-case")] +pub enum Sorting { + /// Sort by author, then key + #[default] + Author, + /// Sort by key, then author + Key, +} +impl From for iroh_sync::store::SortBy { + fn from(value: Sorting) -> Self { + match value { + Sorting::Author => Self::AuthorKey, + Sorting::Key => Self::KeyAuthor, + } + } +} + #[derive(Debug, Clone, Parser)] pub enum AuthorCommands { /// Set the active author (only works within the Iroh console). @@ -324,22 +351,15 @@ impl DocCommands { } => { let doc = get_doc(iroh, env, doc).await?; let key = key.as_bytes().to_vec(); - let filter = match (author, prefix) { - (None, false) => GetFilter::Key(key), - (None, true) => GetFilter::Prefix(key), - (Some(author), true) => GetFilter::AuthorAndPrefix(author, key), - (Some(author), false) => { - // Special case: Author and key, this means single entry. - let entry = doc - .get_one(author, key) - .await? - .ok_or_else(|| anyhow!("Entry not found"))?; - println!("{}", fmt_entry(&doc, &entry, mode).await); - return Ok(()); - } + let query = Query::all(); + let query = match (author, prefix) { + (None, false) => query.key_exact(key), + (None, true) => query.key_prefix(key), + (Some(author), true) => query.author(author).key_prefix(key), + (Some(author), false) => query.author(author).key_exact(key), }; - let mut stream = doc.get_many(filter).await?; + let mut stream = doc.get_many(query).await?; while let Some(entry) = stream.try_next().await? { println!("{}", fmt_entry(&doc, &entry, mode).await); } @@ -349,11 +369,23 @@ impl DocCommands { prefix, author, mode, + sort, + desc, } => { let doc = get_doc(iroh, env, doc).await?; - let filter = GetFilter::author_prefix(author, prefix); - - let mut stream = doc.get_many(filter).await?; + let mut query = Query::all(); + if let Some(author) = author { + query = query.author(author); + } + if let Some(prefix) = prefix { + query = query.key_prefix(prefix); + } + let direction = match desc { + true => SortDirection::Desc, + false => SortDirection::Asc, + }; + query = query.sort_by(sort.into(), direction); + let mut stream = doc.get_many(query).await?; while let Some(entry) = stream.try_next().await? { println!("{}", fmt_entry(&doc, &entry, mode).await); } @@ -420,8 +452,8 @@ impl DocCommands { let key_str = key.clone(); let key = key.as_bytes().to_vec(); let path: PathBuf = canonicalize_path(&out)?; - let stream = doc.get_many(GetFilter::Key(key)).await?; - let entry = match get_latest(stream).await? { + let mut stream = doc.get_many(Query::key_exact(key)).await?; + let entry = match stream.try_next().await? { None => { println!(""); return Ok(()); @@ -843,23 +875,3 @@ impl ImportProgressBar { self.mp.clear().ok(); } } - -/// Get the latest entry for a key. If `None`, then an entry of the given key -/// could not be found. -async fn get_latest(stream: impl Stream>) -> Result> { - let entry = stream - .try_fold(None, |acc: Option, cur: Entry| async move { - match acc { - None => Ok(Some(cur)), - Some(prev) => { - if cur.timestamp() > prev.timestamp() { - Ok(Some(cur)) - } else { - Ok(Some(prev)) - } - } - } - }) - .await?; - Ok(entry) -} diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 64709011ed..a4e95ab49c 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -1561,9 +1561,9 @@ fn handle_rpc_request>( }) .await } - DocGetOne(msg) => { + DocGetExact(msg) => { chan.rpc(msg, handler, |handler, req| async move { - handler.inner.sync.doc_get_one(req).await + handler.inner.sync.doc_get_exact(req).await }) .await } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 797e2a35ed..49cae325da 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -20,9 +20,8 @@ use iroh_net::{ use iroh_sync::{ actor::OpenState, - store::GetFilter, - sync::SignedEntry, - CapabilityKind, {AuthorId, NamespaceId}, + store::Query, + {AuthorId, CapabilityKind, NamespaceId, SignedEntry}, }; use quic_rpc::{ message::{BidiStreaming, BidiStreamingMsg, Msg, RpcMsg, ServerStreaming, ServerStreamingMsg}, @@ -701,8 +700,8 @@ pub struct DocSetHashResponse {} pub struct DocGetManyRequest { /// The document id pub doc_id: NamespaceId, - /// Filter entries by this [`GetFilter`] - pub filter: GetFilter, + /// Query to run + pub query: Query, } impl Msg for DocGetManyRequest { @@ -722,22 +721,24 @@ pub struct DocGetManyResponse { /// Get entries from a document #[derive(Serialize, Deserialize, Debug)] -pub struct DocGetOneRequest { +pub struct DocGetExactRequest { /// The document id pub doc_id: NamespaceId, - /// Key + /// Key matcher pub key: Bytes, - /// Author + /// Author matcher pub author: AuthorId, + /// Whether to include empty entries (prefix deletion markers) + pub include_empty: bool, } -impl RpcMsg for DocGetOneRequest { - type Response = RpcResult; +impl RpcMsg for DocGetExactRequest { + type Response = RpcResult; } -/// Response to [`DocGetOneRequest`] +/// Response to [`DocGetExactRequest`] #[derive(Serialize, Deserialize, Debug)] -pub struct DocGetOneResponse { +pub struct DocGetExactResponse { /// The document entry pub entry: Option, } @@ -866,7 +867,7 @@ pub enum ProviderRequest { DocSet(DocSetRequest), DocSetHash(DocSetHashRequest), DocGet(DocGetManyRequest), - DocGetOne(DocGetOneRequest), + DocGetExact(DocGetExactRequest), DocDel(DocDelRequest), DocStartSync(DocStartSyncRequest), DocLeave(DocLeaveRequest), @@ -911,7 +912,7 @@ pub enum ProviderResponse { DocSet(RpcResult), DocSetHash(RpcResult), DocGet(RpcResult), - DocGetOne(RpcResult), + DocGetExact(RpcResult), DocDel(RpcResult), DocShare(RpcResult), DocStartSync(RpcResult), diff --git a/iroh/src/sync_engine/rpc.rs b/iroh/src/sync_engine/rpc.rs index 78e6f838f6..1ea194e693 100644 --- a/iroh/src/sync_engine/rpc.rs +++ b/iroh/src/sync_engine/rpc.rs @@ -10,13 +10,13 @@ use crate::{ rpc_protocol::{ AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, AuthorListResponse, DocCloseRequest, DocCloseResponse, DocCreateRequest, DocCreateResponse, DocDelRequest, - DocDelResponse, DocDropRequest, DocDropResponse, DocGetManyRequest, DocGetManyResponse, - DocGetOneRequest, DocGetOneResponse, DocImportRequest, DocImportResponse, DocLeaveRequest, - DocLeaveResponse, DocListRequest, DocListResponse, DocOpenRequest, DocOpenResponse, - DocSetHashRequest, DocSetHashResponse, DocSetRequest, DocSetResponse, DocShareRequest, - DocShareResponse, DocStartSyncRequest, DocStartSyncResponse, DocStatusRequest, - DocStatusResponse, DocSubscribeRequest, DocSubscribeResponse, DocTicket, RpcResult, - ShareMode, + DocDelResponse, DocDropRequest, DocDropResponse, DocGetExactRequest, DocGetExactResponse, + DocGetManyRequest, DocGetManyResponse, DocImportRequest, DocImportResponse, + DocLeaveRequest, DocLeaveResponse, DocListRequest, DocListResponse, DocOpenRequest, + DocOpenResponse, DocSetHashRequest, DocSetHashResponse, DocSetRequest, DocSetResponse, + DocShareRequest, DocShareResponse, DocStartSyncRequest, DocStartSyncResponse, + DocStatusRequest, DocStatusResponse, DocSubscribeRequest, DocSubscribeResponse, DocTicket, + RpcResult, ShareMode, }, sync_engine::SyncEngine, }; @@ -176,7 +176,7 @@ impl SyncEngine { .await?; let entry = self .sync - .get_one(doc_id, author_id, key) + .get_exact(doc_id, author_id, key, false) .await? .ok_or_else(|| anyhow!("failed to get entry after insertion"))?; Ok(DocSetResponse { entry }) @@ -210,14 +210,14 @@ impl SyncEngine { &self, req: DocGetManyRequest, ) -> impl Stream> { - let DocGetManyRequest { doc_id, filter } = req; + let DocGetManyRequest { doc_id, query } = req; let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. self.rt.main().spawn(async move { let tx2 = tx.clone(); - if let Err(err) = sync.get_many(doc_id, filter, tx).await { + if let Err(err) = sync.get_many(doc_id, query, tx).await { tx2.send_async(Err(err)).await.ok(); } }); @@ -227,13 +227,17 @@ impl SyncEngine { }) } - pub async fn doc_get_one(&self, req: DocGetOneRequest) -> RpcResult { - let DocGetOneRequest { + pub async fn doc_get_exact(&self, req: DocGetExactRequest) -> RpcResult { + let DocGetExactRequest { doc_id, author, key, + include_empty, } = req; - let entry = self.sync.get_one(doc_id, author, key).await?; - Ok(DocGetOneResponse { entry }) + let entry = self + .sync + .get_exact(doc_id, author, key, include_empty) + .await?; + Ok(DocGetExactResponse { entry }) } } diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index f9cdf5846e..cb7bf3a105 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -23,7 +23,7 @@ use tracing_subscriber::{prelude::*, EnvFilter}; use iroh_bytes::{util::runtime, Hash}; use iroh_net::derp::DerpMode; use iroh_sync::{ - store::{self, GetFilter}, + store::{self, Query}, AuthorId, ContentStatus, Entry, }; @@ -755,7 +755,7 @@ async fn doc_delete() -> Result<()> { let deleted = doc.del(author, b"foo".to_vec()).await?; assert_eq!(deleted, 1); - let entry = doc.get_one(author, b"foo".to_vec()).await?; + let entry = doc.get_exact(author, b"foo".to_vec(), false).await?; assert!(entry.is_none()); // wait for gc @@ -785,7 +785,7 @@ async fn sync_drop_doc() -> Result<()> { assert!(matches!(ev, Some(Ok(LiveEvent::InsertLocal { .. })))); client.docs.drop_doc(doc.id()).await?; - let res = doc.get_one(author, b"foo".to_vec()).await; + let res = doc.get_exact(author, b"foo".to_vec(), true).await; assert!(res.is_err()); let res = doc .set_bytes(author, b"foo".to_vec(), b"bar".to_vec()) @@ -805,9 +805,9 @@ async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) { } async fn get_latest(doc: &Doc, key: &[u8]) -> anyhow::Result> { - let filter = GetFilter::Key(key.to_vec()); + let query = Query::single_latest_per_key().key_exact(key); let entry = doc - .get_many(filter) + .get_many(query) .await? .next() .await