Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added server example.
Converted existing "example" to an integration-style test. Also added
ability to reduce_grouped, closes #22.
  • Loading branch information
ecton committed Apr 13, 2021
1 parent 49a98ea commit c14c6d7
Show file tree
Hide file tree
Showing 19 changed files with 494 additions and 90 deletions.
52 changes: 49 additions & 3 deletions client/src/client/remote_database.rs
Expand Up @@ -6,8 +6,8 @@ use pliantdb_core::{
document::Document,
networking::{self, DatabaseRequest, DatabaseResponse, Request, Response},
schema::{
map::{self, MappedDocument},
Collection, Map, Schema, Schematic, View,
map::{self, MappedDocument, MappedValue},
view, Collection, Key, Map, Schema, Schematic, View,
},
transaction::{Executed, OperationResult, Transaction},
};
Expand Down Expand Up @@ -45,7 +45,7 @@ impl<DB: Schema> RemoteDatabase<DB> {
}

#[async_trait]
impl<'a, DB: Schema> Connection<'a> for RemoteDatabase<DB> {
impl<DB: Schema> Connection for RemoteDatabase<DB> {
async fn get<C: Collection>(
&self,
id: u64,
Expand Down Expand Up @@ -189,6 +189,7 @@ impl<'a, DB: Schema> Connection<'a> for RemoteDatabase<DB> {
.name(),
key: key.map(|key| key.serialized()).transpose()?,
access_policy,
grouped: false,
},
})
.await?
Expand All @@ -204,6 +205,51 @@ impl<'a, DB: Schema> Connection<'a> for RemoteDatabase<DB> {
}
}

async fn reduce_grouped<V: View>(
&self,
key: Option<QueryKey<V::Key>>,
access_policy: AccessPolicy,
) -> Result<Vec<MappedValue<V::Key, V::Value>>, pliantdb_core::Error>
where
Self: Sized,
{
match self
.client
.send_request(Request::Database {
database: self.name.to_string(),
request: DatabaseRequest::Reduce {
view: self
.schema
.view::<V>()
.ok_or(pliantdb_core::Error::CollectionNotFound)?
.name(),
key: key.map(|key| key.serialized()).transpose()?,
access_policy,
grouped: true,
},
})
.await?
{
Response::Database(DatabaseResponse::ViewGroupedReduction(values)) => values
.into_iter()
.map(|map| {
Ok(MappedValue {
key: V::Key::from_big_endian_bytes(&map.key).map_err(|err| {
pliantdb_core::Error::Storage(
view::Error::KeySerialization(err).to_string(),
)
})?,
value: serde_cbor::from_slice(&map.value)?,
})
})
.collect::<Result<Vec<_>, pliantdb_core::Error>>(),
Response::Error(err) => Err(err),
other => Err(pliantdb_core::Error::Networking(
pliantdb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
)),
}
}

