Skip to content

Commit

Permalink
Added access policies to view queries. Closes #13.
Browse files Browse the repository at this point in the history
  • Loading branch information
ecton committed Mar 28, 2021
1 parent 0f2fa14 commit 5bd8137
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 23 deletions.
33 changes: 32 additions & 1 deletion core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ where
/// Parameters to query a `schema::View`.
pub struct View<'a, Cn, V: schema::View> {
connection: &'a Cn,

/// Key filtering criteria.
pub key: Option<QueryKey<V::MapKey>>,

/// The view's data access policy. The default value is [`AccessPolicy::UpdateBefore`].
pub access_policy: AccessPolicy,
}

impl<'a, Cn, V> View<'a, Cn, V>
Expand All @@ -117,6 +121,7 @@ where
Self {
connection,
key: None,
access_policy: AccessPolicy::UpdateBefore,
}
}

Expand All @@ -134,13 +139,19 @@ where
self
}

/// Filters for entries in the view with `keys`.
/// Filters for entries in the view with the range `keys`.
#[must_use]
pub fn with_key_range(mut self, range: Range<V::MapKey>) -> Self {
self.key = Some(QueryKey::Range(range));
self
}

/// Sets the access policy for queries.
pub fn with_access_policy(mut self, policy: AccessPolicy) -> Self {
self.access_policy = policy;
self
}

/// Executes the query and retrieves the results.
pub async fn query(self) -> Result<Vec<map::Serialized>, Error> {
self.connection.query(self).await
Expand All @@ -158,3 +169,23 @@ pub enum QueryKey<K> {
/// Matches all entries that have keys that are included in the set provided.
Multiple(Vec<K>),
}

/// Changes how the view's outdated data will be treated.
pub enum AccessPolicy {
/// Update any changed documents before returning a response.
UpdateBefore,

/// Return the results, which may be out-of-date, and start an update job in
/// the background. This pattern is useful when you want to ensure you
/// provide consistent response times while ensuring the database is
/// updating in the background.
UpdateAfter,

/// Returns the restuls, which may be out-of-date, and do not start any
/// background jobs. This mode is useful if you're using a view as a cache
/// and have a background process that is responsible for controlling when
/// data is refreshed and updated. While the default `UpdateBefore`
/// shouldn't have much overhead, this option removes all overhead related
/// to view updating from the query.
NoUpdate,
}
56 changes: 36 additions & 20 deletions local/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{borrow::Cow, collections::HashMap, marker::PhantomData, path::Path, sy
use async_trait::async_trait;
use itertools::Itertools;
use pliantdb_core::{
connection::{Collection, Connection, QueryKey, View},
connection::{AccessPolicy, Collection, Connection, QueryKey, View},
document::{Document, Header},
schema::{self, collection, map, view, Database, Key, Schema},
transaction::{self, ChangedDocument, Command, Operation, OperationResult, Transaction},
Expand Down Expand Up @@ -339,10 +339,13 @@ where
.schema
.view::<V>()
.expect("query made with view that isn't registered with this database");
self.tasks
.update_view_if_needed(view, self)
.await
.map_err_to_core()?;

if matches!(query.access_policy, AccessPolicy::UpdateBefore) {
self.tasks
.update_view_if_needed(view, self)
.await
.map_err_to_core()?;
}

let view_entries = self
.sled
Expand All @@ -353,27 +356,40 @@ where
.map_err(Error::Sled)
.map_err_to_core()?;

let iterator = create_view_iterator(&view_entries, key).map_err_to_core()?;
let entries = iterator
.collect::<Result<Vec<_>, sled::Error>>()
.map_err(Error::Sled)
.map_err_to_core()?;

let mut results = Vec::new();
for (key, entry) in entries {
let entry = bincode::deserialize::<ViewEntry>(&entry)
.map_err(Error::InternalSerialization)
{
let iterator = create_view_iterator(&view_entries, key).map_err_to_core()?;
let entries = iterator
.collect::<Result<Vec<_>, sled::Error>>()
.map_err(Error::Sled)
.map_err_to_core()?;

for entry in entry.mappings {
results.push(map::Serialized {
source: entry.source,
key: key.to_vec(),
value: entry.value,
});
for (key, entry) in entries {
let entry = bincode::deserialize::<ViewEntry>(&entry)
.map_err(Error::InternalSerialization)
.map_err_to_core()?;

for entry in entry.mappings {
results.push(map::Serialized {
source: entry.source,
key: key.to_vec(),
value: entry.value,
});
}
}
}

if matches!(query.access_policy, AccessPolicy::UpdateAfter) {
let storage = self.clone();
tokio::task::spawn(async move {
let view = storage
.schema
.view::<V>()
.expect("query made with view that isn't registered with this database");
storage.tasks.update_view_if_needed(view, &storage).await
});
}

Ok(results)
}
}
Expand Down
55 changes: 53 additions & 2 deletions local/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::borrow::Cow;
use std::{borrow::Cow, time::Duration};

use pliantdb_core::{
connection::Connection,
connection::{AccessPolicy, Connection},
document::Document,
schema::Collection,
test_util::{Basic, BasicByCategory, BasicByParentId, TestDirectory, UnassociatedCollection},
Expand Down Expand Up @@ -307,3 +307,54 @@ async fn view_update() -> anyhow::Result<()> {

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn view_access_policies() -> anyhow::Result<()> {
let path = TestDirectory::new("view-access-policies");
let db = Storage::<Basic>::open_local(path, &Configuration::default()).await?;
let collection = db.collection::<Basic>()?;
let a = collection.push(&Basic::new("A")).await?;

// Test inserting a record that should match the view, but ask for it to be
// NoUpdate. Verify we get no matches.
collection
.push(
&Basic::new("A.1")
.with_parent_id(a.id)
.with_category("Alpha"),
)
.await?;

let a_children = db
.view::<BasicByParentId>()
.with_key(Some(a.id))
.with_access_policy(AccessPolicy::NoUpdate)
.query()
.await?;
assert_eq!(a_children.len(), 0);

tokio::time::sleep(Duration::from_millis(20)).await;

// Verify the view still have no value, but this time ask for it to be
// updated after returning
let a_children = db
.view::<BasicByParentId>()
.with_key(Some(a.id))
.with_access_policy(AccessPolicy::UpdateAfter)
.query()
.await?;
assert_eq!(a_children.len(), 0);

tokio::time::sleep(Duration::from_millis(20)).await;

// Now, the view should contain the entry.
let a_children = db
.view::<BasicByParentId>()
.with_key(Some(a.id))
.with_access_policy(AccessPolicy::NoUpdate)
.query()
.await?;
assert_eq!(a_children.len(), 1);

Ok(())
}

0 comments on commit 5bd8137

Please sign in to comment.