Skip to content

Commit

Permalink
Schema provider and task (#166)
Browse files Browse the repository at this point in the history
* Add schema service

* Add schema task

* Fix docstring

* Rename to SchemaProvider and add to node context

* Remove temp file helper from this branch

* Remove doctest that uses private items

* Rewrite schema provider without storage provider

* Add comment

* Reset p2panda-rs dep

* Remove custom error type

* Allow dead code for now

* Fix url

* Add test

* Remove unused import

* Add default impl

* Init env_logger when tests start

* Consistent naming and default use

* Update p2panda-rs api

* Add task logging

* Kick off schema task

* Log all the things

* Don't rely on all schemas to be in the provider

* Add tests

* Remove unused import

* Remove unused imports

* Update api after merge

* Consistent logging

* Update changelog

* Add test for new functionality in dependency task

* Move to unreleased section in CHANGELOG

* Minor clean ups

* Bring back receiver to make cov tests work

* Move serde_json back to dev dependencies

* Improve error logging in worker (#194)

* Add an error message to worker error types

* Use Display for QueueItem and TaskInput

* Fix test

* Add entry to CHANGELOG.md

* Use Tokio Mutex

* Update docstrings

* Refactor schema task tests

* Fix comments

* Remove unused dependencies

* Handle document view being deleted while processing task

* Less verbose logging from tasks

The task module from which the log is coming is always displayed in the
beginning of the log line as the log source.

* Formatting and docstrings

* Replace helper function with store call

* Rewrite test using test node

* Improve test fidelity

* Rename module

* Remove unused method

* Extend tests

* Update docstring

* Fix branch for upstream changes

* Oops

Co-authored-by: Andreas Dzialocha <x12@adz.garden>
Co-authored-by: Andreas Dzialocha <adzialocha@users.noreply.github.com>
Co-authored-by: Sam Andreae <contact@samandreae.com>
  • Loading branch information
4 people committed Jul 28, 2022
1 parent bbc55d5 commit 4d2e672
Show file tree
Hide file tree
Showing 18 changed files with 886 additions and 117 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- GraphQL replication service gets and verifies new entries and inserts them into the db [#137](https://github.com/p2panda/aquadoggo/pull/137)
- Add schema task and schema provider that update when new schema views are materialised [#166](https://github.com/p2panda/aquadoggo/pull/166)

### Changed

- Refactor scalars and replication API, replace `graphql-client` with `gql_client` [#184](https://github.com/p2panda/aquadoggo/pull/184)
- Give error types of worker a string for better debugging [#194](https://github.com/p2panda/aquadoggo/pull/194)
- Bump `p2panda-rs` which now supports log id's starting from `0` [#207](https://github.com/p2panda/aquadoggo/pull/207)

### Fixed
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ tower-http = { version = "^0.3.4", default-features = false, features = [
triggered = "^0.1.2"

[dev-dependencies]
ctor = "^0.1.22"
ciborium = "0.2.0"
env_logger = "^0.9.0"
hex = "0.4.3"
Expand Down
16 changes: 12 additions & 4 deletions aquadoggo/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use crate::config::Configuration;
use crate::db::provider::SqlStorage;
use crate::schema::SchemaProvider;

/// Inner data shared across all services.
#[derive(Debug)]
Expand All @@ -14,11 +15,18 @@ pub struct Data {

/// Storage provider with database connection pool.
pub store: SqlStorage,

/// Schema provider gives access to system and application schemas.
pub schema_provider: SchemaProvider,
}

impl Data {
pub fn new(store: SqlStorage, config: Configuration) -> Self {
Self { config, store }
pub fn new(store: SqlStorage, config: Configuration, schema_provider: SchemaProvider) -> Self {
Self {
config,
store,
schema_provider,
}
}
}

Expand All @@ -28,8 +36,8 @@ pub struct Context(pub Arc<Data>);

impl Context {
/// Returns a new instance of `Context`.
pub fn new(store: SqlStorage, config: Configuration) -> Self {
Self(Arc::new(Data::new(store, config)))
pub fn new(store: SqlStorage, config: Configuration, schema_provider: SchemaProvider) -> Self {
Self(Arc::new(Data::new(store, config, schema_provider)))
}
}

Expand Down
114 changes: 113 additions & 1 deletion aquadoggo/src/db/provider.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use async_trait::async_trait;
use p2panda_rs::document::DocumentId;
use p2panda_rs::document::{DocumentId, DocumentViewId};
use p2panda_rs::hash::Hash;
use p2panda_rs::operation::VerifiedOperation;
use p2panda_rs::schema::SchemaId;
use p2panda_rs::storage_provider::errors::OperationStorageError;
use p2panda_rs::storage_provider::traits::StorageProvider;
use sqlx::query_scalar;

Expand Down Expand Up @@ -71,3 +73,113 @@ impl StorageProvider<StorageEntry, StorageLog, VerifiedOperation> for SqlStorage
Ok(hash)
}
}

impl SqlStorage {
/// Returns the schema id for a document view.
///
/// Returns `None` if this document view is not found.
pub async fn get_schema_by_document_view(
&self,
view_id: &DocumentViewId,
) -> StorageProviderResult<Option<SchemaId>> {
let result: Option<String> = query_scalar(
"
SELECT
schema_id
FROM
document_views
WHERE
document_view_id = $1
",
)
.bind(view_id.as_str())
.fetch_optional(&self.pool)
.await
.map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?;

// Unwrap because we expect no invalid schema ids in the db.
Ok(result.map(|id_str| id_str.parse().unwrap()))
}
}

#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use std::str::FromStr;

use p2panda_rs::document::{DocumentView, DocumentViewFields, DocumentViewId};
use p2panda_rs::entry::{LogId, SeqNum};
use p2panda_rs::identity::Author;
use p2panda_rs::operation::{AsOperation, OperationId};
use p2panda_rs::schema::SchemaId;
use p2panda_rs::storage_provider::traits::{AsStorageEntry, EntryStore};
use p2panda_rs::test_utils::constants::SCHEMA_ID;
use p2panda_rs::test_utils::fixtures::random_document_view_id;
use rstest::rstest;

use crate::db::stores::test_utils::{test_db, TestDatabase, TestDatabaseRunner};
use crate::db::traits::DocumentStore;

/// Inserts a `DocumentView` into the db and returns its view id.
async fn insert_document_view(db: &TestDatabase) -> DocumentViewId {
let author = Author::try_from(db.test_data.key_pairs[0].public_key().to_owned()).unwrap();
let entry = db
.store
.get_entry_at_seq_num(&author, &LogId::new(0), &SeqNum::new(1).unwrap())
.await
.unwrap()
.unwrap();
let operation_id: OperationId = entry.hash().into();
let document_view_id: DocumentViewId = operation_id.clone().into();
let document_view = DocumentView::new(
&document_view_id,
&DocumentViewFields::new_from_operation_fields(
&operation_id,
&entry.operation().fields().unwrap(),
),
);
let result = db
.store
.insert_document_view(&document_view, &SchemaId::from_str(SCHEMA_ID).unwrap())
.await;

assert!(result.is_ok());
document_view_id
}

#[rstest]
fn test_get_schema_for_view(
#[from(test_db)]
#[with(1, 1, 1)]
runner: TestDatabaseRunner,
) {
runner.with_db_teardown(|db: TestDatabase| async move {
let document_view_id = insert_document_view(&db).await;
let result = db
.store
.get_schema_by_document_view(&document_view_id)
.await;

assert!(result.is_ok());
assert_eq!(result.unwrap().unwrap().name(), "venue");
});
}

#[rstest]
fn test_get_schema_for_missing_view(
random_document_view_id: DocumentViewId,
#[from(test_db)]
#[with(1, 1, 1)]
runner: TestDatabaseRunner,
) {
runner.with_db_teardown(|db: TestDatabase| async move {
let result = db
.store
.get_schema_by_document_view(&random_document_view_id)
.await;

assert!(result.is_ok());
assert!(result.unwrap().is_none());
});
}
}
6 changes: 3 additions & 3 deletions aquadoggo/src/db/stores/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,9 @@ pub fn with_db_manager_teardown<F: AsyncTestFnWithManager + Send + Sync + 'stati

/// Fixture for constructing a storage provider instance backed by a pre-populated database.
///
/// Returns a `TestDatabaseRunner` which allows to bootstrap a safe async test environment
/// connecting to a database. It makes sure the runner disconnects properly from the connection
/// pool after the test succeeded or even failed.
/// Returns a `TestDatabaseRunner` that bootstraps a safe async test environment connecting to a
/// database. It makes sure the runner disconnects properly from the connection pool after the test
/// succeeded or even failed.
///
/// Passed parameters define what the database should contain. The first entry in each log contains
/// a valid CREATE operation following entries contain duplicate UPDATE operations. If the
Expand Down
24 changes: 24 additions & 0 deletions aquadoggo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,34 @@ mod manager;
mod materializer;
mod node;
mod replication;
mod schema;

#[cfg(test)]
mod test_helpers;

pub use crate::config::Configuration;
pub use crate::replication::ReplicationConfiguration;
pub use node::Node;

/// Init env_logger before the test suite runs to handle logging outputs.
///
/// We output log information using the `log` crate. In itself this doesn't print
/// out any logging information, library users can capture and handle the emitted logs
/// using a log handler. Here we use `env_logger` to handle logs emitted
/// while running our tests.
///
/// This will also capture and output any logs emitted from our dependencies. This behaviour
/// can be customised at runtime. With eg. `RUST_LOG=aquadoggo=info cargo t -- --nocapture` or
/// `RUST_LOG=sqlx=debug cargo t -- --nocapture`.
///
/// The `ctor` crate is used to define a global constructor function. This method will be run
/// before any of the test suites.
#[cfg(test)]
#[ctor::ctor]
fn init() {
// If the `RUST_LOG` env var is not set skip initiation as we don't want
// to see any logs.
if std::env::var("RUST_LOG").is_ok() {
let _ = env_logger::builder().is_test(true).try_init();
}
}
16 changes: 8 additions & 8 deletions aquadoggo/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ impl Drop for Signal {
}
}

// Service manager for orchestration of long-running concurrent processes.
//
// This manager offers a message bus between services for cross-service communication. It also
// sends a shutdown signal to allow services to react to it gracefully.
//
// Stopped services (because of a panic, error or successful return) will send an exit signal which
// can be subscribed to via the `on_exit` method. Usually stopped services indicate system failure
// and it is recommended to stop the application when this events occurs.
/// Service manager for orchestration of long-running concurrent processes.
///
/// This manager offers a message bus between services for cross-service communication. It also
/// sends a shutdown signal to allow services to react to it gracefully.
///
/// Stopped services (because of a panic, error or successful return) will send an exit signal which
/// can be subscribed to via the `on_exit` method. Usually stopped services indicate system failure
/// and it is recommended to stop the application when this events occurs.
pub struct ServiceManager<D, M>
where
D: Clone + Send + Sync + 'static,
Expand Down
18 changes: 18 additions & 0 deletions aquadoggo/src/materializer/input.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::fmt::Display;

use p2panda_rs::document::{DocumentId, DocumentViewId};

/// Input of every task worker containing all information we need to process.
Expand All @@ -26,3 +28,19 @@ impl TaskInput {
}
}
}

impl Display for TaskInput {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let document_id = match &self.document_id {
Some(id) => format!("{}", id),
None => "-".to_string(),
};

let view_id = match &self.document_view_id {
Some(view_id) => format!("{}", view_id),
None => "-".to_string(),
};

write!(f, "<TaskInput {}/{}>", document_id, view_id)
}
}
25 changes: 21 additions & 4 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ mod tests {
use crate::db::stores::test_utils::{send_to_store, test_db, TestDatabase, TestDatabaseRunner};
use crate::db::traits::DocumentStore;
use crate::materializer::{Task, TaskInput};
use crate::schema::SchemaProvider;
use crate::Configuration;

use super::materializer_service;
Expand Down Expand Up @@ -183,7 +184,11 @@ mod tests {
.is_none());

// Prepare arguments for service
let context = Context::new(db.store.clone(), Configuration::default());
let context = Context::new(
db.store.clone(),
Configuration::default(),
SchemaProvider::default(),
);
let shutdown = task::spawn(async {
loop {
// Do this forever .. this means that the shutdown handler will never resolve
Expand Down Expand Up @@ -252,7 +257,11 @@ mod tests {
.unwrap();

// Prepare arguments for service
let context = Context::new(db.store.clone(), Configuration::default());
let context = Context::new(
db.store.clone(),
Configuration::default(),
SchemaProvider::default(),
);
let shutdown = task::spawn(async {
loop {
// Do this forever .. this means that the shutdown handler will never resolve
Expand Down Expand Up @@ -312,7 +321,11 @@ mod tests {
.to_owned();

// Prepare arguments for service
let context = Context::new(db.store.clone(), Configuration::default());
let context = Context::new(
db.store.clone(),
Configuration::default(),
SchemaProvider::default(),
);
let shutdown = task::spawn(async {
loop {
// Do this forever .. this means that the shutdown handler will never resolve
Expand Down Expand Up @@ -434,7 +447,11 @@ mod tests {
// Prepare empty database
runner.with_db_teardown(|db: TestDatabase| async move {
// Prepare arguments for service
let context = Context::new(db.store.clone(), Configuration::default());
let context = Context::new(
db.store.clone(),
Configuration::default(),
SchemaProvider::default(),
);
let shutdown = task::spawn(async {
loop {
// Do this forever .. this means that the shutdown handler will never resolve
Expand Down
Loading

0 comments on commit 4d2e672

Please sign in to comment.