Skip to content

Commit

Permalink
Implemented list and query limit/sorting
Browse files Browse the repository at this point in the history
Closes #11
  • Loading branch information
ecton committed Nov 9, 2021
1 parent f019898 commit 904bce8
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 41 deletions.
14 changes: 13 additions & 1 deletion crates/bonsaidb-client/src/client/remote_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{marker::PhantomData, ops::Deref, sync::Arc};

use async_trait::async_trait;
use bonsaidb_core::{
connection::{AccessPolicy, Connection, QueryKey, Range},
connection::{AccessPolicy, Connection, QueryKey, Range, Sort},
custom_api::CustomApi,
document::Document,
networking::{DatabaseRequest, DatabaseResponse, Request, Response},
Expand Down Expand Up @@ -119,6 +119,8 @@ impl<DB: Schema, A: CustomApi> Connection for RemoteDatabase<DB, A> {
async fn list<C: Collection, R: Into<Range<u64>> + Send>(
&self,
ids: R,
order: Sort,
limit: Option<usize>,
) -> Result<Vec<Document<'static>>, bonsaidb_core::Error> {
match self
.client
Expand All @@ -127,6 +129,8 @@ impl<DB: Schema, A: CustomApi> Connection for RemoteDatabase<DB, A> {
request: DatabaseRequest::List {
collection: C::collection_name()?,
ids: ids.into(),
order,
limit,
},
})
.await?
Expand All @@ -142,6 +146,8 @@ impl<DB: Schema, A: CustomApi> Connection for RemoteDatabase<DB, A> {
async fn query<V: View>(
&self,
key: Option<QueryKey<V::Key>>,
order: Sort,
limit: Option<usize>,
access_policy: AccessPolicy,
) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
where
Expand All @@ -158,6 +164,8 @@ impl<DB: Schema, A: CustomApi> Connection for RemoteDatabase<DB, A> {
.ok_or(bonsaidb_core::Error::CollectionNotFound)?
.view_name()?,
key: key.map(|key| key.serialized()).transpose()?,
order,
limit,
access_policy,
with_docs: false,
},
Expand All @@ -179,6 +187,8 @@ impl<DB: Schema, A: CustomApi> Connection for RemoteDatabase<DB, A> {
async fn query_with_docs<V: View>(
&self,
key: Option<QueryKey<V::Key>>,
order: Sort,
limit: Option<usize>,
access_policy: AccessPolicy,
) -> Result<Vec<MappedDocument<V::Key, V::Value>>, bonsaidb_core::Error>
where
Expand All @@ -195,6 +205,8 @@ impl<DB: Schema, A: CustomApi> Connection for RemoteDatabase<DB, A> {
.ok_or(bonsaidb_core::Error::CollectionNotFound)?
.view_name()?,
key: key.map(|key| key.serialized()).transpose()?,
order,
limit,
access_policy,
with_docs: true,
},
Expand Down
3 changes: 2 additions & 1 deletion crates/bonsaidb-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ sha2 = "0.9"
futures = { version = "0.3" }
tokio = { version = "1", features = ["time"], optional = true }
num-traits = "0.2"
actionable = "0.1.0-rc.1"
actionable = "0.1.0-rc.2"
custodian-password = { git = "https://github.com/khonsulabs/custodian.git", branch = "main", default-features = false, features = [
"blake3",
] }
anyhow = { version = "1", optional = true }
serde_json = { version = "1", optional = true }
bincode = { version = "1", optional = true }
serde_cbor = { version = "0.11", optional = true }
itertools = "0.10"

[dev-dependencies]
hex-literal = "0.3"
Expand Down
146 changes: 139 additions & 7 deletions crates/bonsaidb-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use custodian_password::{
ClientConfig, ClientFile, ClientRegistration, ExportKey, RegistrationFinalization,
RegistrationRequest, RegistrationResponse,
};
use futures::{future::BoxFuture, Future, FutureExt};
use serde::{Deserialize, Serialize};

use crate::{
Expand Down Expand Up @@ -87,6 +88,8 @@ pub trait Connection: Send + Sync {
async fn list<C: schema::Collection, R: Into<Range<u64>> + Send>(
&self,
ids: R,
order: Sort,
limit: Option<usize>,
) -> Result<Vec<Document<'static>>, Error>;

/// Removes a `Document` from the database.
Expand Down Expand Up @@ -122,6 +125,8 @@ pub trait Connection: Send + Sync {
async fn query<V: schema::View>(
&self,
key: Option<QueryKey<V::Key>>,
order: Sort,
limit: Option<usize>,
access_policy: AccessPolicy,
) -> Result<Vec<Map<V::Key, V::Value>>, Error>
where
Expand All @@ -132,6 +137,8 @@ pub trait Connection: Send + Sync {
async fn query_with_docs<V: schema::View>(
&self,
key: Option<QueryKey<V::Key>>,
order: Sort,
limit: Option<usize>,
access_policy: AccessPolicy,
) -> Result<Vec<MappedDocument<V::Key, V::Value>>, Error>
where
Expand Down Expand Up @@ -250,11 +257,101 @@ where

/// Retrieves all documents matching `ids`. Documents that are not found
/// are not returned, but no error will be generated.
pub async fn list<R: Into<Range<u64>> + Send>(
&self,
ids: R,
) -> Result<Vec<Document<'static>>, Error> {
self.connection.list::<Cl, R>(ids).await
pub fn list<R: Into<Range<u64>> + Send>(&self, ids: R) -> List<'_, Cn, Cl, R> {
List {
state: ListState::Pending(Some(ListBuilder {
collection: self,
range: ids,
sort: Sort::Ascending,
limit: None,
})),
}
}
}

struct ListBuilder<'a, Cn, Cl, R> {
collection: &'a Collection<'a, Cn, Cl>,
range: R,
sort: Sort,
limit: Option<usize>,
}

enum ListState<'a, Cn, Cl, R> {
Pending(Option<ListBuilder<'a, Cn, Cl, R>>),
Executing(BoxFuture<'a, Result<Vec<Document<'static>>, Error>>),
}

/// Executes [`Connection::List`] when awaited. Also offers methods to customize
/// the options for the operation.
pub struct List<'a, Cn, Cl, R> {
state: ListState<'a, Cn, Cl, R>,
}

impl<'a, Cn, Cl, R> List<'a, Cn, Cl, R>
where
R: Into<Range<u64>> + Send + 'a + Unpin,
{
fn builder(&mut self) -> &mut ListBuilder<'a, Cn, Cl, R> {
if let ListState::Pending(Some(builder)) = &mut self.state {
builder
} else {
unreachable!("Attempted to use after retrieving the result")
}
}

/// Queries the view in ascending order.
pub fn ascending(mut self) -> Self {
self.builder().sort = Sort::Ascending;
self
}

/// Queries the view in descending order.
pub fn descending(mut self) -> Self {
self.builder().sort = Sort::Descending;
self
}

/// Sets the maximum number of results to return.
pub fn limit(mut self, maximum_results: usize) -> Self {
self.builder().limit = Some(maximum_results);
self
}
}

impl<'a, Cn, Cl, R> Future for List<'a, Cn, Cl, R>
where
Cn: Connection,
Cl: schema::Collection,
R: Into<Range<u64>> + Send + 'a + Unpin,
{
type Output = Result<Vec<Document<'static>>, Error>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match &mut self.state {
ListState::Executing(future) => future.as_mut().poll(cx),
ListState::Pending(builder) => {
let ListBuilder {
collection,
range,
sort,
limit,
} = builder.take().unwrap();

let future = async move {
collection
.connection
.list::<Cl, R>(range, sort, limit)
.await
}
.boxed();

self.state = ListState::Executing(future);
self.poll(cx)
}
}
}
}

