Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State messaging channel for services to report on async work for better tests #266

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- State messaging channel for all services [#266](https://github.com/p2panda/aquadoggo/pull/266)

### Fixed

- Fix polution of mutably shared schema store during testing [#266](https://github.com/p2panda/aquadoggo/pull/266)
sandreae marked this conversation as resolved.
Show resolved Hide resolved

## [0.4.0]

### Added
Expand Down
39 changes: 39 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ p2panda-rs = { version = "^0.6.0", features = [
"storage-provider",
] }
serde = { version = "^1.0.144", features = ["derive"] }
serial_test = "0.9.0"
sandreae marked this conversation as resolved.
Show resolved Hide resolved
sqlx = { version = "^0.6.1", features = [
"any",
"postgres",
Expand Down
12 changes: 12 additions & 0 deletions aquadoggo/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use p2panda_rs::operation::OperationId;

use crate::manager::Sender;
use crate::materializer::{Task, TaskInput};

/// Sender for cross-service communication bus.
pub type ServiceSender = Sender<ServiceMessage>;
Expand All @@ -13,3 +14,14 @@ pub enum ServiceMessage {
/// A new operation arrived at the node.
NewOperation(OperationId),
}

/// Messages sent on the service status channel.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ServiceStatusMessage {
/// Message from the http service announcing the GraphQL Schema has been
/// built or re-built due to a new schema being materialized.
GraphQLSchemaBuilt,

/// Message from the materializer service containing a newly completed tasks.
sandreae marked this conversation as resolved.
Show resolved Hide resolved
MaterializerTaskComplete(Task<TaskInput>),
}
5 changes: 5 additions & 0 deletions aquadoggo/src/graphql/client/dynamic_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,15 @@ mod test {
use p2panda_rs::test_utils::fixtures::random_key_pair;
use rstest::rstest;
use serde_json::json;
use serial_test::serial;

use crate::db::stores::test_utils::{
add_document, add_schema, test_db, TestDatabase, TestDatabaseRunner,
};
use crate::test_helpers::graphql_test_client;

#[rstest]
#[serial]
fn single_query(#[from(test_db)] runner: TestDatabaseRunner) {
// Test single query parameter variations.

Expand Down Expand Up @@ -482,6 +484,7 @@ mod test {
}

#[rstest]
#[serial]
#[case::unknown_document_id(
"id: \"00208f7492d6eb01360a886dac93da88982029484d8c04a0bd2ac0607101b80a6634\"",
value!({
Expand Down Expand Up @@ -555,6 +558,7 @@ mod test {
}

#[rstest]
#[serial]
fn collection_query(#[from(test_db)] runner: TestDatabaseRunner) {
// Test collection query parameter variations.

Expand Down Expand Up @@ -602,6 +606,7 @@ mod test {
}

#[rstest]
#[serial]
fn type_name(#[from(test_db)] runner: TestDatabaseRunner) {
// Test availability of `__typename` on all objects.

Expand Down
5 changes: 5 additions & 0 deletions aquadoggo/src/graphql/client/dynamic_types/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use p2panda_rs::schema::{FieldType, SchemaId, SYSTEM_SCHEMAS};
use p2panda_rs::test_utils::fixtures::random_key_pair;
use rstest::rstest;
use serde_json::json;
use serial_test::serial;

use crate::db::stores::test_utils::{add_schema, test_db, TestDatabase, TestDatabaseRunner};
use crate::test_helpers::graphql_test_client;

#[rstest]
#[serial]
#[case(SYSTEM_SCHEMAS[0].id().to_string(), SYSTEM_SCHEMAS[0].description().to_string())]
#[case(SYSTEM_SCHEMAS[1].id().to_string(), SYSTEM_SCHEMAS[1].description().to_string())]
fn system_schema_container_type(
Expand Down Expand Up @@ -71,6 +73,7 @@ fn system_schema_container_type(
}

#[rstest]
#[serial]
fn application_schema_container_type(#[from(test_db)] runner: TestDatabaseRunner) {
runner.with_db_teardown(move |mut db: TestDatabase| async move {
let key_pair = random_key_pair();
Expand Down Expand Up @@ -137,6 +140,7 @@ fn application_schema_container_type(#[from(test_db)] runner: TestDatabaseRunner
}

#[rstest]
#[serial]
fn application_schema_fields_type(#[from(test_db)] runner: TestDatabaseRunner) {
runner.with_db_teardown(move |mut db: TestDatabase| async move {
let key_pair = random_key_pair();
Expand Down Expand Up @@ -220,6 +224,7 @@ fn application_schema_fields_type(#[from(test_db)] runner: TestDatabaseRunner) {
}

#[rstest]
#[serial]
fn metadata_type(#[from(test_db)] runner: TestDatabaseRunner) {
runner.with_db_teardown(move |db: TestDatabase| async move {
let client = graphql_test_client(&db).await;
Expand Down
30 changes: 25 additions & 5 deletions aquadoggo/src/graphql/client/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use p2panda_rs::entry::EncodedEntry;
use p2panda_rs::operation::decode::decode_operation;
use p2panda_rs::operation::traits::Schematic;
use p2panda_rs::operation::{EncodedOperation, OperationId};
use p2panda_rs::Human;

use crate::bus::{ServiceMessage, ServiceSender};
use crate::db::provider::SqlStorage;
Expand Down Expand Up @@ -47,7 +48,7 @@ impl ClientMutationRoot {
let schema = schema_provider
.get(operation.schema_id())
.await
.ok_or_else(|| anyhow!("Schema not found"))?;
.ok_or_else(|| anyhow!("Schema {} not found", operation.schema_id().display()))?;

/////////////////////////////////////
// PUBLISH THE ENTRY AND OPERATION //
Expand Down Expand Up @@ -106,6 +107,7 @@ mod tests {
};
use rstest::{fixture, rstest};
use serde_json::json;
use serial_test::serial;
use tokio::sync::broadcast;

use crate::bus::ServiceMessage;
Expand Down Expand Up @@ -198,6 +200,7 @@ mod tests {
}

#[rstest]
#[serial]
fn publish_entry(
#[from(test_db)]
#[with(0, 0, 0, false, test_schema())]
Expand All @@ -206,7 +209,8 @@ mod tests {
) {
runner.with_db_teardown(move |db: TestDatabase| async move {
let (tx, _rx) = broadcast::channel(120);
let manager = GraphQLSchemaManager::new(db.store, tx, db.context.schema_provider.clone()).await;
let (tx_status, _rx) = broadcast::channel(120);
let manager = GraphQLSchemaManager::new(db.store, tx, tx_status, db.context.schema_provider.clone()).await;
let context = HttpServiceContext::new(manager);

let response = context.schema.execute(publish_request).await;
Expand All @@ -226,6 +230,7 @@ mod tests {
}

#[rstest]
#[serial]
fn sends_message_on_communication_bus(
#[from(test_db)]
#[with(0, 0, 0, false, test_schema())]
Expand All @@ -234,8 +239,14 @@ mod tests {
) {
runner.with_db_teardown(move |db: TestDatabase| async move {
let (tx, mut rx) = broadcast::channel(120);
let manager =
GraphQLSchemaManager::new(db.store, tx, db.context.schema_provider.clone()).await;
let (tx_status, _rx) = broadcast::channel(120);
let manager = GraphQLSchemaManager::new(
db.store,
tx,
tx_status,
db.context.schema_provider.clone(),
)
.await;
let context = HttpServiceContext::new(manager);

context.schema.execute(publish_request).await;
Expand All @@ -253,6 +264,7 @@ mod tests {
}

#[rstest]
#[serial]
fn post_gql_mutation(
#[from(test_db)]
#[with(0, 0, 0, false, test_schema())]
Expand Down Expand Up @@ -291,6 +303,7 @@ mod tests {
}

#[rstest]
#[serial]
#[case::invalid_entry_bytes(
"AB01",
&OPERATION_ENCODED,
Expand Down Expand Up @@ -507,6 +520,7 @@ mod tests {
}

#[rstest]
#[serial]
#[case::backlink_and_skiplink_not_in_db(
&entry_signed_encoded_unvalidated(
8,
Expand Down Expand Up @@ -637,6 +651,7 @@ mod tests {
}

#[rstest]
#[serial]
fn publish_many_entries(
#[from(test_db)]
#[with(0, 0, 0, false, doggo_schema())]
Expand Down Expand Up @@ -726,6 +741,7 @@ mod tests {
}

#[rstest]
#[serial]
fn duplicate_publishing_of_entries(
#[from(test_db)]
#[with(1, 1, 1, false, doggo_schema())]
Expand Down Expand Up @@ -770,6 +786,7 @@ mod tests {
}

#[rstest]
#[serial]
fn publish_unsupported_schema(
#[from(encoded_entry)] entry_with_unsupported_schema: EncodedEntry,
#[from(encoded_operation)] operation_with_unsupported_schema: EncodedOperation,
Expand Down Expand Up @@ -799,7 +816,10 @@ mod tests {
let response = response.json::<serde_json::Value>().await;

for error in response.get("errors").unwrap().as_array().unwrap() {
assert_eq!(error.get("message").unwrap(), "Schema not found")
assert_eq!(
error.get("message").unwrap(),
"Schema venue 8fc78b not found"
)
}
});
}
Expand Down
4 changes: 4 additions & 0 deletions aquadoggo/src/graphql/client/static_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ mod tests {
use async_graphql::{value, Response};
use rstest::rstest;
use serde_json::json;
use serial_test::serial;

use crate::db::stores::test_utils::{test_db, TestDatabase, TestDatabaseRunner};
use crate::test_helpers::graphql_test_client;

#[rstest]
#[serial]
fn next_args_valid_query(#[from(test_db)] runner: TestDatabaseRunner) {
runner.with_db_teardown(move |db: TestDatabase| async move {
let client = graphql_test_client(&db).await;
Expand Down Expand Up @@ -93,6 +95,7 @@ mod tests {
})
}
#[rstest]
#[serial]
fn next_args_valid_query_with_document_id(
#[with(1, 1, 1)]
#[from(test_db)]
Expand Down Expand Up @@ -146,6 +149,7 @@ mod tests {
}

#[rstest]
#[serial]
fn next_args_error_response(#[from(test_db)] runner: TestDatabaseRunner) {
runner.with_db_teardown(move |db: TestDatabase| async move {
let client = graphql_test_client(&db).await;
Expand Down
3 changes: 3 additions & 0 deletions aquadoggo/src/graphql/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use p2panda_rs::schema::FieldType;
use p2panda_rs::test_utils::fixtures::random_key_pair;
use rstest::rstest;
use serde_json::json;
use serial_test::serial;

use crate::db::stores::test_utils::{
add_document, add_schema, test_db, TestDatabase, TestDatabaseRunner,
Expand All @@ -18,6 +19,7 @@ use crate::test_helpers::graphql_test_client;
// Test querying application documents with scalar fields (no relations) by document id and by view
// id.
#[rstest]
#[serial]
fn scalar_fields(#[from(test_db)] runner: TestDatabaseRunner) {
runner.with_db_teardown(&|mut db: TestDatabase| async move {
let key_pair = random_key_pair();
Expand Down Expand Up @@ -91,6 +93,7 @@ fn scalar_fields(#[from(test_db)] runner: TestDatabaseRunner) {
// Test querying application documents across a parent-child relation using different kinds of
// relation fields.
#[rstest]
#[serial]
fn relation_fields(#[from(test_db)] runner: TestDatabaseRunner) {
runner.with_db_teardown(&|mut db: TestDatabase| async move {
let key_pair = random_key_pair();
Expand Down
Loading