async fn apply_transaction(
&self,
transaction: Transaction<'static>,
Expand Down
@@ -1,16 +1,17 @@
//! Tests a single server with multiple simultaneous connections.

use pliantdb_client::{url::Url, Client};
use pliantdb_core::{
networking::ServerConnection,
schema::Schema,
test_util::{self, Basic, TestDirectory},
};
use pliantdb_server::Server;
use pliantdb_server::{Configuration, Server};

// This isn't really an example, just a way to run some manual testing
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dir = TestDirectory::new("test-server.pliantdb");
let server = Server::open(dir.as_ref()).await?;
#[tokio::test(flavor = "multi_thread")]
async fn simultaneous_connections() -> anyhow::Result<()> {
let dir = TestDirectory::new("simultaneous-connections.pliantdb");
let server = Server::open(dir.as_ref(), Configuration::default()).await?;
server
.install_self_signed_certificate("test", false)
.await?;
Expand All @@ -33,12 +34,11 @@ async fn main() -> anyhow::Result<()> {
.await
.into_iter()
.collect::<Result<Vec<()>, anyhow::Error>>()?;
println!("Done!");
Ok(())
}

async fn test_one_client(client: Client, database_name: String) -> anyhow::Result<()> {
for _ in 0u32..100 {
for _ in 0u32..50 {
client
.create_database(&database_name, Basic::schema_id())
.await
Expand Down
35 changes: 29 additions & 6 deletions core/src/connection.rs
Expand Up @@ -5,16 +5,21 @@ use serde::{Deserialize, Serialize};

use crate::{
document::{Document, Header},
schema::{self, map::MappedDocument, view, Key, Map},
schema::{
self,
map::MappedDocument,
view::{self, map::MappedValue},
Key, Map,
},
transaction::{self, Command, Operation, OperationResult, Transaction},
Error,
};

/// Defines all interactions with a [`schema::Schema`], regardless of whether it is local or remote.
#[async_trait]
pub trait Connection<'a>: Send + Sync {
pub trait Connection: Send + Sync {
/// Accesses a collection for the connected [`schema::Schema`].
fn collection<C: schema::Collection + 'static>(&'a self) -> Collection<'a, Self, C>
fn collection<'a, C: schema::Collection + 'static>(&'a self) -> Collection<'a, Self, C>
where
Self: Sized,
{
Expand Down Expand Up @@ -95,7 +100,7 @@ pub trait Connection<'a>: Send + Sync {

/// Initializes [`View`] for [`schema::View`] `V`.
#[must_use]
fn view<V: schema::View>(&'a self) -> View<'a, Self, V>
fn view<V: schema::View>(&'_ self) -> View<'_, Self, V>
where
Self: Sized,
{
Expand Down Expand Up @@ -132,6 +137,17 @@ pub trait Connection<'a>: Send + Sync {
where
Self: Sized;

/// Reduces the view entries matching [`View`], reducing the values by each
/// unique key.
#[must_use]
async fn reduce_grouped<V: schema::View>(
&self,
key: Option<QueryKey<V::Key>>,
access_policy: AccessPolicy,
) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error>
where
Self: Sized;

/// Applies a [`Transaction`] to the [`schema::Schema`]. If any operation in the
/// [`Transaction`] fails, none of the operations will be applied to the
/// [`schema::Schema`].
Expand Down Expand Up @@ -163,7 +179,7 @@ pub struct Collection<'a, Cn, Cl> {

impl<'a, Cn, Cl> Collection<'a, Cn, Cl>
where
Cn: Connection<'a>,
Cn: Connection,
Cl: schema::Collection,
{
/// Creates a new instance using `connection`.
Expand Down Expand Up @@ -200,7 +216,7 @@ pub struct View<'a, Cn, V: schema::View> {
impl<'a, Cn, V> View<'a, Cn, V>
where
V: schema::View,
Cn: Connection<'a>,
Cn: Connection,
{
fn new(connection: &'a Cn) -> Self {
Self {
Expand Down Expand Up @@ -257,6 +273,13 @@ where
.reduce::<V>(self.key, self.access_policy)
.await
}

/// Executes a reduce over the results of the query
pub async fn reduce_grouped(self) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error> {
self.connection
.reduce_grouped::<V>(self.key, self.access_policy)
.await
}
}

/// Filters a [`View`] by key.
Expand Down
11 changes: 9 additions & 2 deletions core/src/networking.rs
Expand Up @@ -10,7 +10,8 @@ use crate::{
schema::{
self, collection,
map::{self},
view, Key,
view::{self, map::MappedValue},
Key,
},
transaction::{Executed, OperationResult, Transaction},
};
Expand Down Expand Up @@ -92,6 +93,10 @@ pub enum DatabaseRequest {
key: Option<QueryKey<Vec<u8>>>,
/// The access policy for the query.
access_policy: AccessPolicy,
/// Whether to return a single value or values grouped by unique key. If
/// true, [`DatabaseResponse::ViewGroupedReduction`] will be returned.
/// If false, [`DatabaseResponse::ViewReduction`] is returned.
grouped: bool,
},
/// Applies a transaction.
ApplyTransaction {
Expand Down Expand Up @@ -150,8 +155,10 @@ pub enum DatabaseResponse {
ViewMappings(Vec<map::Serialized>),
/// Results of [`DatabaseRequest::Query`] when `with_docs` is true.
ViewMappingsWithDocs(Vec<MappedDocument>),
/// Result of [`DatabaseRequest::Reduce`].
/// Result of [`DatabaseRequest::Reduce`] when `grouped` is false.
ViewReduction(Vec<u8>),
/// Result of [`DatabaseRequest::Reduce`] when `grouped` is true.
ViewGroupedReduction(Vec<MappedValue<Vec<u8>, Vec<u8>>>),
/// Results of [`DatabaseRequest::ListExecutedTransactions`].
ExecutedTransactions(Vec<Executed<'static>>),
/// Result of [`DatabaseRequest::LastTransactionId`].
Expand Down
4 changes: 2 additions & 2 deletions core/src/schema/view/map.rs
Expand Up @@ -79,8 +79,8 @@ impl Serialized {
}

/// A key value pair
#[derive(PartialEq, Debug)]
pub struct MappedValue<K: Key, V: Serialize> {
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct MappedValue<K: Key, V> {
/// The key responsible for generating the value
pub key: K,

Expand Down
52 changes: 36 additions & 16 deletions core/src/test_util.rs
Expand Up @@ -376,8 +376,8 @@ macro_rules! define_connection_test_suite {
};
}

pub async fn store_retrieve_update_delete_tests<'a, C: Connection<'a>>(
db: &'a C,
pub async fn store_retrieve_update_delete_tests<C: Connection>(
db: &C,
) -> Result<(), anyhow::Error> {
let original_value = Basic::new("initial_value");
let collection = db.collection::<Basic>();
Expand Down Expand Up @@ -438,15 +438,15 @@ pub async fn store_retrieve_update_delete_tests<'a, C: Connection<'a>>(
Ok(())
}

pub async fn not_found_tests<'a, C: Connection<'a>>(db: &'a C) -> Result<(), anyhow::Error> {
pub async fn not_found_tests<C: Connection>(db: &C) -> Result<(), anyhow::Error> {
assert!(db.collection::<Basic>().get(1).await?.is_none());

assert!(db.last_transaction_id().await?.is_none());

Ok(())
}

pub async fn conflict_tests<'a, C: Connection<'a>>(db: &'a C) -> Result<(), anyhow::Error> {
pub async fn conflict_tests<C: Connection>(db: &C) -> Result<(), anyhow::Error> {
let original_value = Basic::new("initial_value");
let collection = db.collection::<Basic>();
let header = collection.push(&original_value).await?;
Expand Down Expand Up @@ -478,7 +478,7 @@ pub async fn conflict_tests<'a, C: Connection<'a>>(db: &'a C) -> Result<(), anyh
Ok(())
}

pub async fn bad_update_tests<'a, C: Connection<'a>>(db: &'a C) -> Result<(), anyhow::Error> {
pub async fn bad_update_tests<C: Connection>(db: &C) -> Result<(), anyhow::Error> {
let mut doc = Document::with_contents(1, &Basic::default(), Basic::collection_id())?;
match db.update(&mut doc).await {
Err(Error::DocumentNotFound(collection, id)) => {
Expand All @@ -490,7 +490,7 @@ pub async fn bad_update_tests<'a, C: Connection<'a>>(db: &'a C) -> Result<(), an
}
}

pub async fn no_update_tests<'a, C: Connection<'a>>(db: &'a C) -> Result<(), anyhow::Error> {
pub async fn no_update_tests<C: Connection>(db: &C) -> Result<(), anyhow::Error> {
let original_value = Basic::new("initial_value");
let collection = db.collection::<Basic>();
let header = collection.push(&original_value).await?;
Expand All @@ -506,7 +506,7 @@ pub async fn no_update_tests<'a, C: Connection<'a>>(db: &'a C) -> Result<(), any
Ok(())
}

pub async fn get_multiple_tests<'a, C: Connection<'a>>(db: &'a C) -> Result<(), anyhow::Error> {
pub async fn get_multiple_tests<C: Connection>(db: &C) -> Result<(), anyhow::Error> {
let collection = db.collection::<Basic>();
let doc1_value = Basic::new("initial_value");
let doc1 = collection.push(&doc1_value).await?;
Expand Down Expand Up @@ -535,9 +535,7 @@ pub async fn get_multiple_tests<'a, C: Connection<'a>>(db: &'a C) -> Result<(),
Ok(())
}

pub async fn list_transactions_tests<'a, C: Connection<'a>>(
db: &'a C,
) -> Result<(), anyhow::Error> {
pub async fn list_transactions_tests<C: Connection>(db: &C) -> Result<(), anyhow::Error> {
let collection = db.collection::<Basic>();

// create LIST_TRANSACTIONS_MAX_RESULTS + 1 items, giving us just enough
Expand Down Expand Up @@ -583,7 +581,7 @@ pub async fn list_transactions_tests<'a, C: Connection<'a>>(
Ok(())
}

pub async fn view_query_tests<'a, C: Connection<'a>>(db: &'a C) -> anyhow::Result<()> {
pub async fn view_query_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
let collection = db.collection::<Basic>();
let a = collection.push(&Basic::new("A")).await?;
let b = collection.push(&Basic::new("B")).await?;
Expand Down Expand Up @@ -647,9 +645,7 @@ pub async fn view_query_tests<'a, C: Connection<'a>>(db: &'a C) -> anyhow::Resul
Ok(())
}

pub async fn unassociated_collection_tests<'a, C: Connection<'a>>(
db: &'a C,
) -> Result<(), anyhow::Error> {
pub async fn unassociated_collection_tests<C: Connection>(db: &C) -> Result<(), anyhow::Error> {
assert!(matches!(
db.collection::<UnassociatedCollection>()
.push(&Basic::default())
Expand All @@ -660,7 +656,7 @@ pub async fn unassociated_collection_tests<'a, C: Connection<'a>>(
Ok(())
}

pub async fn view_update_tests<'a, C: Connection<'a>>(db: &'a C) -> anyhow::Result<()> {
pub async fn view_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
let collection = db.collection::<Basic>();
let a = collection.push(&Basic::new("A")).await?;

Expand Down Expand Up @@ -702,6 +698,21 @@ pub async fn view_update_tests<'a, C: Connection<'a>>(db: &'a C) -> anyhow::Resu
1
);

// Verify reduce_grouped matches our expectations.
assert_eq!(
db.view::<BasicByParentId>().reduce_grouped().await?,
vec![
MappedValue {
key: None,
value: 1,
},
MappedValue {
key: Some(a.id),
value: 1,
},
]
);

// Test updating the record and the view being updated appropriately
let mut doc = db.collection::<Basic>().get(a_child.id).await?.unwrap();
let mut basic = doc.contents::<Basic>()?;
Expand Down Expand Up @@ -730,10 +741,19 @@ pub async fn view_update_tests<'a, C: Connection<'a>>(db: &'a C) -> anyhow::Resu
let all_entries = db.view::<BasicByParentId>().query().await?;
assert_eq!(all_entries.len(), 1);

// Verify reduce_grouped matches our expectations.
assert_eq!(
db.view::<BasicByParentId>().reduce_grouped().await?,
vec![MappedValue {
key: None,
value: 1,
},]
);

Ok(())
}

pub async fn view_access_policy_tests<'a, C: Connection<'a>>(db: &'a C) -> anyhow::Result<()> {
pub async fn view_access_policy_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
let collection = db.collection::<Basic>();
let a = collection.push(&Basic::new("A")).await?;

Expand Down

0 comments on commit c14c6d7

Please sign in to comment.