Expand All @@ -267,6 +364,12 @@ pub struct View<'a, Cn, V: schema::View> {

/// The view's data access policy. The default value is [`AccessPolicy::UpdateBefore`].
pub access_policy: AccessPolicy,

/// The sort order of the query.
pub sort: Sort,

/// The maximum number of results to return.
pub limit: Option<usize>,
}

impl<'a, Cn, V> View<'a, Cn, V>
Expand All @@ -279,6 +382,8 @@ where
connection,
key: None,
access_policy: AccessPolicy::UpdateBefore,
sort: Sort::Ascending,
limit: None,
}
}

Expand Down Expand Up @@ -309,17 +414,35 @@ where
self
}

/// Queries the view in ascending order.
pub fn ascending(mut self) -> Self {
self.sort = Sort::Ascending;
self
}

/// Queries the view in descending order.
pub fn descending(mut self) -> Self {
self.sort = Sort::Descending;
self
}

/// Sets the maximum number of results to return.
pub fn limit(mut self, maximum_results: usize) -> Self {
self.limit = Some(maximum_results);
self
}

/// Executes the query and retrieves the results.
pub async fn query(self) -> Result<Vec<Map<V::Key, V::Value>>, Error> {
self.connection
.query::<V>(self.key, self.access_policy)
.query::<V>(self.key, self.sort, self.limit, self.access_policy)
.await
}

