diff --git a/CHANGELOG.md b/CHANGELOG.md index 80b0c3bf9..499d7db22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Introduce `identify` and `rendezvous` network protocols / behaviours [#304](https://github.com/p2panda/aquadoggo/pull/304) + ### Added - Introduce libp2p networking service and configuration [#282](https://github.com/p2panda/aquadoggo/pull/282) @@ -19,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Implement API changes to p2panda-rs storage traits, new and breaking db migration [#268](https://github.com/p2panda/aquadoggo/pull/268) - Move all test utils into one module [#275](https://github.com/p2panda/aquadoggo/pull/275) - Use new version of `async-graphql` for dynamic schema generation [#287](https://github.com/p2panda/aquadoggo/pull/287) +- Restructure `graphql` module [#307](https://github.com/p2panda/aquadoggo/pull/307) - Removed replication service for now, preparing for new replication protocol [#296](https://github.com/p2panda/aquadoggo/pull/296) ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 9e4c9c8c3..8087be9a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,6 +134,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "ahash" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.20" @@ -202,6 +213,7 @@ dependencies = [ "aquadoggo", "clap", "env_logger", + "libp2p", "tokio", ] @@ -613,6 +625,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bimap" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc0455254eb5c6964c4545d8bac815e1a1be4f3afe0ae695ea539c12d728d44b" + [[package]] name = "bincode" version = "1.3.3" @@ -1002,6 +1020,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-bigint" version = "0.4.9" @@ -1808,7 +1832,16 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "ahash", + "ahash 0.7.6", +] + +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.3", ] [[package]] @@ -1817,7 +1850,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" dependencies = [ - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -1887,6 +1920,12 @@ dependencies = [ "serde", ] +[[package]] +name = "hex_fmt" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" + [[package]] name = "hkdf" version = "0.12.3" @@ -2072,7 +2111,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", "serde", ] @@ -2217,11 +2256,15 @@ dependencies = [ "instant", "libp2p-core", "libp2p-dns", + "libp2p-gossipsub", + "libp2p-identify", "libp2p-identity", + "libp2p-kad", "libp2p-mdns", "libp2p-metrics", "libp2p-ping", "libp2p-quic", + "libp2p-rendezvous", "libp2p-swarm", "libp2p-tcp", "libp2p-webrtc", @@ -2251,6 +2294,7 @@ dependencies = [ "quick-protobuf", "rand 0.8.5", "rw-stream-sink", + "serde", "smallvec", "thiserror", "unsigned-varint", @@ -2271,6 +2315,59 @@ dependencies = [ "trust-dns-resolver", ] +[[package]] +name = "libp2p-gossipsub" +version = "0.44.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "708235886ca7c8f3792a8683ef81f9b7fb0a952bbd15fe5038b7610689a88390" +dependencies = [ + "asynchronous-codec", + "base64 0.21.0", + "byteorder", + "bytes", + "fnv", + "futures", + "hex_fmt", + "instant", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "log", + "prometheus-client", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "regex", + "serde", + "sha2 0.10.6", + "smallvec", + "thiserror", + "unsigned-varint", + "wasm-timer", +] + +[[package]] +name = "libp2p-identify" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40d1da1f75baf824cfdc80f6aced51f7cbf8dc14e32363e9443570a80d4ee337" +dependencies = [ + "asynchronous-codec", + "either", + "futures", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "log", + "lru", + "quick-protobuf", + "quick-protobuf-codec", + "smallvec", + "thiserror", + "void", +] + [[package]] name = "libp2p-identity" version = "0.1.0" @@ -2286,10 +2383,40 @@ dependencies = [ "prost-build", "quick-protobuf", "rand 0.8.5", + "serde", "thiserror", "zeroize", ] +[[package]] +name = "libp2p-kad" +version = "0.43.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bc57e02d7ad49a63792370f24b829af38f34982ff56556e59e4cb325a4dbf6b" +dependencies = [ + "arrayvec 0.7.2", + "asynchronous-codec", + "bytes", + "either", + "fnv", + "futures", + "futures-timer", + "instant", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "log", + "quick-protobuf", + "rand 0.8.5", + "serde", + "sha2 0.10.6", + "smallvec", + "thiserror", + "uint", + "unsigned-varint", + "void", +] + [[package]] name = "libp2p-mdns" version = "0.43.0" @@ -2317,6 +2444,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a42ec91e227d7d0dafa4ce88b333cdf5f277253873ab087555c92798db2ddd46" dependencies = [ "libp2p-core", + "libp2p-identify", "libp2p-ping", "libp2p-swarm", "prometheus-client", @@ -2384,6 +2512,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "libp2p-rendezvous" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633f2dc23d63ad04955642f3025e740a943da4deb79b252b5fcf882208164467" +dependencies = [ + "asynchronous-codec", + "bimap", + "futures", + "futures-timer", + "instant", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "log", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "thiserror", + "void", +] + [[package]] name = "libp2p-swarm" version = "0.42.0" @@ -2537,6 +2687,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lru" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "lru-cache" version = "0.1.2" @@ -2689,6 +2848,8 @@ dependencies = [ "core2", "digest 0.10.6", "multihash-derive", + "serde", + "serde-big-array", "sha2 0.10.6", "unsigned-varint", ] @@ -3988,6 +4149,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-big-array" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd31f59f6fe2b0c055371bb2f16d7f0aa7d8881676c04a55b1596d1a17cd10a4" +dependencies = [ + "serde", +] + [[package]] name = "serde-wasm-bindgen" version = "0.4.5" @@ -4233,7 +4403,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcbc16ddba161afc99e14d1713a453747a2b07fc097d2009f4c300ec99286105" dependencies = [ - "ahash", + "ahash 0.7.6", "atoi", "base64 0.13.1", "bitflags", @@ -4805,6 +4975,18 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e79c4d996edb816c91e4308506774452e55e95c3c9de07b6729e17e15a5ef81" +[[package]] +name = "uint" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76f64bba2c53b04fcab63c01a7d7427eadc821e3bc48c34dc9ba29c501164b52" +dependencies = [ + "byteorder", + "crunchy", + "hex", + "static_assertions 1.1.0", +] + [[package]] name = "unicode-bidi" version = "0.3.11" @@ -5055,6 +5237,21 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasm-timer" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f" +dependencies = [ + "futures", + "js-sys", + "parking_lot 0.11.2", + "pin-utils", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.61" diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index 790d58476..773ed19af 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -7,6 +7,7 @@ authors = [ "pietgeursen ", "sandreae ", "sophiiistika ", + "glyph ", ] description = "Embeddable p2p network node" license = "AGPL-3.0-or-later" @@ -38,6 +39,9 @@ libp2p = { version = "^0.51.0", features = [ "quic", "ping", "mdns", + "identify", + "rendezvous", + "serde", ] } lipmaa-link = "^0.2.2" log = "^0.4.17" diff --git a/aquadoggo/src/graphql/mod.rs b/aquadoggo/src/graphql/mod.rs index b7ca3aa7d..6c24b48e1 100644 --- a/aquadoggo/src/graphql/mod.rs +++ b/aquadoggo/src/graphql/mod.rs @@ -2,9 +2,11 @@ pub mod constants; pub mod mutations; +pub mod queries; pub mod scalars; mod schema; -mod schema_builders; +#[cfg(test)] +mod tests; pub mod types; pub mod utils; diff --git a/aquadoggo/src/graphql/queries/all_documents.rs b/aquadoggo/src/graphql/queries/all_documents.rs new file mode 100644 index 000000000..13184ea14 --- /dev/null +++ b/aquadoggo/src/graphql/queries/all_documents.rs @@ -0,0 +1,120 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_graphql::dynamic::{Field, FieldFuture, Object, TypeRef}; +use dynamic_graphql::FieldValue; +use log::debug; +use p2panda_rs::document::traits::AsDocument; +use p2panda_rs::schema::Schema; +use p2panda_rs::storage_provider::traits::DocumentStore; + +use crate::db::SqlStore; +use crate::graphql::constants; +use crate::graphql::scalars::{DocumentIdScalar, DocumentViewIdScalar}; + +/// Adds GraphQL query for getting all documents of a certain p2panda schema to the root query +/// object. +/// +/// The query follows the format `all_`. +pub fn build_all_documents_query(query: Object, schema: &Schema) -> Object { + let schema_id = schema.id().clone(); + query.field( + Field::new( + format!("{}{}", constants::QUERY_ALL_PREFIX, schema_id), + TypeRef::named_list(schema_id.to_string()), + move |ctx| { + // Take ownership of the schema id in the resolver. + let schema_id = schema_id.clone(); + + debug!( + "Query to {}{} received", + constants::QUERY_ALL_PREFIX, + schema_id + ); + + FieldFuture::new(async move { + // Fetch all queried documents and compose the field value, a list of document + // id / view id tuples, which will bubble up the query tree. + + let store = ctx.data_unchecked::(); + let documents: Vec = store + .get_documents_by_schema(&schema_id) + .await? + .iter() + .map(|document| { + FieldValue::owned_any(( + Some(DocumentIdScalar::from(document.id())), + None::, + )) + }) + .collect(); + + // Pass the list up to the children query fields. + Ok(Some(FieldValue::list(documents))) + }) + }, + ) + .description(format!("Get all {} documents.", schema.name())), + ) +} + +#[cfg(test)] +mod test { + use async_graphql::{value, Response}; + use p2panda_rs::identity::KeyPair; + use p2panda_rs::schema::FieldType; + use p2panda_rs::test_utils::fixtures::random_key_pair; + use rstest::rstest; + use serde_json::json; + + use crate::test_utils::{add_document, add_schema, graphql_test_client, test_runner, TestNode}; + + #[rstest] + fn collection_query(#[from(random_key_pair)] key_pair: KeyPair) { + // Test collection query parameter variations. + test_runner(move |mut node: TestNode| async move { + // Add schema to node. + let schema = add_schema( + &mut node, + "schema_name", + vec![("bool", FieldType::Boolean)], + &key_pair, + ) + .await; + + // Publish document on node. + add_document( + &mut node, + schema.id(), + vec![("bool", true.into())], + &key_pair, + ) + .await; + + // Configure and send test query. + let client = graphql_test_client(&node).await; + let query = format!( + r#"{{ + collection: all_{type_name} {{ + fields {{ bool }} + }}, + }}"#, + type_name = schema.id(), + ); + + let response = client + .post("/graphql") + .json(&json!({ + "query": query, + })) + .send() + .await; + + let response: Response = response.json().await; + + let expected_data = value!({ + "collection": value!([{ "fields": { "bool": true, } }]), + }); + assert_eq!(response.data, expected_data, "{:#?}", response.errors); + }); + } +} diff --git a/aquadoggo/src/graphql/schema_builders/document.rs b/aquadoggo/src/graphql/queries/document.rs similarity index 51% rename from aquadoggo/src/graphql/schema_builders/document.rs rename to aquadoggo/src/graphql/queries/document.rs index f036e230d..edf827463 100644 --- a/aquadoggo/src/graphql/schema_builders/document.rs +++ b/aquadoggo/src/graphql/queries/document.rs @@ -1,71 +1,13 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use async_graphql::dynamic::{Field, FieldFuture, InputValue, Object, TypeRef}; -use dynamic_graphql::{Error, FieldValue, ScalarValue}; +use async_graphql::dynamic::{Field, FieldFuture, InputValue, Object, ResolverContext, TypeRef}; +use async_graphql::Error; +use dynamic_graphql::{FieldValue, ScalarValue}; use log::debug; -use p2panda_rs::storage_provider::traits::DocumentStore; -use p2panda_rs::{document::traits::AsDocument, schema::Schema}; +use p2panda_rs::schema::Schema; -use crate::db::SqlStore; use crate::graphql::constants; use crate::graphql::scalars::{DocumentIdScalar, DocumentViewIdScalar}; -use crate::graphql::types::DocumentMeta; -use crate::graphql::utils::{downcast_id_params, fields_name, get_document_from_params}; - -/// Build a GraphQL object type for a p2panda schema. -/// -/// Contains resolvers for both `fields` and `meta`. The former simply passes up the query -/// arguments to it's children query fields. The latter retrieves the document being queried and -/// already constructs and returns the `DocumentMeta` object. -pub fn build_document_schema(schema: &Schema) -> Object { - let document_fields_name = fields_name(schema.id()); - Object::new(schema.id().to_string()) - // The `fields` field of a document, passes up the query arguments to it's children. - .field(Field::new( - constants::FIELDS_FIELD, - TypeRef::named(document_fields_name), - move |ctx| { - FieldFuture::new(async move { - // Here we just pass up the root query parameters to be used in the fields resolver - let params = downcast_id_params(&ctx); - Ok(Some(FieldValue::owned_any(params))) - }) - }, - )) - // The `meta` field of a document, resolves the `DocumentMeta` object. - .field(Field::new( - constants::META_FIELD, - TypeRef::named(constants::DOCUMENT_META), - move |ctx| { - FieldFuture::new(async move { - let store = ctx.data_unchecked::(); - - // Downcast the parameters passed up from the parent query field - let (document_id, document_view_id) = downcast_id_params(&ctx); - // Get the whole document - let document = - get_document_from_params(store, &document_id, &document_view_id).await?; - - // Construct `DocumentMeta` and return it. We defined the document meta - // type and already registered it in the schema. It's derived resolvers - // will handle field selection. - let field_value = match document { - Some(document) => { - let document_meta = DocumentMeta { - document_id: document.id().into(), - view_id: document.view_id().into(), - }; - Some(FieldValue::owned_any(document_meta)) - } - None => Some(FieldValue::NULL), - }; - - Ok(field_value) - }) - }, - )) - .description(schema.description().to_string()) -} /// Adds GraphQL query for getting a single p2panda document, selected by its document id or /// document view id to the root query object. @@ -78,43 +20,12 @@ pub fn build_document_query(query: Object, schema: &Schema) -> Object { schema_id.to_string(), TypeRef::named(schema_id.to_string()), move |ctx| { - let schema_id = schema_id.clone(); FieldFuture::new(async move { - // Parse arguments - let mut document_id = None; - let mut document_view_id = None; - for (name, id) in ctx.field().arguments()?.into_iter() { - match name.as_str() { - constants::DOCUMENT_ID_ARG => { - document_id = Some(DocumentIdScalar::from_value(id)?); - } - constants::DOCUMENT_VIEW_ID_ARG => { - document_view_id = Some(DocumentViewIdScalar::from_value(id)?) - } - _ => (), - } - } + // Validate the received arguments. + let args = validate_args(&ctx)?; - // Check a valid combination of arguments was passed - match (&document_id, &document_view_id) { - (None, None) => { - return Err(Error::new("Must provide either `id` or `viewId` argument")) - } - (Some(_), Some(_)) => { - return Err(Error::new("Must only provide `id` or `viewId` argument")) - } - (Some(id), None) => { - debug!("Query to {} received for document {}", schema_id, id); - } - (None, Some(id)) => { - debug!( - "Query to {} received for document at view id {}", - schema_id, id - ); - } - }; - // Pass them up to the children query fields - Ok(Some(FieldValue::owned_any((document_id, document_view_id)))) + // Pass them up to the children query fields. + Ok(Some(FieldValue::owned_any(args))) }) }, ) @@ -133,49 +44,43 @@ pub fn build_document_query(query: Object, schema: &Schema) -> Object { ) } -/// Adds GraphQL query for getting all documents of a certain p2panda schema to the root query -/// object. -/// -/// The query follows the format `all_`. -pub fn build_all_document_query(query: Object, schema: &Schema) -> Object { - let schema_id = schema.id().clone(); - query.field( - Field::new( - format!("{}{}", constants::QUERY_ALL_PREFIX, schema_id), - TypeRef::named_list(schema_id.to_string()), - move |ctx| { - let schema_id = schema_id.clone(); - FieldFuture::new(async move { - debug!( - "Query to {}{} received", - constants::QUERY_ALL_PREFIX, - schema_id - ); - - // Access the store. - let store = ctx.data_unchecked::(); +fn validate_args( + ctx: &ResolverContext, +) -> Result<(Option, Option), Error> { + // Parse arguments + let schema_id = ctx.field().name(); + let mut document_id = None; + let mut document_view_id = None; + for (name, id) in ctx.field().arguments()?.into_iter() { + match name.as_str() { + constants::DOCUMENT_ID_ARG => { + document_id = Some(DocumentIdScalar::from_value(id)?); + } + constants::DOCUMENT_VIEW_ID_ARG => { + document_view_id = Some(DocumentViewIdScalar::from_value(id)?) + } + _ => (), + } + } - // Fetch all documents of the schema this endpoint serves and compose the - // field value (a list) which will bubble up the query tree. - let documents: Vec = store - .get_documents_by_schema(&schema_id) - .await? - .iter() - .map(|document| { - FieldValue::owned_any(( - Some(DocumentIdScalar::from(document.id())), - None::, - )) - }) - .collect(); + // Check a valid combination of arguments was passed + match (&document_id, &document_view_id) { + (None, None) => return Err(Error::new("Must provide either `id` or `viewId` argument")), + (Some(_), Some(_)) => { + return Err(Error::new("Must only provide `id` or `viewId` argument")) + } + (Some(id), None) => { + debug!("Query to {} received for document {}", schema_id, id); + } + (None, Some(id)) => { + debug!( + "Query to {} received for document at view id {}", + schema_id, id + ); + } + }; - // Pass the list up to the children query fields. - Ok(Some(FieldValue::list(documents))) - }) - }, - ) - .description(format!("Get all {} documents.", schema.name())), - ) + Ok((document_id, document_view_id)) } #[cfg(test)] @@ -343,56 +248,6 @@ mod test { }); } - #[rstest] - fn collection_query(#[from(random_key_pair)] key_pair: KeyPair) { - // Test collection query parameter variations. - test_runner(move |mut node: TestNode| async move { - // Add schema to node. - let schema = add_schema( - &mut node, - "schema_name", - vec![("bool", FieldType::Boolean)], - &key_pair, - ) - .await; - - // Publish document on node. - add_document( - &mut node, - schema.id(), - vec![("bool", true.into())], - &key_pair, - ) - .await; - - // Configure and send test query. - let client = graphql_test_client(&node).await; - let query = format!( - r#"{{ - collection: all_{type_name} {{ - fields {{ bool }} - }}, - }}"#, - type_name = schema.id(), - ); - - let response = client - .post("/graphql") - .json(&json!({ - "query": query, - })) - .send() - .await; - - let response: Response = response.json().await; - - let expected_data = value!({ - "collection": value!([{ "fields": { "bool": true, } }]), - }); - assert_eq!(response.data, expected_data, "{:#?}", response.errors); - }); - } - #[rstest] fn type_name(#[from(random_key_pair)] key_pair: KeyPair) { // Test availability of `__typename` on all objects. diff --git a/aquadoggo/src/graphql/queries/mod.rs b/aquadoggo/src/graphql/queries/mod.rs new file mode 100644 index 000000000..ddf59833a --- /dev/null +++ b/aquadoggo/src/graphql/queries/mod.rs @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +mod all_documents; +mod document; +mod next_args; + +pub use all_documents::build_all_documents_query; +pub use document::build_document_query; +pub use next_args::build_next_args_query; diff --git a/aquadoggo/src/graphql/schema_builders/next_args.rs b/aquadoggo/src/graphql/queries/next_args.rs similarity index 77% rename from aquadoggo/src/graphql/schema_builders/next_args.rs rename to aquadoggo/src/graphql/queries/next_args.rs index b8db5a0d7..eedd83fcb 100644 --- a/aquadoggo/src/graphql/schema_builders/next_args.rs +++ b/aquadoggo/src/graphql/queries/next_args.rs @@ -1,11 +1,10 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use async_graphql::dynamic::{Field, FieldFuture, InputValue, Object, TypeRef}; +use async_graphql::dynamic::{Field, FieldFuture, InputValue, Object, ResolverContext, TypeRef}; +use async_graphql::Error; use dynamic_graphql::{FieldValue, ScalarValue}; use log::debug; use p2panda_rs::api; -use p2panda_rs::document::DocumentViewId; -use p2panda_rs::identity::PublicKey; use crate::db::SqlStore; use crate::graphql::constants; @@ -20,30 +19,17 @@ pub fn build_next_args_query(query: Object) -> Object { TypeRef::named(constants::NEXT_ARGS), |ctx| { FieldFuture::new(async move { - let mut args = ctx.field().arguments()?.into_iter().map(|(_, value)| value); - let store = ctx.data::()?; - - // Convert and validate passed parameters. - let public_key: PublicKey = - PublicKeyScalar::from_value(args.next().unwrap())?.into(); - let document_view_id: Option = match args.next() { - Some(value) => { - let document_view_id = DocumentViewIdScalar::from_value(value)?.into(); - debug!( - "Query to nextArgs received for public key {} and document at view {}", - public_key, document_view_id - ); - Some(document_view_id) - } - None => { - debug!("Query to nextArgs received for public key {}", public_key); - None - } - }; + // Get and validate arguments. + let (public_key, document_view_id) = validate_args(&ctx)?; + let store = ctx.data_unchecked::(); // Calculate next entry's arguments. - let (backlink, skiplink, seq_num, log_id) = - api::next_args(store, &public_key, document_view_id.as_ref()).await?; + let (backlink, skiplink, seq_num, log_id) = api::next_args( + store, + &public_key.into(), + document_view_id.map(|id| id.into()).as_ref(), + ) + .await?; let next_args = NextArguments { log_id: log_id.into(), @@ -68,6 +54,40 @@ pub fn build_next_args_query(query: Object) -> Object { ) } +/// Validate and return the arguments passed to next_args. +fn validate_args( + ctx: &ResolverContext, +) -> Result<(PublicKeyScalar, Option), Error> { + let mut args = ctx.field().arguments()?.into_iter().map(|(_, value)| value); + + // Convert and validate passed parameters. + let public_key = PublicKeyScalar::from_value(args.next().unwrap())?; + let document_view_id = match args.next() { + Some(value) => match value { + async_graphql::Value::Null => None, + async_graphql::Value::String(_) => Some(value), + _ => panic!("Unexpected value type received for viewId in nextArgs"), + }, + None => None, + }; + let document_view_id = match document_view_id { + Some(value) => { + let document_view_id = DocumentViewIdScalar::from_value(value)?; + debug!( + "Query to nextArgs received for public key {} and document at view {}", + public_key, document_view_id + ); + Some(document_view_id) + } + None => { + debug!("Query to nextArgs received for public key {}", public_key); + None + } + }; + + Ok((public_key, document_view_id)) +} + #[cfg(test)] mod tests { use async_graphql::{value, Response}; diff --git a/aquadoggo/src/graphql/schema.rs b/aquadoggo/src/graphql/schema.rs index 2e3756556..39928603e 100644 --- a/aquadoggo/src/graphql/schema.rs +++ b/aquadoggo/src/graphql/schema.rs @@ -13,16 +13,14 @@ use tokio::sync::Mutex; use crate::bus::ServiceSender; use crate::db::SqlStore; use crate::graphql::mutations::{MutationRoot, Publish}; +use crate::graphql::queries::{ + build_all_documents_query, build_document_query, build_next_args_query, +}; use crate::graphql::scalars::{ DocumentIdScalar, DocumentViewIdScalar, EncodedEntryScalar, EncodedOperationScalar, EntryHashScalar, LogIdScalar, PublicKeyScalar, SeqNumScalar, }; -use crate::graphql::schema_builders::{ - build_all_document_query, build_document_field_schema, build_document_query, - build_document_schema, build_next_args_query, -}; -use crate::graphql::types::{DocumentMeta, NextArguments}; -use crate::graphql::utils::fields_name; +use crate::graphql::types::{Document, DocumentFields, DocumentMeta, NextArguments}; use crate::schema::SchemaProvider; /// Returns GraphQL API schema for p2panda node. @@ -66,17 +64,10 @@ pub async fn build_root_schema( // documents they describe. for schema in all_schema { // Construct the document fields object which will be named `Field`. - let schema_field_name = fields_name(schema.id()); - let mut document_schema_fields = Object::new(&schema_field_name); - - // For every field in the schema we create a type with a resolver. - for (name, field_type) in schema.fields().iter() { - document_schema_fields = - build_document_field_schema(document_schema_fields, name.to_string(), field_type); - } + let document_schema_fields = DocumentFields::build(&schema); // Construct the document schema which has "fields" and "meta" fields. - let document_schema = build_document_schema(&schema); + let document_schema = Document::build(&schema); // Register a schema and schema fields type for every schema. schema_builder = schema_builder @@ -89,7 +80,7 @@ pub async fn build_root_schema( root_query = build_document_query(root_query, &schema); // Add a query for retrieving all documents of a certain schema. - root_query = build_all_document_query(root_query, &schema); + root_query = build_all_documents_query(root_query, &schema); } // Add next args to the query object. diff --git a/aquadoggo/src/graphql/schema_builders/document_field.rs b/aquadoggo/src/graphql/schema_builders/document_field.rs deleted file mode 100644 index 63613ce17..000000000 --- a/aquadoggo/src/graphql/schema_builders/document_field.rs +++ /dev/null @@ -1,106 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later - -use async_graphql::dynamic::{Field, FieldFuture, Object, TypeRef}; -use dynamic_graphql::FieldValue; -use p2panda_rs::document::traits::AsDocument; -use p2panda_rs::operation::OperationValue; -use p2panda_rs::schema::FieldType; - -use crate::db::SqlStore; -use crate::graphql::scalars::{DocumentIdScalar, DocumentViewIdScalar}; -use crate::graphql::utils::{downcast_id_params, get_document_from_params, gql_scalar}; - -/// Get the GraphQL type name for a p2panda field type. -/// -/// GraphQL types for relations use the p2panda schema id as their name. -fn graphql_type(field_type: &FieldType) -> TypeRef { - match field_type { - p2panda_rs::schema::FieldType::Boolean => TypeRef::named(TypeRef::BOOLEAN), - p2panda_rs::schema::FieldType::Integer => TypeRef::named(TypeRef::INT), - p2panda_rs::schema::FieldType::Float => TypeRef::named(TypeRef::FLOAT), - p2panda_rs::schema::FieldType::String => TypeRef::named(TypeRef::STRING), - p2panda_rs::schema::FieldType::Relation(schema_id) => TypeRef::named(schema_id.to_string()), - p2panda_rs::schema::FieldType::RelationList(schema_id) => { - TypeRef::named_list(schema_id.to_string()) - } - p2panda_rs::schema::FieldType::PinnedRelation(schema_id) => { - TypeRef::named(schema_id.to_string()) - } - p2panda_rs::schema::FieldType::PinnedRelationList(schema_id) => { - TypeRef::named_list(schema_id.to_string()) - } - } -} - -/// Build a graphql schema object for each field of a document. -/// -/// Contains a resolver which accesses the actual value of the field from the store when queries -/// are resolved. -pub fn build_document_field_schema( - document_fields: Object, - name: String, - field_type: &FieldType, -) -> Object { - // The type of this field. - let field_type = field_type.clone(); - let graphql_type = graphql_type(&field_type); - - // Define the field and create a resolver. - document_fields.field(Field::new(name.clone(), graphql_type, move |ctx| { - let store = ctx.data_unchecked::(); - let name = name.clone(); - - FieldFuture::new(async move { - // Parse the bubble up message. - let (document_id, document_view_id) = downcast_id_params(&ctx); - - // Get the whole document from the store. - let document = - match get_document_from_params(store, &document_id, &document_view_id).await? { - Some(document) => document, - None => return Ok(FieldValue::NONE), - }; - - // Get the field this query is concerned with. - match document.get(&name).unwrap() { - // Relation fields are expected to resolve to the related document so we pass - // along the document id which will be processed through it's own resolver. - OperationValue::Relation(rel) => Ok(Some(FieldValue::owned_any(( - Some(DocumentIdScalar::from(rel.document_id())), - None::, - )))), - // Relation lists are handled by collecting and returning a list of all document - // id's in the relation list. Each of these in turn are processed and queries - // forwarded up the tree via their own respective resolvers. - OperationValue::RelationList(rel) => { - let mut fields = vec![]; - for document_id in rel.iter() { - fields.push(FieldValue::owned_any(( - Some(DocumentIdScalar::from(document_id)), - None::, - ))); - } - Ok(Some(FieldValue::list(fields))) - } - // Pinned relation behaves the same as relation but passes along a document view id. - OperationValue::PinnedRelation(rel) => Ok(Some(FieldValue::owned_any(( - None::, - Some(DocumentViewIdScalar::from(rel.view_id())), - )))), - // Pinned relation lists behave the same as relation lists but pass along view ids. - OperationValue::PinnedRelationList(rel) => { - let mut fields = vec![]; - for document_view_id in rel.iter() { - fields.push(FieldValue::owned_any(( - None::, - Some(DocumentViewIdScalar::from(document_view_id)), - ))); - } - Ok(Some(FieldValue::list(fields))) - } - // All other fields are simply resolved to their scalar value. - value => Ok(Some(FieldValue::value(gql_scalar(value)))), - } - }) - })) -} diff --git a/aquadoggo/src/graphql/schema_builders/mod.rs b/aquadoggo/src/graphql/schema_builders/mod.rs deleted file mode 100644 index f8c2f73ac..000000000 --- a/aquadoggo/src/graphql/schema_builders/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later - -mod document; -mod document_field; -mod next_args; -#[cfg(test)] -mod tests; - -pub use document::{build_all_document_query, build_document_query, build_document_schema}; -pub use document_field::build_document_field_schema; -pub use next_args::build_next_args_query; diff --git a/aquadoggo/src/graphql/schema_builders/tests.rs b/aquadoggo/src/graphql/tests.rs similarity index 100% rename from aquadoggo/src/graphql/schema_builders/tests.rs rename to aquadoggo/src/graphql/tests.rs diff --git a/aquadoggo/src/graphql/types/document.rs b/aquadoggo/src/graphql/types/document.rs new file mode 100644 index 000000000..4d4a3e0ba --- /dev/null +++ b/aquadoggo/src/graphql/types/document.rs @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_graphql::dynamic::{Field, FieldFuture, Object, TypeRef}; +use dynamic_graphql::FieldValue; +use p2panda_rs::schema::Schema; + +use crate::graphql::constants; +use crate::graphql::types::DocumentMeta; +use crate::graphql::utils::{downcast_document_id_arguments, fields_name}; + +/// GraphQL object which represents a document type which contains `fields` and `meta` fields. A +/// type is added to the root GraphQL schema for every document, as these types are not known at +/// compile time we make use of the `async-graphql ` `dynamic` module. +/// +/// See `DocumentFields` and `DocumentMeta` to see the shape of the children field types. +pub struct Document; + +impl Document { + /// Build a GraphQL object type from a p2panda schema. + /// + /// Contains resolvers for both `fields` and `meta`. The former simply passes up the query + /// arguments to it's children query fields. The latter calls the `resolve` method defined on + /// `DocumentMeta` type. + pub fn build(schema: &Schema) -> Object { + let document_fields_name = fields_name(schema.id()); + Object::new(schema.id().to_string()) + // The `fields` field of a document, passes up the query arguments to it's children. + .field(Field::new( + constants::FIELDS_FIELD, + TypeRef::named(document_fields_name), + move |ctx| { + FieldFuture::new(async move { + // Here we just pass up the root query parameters to be used in the fields resolver + let params = downcast_document_id_arguments(&ctx); + Ok(Some(FieldValue::owned_any(params))) + }) + }, + )) + // The `meta` field of a document, resolves the `DocumentMeta` object. + .field(Field::new( + constants::META_FIELD, + TypeRef::named(constants::DOCUMENT_META), + move |ctx| FieldFuture::new(async move { DocumentMeta::resolve(ctx).await }), + )) + .description(schema.description().to_string()) + } +} diff --git a/aquadoggo/src/graphql/types/document_fields.rs b/aquadoggo/src/graphql/types/document_fields.rs new file mode 100644 index 000000000..171c24ce6 --- /dev/null +++ b/aquadoggo/src/graphql/types/document_fields.rs @@ -0,0 +1,101 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_graphql::dynamic::{Field, FieldFuture, Object, ResolverContext}; +use async_graphql::Error; +use dynamic_graphql::FieldValue; +use p2panda_rs::document::traits::AsDocument; +use p2panda_rs::operation::OperationValue; +use p2panda_rs::schema::Schema; + +use crate::db::SqlStore; +use crate::graphql::scalars::{DocumentIdScalar, DocumentViewIdScalar}; +use crate::graphql::utils::{ + downcast_document_id_arguments, fields_name, get_document_from_params, gql_scalar, graphql_type, +}; + +/// GraphQL object which represents the fields of a document document type as described by it's +/// p2panda schema. A type is added to the root GraphQL schema for every document, as these types +/// are not known at compile time we make use of the `async-graphql ` `dynamic` module. +pub struct DocumentFields; + +impl DocumentFields { + /// Build the fields of a document from the related p2panda schema. Constructs an object which + /// can then be added to the root GraphQL schema. + pub fn build(schema: &Schema) -> Object { + // Construct the document fields object which will be named `Field`. + let schema_field_name = fields_name(schema.id()); + let mut document_schema_fields = Object::new(&schema_field_name); + + // For every field in the schema we create a type with a resolver. + for (name, field_type) in schema.fields().iter() { + document_schema_fields = document_schema_fields.field(Field::new( + name, + graphql_type(field_type), + move |ctx| FieldFuture::new(async move { Self::resolve(ctx).await }), + )); + } + + document_schema_fields + } + + /// Resolve a document field value as a graphql `FieldValue`. If the value is a relation, then + /// the relevant document id or document view id is determined and passed along the query chain. + /// If the value is a simple type (meaning it is also a query leaf) then it is directly resolved. + /// + /// Requires a `ResolverContext` to be passed into the method. + async fn resolve(ctx: ResolverContext<'_>) -> Result>, Error> { + let store = ctx.data_unchecked::(); + let name = ctx.field().name(); + + // Parse the bubble up message. + let (document_id, document_view_id) = downcast_document_id_arguments(&ctx); + + // Get the whole document from the store. + let document = + match get_document_from_params(store, &document_id, &document_view_id).await? { + Some(document) => document, + None => return Ok(FieldValue::NONE), + }; + + // Get the field this query is concerned with. + match document.get(name).unwrap() { + // Relation fields are expected to resolve to the related document so we pass + // along the document id which will be processed through it's own resolver. + OperationValue::Relation(rel) => Ok(Some(FieldValue::owned_any(( + Some(DocumentIdScalar::from(rel.document_id())), + None::, + )))), + // Relation lists are handled by collecting and returning a list of all document + // id's in the relation list. Each of these in turn are processed and queries + // forwarded up the tree via their own respective resolvers. + OperationValue::RelationList(rel) => { + let mut fields = vec![]; + for document_id in rel.iter() { + fields.push(FieldValue::owned_any(( + Some(DocumentIdScalar::from(document_id)), + None::, + ))); + } + Ok(Some(FieldValue::list(fields))) + } + // Pinned relation behaves the same as relation but passes along a document view id. + OperationValue::PinnedRelation(rel) => Ok(Some(FieldValue::owned_any(( + None::, + Some(DocumentViewIdScalar::from(rel.view_id())), + )))), + // Pinned relation lists behave the same as relation lists but pass along view ids. + OperationValue::PinnedRelationList(rel) => { + let mut fields = vec![]; + for document_view_id in rel.iter() { + fields.push(FieldValue::owned_any(( + None::, + Some(DocumentViewIdScalar::from(document_view_id)), + ))); + } + Ok(Some(FieldValue::list(fields))) + } + // All other fields are simply resolved to their scalar value. + value => Ok(Some(FieldValue::value(gql_scalar(value)))), + } + } +} diff --git a/aquadoggo/src/graphql/types/document_meta.rs b/aquadoggo/src/graphql/types/document_meta.rs index eb230dd5d..f522af507 100644 --- a/aquadoggo/src/graphql/types/document_meta.rs +++ b/aquadoggo/src/graphql/types/document_meta.rs @@ -1,8 +1,13 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use dynamic_graphql::SimpleObject; +use async_graphql::dynamic::ResolverContext; +use async_graphql::Error; +use dynamic_graphql::{FieldValue, SimpleObject}; +use p2panda_rs::document::traits::AsDocument; +use crate::db::SqlStore; use crate::graphql::scalars::{DocumentIdScalar, DocumentViewIdScalar}; +use crate::graphql::utils::{downcast_document_id_arguments, get_document_from_params}; /// The meta fields of a document. #[derive(SimpleObject)] @@ -13,3 +18,34 @@ pub struct DocumentMeta { #[graphql(name = "viewId")] pub view_id: DocumentViewIdScalar, } + +impl DocumentMeta { + /// Resolve `DocumentMeta` as a graphql `FieldValue`. + /// + /// Requires a `ResolverContext` to be passed into the method. + pub async fn resolve(ctx: ResolverContext<'_>) -> Result>, Error> { + let store = ctx.data_unchecked::(); + + // Downcast the parameters passed up from the parent query field + let (document_id, document_view_id) = downcast_document_id_arguments(&ctx); + + // Get the whole document + let document = get_document_from_params(store, &document_id, &document_view_id).await?; + + // Construct `DocumentMeta` and return it. We defined the document meta + // type and already registered it in the schema. It's derived resolvers + // will handle field selection. + let field_value = match document { + Some(document) => { + let document_meta = Self { + document_id: document.id().into(), + view_id: document.view_id().into(), + }; + Some(FieldValue::owned_any(document_meta)) + } + None => Some(FieldValue::NULL), + }; + + Ok(field_value) + } +} diff --git a/aquadoggo/src/graphql/types/mod.rs b/aquadoggo/src/graphql/types/mod.rs index 23a50c49d..6f3422f04 100644 --- a/aquadoggo/src/graphql/types/mod.rs +++ b/aquadoggo/src/graphql/types/mod.rs @@ -1,7 +1,11 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +mod document; +mod document_fields; mod document_meta; mod next_arguments; +pub use document::Document; +pub use document_fields::DocumentFields; pub use document_meta::DocumentMeta; pub use next_arguments::NextArguments; diff --git a/aquadoggo/src/graphql/utils.rs b/aquadoggo/src/graphql/utils.rs index 5d9b93fd8..f5bf87362 100644 --- a/aquadoggo/src/graphql/utils.rs +++ b/aquadoggo/src/graphql/utils.rs @@ -1,10 +1,10 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use async_graphql::dynamic::ResolverContext; +use async_graphql::dynamic::{ResolverContext, TypeRef}; use async_graphql::Value; use p2panda_rs::document::{DocumentId, DocumentViewId}; use p2panda_rs::operation::OperationValue; -use p2panda_rs::schema::SchemaId; +use p2panda_rs::schema::{FieldType, SchemaId}; use p2panda_rs::storage_provider::error::DocumentStorageError; use p2panda_rs::storage_provider::traits::DocumentStore; @@ -31,9 +31,33 @@ pub fn gql_scalar(operation_value: &OperationValue) -> Value { } } +/// Get the GraphQL type name for a p2panda field type. +/// +/// GraphQL types for relations use the p2panda schema id as their name. +pub fn graphql_type(field_type: &FieldType) -> TypeRef { + match field_type { + p2panda_rs::schema::FieldType::Boolean => TypeRef::named(TypeRef::BOOLEAN), + p2panda_rs::schema::FieldType::Integer => TypeRef::named(TypeRef::INT), + p2panda_rs::schema::FieldType::Float => TypeRef::named(TypeRef::FLOAT), + p2panda_rs::schema::FieldType::String => TypeRef::named(TypeRef::STRING), + p2panda_rs::schema::FieldType::Relation(schema_id) => TypeRef::named(schema_id.to_string()), + p2panda_rs::schema::FieldType::RelationList(schema_id) => { + TypeRef::named_list(schema_id.to_string()) + } + p2panda_rs::schema::FieldType::PinnedRelation(schema_id) => { + TypeRef::named(schema_id.to_string()) + } + p2panda_rs::schema::FieldType::PinnedRelationList(schema_id) => { + TypeRef::named_list(schema_id.to_string()) + } + } +} + /// Downcast document id and document view id from parameters passed up the query fields and /// retrieved via the `ResolverContext`. -pub fn downcast_id_params( +/// +/// We unwrap internally here as we expect validation to have occured in the query resolver. +pub fn downcast_document_id_arguments( ctx: &ResolverContext, ) -> (Option, Option) { ctx.parent_value @@ -42,7 +66,7 @@ pub fn downcast_id_params( .to_owned() } -/// Helper for getting a document from score by either the document id or document view id. +/// Helper for getting a document from the store by either the document id or document view id. pub async fn get_document_from_params( store: &SqlStore, document_id: &Option, diff --git a/aquadoggo/src/network/behaviour.rs b/aquadoggo/src/network/behaviour.rs new file mode 100644 index 000000000..779aaacee --- /dev/null +++ b/aquadoggo/src/network/behaviour.rs @@ -0,0 +1,101 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use anyhow::Result; +use libp2p::identity::Keypair; +use libp2p::swarm::behaviour::toggle::Toggle; +use libp2p::swarm::NetworkBehaviour; +use libp2p::{identify, mdns, ping, rendezvous, PeerId}; +use log::debug; + +use crate::network::config::NODE_NAMESPACE; +use crate::network::NetworkConfiguration; + +/// Network behaviour for the aquadoggo node. +#[derive(NetworkBehaviour)] +pub struct Behaviour { + /// Automatically discover peers on the local network. + pub mdns: Toggle, + + /// Respond to inbound pings and periodically send outbound ping on every established + /// connection. + pub ping: Toggle, + + /// Register with a rendezvous server and query remote peer addresses. + pub rendezvous_client: Toggle, + + /// Serve as a rendezvous point for remote peers to register their external addresses + /// and query the addresses of other peers. + pub rendezvous_server: Toggle, + + /// Periodically exchange information between peer on an established connection. This + /// is useful for learning the external address of the local node from a remote peer. + pub identify: Toggle, +} + +impl Behaviour { + /// Generate a new instance of the composed network behaviour according to + /// the network configuration context. + pub fn new( + network_config: &NetworkConfiguration, + peer_id: PeerId, + key_pair: Keypair, + ) -> Result { + let public_key = key_pair.public(); + + // Create an mDNS behaviour with default configuration if the mDNS flag is set + let mdns = if network_config.mdns { + debug!("mDNS network behaviour enabled"); + Some(mdns::Behaviour::new(Default::default(), peer_id)?) + } else { + None + }; + + // Create a ping behaviour with default configuration if the ping flag is set + let ping = if network_config.ping { + debug!("Ping network behaviour enabled"); + Some(ping::Behaviour::default()) + } else { + None + }; + + // Create a rendezvous client behaviour with default configuration if the rendezvous client + // flag is set + let rendezvous_client = if network_config.rendezvous_client { + debug!("Rendezvous client network behaviour enabled"); + Some(rendezvous::client::Behaviour::new(key_pair)) + } else { + None + }; + + // Create a rendezvous server behaviour with default configuration if the rendezvous server + // flag is set + let rendezvous_server = if network_config.rendezvous_server { + debug!("Rendezvous server network behaviour enabled"); + Some(rendezvous::server::Behaviour::new( + rendezvous::server::Config::default(), + )) + } else { + None + }; + + // Create an identify server behaviour with default configuration if either the rendezvous + // client or server flag is set + let identify = if network_config.rendezvous_client || network_config.rendezvous_server { + debug!("Identify network behaviour enabled"); + Some(identify::Behaviour::new(identify::Config::new( + format!("{NODE_NAMESPACE}/1.0.0"), + public_key, + ))) + } else { + None + }; + + Ok(Self { + mdns: mdns.into(), // Convert the `Option` into a `Toggle` + ping: ping.into(), + rendezvous_client: rendezvous_client.into(), + rendezvous_server: rendezvous_server.into(), + identify: identify.into(), + }) + } +} diff --git a/aquadoggo/src/network/config.rs b/aquadoggo/src/network/config.rs index 63fe04c61..6cde0adc4 100644 --- a/aquadoggo/src/network/config.rs +++ b/aquadoggo/src/network/config.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use anyhow::Result; use libp2p::identity::Keypair; use libp2p::swarm::ConnectionLimits; +use libp2p::{Multiaddr, PeerId}; use log::info; use serde::{Deserialize, Serialize}; @@ -13,6 +14,9 @@ use crate::network::identity::Identity; /// Key pair file name. const KEY_PAIR_FILE_NAME: &str = "private-key"; +/// The namespace used by the `identify` network behaviour. +pub const NODE_NAMESPACE: &str = "aquadoggo"; + /// Network config for the node. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct NetworkConfiguration { @@ -71,7 +75,23 @@ pub struct NetworkConfiguration { pub quic_port: u16, /// The addresses of remote peers to replicate from. - pub remote_peers: Vec, + pub remote_peers: Vec, + + /// Rendezvous client behaviour enabled. + /// + /// Connect to a rendezvous point, register the local node and query addresses of remote peers. + pub rendezvous_client: bool, + + /// Rendezvous server behaviour enabled. + /// + /// Serve as a rendezvous point for peer discovery, allowing peer registration and queries. + pub rendezvous_server: bool, + + /// Address of a rendezvous server in the form of a multiaddress. + pub rendezvous_address: Option, + + /// Peer ID of a rendezvous server. + pub rendezvous_peer_id: Option, } impl Default for NetworkConfiguration { @@ -89,6 +109,10 @@ impl Default for NetworkConfiguration { ping: false, quic_port: 2022, remote_peers: Vec::new(), + rendezvous_client: false, + rendezvous_server: false, + rendezvous_address: None, + rendezvous_peer_id: None, } } } diff --git a/aquadoggo/src/network/mod.rs b/aquadoggo/src/network/mod.rs index f62375b9d..520d3146b 100644 --- a/aquadoggo/src/network/mod.rs +++ b/aquadoggo/src/network/mod.rs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +mod behaviour; mod config; mod identity; mod service; diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index bfdb93f1d..b8c9b4fe8 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -5,57 +5,20 @@ use std::convert::TryInto; use anyhow::Result; use futures::StreamExt; use libp2p::core::muxing::StreamMuxerBox; +use libp2p::multiaddr::Protocol; use libp2p::ping::Event; -use libp2p::swarm::behaviour::toggle::Toggle; -use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; -use libp2p::{mdns, ping, quic, Multiaddr, PeerId, Transport}; +use libp2p::swarm::{AddressScore, SwarmBuilder, SwarmEvent}; +use libp2p::{identify, mdns, quic, rendezvous, Multiaddr, PeerId, Transport}; use log::{debug, info, warn}; use crate::bus::ServiceSender; use crate::context::Context; use crate::manager::{ServiceReadySender, Shutdown}; +use crate::network::behaviour::{Behaviour, BehaviourEvent}; +use crate::network::config::NODE_NAMESPACE; use crate::network::NetworkConfiguration; -/// Network behaviour for the aquadoggo node. -#[derive(NetworkBehaviour)] -struct Behaviour { - /// Automatically discover peers on the local network. - mdns: Toggle, - - /// Respond to inbound pings and periodically send outbound ping on every established - /// connection. - ping: Toggle, -} - -impl Behaviour { - /// Generate a new instance of the composed network behaviour according to - /// the network configuration context. - fn new(network_config: &NetworkConfiguration, peer_id: PeerId) -> Result { - // Create an mDNS behaviour with default configuration if the mDNS flag is set - let mdns = if network_config.mdns { - debug!("mDNS network behaviour enabled"); - Some(mdns::Behaviour::new(Default::default(), peer_id)?) - } else { - None - }; - - // Create a ping behaviour with default configuration if the ping flag is set - let ping = if network_config.ping { - debug!("Ping network behaviour enabled"); - Some(ping::Behaviour::default()) - } else { - None - }; - - Ok(Self { - mdns: mdns.into(), // Convert the `Option` into a `Toggle` - ping: ping.into(), - }) - } -} - -/// Network service that configures and deploys a network swarm over QUIC transports, -/// with mDNS provided for peer discovery on the local network. +/// Network service that configures and deploys a network swarm over QUIC transports. /// /// The swarm listens for incoming connections, dials remote nodes, manages /// connections and executes predefined network behaviours. @@ -88,7 +51,7 @@ pub async fn network_service( // Instantiate the custom network behaviour with default configuration // and the libp2p peer ID - let behaviour = Behaviour::new(&network_config, peer_id)?; + let behaviour = Behaviour::new(&network_config, peer_id, key_pair)?; // Initialise a swarm with QUIC transports, our composed network behaviour // and the default configuration parameters @@ -103,13 +66,20 @@ pub async fn network_service( // Listen for incoming connection requests over the QUIC transport swarm.listen_on(quic_multiaddr)?; - // Dial the peer identified by the multi-address given in the `--remote-node-addresses` if given - if let Some(addr) = network_config.remote_peers.get(0) { - let remote: Multiaddr = addr.parse()?; - swarm.dial(remote)?; + // Dial each peer identified by the multi-address provided via `--remote-node-addresses` if given + for addr in network_config.remote_peers.clone() { + swarm.dial(addr)? + } + + // Dial the peer identified by the multi-address provided via `--rendezvous_address` if given + if let Some(addr) = network_config.rendezvous_address.clone() { + swarm.dial(addr)?; } - // Spawn a task logging swarm events + // Create a cookie holder for the identify service + let mut cookie = None; + + // Spawn a task to handle swarm events let handle = tokio::spawn(async move { loop { match swarm.select_next_some().await { @@ -140,13 +110,36 @@ pub async fn network_service( } => { debug!("ConnectionClosed: {peer_id} {endpoint:?} {num_established} {cause:?}") } + SwarmEvent::ConnectionEstablished { peer_id, .. } + // Match on a connection with the rendezvous server + if network_config.rendezvous_client + // Should be safe to unwrap rendezvous_peer_id because the CLI parser ensures + // it's provided if rendezvous_client is set to true + && network_config.rendezvous_peer_id.unwrap() == peer_id => + { + if let Some(rendezvous_client) = + swarm.behaviour_mut().rendezvous_client.as_mut() + { + debug!( + "Connected to rendezvous point, discovering nodes in '{NODE_NAMESPACE}' namespace ..." + ); + + rendezvous_client.discover( + Some(rendezvous::Namespace::from_static(NODE_NAMESPACE)), + None, + None, + network_config + .rendezvous_peer_id + .expect("Rendezvous server peer ID was provided"), + ); + } + } SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established, .. } => debug!("ConnectionEstablished: {peer_id} {endpoint:?} {num_established}"), - SwarmEvent::Dialing(peer_id) => info!("Dialing: {peer_id}"), SwarmEvent::ExpiredListenAddr { listener_id, @@ -179,7 +172,105 @@ pub async fn network_service( SwarmEvent::OutgoingConnectionError { peer_id, error } => { warn!("OutgoingConnectionError: {peer_id:?} {error:?}") } - }; + SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(event)) => match event { + rendezvous::client::Event::Registered { + namespace, + ttl, + rendezvous_node, + } => { + debug!("Registered for namespace '{namespace}' at rendezvous point {rendezvous_node} for the next {ttl} seconds") + } + rendezvous::client::Event::Discovered { + registrations, + cookie: new_cookie, + .. + } => { + debug!("Rendezvous point responded with peer registration data"); + + cookie.replace(new_cookie); + + for registration in registrations { + for address in registration.record.addresses() { + let peer = registration.record.peer_id(); + debug!("Discovered peer {peer} at {address}"); + + let p2p_suffix = Protocol::P2p(*peer.as_ref()); + let address_with_p2p = if !address + .ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) + { + address.clone().with(p2p_suffix) + } else { + address.clone() + }; + + swarm.dial(address_with_p2p).unwrap(); + } + } + } + rendezvous::client::Event::RegisterFailed(error) => { + warn!("Failed to register with rendezvous point: {error}"); + } + other => debug!("Unhandled rendezvous client event: {other:?}"), + }, + SwarmEvent::Behaviour(BehaviourEvent::RendezvousServer(event)) => match event { + rendezvous::server::Event::PeerRegistered { peer, registration } => { + debug!( + "Peer {peer} registered for namespace '{}'", + registration.namespace + ); + } + rendezvous::server::Event::DiscoverServed { + enquirer, + registrations, + } => { + debug!( + "Served peer {enquirer} with {} registrations", + registrations.len() + ); + } + other => debug!("Unhandled rendezvous server event: {other:?}"), + }, + SwarmEvent::Behaviour(BehaviourEvent::Identify(event)) => { + match event { + identify::Event::Received { peer_id, info } => { + debug!("Received identify information from peer {peer_id}"); + debug!( + "Peer {peer_id} reported local external address: {}", + info.observed_addr + ); + + swarm.add_external_address(info.observed_addr, AddressScore::Infinite); + + // Only attempt registration if the local node is running as a rendezvous client + if network_config.rendezvous_client { + // Once `identify` information is received from a remote peer, the external + // address of the local node is known and registration with the rendezvous + // server can be carried out. + + // We call `as_mut()` on the rendezvous client network behaviour in + // order to get a mutable reference out of the `Toggle` + if let Some (rendezvous_client) = swarm.behaviour_mut().rendezvous_client.as_mut() { + rendezvous_client.register( + rendezvous::Namespace::from_static(NODE_NAMESPACE), + network_config + .rendezvous_peer_id + .expect("Rendezvous server peer ID was provided"), + None, + ); + } + } + } + identify::Event::Sent { peer_id } | identify::Event::Pushed { peer_id } => { + debug!( + "Sent identification information of the local node to peer {peer_id}" + ) + } + identify::Event::Error { peer_id, error } => { + warn!("Failed to identify the remote peer {peer_id}: {error}") + } + } + } + } } }); diff --git a/aquadoggo_cli/Cargo.toml b/aquadoggo_cli/Cargo.toml index 4be9c9d0c..41fd26e10 100644 --- a/aquadoggo_cli/Cargo.toml +++ b/aquadoggo_cli/Cargo.toml @@ -7,6 +7,7 @@ authors = [ "pietgeursen ", "sandreae ", "sophiiistika ", + "glyph ", ] license = "AGPL-3.0-or-later" repository = "https://github.com/p2panda/aquadoggo" @@ -23,6 +24,7 @@ anyhow = "^1.0.62" tokio = { version = "^1.20.1", features = ["full"] } env_logger = "^0.9.0" clap = { version = "^4.1.8", features = ["derive"] } +libp2p = "^0.51.0" [dependencies.aquadoggo] version = "~0.4.0" diff --git a/aquadoggo_cli/README.md b/aquadoggo_cli/README.md index 3fe881943..203b05439 100644 --- a/aquadoggo_cli/README.md +++ b/aquadoggo_cli/README.md @@ -4,6 +4,8 @@ Node server with GraphQL API for the p2panda network. ## Usage +When running the node as a rendezvous client (`--rendezvous-client true`) both the rendezvous address and peer ID must be provided. + ``` Options: -d, --data-dir @@ -28,6 +30,26 @@ Options: [possible values: true, false] + -C, --rendezvous-client + Enable rendezvous client to facilitate peer discovery via a rendezvous server, false by default + + [possible values: true, false] + + -S, --rendezvous-server + Enable rendezvous server to facilitate peer discovery for remote peers, false by default + + [possible values: true, false] + + --rendezvous-address + The IP address of a rendezvous server in the form of a multiaddress. + + eg. --rendezvous-address "/ip4/127.0.0.1/udp/12345/quic-v1" + + --rendezvous-peer-id + The peer ID of a rendezvous server in the form of an Ed25519 key encoded as a raw base58btc multihash. + + eg. --rendezvous-peer-id "12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA" + -h, --help Print help (see a summary with '-h') diff --git a/aquadoggo_cli/src/main.rs b/aquadoggo_cli/src/main.rs index f80a424f3..5d032860e 100644 --- a/aquadoggo_cli/src/main.rs +++ b/aquadoggo_cli/src/main.rs @@ -5,7 +5,9 @@ use std::convert::{TryFrom, TryInto}; use anyhow::Result; use aquadoggo::{Configuration, NetworkConfiguration, Node}; -use clap::Parser; +use clap::error::ErrorKind as ClapErrorKind; +use clap::{CommandFactory, Parser}; +use libp2p::{Multiaddr, PeerId}; #[derive(Parser, Debug)] #[command(name = "aquadoggo Node", version)] @@ -25,7 +27,7 @@ struct Cli { /// URLs of remote nodes to replicate with. #[arg(short, long)] - remote_node_addresses: Vec, + remote_node_addresses: Vec, /// Enable mDNS for peer discovery over LAN (using port 5353), true by default. #[arg(short, long)] @@ -34,6 +36,49 @@ struct Cli { /// Enable ping for connected peers (send and receive ping packets), true by default. #[arg(long)] ping: Option, + + /// Enable rendezvous client to facilitate peer discovery via a rendezvous server, false by default. + #[arg(short = 'C', long)] + rendezvous_client: Option, + + /// Enable rendezvous server to facilitate peer discovery for remote peers, false by default. + #[arg(short = 'S', long)] + rendezvous_server: Option, + + /// The IP address of a rendezvous server in the form of a multiaddress. + /// + /// eg. --rendezvous-address "/ip4/127.0.0.1/udp/12345/quic-v1" + #[arg(long)] + rendezvous_address: Option, + + /// The peer ID of a rendezvous server in the form of an Ed25519 key encoded as a raw + /// base58btc multihash. + /// + /// eg. --rendezvous-peer-id "12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA" + #[arg(long)] + rendezvous_peer_id: Option, +} + +impl Cli { + // Run custom validators on parsed CLI input + fn validate(self) -> Self { + // Ensure rendezvous server address and peer ID are both provided if + // rendezvous client mode has been set to `true`. Both values are required + // to dial the rendezvous server. + if let Some(true) = self.rendezvous_client { + if self.rendezvous_address.is_none() || self.rendezvous_peer_id.is_none() { + // Print a help message about the missing value(s) and exit + Cli::command() + .error( + ClapErrorKind::MissingRequiredArgument, + "'--rendezvous-address' and '--rendezvous-peer-id' must both be provided if '--rendezvous-client true'", + ) + .exit() + } + } + + self + } } impl TryFrom for Configuration { @@ -48,6 +93,10 @@ impl TryFrom for Configuration { ping: cli.ping.unwrap_or(true), quic_port: cli.quic_port.unwrap_or(2022), remote_peers: cli.remote_node_addresses, + rendezvous_client: cli.rendezvous_client.unwrap_or(false), + rendezvous_server: cli.rendezvous_server.unwrap_or(false), + rendezvous_address: cli.rendezvous_address, + rendezvous_peer_id: cli.rendezvous_peer_id, ..NetworkConfiguration::default() }; @@ -59,8 +108,10 @@ impl TryFrom for Configuration { async fn main() { env_logger::init(); - // Parse command line arguments and load configuration - let cli = Cli::parse(); + // Parse command line arguments and run custom validators + let cli = Cli::parse().validate(); + + // Load configuration parameters and apply defaults let config = cli.try_into().expect("Could not load configuration"); // Start p2panda node in async runtime