/// Executes the query and retrieves the results with the associated `Document`s.
pub async fn query_with_docs(self) -> Result<Vec<MappedDocument<V::Key, V::Value>>, Error> {
self.connection
.query_with_docs::<V>(self.key, self.access_policy)
.query_with_docs::<V>(self.key, self.sort, self.limit, self.access_policy)
.await
}

Expand All @@ -338,6 +461,15 @@ where
}
}

/// A sort order.
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
pub enum Sort {
/// Sort ascending (A -> Z).
Ascending,
/// Sort descending (Z -> A).
Descending,
}

/// Filters a [`View`] by key.
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum QueryKey<K> {
Expand Down
2 changes: 1 addition & 1 deletion crates/bonsaidb-core/src/kv/implementation/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where
if let BuilderState::Pending(Some(options)) = &mut self.state {
options
} else {
panic!("Attempted to use after retrieving the result")
unreachable!("Attempted to use after retrieving the result")
}
}

Expand Down
10 changes: 9 additions & 1 deletion crates/bonsaidb-core/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};

use crate::{
admin::Database,
connection::{AccessPolicy, QueryKey, Range},
connection::{AccessPolicy, QueryKey, Range, Sort},
document::Document,
kv::{KeyOperation, Output},
schema::{self, view::map, CollectionName, MappedValue, NamedReference, ViewName},
Expand Down Expand Up @@ -171,6 +171,10 @@ pub enum DatabaseRequest {
collection: CollectionName,
/// The range of ids to list.
ids: Range<u64>,
/// The order for the query into the collection.
order: Sort,
/// The maximum number of results to return.
limit: Option<usize>,
},
/// Queries a view.
#[cfg_attr(feature = "actionable-traits", actionable(protection = "simple"))]
Expand All @@ -179,6 +183,10 @@ pub enum DatabaseRequest {
view: ViewName,
/// The filter for the view.
key: Option<QueryKey<Vec<u8>>>,
/// The order for the query into the view.
order: Sort,
/// The maximum number of results to return.
limit: Option<usize>,
/// The access policy for the query.
access_policy: AccessPolicy,
/// If true, [`DatabaseResponse::ViewMappingsWithDocs`] will be
Expand Down
29 changes: 25 additions & 4 deletions crates/bonsaidb-core/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
time::{Duration, Instant},
};

use itertools::Itertools;
use serde::{Deserialize, Serialize};

use crate::{
Expand Down Expand Up @@ -1015,6 +1016,15 @@ pub async fn list_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
let one_doc = collection.list(doc1.id..doc2.id).await?;
assert_eq!(one_doc.len(), 1);

let limited = collection
.list(doc1.id..=doc2.id)
.limit(1)
.descending()
.await?;
assert_eq!(limited.len(), 1);
let limited = limited[0].contents::<Basic>()?;
assert_eq!(limited.value, doc2_contents.value);

Ok(())
}

Expand Down Expand Up @@ -1113,14 +1123,25 @@ pub async fn view_query_tests<C: Connection>(db: &C) -> anyhow::Result<()> {

let has_parent = db
.view::<BasicByParentId>()
// TODO range is tough because there's no single structure that works
// here. RangeBounds is a trait. We'll need to use something else, but
// my quick search doesn't find a serde-compatible library already
// written. This should be an inclusive range
.with_key_range(Some(0)..=Some(u64::MAX))
.query()
.await?;
assert_eq!(has_parent.len(), 3);
// Verify the result is sorted ascending
assert!(has_parent
.windows(2)
.all(|window| window[0].key <= window[1].key));

// Test limiting and descending order
let last_with_parent = db
.view::<BasicByParentId>()
.with_key_range(Some(0)..=Some(u64::MAX))
.descending()
.limit(1)
.query()
.await?;
assert_eq!(last_with_parent.iter().map(|m| m.key).unique().count(), 1);
assert_eq!(last_with_parent[0].key, has_parent[2].key);

let items_with_categories = db.view::<BasicByCategory>().query().await?;
assert_eq!(items_with_categories.len(), 3);
Expand Down
Loading

0 comments on commit 904bce8

Please sign in to comment.