diff --git a/Cargo.lock b/Cargo.lock index a95dde4b8..9f30132cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,6 +95,7 @@ dependencies = [ "crossbeam-queue", "directories", "envy", + "futures", "hex", "http", "hyper", @@ -104,6 +105,7 @@ dependencies = [ "p2panda-rs", "rand 0.8.5", "reqwest", + "rstest", "serde", "serde_json", "sqlformat", @@ -319,9 +321,9 @@ dependencies = [ [[package]] name = "async-process" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83137067e3a2a6a06d67168e49e68a0957d215410473a740cea95a2425c0b7c6" +checksum = "cf2c06e30a24e8c78a3987d07f0930edf76ef35e027e7bdb063fccafdad1f60c" dependencies = [ "async-io", "blocking", @@ -482,16 +484,16 @@ dependencies = [ "tokio", "tokio-tungstenite", "tower", - "tower-http 0.3.0", + "tower-http 0.3.2", "tower-layer", "tower-service", ] [[package]] name = "axum-core" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bdc19781b16e32f8a7200368a336fa4509d4b72ef15dd4e41df5290855ee1e6" +checksum = "da31c0ed7b4690e2c78fe4b880d21cd7db04a346ebc658b4270251b695437f17" dependencies = [ "async-trait", "bytes 1.1.0", @@ -1178,7 +1180,7 @@ dependencies = [ "base16ct", "crypto-bigint 0.3.2", "der 0.5.1", - "ff 0.11.0", + "ff 0.11.1", "generic-array 0.14.5", "group 0.11.0", "rand_core 0.6.3", @@ -1287,9 +1289,9 @@ dependencies = [ [[package]] name = "ff" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2958d04124b9f27f175eaeb9a9f383d026098aa837eadd8ba22c11f13a05b9e" +checksum = "131655483be284720a17d74ff97592b8e76576dc25563148601df2d7c9080924" dependencies = [ "rand_core 0.6.3", "subtle", @@ -1548,7 +1550,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5ac374b108929de78460075f3dc439fa66df9d8fc77e8f12caa5165fcf0c89" dependencies = [ - "ff 0.11.0", + "ff 0.11.1", "rand_core 0.6.3", "subtle", ] @@ -1731,9 +1733,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" dependencies = [ "bytes 1.1.0", "fnv", @@ -1917,9 +1919,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.124" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50" +checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b" [[package]] name = "libm" @@ -1956,9 +1958,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if", "value-bag", @@ -1995,9 +1997,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "memoffset" @@ -2131,9 +2133,9 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.44" +version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" dependencies = [ "autocfg 1.1.0", "num-traits", @@ -2152,9 +2154,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg 1.1.0", "libm", @@ -2245,7 +2247,7 @@ dependencies = [ [[package]] name = "p2panda-rs" version = "0.3.0" -source = "git+https://github.com/p2panda/p2panda?branch=main#e612a63f8d8522541bf441ca0830589ccbc0197e" +source = "git+https://github.com/p2panda/p2panda?branch=aquadoggo-wip#cad5659271c6af3bd4d3ae1496a9db477c6bb193" dependencies = [ "arrayvec 0.5.2", "async-trait", @@ -2308,7 +2310,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" dependencies = [ "lock_api", - "parking_lot_core 0.9.2", + "parking_lot_core 0.9.3", ] [[package]] @@ -2327,9 +2329,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "995f667a6c822200b0433ac218e05582f0e2efa1b922a3fd2fbaadc5f87bab37" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" dependencies = [ "cfg-if", "libc", @@ -2790,6 +2792,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rstest" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d912f35156a3f99a66ee3e11ac2e0b3f34ac85a07e05263d05a7e2c8810d616f" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + [[package]] name = "rust_crypto" version = "0.1.0" @@ -2811,6 +2826,15 @@ dependencies = [ "sha2 0.9.9", ] +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustls" version = "0.19.1" @@ -2865,11 +2889,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "semver" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cb243bdfdb5936c8dc3c45762a19d12ab4550cdc753bc247637d4ec35a040fd" + [[package]] name = "serde" -version = "1.0.136" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" +checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" dependencies = [ "serde_derive", ] @@ -2888,9 +2918,9 @@ dependencies = [ [[package]] name = "serde_bytes" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9" +checksum = "212e73464ebcde48d723aa02eb270ba62eff38a9b732df31f33f1b4e145f3a54" dependencies = [ "serde", ] @@ -2907,9 +2937,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.136" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" +checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" dependencies = [ "proc-macro2", "quote", @@ -2918,9 +2948,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.79" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" dependencies = [ "itoa", "ryu", @@ -2929,9 +2959,9 @@ dependencies = [ [[package]] name = "serde_repr" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98d0516900518c29efa217c298fa1f4e6c6ffc85ae29fd7f4ee48f176e1a9ed5" +checksum = "a2ad84e47328a31223de7fed7a4f5087f2d6ddfe586cf3ca25b7a165bc0a5aed" dependencies = [ "proc-macro2", "quote", @@ -3310,9 +3340,9 @@ checksum = "734676eb262c623cec13c3155096e08d1f8f29adce39ba17948b18dad1e54142" [[package]] name = "syn" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" +checksum = "7ff7c592601f11445996a06f8ad0c27f094a58857c2f89e97974ab9235b92c52" dependencies = [ "proc-macro2", "quote", @@ -3371,18 +3401,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" dependencies = [ "proc-macro2", "quote", @@ -3428,9 +3458,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.18.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f48b6d60512a392e34dbf7fd456249fd2de3c83669ab642e021903f4015185b" +checksum = "dce653fb475565de9f6fb0614b28bca8df2c430c0cf84bcd9c843f15de5414cc" dependencies = [ "bytes 1.1.0", "libc", @@ -3544,9 +3574,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.3.0" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79dd37121c38240c4b4fe6520332406218bbf876f2f690fe9e406020189366fd" +checksum = "e980386f06883cf4d0578d6c9178c81f68b45d77d00f2c2c1bc034b3439c2c56" dependencies = [ "bitflags", "bytes 1.1.0", @@ -3702,9 +3732,9 @@ checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" [[package]] name = "unicode-xid" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" [[package]] name = "unicode_categories" @@ -3757,9 +3787,9 @@ dependencies = [ [[package]] name = "value-bag" -version = "1.0.0-alpha.8" +version = "1.0.0-alpha.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79923f7731dc61ebfba3633098bf3ac533bbd35ccd8c57e7088d9a5eebe0263f" +checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55" dependencies = [ "ctor", "version_check", @@ -3970,9 +4000,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-sys" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5acdd78cb4ba54c0045ac14f62d8f94a03d10047904ae2a40afa1e99d8f70825" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" dependencies = [ "windows_aarch64_msvc", "windows_i686_gnu", @@ -3983,33 +4013,33 @@ dependencies = [ [[package]] name = "windows_aarch64_msvc" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" [[package]] name = "windows_i686_gnu" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" [[package]] name = "windows_i686_msvc" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" [[package]] name = "windows_x86_64_gnu" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" [[package]] name = "windows_x86_64_msvc" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" [[package]] name = "winreg" diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index 4cd3a3fb5..95d779165 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -23,6 +23,7 @@ bamboo-rs-core-ed25519-yasmf = "0.1.1" crossbeam-queue = "0.3.5" directories = "3.0.2" envy = "0.4.2" +futures = "0.3.17" hex = "0.4.3" jsonrpc-v2 = { version = "0.10.1", features = [ "easy-errors", @@ -32,7 +33,8 @@ log = "0.4.14" openssl-probe = "0.1.4" # We can not publish the `aquadoggo` crate yet, since `p2panda-rs` is an # unpublished dependency. -p2panda-rs = { git = "https://github.com/p2panda/p2panda", branch = "main" } +# @TODO: This points at a WIP branch in p2panda_rs which is used as long as things are slightly unstable +p2panda-rs = { git = "https://github.com/p2panda/p2panda", branch = "aquadoggo-wip" } rand = "0.8.4" serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.67" @@ -59,6 +61,7 @@ reqwest = { version = "0.11.9", default-features = false, features = [ "json", "stream", ] } +rstest = "0.12.0" tower-service = "0.3.1" hyper = "0.14.17" http = "0.2.6" diff --git a/aquadoggo/migrations/20220424125654_create-operations.sql b/aquadoggo/migrations/20220424125654_create-operations.sql new file mode 100644 index 000000000..1283ab073 --- /dev/null +++ b/aquadoggo/migrations/20220424125654_create-operations.sql @@ -0,0 +1,32 @@ +-- SPDX-License-Identifier: AGPL-3.0-or-later + +CREATE TABLE IF NOT EXISTS operations_v1 ( + author VARCHAR(64) NOT NULL, + document_id VARCHAR(64) NOT NULL, + operation_id VARCHAR(64) NOT NULL UNIQUE, + entry_hash VARCHAR(68) NOT NULL UNIQUE, + action VARCHAR(16) NOT NULL, + schema_id TEXT NOT NULL, + previous_operations TEXT NULL, + -- FOREIGN KEY(entry_hash) REFERENCES entries(entry_hash), + PRIMARY KEY (operation_id) +); + +-- With the above "previous_operations" column in operations_v1 table we technically no longer need this +-- relation table. Can we remove it or are there reasons it's useful? +CREATE TABLE IF NOT EXISTS previous_operations_v1 ( + parent_operation_id VARCHAR(64) NOT NULL, + child_operation_id VARCHAR(64) NOT NULL, + FOREIGN KEY(parent_operation_id) REFERENCES operations_v1(operation_id) + FOREIGN KEY(child_operation_id) REFERENCES operations_v1(operation_id) +); + +CREATE TABLE IF NOT EXISTS operation_fields_v1 ( + operation_id VARCHAR(64) NOT NULL, + name VARCHAR(128) NOT NULL, + field_type TEXT NOT NULL, + value BLOB NULL, + FOREIGN KEY(operation_id) REFERENCES operations_v1(operation_id) +); + +CREATE INDEX idx_operation_fields_v1 ON operation_fields_v1 (operation_id, name); diff --git a/aquadoggo/migrations/20220502013246_create-documents.sql b/aquadoggo/migrations/20220502013246_create-documents.sql new file mode 100644 index 000000000..086fe76d7 --- /dev/null +++ b/aquadoggo/migrations/20220502013246_create-documents.sql @@ -0,0 +1,21 @@ +-- SPDX-License-Identifier: AGPL-3.0-or-later + +CREATE TABLE IF NOT EXISTS document_view_fields ( + document_view_id VARCHAR(64) NOT NULL, + operation_id VARCHAR(64) NOT NULL, + name VARCHAR(128) NOT NULL +); + +CREATE INDEX idx_document_view_fields ON document_view_fields (document_view_id, operation_id, name); + +CREATE TABLE IF NOT EXISTS document_views ( + document_view_id VARCHAR(64) NOT NULL UNIQUE, + schema_id TEXT NOT NULL, + PRIMARY KEY (document_view_id) +); + +CREATE TABLE IF NOT EXISTS documents ( + document_id VARCHAR(64) NOT NULL UNIQUE, + document_view_id VARCHAR(64) NOT NULL, + PRIMARY KEY (document_id) +); diff --git a/aquadoggo/src/db/errors.rs b/aquadoggo/src/db/errors.rs new file mode 100644 index 000000000..0fd81c507 --- /dev/null +++ b/aquadoggo/src/db/errors.rs @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +/// `OperationStore` errors. +#[derive(thiserror::Error, Debug)] +pub enum OperationStorageError { + /// Catch all error which implementers can use for passing their own errors up the chain. + #[error("Ahhhhh!!!!: {0}")] + Custom(String), +} + +/// `DocumentStore` errors. +#[derive(thiserror::Error, Debug)] +pub enum DocumentViewStorageError { + /// Catch all error which implementers can use for passing their own errors up the chain. + #[error("Ahhhhh!!!!: {0}")] + Custom(String), +} diff --git a/aquadoggo/src/db/mod.rs b/aquadoggo/src/db/mod.rs index 39e4a47f3..22f0e6bea 100644 --- a/aquadoggo/src/db/mod.rs +++ b/aquadoggo/src/db/mod.rs @@ -5,8 +5,12 @@ use sqlx::any::{Any, AnyPool, AnyPoolOptions}; use sqlx::migrate; use sqlx::migrate::MigrateDatabase; +pub mod errors; pub mod models; -pub mod store; +pub mod provider; +pub mod stores; +pub mod traits; +pub mod utils; /// Re-export of generic connection pool type. pub type Pool = AnyPool; diff --git a/aquadoggo/src/db/models/entry.rs b/aquadoggo/src/db/models/entry.rs index 112609316..dfe5580f4 100644 --- a/aquadoggo/src/db/models/entry.rs +++ b/aquadoggo/src/db/models/entry.rs @@ -1,30 +1,17 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use async_trait::async_trait; +use p2panda_rs::entry::{decode_entry, Entry, EntrySigned}; +use p2panda_rs::operation::OperationEncoded; +use p2panda_rs::storage_provider::errors::ValidationError; +use p2panda_rs::Validate; use serde::Serialize; use sqlx::FromRow; -use sqlx::{query, query_as, query_scalar}; - -use p2panda_rs::document::DocumentId; -use p2panda_rs::entry::{decode_entry, Entry as P2PandaEntry, EntrySigned, LogId, SeqNum}; -use p2panda_rs::hash::Hash; -use p2panda_rs::identity::Author; -use p2panda_rs::operation::{Operation, OperationEncoded}; -use p2panda_rs::schema::SchemaId; -use p2panda_rs::storage_provider::errors::{EntryStorageError, ValidationError}; -use p2panda_rs::storage_provider::traits::{AsStorageEntry, EntryStore, StorageProvider}; -use p2panda_rs::Validate; - -use crate::db::models::Log; -use crate::db::store::SqlStorage; -use crate::errors::StorageProviderResult; -use crate::rpc::{EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse}; /// Struct representing the actual SQL row of `Entry`. /// /// We store the u64 integer values of `log_id` and `seq_num` as strings since not all database /// backend support large numbers. -#[derive(FromRow, Debug, Serialize, Clone)] +#[derive(FromRow, Debug, Serialize, Clone, PartialEq)] #[serde(rename_all = "camelCase")] pub struct EntryRow { /// Public key of the author. @@ -49,14 +36,8 @@ pub struct EntryRow { pub seq_num: String, } -impl AsRef for EntryRow { - fn as_ref(&self) -> &Self { - self - } -} - impl EntryRow { - fn entry_decoded(&self) -> P2PandaEntry { + pub fn entry_decoded(&self) -> Entry { // Unwrapping as validation occurs in `EntryWithOperation`. decode_entry(&self.entry_signed(), self.operation_encoded().as_ref()).unwrap() } @@ -70,62 +51,6 @@ impl EntryRow { } } -/// Implement `AsStorageEntry` trait for `Entry` -impl AsStorageEntry for EntryRow { - type AsStorageEntryError = EntryStorageError; - - fn new( - entry_signed: &EntrySigned, - operation_encoded: &OperationEncoded, - ) -> Result { - let entry = decode_entry(entry_signed, Some(operation_encoded)) - .map_err(|e| EntryStorageError::Custom(e.to_string()))?; - - Ok(Self { - author: entry_signed.author().as_str().into(), - entry_bytes: entry_signed.as_str().into(), - entry_hash: entry_signed.hash().as_str().into(), - log_id: entry.log_id().as_u64().to_string(), - payload_bytes: Some(operation_encoded.as_str().to_string()), - payload_hash: entry_signed.payload_hash().as_str().into(), - seq_num: entry.seq_num().as_u64().to_string(), - }) - } - - fn author(&self) -> Author { - Author::new(self.author.as_ref()).unwrap() - } - - fn hash(&self) -> Hash { - self.entry_signed().hash() - } - - fn entry_bytes(&self) -> Vec { - self.entry_signed().to_bytes() - } - - fn backlink_hash(&self) -> Option { - self.entry_decoded().backlink_hash().cloned() - } - - fn skiplink_hash(&self) -> Option { - self.entry_decoded().skiplink_hash().cloned() - } - - fn seq_num(&self) -> SeqNum { - *self.entry_decoded().seq_num() - } - - fn log_id(&self) -> LogId { - *self.entry_decoded().log_id() - } - - fn operation(&self) -> Operation { - let operation_encoded = self.operation_encoded().unwrap(); - Operation::from(&operation_encoded) - } -} - impl Validate for EntryRow { type Error = ValidationError; @@ -139,227 +64,8 @@ impl Validate for EntryRow { } } -/// Trait which handles all storage actions relating to `Entries`. -#[async_trait] -impl EntryStore for SqlStorage { - /// Insert an entry into storage. - async fn insert_entry(&self, entry: EntryRow) -> Result { - let rows_affected = query( - " - INSERT INTO - entries ( - author, - entry_bytes, - entry_hash, - log_id, - payload_bytes, - payload_hash, - seq_num - ) - VALUES - ($1, $2, $3, $4, $5, $6, $7) - ", - ) - .bind(entry.author().as_str()) - .bind(entry.entry_signed().as_str()) - .bind(entry.hash().as_str()) - .bind(entry.log_id().as_u64().to_string()) - .bind(entry.operation_encoded().unwrap().as_str()) - .bind(entry.operation_encoded().unwrap().hash().as_str()) - .bind(entry.seq_num().as_u64().to_string()) - .execute(&self.pool) - .await - .map_err(|e| EntryStorageError::Custom(e.to_string()))? - .rows_affected(); - - Ok(rows_affected == 1) - } - - /// Returns entry at sequence position within an author's log. - async fn entry_at_seq_num( - &self, - author: &Author, - log_id: &LogId, - seq_num: &SeqNum, - ) -> Result, EntryStorageError> { - let entry_row = query_as::<_, EntryRow>( - " - SELECT - author, - entry_bytes, - entry_hash, - log_id, - payload_bytes, - payload_hash, - seq_num - FROM - entries - WHERE - author = $1 - AND log_id = $2 - AND seq_num = $3 - ", - ) - .bind(author.as_str()) - .bind(log_id.as_u64().to_string()) - .bind(seq_num.as_u64().to_string()) - .fetch_optional(&self.pool) - .await - .map_err(|e| EntryStorageError::Custom(e.to_string()))?; - - Ok(entry_row) - } - - /// Returns the latest Bamboo entry of an author's log. - async fn latest_entry( - &self, - author: &Author, - log_id: &LogId, - ) -> Result, EntryStorageError> { - let entry_row = query_as::<_, EntryRow>( - " - SELECT - author, - entry_bytes, - entry_hash, - log_id, - payload_bytes, - payload_hash, - seq_num - FROM - entries - WHERE - author = $1 - AND log_id = $2 - ORDER BY - seq_num DESC - LIMIT - 1 - ", - ) - .bind(author.as_str()) - .bind(log_id.as_u64().to_string()) - .fetch_optional(&self.pool) - .await - .map_err(|e| EntryStorageError::Custom(e.to_string()))?; - - Ok(entry_row) - } - - /// Return vector of all entries of a given schema - async fn by_schema(&self, schema: &SchemaId) -> Result, EntryStorageError> { - let entries = query_as::<_, EntryRow>( - " - SELECT - entries.author, - entries.entry_bytes, - entries.entry_hash, - entries.log_id, - entries.payload_bytes, - entries.payload_hash, - entries.seq_num - FROM - entries - INNER JOIN logs - ON (entries.log_id = logs.log_id - AND entries.author = logs.author) - WHERE - logs.schema = $1 - ", - ) - .bind(schema.as_str()) - .fetch_all(&self.pool) - .await - .map_err(|e| EntryStorageError::Custom(e.to_string()))?; - - Ok(entries) - } -} - -/// All other methods needed to be implemented by a p2panda `StorageProvider` -#[async_trait] -impl StorageProvider for SqlStorage { - type EntryArgsResponse = EntryArgsResponse; - type EntryArgsRequest = EntryArgsRequest; - type PublishEntryResponse = PublishEntryResponse; - type PublishEntryRequest = PublishEntryRequest; - - /// Returns the related document for any entry. - /// - /// Every entry is part of a document and, through that, associated with a specific log id used - /// by this document and author. This method returns that document id by looking up the log - /// that the entry was stored in. - async fn get_document_by_entry( - &self, - entry_hash: &Hash, - ) -> StorageProviderResult> { - let result: Option = query_scalar( - " - SELECT - logs.document - FROM - logs - INNER JOIN entries - ON (logs.log_id = entries.log_id - AND logs.author = entries.author) - WHERE - entries.entry_hash = $1 - ", - ) - .bind(entry_hash.as_str()) - .fetch_optional(&self.pool) - .await?; - - // Unwrap here since we already validated the hash - let hash = result.map(|str| { - Hash::new(&str) - .expect("Corrupt hash found in database") - .into() - }); - - Ok(hash) - } -} - -#[cfg(test)] -mod tests { - use p2panda_rs::entry::LogId; - use p2panda_rs::hash::Hash; - use p2panda_rs::identity::Author; - use p2panda_rs::schema::SchemaId; - use p2panda_rs::storage_provider::traits::EntryStore; - - use crate::db::store::SqlStorage; - use crate::test_helpers::initialize_db; - - const TEST_AUTHOR: &str = "1a8a62c5f64eed987326513ea15a6ea2682c256ac57a418c1c92d96787c8b36e"; - - #[tokio::test] - async fn latest_entry() { - let pool = initialize_db().await; - let storage_provider = SqlStorage { pool }; - - let author = Author::new(TEST_AUTHOR).unwrap(); - let log_id = LogId::new(1); - - let latest_entry = storage_provider - .latest_entry(&author, &log_id) - .await - .unwrap(); - assert!(latest_entry.is_none()); - } - - #[tokio::test] - async fn entries_by_schema() { - let pool = initialize_db().await; - let storage_provider = SqlStorage { pool }; - - let schema = SchemaId::new_application( - "venue", - &Hash::new_from_bytes(vec![1, 2, 3]).unwrap().into(), - ); - - let entries = storage_provider.by_schema(&schema).await.unwrap(); - assert!(entries.len() == 0); +impl AsRef for EntryRow { + fn as_ref(&self) -> &Self { + self } } diff --git a/aquadoggo/src/db/models/log.rs b/aquadoggo/src/db/models/log.rs index a4f4382cd..88696eeb7 100644 --- a/aquadoggo/src/db/models/log.rs +++ b/aquadoggo/src/db/models/log.rs @@ -1,19 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use std::str::FromStr; - -use async_trait::async_trait; use sqlx::FromRow; -use sqlx::{query, query_scalar}; - -use p2panda_rs::document::DocumentId; -use p2panda_rs::entry::LogId; -use p2panda_rs::identity::Author; -use p2panda_rs::schema::SchemaId; -use p2panda_rs::storage_provider::errors::LogStorageError; -use p2panda_rs::storage_provider::traits::{AsStorageLog, LogStore}; - -use crate::db::store::SqlStorage; /// Tracks the assigment of an author's logs to documents and records their schema. /// @@ -23,7 +10,7 @@ use crate::db::store::SqlStorage; /// We store the u64 integer values of `log_id` as a string here since not all database backends /// support large numbers. #[derive(FromRow, Debug, Clone)] -pub struct Log { +pub struct LogRow { /// Public key of the author. pub author: String, @@ -36,357 +23,3 @@ pub struct Log { /// SchemaId which identifies the schema for operations in this log. pub schema: String, } - -impl AsStorageLog for Log { - fn new(author: &Author, schema: &SchemaId, document: &DocumentId, log_id: &LogId) -> Self { - Self { - author: author.as_str().to_string(), - log_id: log_id.as_u64().to_string(), - document: document.as_str().to_string(), - schema: schema.as_str(), - } - } - - fn author(&self) -> Author { - Author::new(&self.author).unwrap() - } - fn id(&self) -> LogId { - LogId::from_str(&self.log_id).unwrap() - } - fn document_id(&self) -> DocumentId { - let document_id: DocumentId = self.document.parse().unwrap(); - document_id - } - fn schema_id(&self) -> SchemaId { - SchemaId::new(&self.schema).unwrap() - } -} - -/// Trait which handles all storage actions relating to `Log`s. -#[async_trait] -impl LogStore for SqlStorage { - /// Insert a log into storage. - async fn insert_log(&self, log: Log) -> Result { - let rows_affected = query( - " - INSERT INTO - logs (author, log_id, document, schema) - VALUES - ($1, $2, $3, $4) - ", - ) - .bind(log.author().as_str()) - .bind(log.id().as_u64().to_string()) - .bind(log.document_id().as_str()) - .bind(log.schema_id().as_str()) - .execute(&self.pool) - .await - .map_err(|e| LogStorageError::Custom(e.to_string()))? - .rows_affected(); - - Ok(rows_affected == 1) - } - - /// Get a log from storage - async fn get( - &self, - author: &Author, - document_id: &DocumentId, - ) -> Result, LogStorageError> { - let result: Option = query_scalar( - " - SELECT - log_id - FROM - logs - WHERE - author = $1 - AND document = $2 - ", - ) - .bind(author.as_str()) - .bind(document_id.as_str()) - .fetch_optional(&self.pool) - .await - .map_err(|e| LogStorageError::Custom(e.to_string()))?; - - // Wrap u64 inside of `P2PandaLog` instance - let log_id: Option = - result.map(|str| str.parse().expect("Corrupt u64 integer found in database")); - - Ok(log_id) - } - - /// Determines the next unused log_id of an author. - async fn next_log_id(&self, author: &Author) -> Result { - // Get all log ids from this author - let mut result: Vec = query_scalar( - " - SELECT - log_id - FROM - logs - WHERE - author = $1 - ", - ) - .bind(author.as_str()) - .fetch_all(&self.pool) - .await - .map_err(|e| LogStorageError::Custom(e.to_string()))?; - - // Convert all strings representing u64 integers to `LogId` instances - let mut log_ids: Vec = result - .iter_mut() - .map(|str| str.parse().expect("Corrupt u64 integer found in database")) - .collect(); - - // The log id selection below expects log ids in sorted order. We can't easily use SQL - // for this because log IDs are stored as `VARCHAR`, which doesn't sort numbers correctly. - // A good solution would not require reading all existing log ids to find the next - // available one. See this issue: https://github.com/p2panda/aquadoggo/issues/67 - log_ids.sort(); - - // Find next unused document log by comparing the sequence of known log ids with an - // sequence of subsequent log ids until we find a gap. - let mut next_log_id = LogId::default(); - - for log_id in log_ids.iter() { - // Success! Found unused log id - if next_log_id != *log_id { - break; - } - - // Otherwise, try next possible log id - next_log_id = next_log_id.next().unwrap(); - } - - Ok(next_log_id) - } -} - -#[cfg(test)] -mod tests { - use std::convert::TryFrom; - - use p2panda_rs::document::DocumentViewId; - use p2panda_rs::entry::{sign_and_encode, Entry as P2PandaEntry, LogId, SeqNum}; - use p2panda_rs::hash::Hash; - use p2panda_rs::identity::{Author, KeyPair}; - use p2panda_rs::operation::{Operation, OperationEncoded, OperationFields, OperationValue}; - use p2panda_rs::schema::SchemaId; - use p2panda_rs::storage_provider::traits::{ - AsStorageEntry, AsStorageLog, EntryStore, LogStore, StorageProvider, - }; - - use crate::db::models::{EntryRow, Log}; - use crate::db::store::SqlStorage; - use crate::test_helpers::{initialize_db, random_entry_hash}; - - const TEST_AUTHOR: &str = "58223678ab378f1b07d1d8c789e6da01d16a06b1a4d17cc10119a0109181156c"; - - #[tokio::test] - async fn initial_log_id() { - let pool = initialize_db().await; - - let author = Author::new(TEST_AUTHOR).unwrap(); - - let storage_provider = SqlStorage { pool }; - - let log_id = storage_provider - .find_document_log_id(&author, None) - .await - .unwrap(); - - assert_eq!(log_id, LogId::new(1)); - } - - #[tokio::test] - async fn prevent_duplicate_log_ids() { - let pool = initialize_db().await; - let storage_provider = SqlStorage { pool }; - - let author = Author::new(TEST_AUTHOR).unwrap(); - let document = Hash::new(&random_entry_hash()).unwrap(); - let schema = - SchemaId::new_application("venue", &Hash::new(&random_entry_hash()).unwrap().into()); - - let log = Log::new(&author, &schema, &document.clone().into(), &LogId::new(1)); - assert!(storage_provider.insert_log(log).await.is_ok()); - - let log = Log::new(&author, &schema, &document.into(), &LogId::new(1)); - assert!(storage_provider.insert_log(log).await.is_err()); - } - - #[tokio::test] - async fn with_multi_hash_schema_id() { - let pool = initialize_db().await; - let storage_provider = SqlStorage { pool }; - - let author = Author::new(TEST_AUTHOR).unwrap(); - let document = Hash::new(&random_entry_hash()).unwrap(); - let schema = SchemaId::new_application( - "venue", - &DocumentViewId::new(&[ - Hash::new(&random_entry_hash()).unwrap().into(), - Hash::new(&random_entry_hash()).unwrap().into(), - ]), - ); - - let log = Log::new(&author, &schema, &document.into(), &LogId::new(1)); - - assert!(storage_provider.insert_log(log).await.is_ok()); - } - - #[tokio::test] - async fn selecting_next_log_id() { - let pool = initialize_db().await; - let key_pair = KeyPair::new(); - let author = Author::try_from(*key_pair.public_key()).unwrap(); - let schema = SchemaId::new_application( - "venue", - &Hash::new_from_bytes(vec![1, 2, 3]).unwrap().into(), - ); - - let storage_provider = SqlStorage { pool }; - - let log_id = storage_provider - .find_document_log_id(&author, None) - .await - .unwrap(); - - // We expect to be given the next log id when asking for a possible log id for a new - // document by the same author - assert_eq!(log_id, LogId::default()); - - // Starting with an empty db, we expect to be able to count up from 1 and expect each - // inserted document's log id to be euqal to the count index - for n in 1..12 { - let doc = Hash::new_from_bytes(vec![1, 2, n]).unwrap().into(); - - let log_id = storage_provider - .find_document_log_id(&author, None) - .await - .unwrap(); - assert_eq!(LogId::new(n.into()), log_id); - let log = Log::new(&author, &schema, &doc, &log_id); - storage_provider.insert_log(log).await.unwrap(); - } - } - - #[tokio::test] - async fn document_log_id() { - let pool = initialize_db().await; - - // Create a new document - // TODO: use p2panda-rs test utils once available - let key_pair = KeyPair::new(); - let author = Author::try_from(*key_pair.public_key()).unwrap(); - let log_id = LogId::new(1); - let schema = SchemaId::new_application( - "venue", - &Hash::new_from_bytes(vec![1, 2, 3]).unwrap().into(), - ); - let seq_num = SeqNum::new(1).unwrap(); - let mut fields = OperationFields::new(); - fields - .add("test", OperationValue::Text("Hello".to_owned())) - .unwrap(); - let operation = Operation::new_create(schema.clone(), fields).unwrap(); - let operation_encoded = OperationEncoded::try_from(&operation).unwrap(); - let entry = P2PandaEntry::new(&log_id, Some(&operation), None, None, &seq_num).unwrap(); - let entry_encoded = sign_and_encode(&entry, &key_pair).unwrap(); - - let storage_provider = SqlStorage { pool }; - - // Expect database to return nothing yet - assert_eq!( - storage_provider - .get_document_by_entry(&entry_encoded.hash()) - .await - .unwrap(), - None - ); - - let entry = EntryRow::new(&entry_encoded.clone(), &operation_encoded).unwrap(); - - // Store entry in database - assert!(storage_provider.insert_entry(entry).await.is_ok()); - - let log = Log::new( - &author, - &schema, - &entry_encoded.hash().into(), - &LogId::new(1), - ); - - // Store log in database - assert!(storage_provider.insert_log(log).await.is_ok()); - - // Expect to find document in database. The document hash should be the same as the hash of - // the entry which referred to the `CREATE` operation. - assert_eq!( - storage_provider - .get_document_by_entry(&entry_encoded.hash()) - .await - .unwrap(), - Some(entry_encoded.hash().into()) - ); - - // We expect to find this document in the default log - assert_eq!( - storage_provider - .find_document_log_id(&author, Some(&entry_encoded.hash().into())) - .await - .unwrap(), - LogId::default() - ); - } - - #[tokio::test] - async fn log_ids() { - let pool = initialize_db().await; - - // Mock author - let author = Author::new(TEST_AUTHOR).unwrap(); - - // Mock schema - let schema = - SchemaId::new_application("venue", &Hash::new(&random_entry_hash()).unwrap().into()); - - // Mock four different document hashes - let document_first = Hash::new(&random_entry_hash()).unwrap(); - let document_second = Hash::new(&random_entry_hash()).unwrap(); - let document_third = Hash::new(&random_entry_hash()).unwrap(); - let document_forth = Hash::new(&random_entry_hash()).unwrap(); - - let storage_provider = SqlStorage { pool }; - - // Register two log ids at the beginning - let log_1 = Log::new(&author, &schema, &document_first.into(), &LogId::new(1)); - let log_2 = Log::new(&author, &schema, &document_second.into(), &LogId::new(2)); - - storage_provider.insert_log(log_1).await.unwrap(); - storage_provider.insert_log(log_2).await.unwrap(); - - // Find next free log id and register it - let log_id = storage_provider.next_log_id(&author).await.unwrap(); - assert_eq!(log_id, LogId::new(3)); - - let log_3 = Log::new(&author, &schema, &document_third.into(), &log_id); - - storage_provider.insert_log(log_3).await.unwrap(); - - // Find next free log id and register it - let log_id = storage_provider.next_log_id(&author).await.unwrap(); - assert_eq!(log_id, LogId::new(4)); - - let log_4 = Log::new(&author, &schema, &document_forth.into(), &log_id); - - storage_provider.insert_log(log_4).await.unwrap(); - - // Find next free log id - let log_id = storage_provider.next_log_id(&author).await.unwrap(); - assert_eq!(log_id, LogId::new(5)); - } -} diff --git a/aquadoggo/src/db/models/mod.rs b/aquadoggo/src/db/models/mod.rs index a8dfd7301..8b9a6fc28 100644 --- a/aquadoggo/src/db/models/mod.rs +++ b/aquadoggo/src/db/models/mod.rs @@ -1,7 +1,5 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -mod entry; -mod log; - -pub use self::log::Log; -pub use entry::EntryRow; +pub mod entry; +pub mod log; +pub mod operation; diff --git a/aquadoggo/src/db/models/operation.rs b/aquadoggo/src/db/models/operation.rs new file mode 100644 index 000000000..5ea5f8002 --- /dev/null +++ b/aquadoggo/src/db/models/operation.rs @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use sqlx::FromRow; + +/// A struct representing a single operation row as it is inserted in the database. +#[derive(FromRow, Debug)] +pub struct OperationRow { + /// The author of this operation. + pub author: String, + + /// The id of the document this operation is part of. + pub document_id: String, + + /// The id of this operation. + pub operation_id: String, + + /// The action type this operation is performing. + pub action: String, + + /// The hash of the entry this operation is associated with. + pub entry_hash: String, + + /// The id of the schema this operation follows. + pub schema_id: String, + + /// The previous operations of this operation concatenated into string format with `_` seperator. + pub previous_operations: String, +} + +/// A struct representing a single previous operation relation row as it is inserted in the database. +#[derive(FromRow, Debug)] +pub struct PreviousOperationRelationRow { + /// The parent in this operation relation. This is the operation + /// being appended to, it lies nearer the root in a graph structure. + parent_operation_id: String, + + /// The child in this operation relation. This is the operation + /// which has a depenency on the parent, it lies nearer the tip/leaves + /// in a graph structure. + child_operation_id: String, +} + +/// A struct representing a single operation field row as it is inserted in the database. +#[derive(FromRow, Debug)] +pub struct OperationFieldRow { + /// The id of the operation this field was published on. + pub operation_id: String, + + /// The name of this field. + pub name: String, + + /// The type of this field. + pub field_type: String, + + /// The actual value contained in this field. + pub value: String, +} + diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs new file mode 100644 index 000000000..c85c21e52 --- /dev/null +++ b/aquadoggo/src/db/provider.rs @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_trait::async_trait; +use p2panda_rs::document::DocumentId; +use p2panda_rs::hash::Hash; +use p2panda_rs::storage_provider::traits::StorageProvider; +use sqlx::query_scalar; + +use crate::db::models::entry::EntryRow; +use crate::db::models::log::LogRow; +use crate::db::Pool; +use crate::errors::StorageProviderResult; +use crate::rpc::{EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse}; + +pub struct SqlStorage { + pub(crate) pool: Pool, +} + +/// All other methods needed to be implemented by a p2panda `StorageProvider` +#[async_trait] +impl StorageProvider for SqlStorage { + type EntryArgsResponse = EntryArgsResponse; + type EntryArgsRequest = EntryArgsRequest; + type PublishEntryResponse = PublishEntryResponse; + type PublishEntryRequest = PublishEntryRequest; + + /// Returns the related document for any entry. + /// + /// Every entry is part of a document and, through that, associated with a specific log id used + /// by this document and author. This method returns that document id by looking up the log + /// that the entry was stored in. + async fn get_document_by_entry( + &self, + entry_hash: &Hash, + ) -> StorageProviderResult> { + let result: Option = query_scalar( + " + SELECT + logs.document + FROM + logs + INNER JOIN entries + ON (logs.log_id = entries.log_id + AND logs.author = entries.author) + WHERE + entries.entry_hash = $1 + ", + ) + .bind(entry_hash.as_str()) + .fetch_optional(&self.pool) + .await?; + + // Unwrap here since we already validated the hash + let hash = result.map(|str| { + Hash::new(&str) + .expect("Corrupt hash found in database") + .into() + }); + + Ok(hash) + } +} diff --git a/aquadoggo/src/db/store.rs b/aquadoggo/src/db/store.rs deleted file mode 100644 index 70437c1ca..000000000 --- a/aquadoggo/src/db/store.rs +++ /dev/null @@ -1,23 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later - -use async_trait::async_trait; -use sqlx::{query, query_as, query_scalar}; - -use p2panda_rs::document::DocumentId; -use p2panda_rs::entry::SeqNum; -use p2panda_rs::hash::Hash; -use p2panda_rs::schema::SchemaId; -use p2panda_rs::storage_provider::errors as p2panda_errors; -use p2panda_rs::storage_provider::traits::{ - AsStorageEntry, AsStorageLog, EntryStore, LogStore, StorageProvider, -}; -use p2panda_rs::{entry::LogId, identity::Author}; - -use crate::db::models::{EntryRow, Log}; -use crate::db::Pool; -use crate::errors::StorageProviderResult; -use crate::rpc::{EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse}; - -pub struct SqlStorage { - pub(crate) pool: Pool, -} diff --git a/aquadoggo/src/db/stores/document.rs b/aquadoggo/src/db/stores/document.rs new file mode 100644 index 000000000..ef1ecf857 --- /dev/null +++ b/aquadoggo/src/db/stores/document.rs @@ -0,0 +1,331 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::collections::btree_map::Iter; + +use async_trait::async_trait; +use futures::future::try_join_all; +use p2panda_rs::document::{DocumentView, DocumentViewHash, DocumentViewId}; +use p2panda_rs::operation::OperationValue; +use p2panda_rs::schema::SchemaId; +use sqlx::{query, query_as}; + +use crate::db::errors::DocumentViewStorageError; +use crate::db::models::operation::OperationFieldRow; +use crate::db::provider::SqlStorage; +use crate::db::traits::{ + AsStorageDocumentView, DocumentStore, DocumentViewFields, FieldIds, FieldName, +}; +use crate::db::utils::parse_operation_fields; + +/// Aquadoggo struct which will implement AsStorageDocumentView trait. +#[derive(Debug, Clone)] +pub struct DocumentViewStorage { + document_view: DocumentView, + field_ids: FieldIds, + schema_id: SchemaId, +} + +impl DocumentViewStorage { + pub fn new(document_view: &DocumentView, field_ids: &FieldIds, schema_id: &SchemaId) -> Self { + Self { + document_view: document_view.clone(), + field_ids: field_ids.clone(), + schema_id: schema_id.clone(), + } + } +} + +impl AsStorageDocumentView for DocumentViewStorage { + type AsStorageDocumentViewError = DocumentViewStorageError; + + fn id(&self) -> DocumentViewId { + self.document_view.id().clone() + } + + fn iter(&self) -> Iter { + self.document_view.iter() + } + + fn get(&self, key: &str) -> Option<&OperationValue> { + self.document_view.get(key) + } + + fn schema_id(&self) -> SchemaId { + self.schema_id.clone() + } + + fn field_ids(&self) -> FieldIds { + self.field_ids.clone() + } +} + +#[async_trait] +impl DocumentStore for SqlStorage { + /// Insert a document_view into the db. Requires that all relevent operations are already in + /// the db as this method only creates relations between document view fields and their current + /// values (last updated operation value). + /// + /// QUESTION: Is this too implementation specific? It assumes quite a lot about the db + /// structure and others may wish to structure things differently. + async fn insert_document_view( + &self, + document_view_id: &DocumentViewId, + field_ids: &FieldIds, + schema_id: &SchemaId, + ) -> Result { + // OBSERVATIONS: + // - we need to know which operation was LWW for each field. + // - this is different from knowing the document view id, which is + // just the tip(s) of the graph, and will likely not contain a value + // for every field. + // - we could record the operation id for each value when we build the + // document + // - alternatively we could do some dynamic "reverse" graph traversal + // starting from the document view id. This would require + // implementing some new traversal logic (maybe it is already underway + // somewhere? I remember @cafca working on this a while ago). + // - we could also pass in a full list of already sorted operations, + // these are already stored on `Document` so could be re-used from + // there. + + // Insert document view field relations into the db + let field_relations_inserted = try_join_all(field_ids.iter().map(|field| { + query( + " + INSERT INTO + document_view_fields ( + document_view_id, + operation_id, + name + ) + VALUES + ($1, $2, $3) + ", + ) + .bind(document_view_id.as_str()) + .bind(field.1.as_str().to_owned()) + .bind(field.0.as_str()) + .execute(&self.pool) + })) + .await + .map_err(|e| DocumentViewStorageError::Custom(e.to_string()))? + .iter() + .try_for_each(|result| { + if result.rows_affected() == 1 { + Ok(()) + } else { + Err(DocumentViewStorageError::Custom(format!( + "Incorrect rows affected: {}", + result.rows_affected() + ))) + } + }) + .is_ok(); + + // Insert document view fields into the db + let document_view_inserted = query( + " + INSERT INTO + document_views ( + document_view_id, + schema_id + ) + VALUES + ($1, $2) + ", + ) + .bind(document_view_id.as_str()) + .bind(schema_id.as_str()) + .execute(&self.pool) + .await + .map_err(|e| DocumentViewStorageError::Custom(e.to_string()))? + .rows_affected() + == 1; + + Ok(field_relations_inserted && document_view_inserted) + } + + /// Get a document view from the database by it's id. + /// + /// Currently returns a map of document view fields as FieldName -> OperationValue. + /// This can be specified more shortly. + async fn get_document_view_by_id( + &self, + id: &DocumentViewId, + ) -> Result { + // Store the document view id in its hashed form + let document_view_hash = DocumentViewHash::from(id); + + let document_view_field_rows = query_as::<_, OperationFieldRow>( + " + SELECT + document_view_fields.name, + operation_fields_v1.operation_id, + operation_fields_v1.field_type, + operation_fields_v1.value + FROM + document_view_fields + LEFT JOIN operation_fields_v1 + ON + operation_fields_v1.operation_id = document_view_fields.operation_id + AND + operation_fields_v1.name = document_view_fields.name + WHERE + document_view_id = $1 + ", + ) + .bind(document_view_hash.as_str()) + .fetch_all(&self.pool) + .await + .map_err(|e| DocumentViewStorageError::Custom(e.to_string()))?; + + Ok(parse_operation_fields(document_view_field_rows)) + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use p2panda_rs::document::{DocumentId, DocumentViewId}; + use p2panda_rs::identity::Author; + use p2panda_rs::operation::{ + AsOperation, Operation, OperationFields, OperationId, OperationValue, + }; + use p2panda_rs::schema::SchemaId; + use p2panda_rs::test_utils::constants::{DEFAULT_HASH, TEST_SCHEMA_ID}; + + use crate::db::provider::SqlStorage; + use crate::db::stores::operation::OperationStorage; + use crate::db::stores::test_utils::test_operation; + use crate::db::traits::OperationStore; + use crate::test_helpers::initialize_db; + + use super::{DocumentStore, FieldIds}; + + const TEST_AUTHOR: &str = "1a8a62c5f64eed987326513ea15a6ea2682c256ac57a418c1c92d96787c8b36e"; + + #[tokio::test] + async fn insert_document_view() { + let pool = initialize_db().await; + let storage_provider = SqlStorage { pool }; + + let operation_id = OperationId::new(DEFAULT_HASH.parse().unwrap()); + let operation = test_operation(); + let document_view_id: DocumentViewId = operation_id.clone().into(); + let schema_id = SchemaId::from_str(TEST_SCHEMA_ID).unwrap(); + + let mut field_ids = FieldIds::new(); + + operation.fields().unwrap().keys().iter().for_each(|key| { + field_ids.insert(key.clone(), operation_id.clone()); + }); + + let result = storage_provider + .insert_document_view(&document_view_id, &field_ids, &schema_id) + .await; + + assert!(result.unwrap()); + } + + #[tokio::test] + async fn get_document_view() { + let pool = initialize_db().await; + let storage_provider = SqlStorage { pool }; + let author = Author::new(TEST_AUTHOR).unwrap(); + + // Fake id for our first operation. + let operation_id = OperationId::new(DEFAULT_HASH.parse().unwrap()); + + // Coresponding document id. + let document_id = DocumentId::new(operation_id.clone()); + + // The test CREATE operation which contains all fields. + let operation = test_operation(); + + // The document view now is just this one operation. + let document_view_id: DocumentViewId = operation_id.clone().into(); + + let schema_id = SchemaId::from_str(TEST_SCHEMA_ID).unwrap(); + + // Init the field ids. + let mut field_ids = FieldIds::new(); + + // Right now every field is derived from the CREATE operation, so they will all contain that id + operation.fields().unwrap().keys().iter().for_each(|key| { + field_ids.insert(key.clone(), operation_id.clone()); + }); + + // Construct a doggo operation for publishing. + let doggo_operation = + OperationStorage::new(&author, &operation, &operation_id, &document_id); + + // Insert the CREATE op. + storage_provider + .insert_operation(&doggo_operation) + .await + .unwrap(); + + // Insert the document view, passing in the field_ids created above. + storage_provider + .insert_document_view(&document_view_id, &field_ids, &schema_id) + .await + .unwrap(); + + // Retrieve the document view. + let result = storage_provider + .get_document_view_by_id(&document_view_id) + .await; + + println!("{:#?}", result); + + // Construct an UPDATE operation which only updates one field. + let mut fields = OperationFields::new(); + fields + .add("username", OperationValue::Text("yahoooo".to_owned())) + .unwrap(); + let update_operation = Operation::new_update( + SchemaId::from_str(TEST_SCHEMA_ID).unwrap(), + vec![operation_id], + fields, + ) + .unwrap(); + + // Give it a dummy id. + let update_operation_id = OperationId::new( + "0020cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" + .parse() + .unwrap(), + ); + + // Update the field_ids to include the newly update operation_id for the "username" field. + field_ids.insert("username".to_string(), update_operation_id.clone()); + let doggo_update_operation = OperationStorage::new( + &author, + &update_operation, + &update_operation_id, + &document_id, + ); + + // Insert the operation. + storage_provider + .insert_operation(&doggo_update_operation) + .await + .unwrap(); + + // Update the document view. + storage_provider + .insert_document_view(&update_operation_id.clone().into(), &field_ids, &schema_id) + .await + .unwrap(); + + // Query the new document view. + // + // It will combine the origin fields with the newly updated "username" field and return the completed fields. + let result = storage_provider + .get_document_view_by_id(&update_operation_id.into()) + .await; + + println!("{:#?}", result) + } +} diff --git a/aquadoggo/src/db/stores/entry.rs b/aquadoggo/src/db/stores/entry.rs new file mode 100644 index 000000000..93c5e4d8f --- /dev/null +++ b/aquadoggo/src/db/stores/entry.rs @@ -0,0 +1,584 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_trait::async_trait; +use bamboo_rs_core_ed25519_yasmf::lipmaa; +use sqlx::{query, query_as}; + +use p2panda_rs::entry::{decode_entry, EntrySigned, LogId, SeqNum}; +use p2panda_rs::hash::Hash; +use p2panda_rs::identity::Author; +use p2panda_rs::operation::{Operation, OperationEncoded}; +use p2panda_rs::schema::SchemaId; +use p2panda_rs::storage_provider::errors::EntryStorageError; +use p2panda_rs::storage_provider::traits::{AsStorageEntry, EntryStore}; + +use crate::db::models::entry::EntryRow; +use crate::db::provider::SqlStorage; + +// Re-write once https://github.com/pietgeursen/lipmaa-link/pull/3 merged in lipma-link +pub fn get_lipmaa_links_back_to_root(mut n: u64) -> Vec { + let mut path = Vec::new(); + + // We don't want to include lipmaa link at position `0` so we + // do the final lipmaa calculation landing on `1` + while n > 1 { + n = lipmaa(n); + path.push(n); + } + + path +} + +/// Implement `AsStorageEntry` trait for `EntryRow`. +impl AsStorageEntry for EntryRow { + type AsStorageEntryError = EntryStorageError; + + fn new( + entry_signed: &EntrySigned, + operation_encoded: &OperationEncoded, + ) -> Result { + let entry = decode_entry(entry_signed, Some(operation_encoded)) + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(Self { + author: entry_signed.author().as_str().into(), + entry_bytes: entry_signed.as_str().into(), + entry_hash: entry_signed.hash().as_str().into(), + log_id: entry.log_id().as_u64().to_string(), + payload_bytes: Some(operation_encoded.as_str().to_string()), + payload_hash: entry_signed.payload_hash().as_str().into(), + seq_num: entry.seq_num().as_u64().to_string(), + }) + } + + fn author(&self) -> Author { + Author::new(self.author.as_ref()).unwrap() + } + + fn hash(&self) -> Hash { + self.entry_signed().hash() + } + + fn entry_bytes(&self) -> Vec { + self.entry_signed().to_bytes() + } + + fn backlink_hash(&self) -> Option { + self.entry_decoded().backlink_hash().cloned() + } + + fn skiplink_hash(&self) -> Option { + self.entry_decoded().skiplink_hash().cloned() + } + + fn seq_num(&self) -> SeqNum { + *self.entry_decoded().seq_num() + } + + fn log_id(&self) -> LogId { + *self.entry_decoded().log_id() + } + + fn operation(&self) -> Operation { + let operation_encoded = self.operation_encoded().unwrap(); + Operation::from(&operation_encoded) + } +} + +/// Trait which handles all storage actions relating to `Entries`. +#[async_trait] +impl EntryStore for SqlStorage { + /// Insert an entry into storage. + async fn insert_entry(&self, entry: EntryRow) -> Result { + let rows_affected = query( + " + INSERT INTO + entries ( + author, + entry_bytes, + entry_hash, + log_id, + payload_bytes, + payload_hash, + seq_num + ) + VALUES + ($1, $2, $3, $4, $5, $6, $7) + ", + ) + .bind(entry.author().as_str()) + .bind(entry.entry_signed().as_str()) + .bind(entry.hash().as_str()) + .bind(entry.log_id().as_u64().to_string()) + .bind(entry.operation_encoded().unwrap().as_str()) + .bind(entry.operation_encoded().unwrap().hash().as_str()) + .bind(entry.seq_num().as_u64().to_string()) + .execute(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))? + .rows_affected(); + + Ok(rows_affected == 1) + } + + async fn get_entry_by_hash(&self, hash: &Hash) -> Result, EntryStorageError> { + let entry_row = query_as::<_, EntryRow>( + " + SELECT + author, + entry_bytes, + entry_hash, + log_id, + payload_bytes, + payload_hash, + seq_num + FROM + entries + WHERE + entry_hash = $1 + ", + ) + .bind(hash.as_str()) + .fetch_optional(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(entry_row) + } + + /// Returns entry at sequence position within an author's log. + async fn entry_at_seq_num( + &self, + author: &Author, + log_id: &LogId, + seq_num: &SeqNum, + ) -> Result, EntryStorageError> { + let entry_row = query_as::<_, EntryRow>( + " + SELECT + author, + entry_bytes, + entry_hash, + log_id, + payload_bytes, + payload_hash, + seq_num + FROM + entries + WHERE + author = $1 + AND log_id = $2 + AND seq_num = $3 + ", + ) + .bind(author.as_str()) + .bind(log_id.as_u64().to_string()) + .bind(seq_num.as_u64().to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(entry_row) + } + + /// Returns the latest Bamboo entry of an author's log. + async fn latest_entry( + &self, + author: &Author, + log_id: &LogId, + ) -> Result, EntryStorageError> { + let entry_row = query_as::<_, EntryRow>( + " + SELECT + author, + entry_bytes, + entry_hash, + log_id, + payload_bytes, + payload_hash, + seq_num + FROM + entries + WHERE + author = $1 + AND log_id = $2 + ORDER BY + CAST(seq_num AS INTEGER) DESC + LIMIT + 1 + ", + ) + .bind(author.as_str()) + .bind(log_id.as_u64().to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(entry_row) + } + + /// Return vector of all entries of a given schema + async fn by_schema(&self, schema: &SchemaId) -> Result, EntryStorageError> { + let entries = query_as::<_, EntryRow>( + " + SELECT + entries.author, + entries.entry_bytes, + entries.entry_hash, + entries.log_id, + entries.payload_bytes, + entries.payload_hash, + entries.seq_num + FROM + entries + INNER JOIN logs + ON (entries.log_id = logs.log_id + AND entries.author = logs.author) + WHERE + logs.schema = $1 + ", + ) + .bind(schema.as_str()) + .fetch_all(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(entries) + } + + async fn get_next_n_entries_after_seq( + &self, + author: &Author, + log_id: &LogId, + seq_num: &SeqNum, + max_number_of_entries: usize, + ) -> Result>, EntryStorageError> { + let max_seq_num = seq_num.as_u64() as usize + max_number_of_entries - 1; + let entries = query_as::<_, EntryRow>( + " + SELECT + author, + entry_bytes, + entry_hash, + log_id, + payload_bytes, + payload_hash, + seq_num + FROM + entries + WHERE + author = $1 + AND log_id = $2 + AND CAST(seq_num AS INTEGER) BETWEEN $3 and $4 + ORDER BY + CAST(seq_num AS INTEGER) + ", + ) + .bind(author.as_str()) + .bind(log_id.as_u64().to_string()) + .bind(seq_num.as_u64().to_string()) + .bind((max_seq_num as u64).to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(Some(entries)) + } + + async fn get_all_lipmaa_entries_for_entry( + &self, + author: &Author, + log_id: &LogId, + initial_seq_num: &SeqNum, + ) -> Result, EntryStorageError> { + let cert_pool = get_lipmaa_links_back_to_root(initial_seq_num.as_u64()) + .iter() + .map(|seq_num| seq_num.to_string()) + .collect::>() + .join(","); + + // Formatting query string in this way as `sqlx` currently + // doesn't support binding list arguments for IN queries. + let sql_str = format!( + "SELECT + author, + entry_bytes, + entry_hash, + log_id, + payload_bytes, + payload_hash, + seq_num + FROM + entries + WHERE + author = $1 + AND log_id = $2 + AND CAST(seq_num AS INTEGER) IN ({}) + ORDER BY + CAST(seq_num AS INTEGER) DESC + ", + cert_pool + ); + + let entries = query_as::<_, EntryRow>(sql_str.as_str()) + .bind(author.as_str()) + .bind(log_id.as_u64().to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(entries) + } +} + +#[cfg(test)] +mod tests { + use std::convert::TryFrom; + use std::str::FromStr; + + use p2panda_rs::document::DocumentId; + use p2panda_rs::entry::{sign_and_encode, Entry}; + use p2panda_rs::entry::{LogId, SeqNum}; + use p2panda_rs::hash::Hash; + use p2panda_rs::identity::{Author, KeyPair}; + use p2panda_rs::operation::{Operation, OperationEncoded, OperationFields, OperationValue}; + use p2panda_rs::schema::SchemaId; + use p2panda_rs::storage_provider::traits::{AsStorageEntry, EntryStore, StorageProvider}; + use p2panda_rs::test_utils::constants::{DEFAULT_PRIVATE_KEY, TEST_SCHEMA_ID}; + + use crate::db::stores::entry::EntryRow; + use crate::db::stores::test_utils::test_db; + use crate::rpc::EntryArgsRequest; + + #[tokio::test] + async fn insert_entry() { + let storage_provider = test_db(100).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(key_pair.public_key().to_owned()).unwrap(); + let log_id = LogId::new(1); + let schema = SchemaId::from_str(TEST_SCHEMA_ID).unwrap(); + + // Derive the document_id by fetching the first entry + let document_id: DocumentId = storage_provider + .entry_at_seq_num(&author, &log_id, &SeqNum::new(1).unwrap()) + .await + .unwrap() + .unwrap() + .hash() + .into(); + + let next_entry_args = storage_provider + .get_entry_args(&EntryArgsRequest { + author: author.clone(), + document: Some(document_id.clone()), + }) + .await + .unwrap(); + + let mut fields = OperationFields::new(); + fields + .add("username", OperationValue::Text("stitch".to_owned())) + .unwrap(); + + let update_operation = Operation::new_update( + schema.clone(), + vec![next_entry_args.entry_hash_backlink.clone().unwrap().into()], + fields.clone(), + ) + .unwrap(); + + let update_entry = Entry::new( + &next_entry_args.log_id, + Some(&update_operation), + next_entry_args.entry_hash_skiplink.as_ref(), + next_entry_args.entry_hash_backlink.as_ref(), + &next_entry_args.seq_num, + ) + .unwrap(); + + let entry_encoded = sign_and_encode(&update_entry, &key_pair).unwrap(); + let operation_encoded = OperationEncoded::try_from(&update_operation).unwrap(); + let doggo_entry = EntryRow::new(&entry_encoded, &operation_encoded).unwrap(); + let result = storage_provider.insert_entry(doggo_entry).await; + + assert!(result.is_ok()) + } + + #[tokio::test] + async fn try_insert_non_unique_entry() { + let storage_provider = test_db(100).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(key_pair.public_key().to_owned()).unwrap(); + let log_id = LogId::new(1); + + let first_entry = storage_provider + .entry_at_seq_num(&author, &log_id, &SeqNum::new(1).unwrap()) + .await + .unwrap() + .unwrap(); + + let duplicate_doggo_entry = EntryRow::new( + &first_entry.entry_signed(), + &first_entry.operation_encoded().unwrap(), + ) + .unwrap(); + let result = storage_provider.insert_entry(duplicate_doggo_entry).await; + + assert_eq!(result.unwrap_err().to_string(), "Error occured during `EntryStorage` request in storage provider: error returned from database: UNIQUE constraint failed: entries.author, entries.log_id, entries.seq_num") + } + + #[tokio::test] + async fn latest_entry() { + let storage_provider = test_db(100).await; + + let author_not_in_db = Author::try_from(*KeyPair::new().public_key()).unwrap(); + let log_id = LogId::new(1); + + let latest_entry = storage_provider + .latest_entry(&author_not_in_db, &log_id) + .await + .unwrap(); + assert!(latest_entry.is_none()); + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author_in_db = Author::try_from(*key_pair.public_key()).unwrap(); + + let latest_entry = storage_provider + .latest_entry(&author_in_db, &log_id) + .await + .unwrap(); + assert_eq!(latest_entry.unwrap().seq_num(), SeqNum::new(100).unwrap()); + } + + #[tokio::test] + async fn entries_by_schema() { + let storage_provider = test_db(100).await; + + let schema_not_in_the_db = SchemaId::new_application( + "venue", + &Hash::new_from_bytes(vec![1, 2, 3]).unwrap().into(), + ); + + let entries = storage_provider + .by_schema(&schema_not_in_the_db) + .await + .unwrap(); + assert!(entries.is_empty()); + + let schema_in_the_db = SchemaId::new(TEST_SCHEMA_ID).unwrap(); + + let entries = storage_provider.by_schema(&schema_in_the_db).await.unwrap(); + assert!(entries.len() == 100); + } + + #[tokio::test] + async fn entry_by_seq_num() { + let storage_provider = test_db(100).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + + for seq_num in [1, 10, 56, 77, 90] { + let seq_num = SeqNum::new(seq_num).unwrap(); + let entry = storage_provider + .entry_at_seq_num(&author, &LogId::new(1), &seq_num) + .await + .unwrap(); + assert_eq!(entry.unwrap().seq_num(), seq_num) + } + + let wrong_log = LogId::new(2); + let entry = storage_provider + .entry_at_seq_num(&author, &wrong_log, &SeqNum::new(1).unwrap()) + .await + .unwrap(); + assert!(entry.is_none()); + + let author_not_in_db = Author::try_from(*KeyPair::new().public_key()).unwrap(); + let entry = storage_provider + .entry_at_seq_num(&author_not_in_db, &LogId::new(1), &SeqNum::new(1).unwrap()) + .await + .unwrap(); + assert!(entry.is_none()); + + let seq_num_not_in_log = SeqNum::new(1000).unwrap(); + let entry = storage_provider + .entry_at_seq_num(&author_not_in_db, &LogId::new(1), &seq_num_not_in_log) + .await + .unwrap(); + assert!(entry.is_none()) + } + + #[tokio::test] + async fn get_entry_by_hash() { + let storage_provider = test_db(100).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + + for seq_num in [1, 11, 32, 45, 76] { + let seq_num = SeqNum::new(seq_num).unwrap(); + let entry = storage_provider + .entry_at_seq_num(&author, &LogId::new(1), &seq_num) + .await + .unwrap() + .unwrap(); + + let entry_hash = entry.hash(); + let entry_by_hash = storage_provider + .get_entry_by_hash(&entry_hash) + .await + .unwrap() + .unwrap(); + assert_eq!(entry, entry_by_hash) + } + + let entry_hash_not_in_db = Hash::new_from_bytes(vec![1, 2, 3]).unwrap(); + let entry = storage_provider + .get_entry_by_hash(&entry_hash_not_in_db) + .await + .unwrap(); + assert!(entry.is_none()) + } + + #[tokio::test] + async fn gets_next_n_entries_after_seq() { + let storage_provider = test_db(50).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + + let entries = storage_provider + .get_next_n_entries_after_seq(&author, &LogId::default(), &SeqNum::default(), 20) + .await + .unwrap() + .unwrap(); + for entry in entries.clone() { + assert!(entry.seq_num().as_u64() >= 1 && entry.seq_num().as_u64() <= 20) + } + assert_eq!(entries.len(), 20); + } + + #[tokio::test] + async fn gets_all_lipmaa_entries_for_entry() { + let storage_provider = test_db(50).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + + let entries = storage_provider + .get_all_lipmaa_entries_for_entry(&author, &LogId::default(), &SeqNum::new(20).unwrap()) + .await + .unwrap(); + + let cert_pool_seq_nums = entries + .iter() + .map(|entry| entry.seq_num().as_u64()) + .collect::>(); + + assert!(!entries.is_empty()); + assert_eq!(cert_pool_seq_nums, vec![19, 18, 17, 13, 4, 1]); + } +} diff --git a/aquadoggo/src/db/stores/log.rs b/aquadoggo/src/db/stores/log.rs new file mode 100644 index 000000000..e66524132 --- /dev/null +++ b/aquadoggo/src/db/stores/log.rs @@ -0,0 +1,376 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::str::FromStr; + +use async_trait::async_trait; +use p2panda_rs::document::DocumentId; +use p2panda_rs::entry::LogId; +use p2panda_rs::identity::Author; +use p2panda_rs::schema::SchemaId; +use p2panda_rs::storage_provider::errors::LogStorageError; +use p2panda_rs::storage_provider::traits::{AsStorageLog, LogStore}; +use sqlx::{query, query_scalar}; + +use crate::db::models::log::LogRow; +use crate::db::provider::SqlStorage; + +impl AsStorageLog for LogRow { + fn new(author: &Author, schema: &SchemaId, document: &DocumentId, log_id: &LogId) -> Self { + Self { + author: author.as_str().to_string(), + log_id: log_id.as_u64().to_string(), + document: document.as_str().to_string(), + schema: schema.as_str(), + } + } + + fn author(&self) -> Author { + Author::new(&self.author).unwrap() + } + + fn id(&self) -> LogId { + LogId::from_str(&self.log_id).unwrap() + } + + fn document_id(&self) -> DocumentId { + let document_id: DocumentId = self.document.parse().unwrap(); + document_id + } + + fn schema_id(&self) -> SchemaId { + SchemaId::new(&self.schema).unwrap() + } +} + +/// Trait which handles all storage actions relating to `LogRow`s. +#[async_trait] +impl LogStore for SqlStorage { + /// Insert a log into storage. + async fn insert_log(&self, log: LogRow) -> Result { + let rows_affected = query( + " + INSERT INTO + logs ( + author, + log_id, + document, + schema + ) + VALUES + ($1, $2, $3, $4) + ", + ) + .bind(log.author().as_str()) + .bind(log.id().as_u64().to_string()) + .bind(log.document_id().as_str()) + .bind(log.schema_id().as_str()) + .execute(&self.pool) + .await + .map_err(|e| LogStorageError::Custom(e.to_string()))? + .rows_affected(); + + Ok(rows_affected == 1) + } + + /// Get a log from storage + async fn get( + &self, + author: &Author, + document_id: &DocumentId, + ) -> Result, LogStorageError> { + let result: Option = query_scalar( + " + SELECT + log_id + FROM + logs + WHERE + author = $1 + AND document = $2 + ", + ) + .bind(author.as_str()) + .bind(document_id.as_str()) + .fetch_optional(&self.pool) + .await + .map_err(|e| LogStorageError::Custom(e.to_string()))?; + + // Wrap u64 inside of `P2PandaLog` instance + let log_id: Option = + result.map(|str| str.parse().expect("Corrupt u64 integer found in database")); + + Ok(log_id) + } + + /// Determines the next unused log_id of an author. + async fn next_log_id(&self, author: &Author) -> Result { + // Get all log ids from this author + let mut result: Vec = query_scalar( + " + SELECT + log_id + FROM + logs + WHERE + author = $1 + ", + ) + .bind(author.as_str()) + .fetch_all(&self.pool) + .await + .map_err(|e| LogStorageError::Custom(e.to_string()))?; + + // Convert all strings representing u64 integers to `LogId` instances + let mut log_ids: Vec = result + .iter_mut() + .map(|str| str.parse().expect("Corrupt u64 integer found in database")) + .collect(); + + // The log id selection below expects log ids in sorted order. We can't easily use SQL + // for this because log IDs are stored as `VARCHAR`, which doesn't sort numbers correctly. + // A good solution would not require reading all existing log ids to find the next + // available one. See this issue: https://github.com/p2panda/aquadoggo/issues/67 + log_ids.sort(); + + // Find next unused document log by comparing the sequence of known log ids with an + // sequence of subsequent log ids until we find a gap. + let mut next_log_id = LogId::default(); + + for log_id in log_ids.iter() { + // Success! Found unused log id + if next_log_id != *log_id { + break; + } + + // Otherwise, try next possible log id + next_log_id = next_log_id.next().unwrap(); + } + + Ok(next_log_id) + } +} + +#[cfg(test)] +mod tests { + use std::convert::TryFrom; + + use p2panda_rs::document::DocumentViewId; + use p2panda_rs::entry::{sign_and_encode, Entry as P2PandaEntry, LogId, SeqNum}; + use p2panda_rs::hash::Hash; + use p2panda_rs::identity::{Author, KeyPair}; + use p2panda_rs::operation::{Operation, OperationEncoded, OperationFields, OperationValue}; + use p2panda_rs::schema::SchemaId; + use p2panda_rs::storage_provider::traits::{ + AsStorageEntry, AsStorageLog, EntryStore, LogStore, StorageProvider, + }; + + use crate::db::models::entry::EntryRow; + use crate::db::models::log::LogRow; + use crate::db::provider::SqlStorage; + use crate::test_helpers::{initialize_db, random_entry_hash}; + + const TEST_AUTHOR: &str = "58223678ab378f1b07d1d8c789e6da01d16a06b1a4d17cc10119a0109181156c"; + + #[tokio::test] + async fn initial_log_id() { + let pool = initialize_db().await; + let author = Author::new(TEST_AUTHOR).unwrap(); + let storage_provider = SqlStorage { pool }; + + let log_id = storage_provider + .find_document_log_id(&author, None) + .await + .unwrap(); + + assert_eq!(log_id, LogId::new(1)); + } + + #[tokio::test] + async fn prevent_duplicate_log_ids() { + let pool = initialize_db().await; + let storage_provider = SqlStorage { pool }; + + let author = Author::new(TEST_AUTHOR).unwrap(); + let document = Hash::new(&random_entry_hash()).unwrap(); + let schema = + SchemaId::new_application("venue", &Hash::new(&random_entry_hash()).unwrap().into()); + + let log = LogRow::new(&author, &schema, &document.clone().into(), &LogId::new(1)); + assert!(storage_provider.insert_log(log).await.is_ok()); + + let log = LogRow::new(&author, &schema, &document.into(), &LogId::new(1)); + assert!(storage_provider.insert_log(log).await.is_err()); + } + + #[tokio::test] + async fn with_multi_hash_schema_id() { + let pool = initialize_db().await; + let storage_provider = SqlStorage { pool }; + + let author = Author::new(TEST_AUTHOR).unwrap(); + let document = Hash::new(&random_entry_hash()).unwrap(); + let schema = SchemaId::new_application( + "venue", + &DocumentViewId::new(&[ + Hash::new(&random_entry_hash()).unwrap().into(), + Hash::new(&random_entry_hash()).unwrap().into(), + ]), + ); + + let log = LogRow::new(&author, &schema, &document.into(), &LogId::new(1)); + + assert!(storage_provider.insert_log(log).await.is_ok()); + } + + #[tokio::test] + async fn selecting_next_log_id() { + let pool = initialize_db().await; + let key_pair = KeyPair::new(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + let schema = SchemaId::new_application( + "venue", + &Hash::new_from_bytes(vec![1, 2, 3]).unwrap().into(), + ); + + let storage_provider = SqlStorage { pool }; + + let log_id = storage_provider + .find_document_log_id(&author, None) + .await + .unwrap(); + + // We expect to be given the next log id when asking for a possible log id for a new + // document by the same author + assert_eq!(log_id, LogId::default()); + + // Starting with an empty db, we expect to be able to count up from 1 and expect each + // inserted document's log id to be euqal to the count index + for n in 1..12 { + let doc = Hash::new_from_bytes(vec![1, 2, n]).unwrap().into(); + + let log_id = storage_provider + .find_document_log_id(&author, None) + .await + .unwrap(); + assert_eq!(LogId::new(n.into()), log_id); + let log = LogRow::new(&author, &schema, &doc, &log_id); + storage_provider.insert_log(log).await.unwrap(); + } + } + + #[tokio::test] + async fn document_log_id() { + let pool = initialize_db().await; + + // Create a new document + // TODO: use p2panda-rs test utils once available + let key_pair = KeyPair::new(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + let log_id = LogId::new(1); + let schema = SchemaId::new_application( + "venue", + &Hash::new_from_bytes(vec![1, 2, 3]).unwrap().into(), + ); + let seq_num = SeqNum::new(1).unwrap(); + let mut fields = OperationFields::new(); + fields + .add("test", OperationValue::Text("Hello".to_owned())) + .unwrap(); + let operation = Operation::new_create(schema.clone(), fields).unwrap(); + let operation_encoded = OperationEncoded::try_from(&operation).unwrap(); + let entry = P2PandaEntry::new(&log_id, Some(&operation), None, None, &seq_num).unwrap(); + let entry_encoded = sign_and_encode(&entry, &key_pair).unwrap(); + + let storage_provider = SqlStorage { pool }; + + // Expect database to return nothing yet + assert_eq!( + storage_provider + .get_document_by_entry(&entry_encoded.hash()) + .await + .unwrap(), + None + ); + + let entry = EntryRow::new(&entry_encoded.clone(), &operation_encoded).unwrap(); + + // Store entry in database + assert!(storage_provider.insert_entry(entry).await.is_ok()); + + let log = LogRow::new( + &author, + &schema, + &entry_encoded.hash().into(), + &LogId::new(1), + ); + + // Store log in database + assert!(storage_provider.insert_log(log).await.is_ok()); + + // Expect to find document in database. The document hash should be the same as the hash of + // the entry which referred to the `CREATE` operation. + assert_eq!( + storage_provider + .get_document_by_entry(&entry_encoded.hash()) + .await + .unwrap(), + Some(entry_encoded.hash().into()) + ); + + // We expect to find this document in the default log + assert_eq!( + storage_provider + .find_document_log_id(&author, Some(&entry_encoded.hash().into())) + .await + .unwrap(), + LogId::default() + ); + } + + #[tokio::test] + async fn log_ids() { + let pool = initialize_db().await; + + // Mock author + let author = Author::new(TEST_AUTHOR).unwrap(); + + // Mock schema + let schema = + SchemaId::new_application("venue", &Hash::new(&random_entry_hash()).unwrap().into()); + + // Mock four different document hashes + let document_first = Hash::new(&random_entry_hash()).unwrap(); + let document_second = Hash::new(&random_entry_hash()).unwrap(); + let document_third = Hash::new(&random_entry_hash()).unwrap(); + let document_forth = Hash::new(&random_entry_hash()).unwrap(); + + let storage_provider = SqlStorage { pool }; + + // Register two log ids at the beginning + let log_1 = LogRow::new(&author, &schema, &document_first.into(), &LogId::new(1)); + let log_2 = LogRow::new(&author, &schema, &document_second.into(), &LogId::new(2)); + + storage_provider.insert_log(log_1).await.unwrap(); + storage_provider.insert_log(log_2).await.unwrap(); + + // Find next free log id and register it + let log_id = storage_provider.next_log_id(&author).await.unwrap(); + assert_eq!(log_id, LogId::new(3)); + + let log_3 = LogRow::new(&author, &schema, &document_third.into(), &log_id); + + storage_provider.insert_log(log_3).await.unwrap(); + + // Find next free log id and register it + let log_id = storage_provider.next_log_id(&author).await.unwrap(); + assert_eq!(log_id, LogId::new(4)); + + let log_4 = LogRow::new(&author, &schema, &document_forth.into(), &log_id); + + storage_provider.insert_log(log_4).await.unwrap(); + + // Find next free log id + let log_id = storage_provider.next_log_id(&author).await.unwrap(); + assert_eq!(log_id, LogId::new(5)); + } +} diff --git a/aquadoggo/src/db/stores/mod.rs b/aquadoggo/src/db/stores/mod.rs new file mode 100644 index 000000000..67afcfcd2 --- /dev/null +++ b/aquadoggo/src/db/stores/mod.rs @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +mod document; +mod entry; +mod log; +mod operation; +#[cfg(test)] +mod test_utils; diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs new file mode 100644 index 000000000..fe73a69e5 --- /dev/null +++ b/aquadoggo/src/db/stores/operation.rs @@ -0,0 +1,467 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_trait::async_trait; +use futures::future::try_join_all; +use p2panda_rs::document::DocumentId; +use p2panda_rs::hash::Hash; +use p2panda_rs::identity::Author; +use p2panda_rs::operation::{ + AsOperation, Operation, OperationAction, OperationFields, OperationId, OperationValue, +}; +use p2panda_rs::schema::SchemaId; +use sqlx::{query, query_as, query_scalar}; + +use crate::db::errors::OperationStorageError; +use crate::db::models::operation::{OperationFieldRow, OperationRow}; +use crate::db::provider::SqlStorage; +use crate::db::traits::{AsStorageOperation, OperationStore, PreviousOperations}; +use crate::db::utils::parse_operation_fields; + +#[derive(Debug, Clone)] +pub struct OperationStorage { + author: Author, + operation: Operation, + id: OperationId, + document_id: DocumentId, +} + +impl OperationStorage { + pub fn new( + author: &Author, + operation: &Operation, + operation_id: &OperationId, + document_id: &DocumentId, + ) -> Self { + Self { + author: author.clone(), + operation: operation.clone(), + id: operation_id.clone(), + document_id: document_id.clone(), + } + } +} + +impl AsStorageOperation for OperationStorage { + type AsStorageOperationError = OperationStorageError; + + fn action(&self) -> OperationAction { + self.operation.action() + } + + fn author(&self) -> Author { + self.author.clone() + } + + fn id(&self) -> OperationId { + self.id.clone() + } + + fn document_id(&self) -> DocumentId { + self.document_id.clone() + } + + fn schema_id(&self) -> SchemaId { + self.operation.schema() + } + + fn fields(&self) -> Option { + self.operation.fields() + } + + fn previous_operations(&self) -> PreviousOperations { + self.operation.previous_operations().unwrap_or_default() + } +} + +#[async_trait] +impl OperationStore for SqlStorage { + /// Retrieve the id of the document an operation is contained within. + /// + /// If no document was found, then this method returns a result wrapping + /// a None variant. + async fn get_document_by_operation_id( + &self, + id: OperationId, + ) -> Result, OperationStorageError> { + let document_id: Option = query_scalar( + " + SELECT + document_id + FROM + operations_v1 + WHERE + operation_id = $1 + ", + ) + .bind(id.as_str()) + .fetch_optional(&self.pool) + .await + .map_err(|e| OperationStorageError::Custom(e.to_string()))?; + + Ok(document_id.map(|id_str| id_str.parse().unwrap())) + } + + /// Insert an operation into the db. + /// + /// This requires a DoggoEntry to be composed elsewhere, it contains an Author, + /// DocumentId, OperationId and the actual Operation we want to store. + /// + /// - TODO: similar to several other places in StorageProvider, here we return + /// a bool wrapped in a result. We need to be more clear on when we expect to + /// recieve eg. an Ok(false) result, and if this is even needed. + async fn insert_operation( + &self, + operation: &OperationStorage, + ) -> Result { + let mut prev_op_string = "".to_string(); + for (i, operation_id) in operation.previous_operations().iter().enumerate() { + let separator = if i == 0 { "" } else { "_" }; + prev_op_string += format!("{}{}", separator, operation_id.as_hash().as_str()).as_str(); + } + + let document_id = if operation.action().as_str() == "create" { + DocumentId::new(operation.id()) + } else { + // Unwrap as we know any "UPDATE" or "DELETE" operation should have previous operations + let previous_operation_id = operation.previous_operations().get(0).unwrap().clone(); + + self.get_document_by_operation_id(previous_operation_id) + .await? + .ok_or_else(|| OperationStorageError::Custom("Document missing".to_string()))? + }; + + // Consruct query for inserting operation row, execute it + // and check exactly one row was affected. + let operation_inserted = query( + " + INSERT INTO + operations_v1 ( + author, + document_id, + operation_id, + entry_hash, + action, + schema_id, + previous_operations + ) + VALUES + ($1, $2, $3, $4, $5, $6, $7) + ", + ) + .bind(operation.author().as_str()) + .bind(document_id.as_str()) + .bind(operation.id().as_str()) + .bind(operation.id().as_hash().as_str()) + .bind(operation.action().as_str()) + .bind(operation.schema_id().as_str()) + .bind(prev_op_string.as_str()) + .execute(&self.pool) + .await + .map_err(|e| OperationStorageError::Custom(e.to_string()))? + .rows_affected() + == 1; + + // Loop over all previous operations and insert one row for + // each, construct and execute the queries, return their futures + // and execute all of them with `try_join_all()`. + let previous_operations_inserted = + try_join_all(operation.previous_operations().iter().map(|prev_op_id| { + query( + " + INSERT INTO + previous_operations_v1 ( + parent_operation_id, + child_operation_id + ) + VALUES + ($1, $2) + ", + ) + .bind(prev_op_id.as_str()) + .bind(operation.id().as_str().to_owned()) + .execute(&self.pool) + })) + .await + // If any of the queries error we will catch that here. + .map_err(|e| OperationStorageError::Custom(e.to_string()))? + .iter() + // Here we check that each query inserted exactly one row. + .try_for_each(|result| { + if result.rows_affected() == 1 { + Ok(()) + } else { + Err(OperationStorageError::Custom(format!( + "Incorrect rows affected: {}", + result.rows_affected() + ))) + } + }) + .is_ok(); + + // Same pattern as above, construct and execute the queries, return their futures + // and execute all of them with `try_join_all()`. + let mut fields_inserted = true; + if let Some(fields) = operation.fields() { + fields_inserted = try_join_all(fields.iter().flat_map(|(name, value)| { + // Extract the field type. + let field_type = value.field_type(); + + // If the value is a relation_list or pinned_relation_list we need to insert a new field row for + // every item in the list. Here we collect these items and return them in a vector. If this operation + // value is anything except for the above list types, we will return a vec containing a single item. + + let db_values = match value { + OperationValue::Boolean(bool) => vec![Some(bool.to_string())], + OperationValue::Integer(int) => vec![Some(int.to_string())], + OperationValue::Float(float) => vec![Some(float.to_string())], + OperationValue::Text(str) => vec![Some(str.to_string())], + OperationValue::Relation(relation) => { + vec![Some(relation.document_id().as_str().to_string())] + } + OperationValue::RelationList(relation_list) => { + let mut db_values = Vec::new(); + for document_id in relation_list.iter() { + db_values.push(Some(document_id.as_str().to_string())) + } + db_values + } + OperationValue::PinnedRelation(pinned_relation) => { + // Deriving string id here for now until implemented in p2panda-rs + let mut id_str = "".to_string(); + for (i, operation_id) in + pinned_relation.view_id().sorted().iter().enumerate() + { + let separator = if i == 0 { "" } else { "_" }; + id_str += format!("{}{}", separator, operation_id.as_hash().as_str()) + .as_str(); + } + + vec![Some(id_str)] + } + OperationValue::PinnedRelationList(pinned_relation_list) => { + let mut db_values = Vec::new(); + for document_view_id in pinned_relation_list.iter() { + // Deriving string id here for now until implemented in p2panda-rs + let mut id_str = "".to_string(); + for (i, operation_id) in document_view_id.sorted().iter().enumerate() { + let separator = if i == 0 { "" } else { "_" }; + id_str += + format!("{}{}", separator, operation_id.as_hash().as_str()) + .as_str(); + } + + db_values.push(Some(id_str)) + } + db_values + } + }; + + // Collect all query futures. + db_values + .into_iter() + .map(|db_value| { + // Compose the query and return it's future. + query( + " + INSERT INTO + operation_fields_v1 ( + operation_id, + name, + field_type, + value + ) + VALUES + ($1, $2, $3, $4) + ", + ) + .bind(operation.id().as_str().to_owned()) + .bind(name.to_owned()) + .bind(field_type.to_string()) + .bind(db_value) + .execute(&self.pool) + }) + .collect::>() + })) + .await + // If any queries error, we catch that here. + .map_err(|e| OperationStorageError::Custom(e.to_string()))? // Coerce error here + .iter() + // All queries should perform exactly one insertion, we check that here. + .try_for_each(|result| { + if result.rows_affected() == 1 { + Ok(()) + } else { + Err(OperationStorageError::Custom(format!( + "Incorrect rows affected: {}", + result.rows_affected() + ))) + } + }) + .is_ok(); + }; + + Ok(operation_inserted && previous_operations_inserted && fields_inserted) + } + + /// Get an operation identified by it's OperationId. + /// + /// Returns a OperationStorage which includes Author, DocumentId and OperationId metadata. + async fn get_operation_by_id( + &self, + id: OperationId, + ) -> Result, OperationStorageError> { + let operation_row = query_as::<_, OperationRow>( + " + SELECT + author, + document_id, + operation_id, + entry_hash, + action, + schema_id, + previous_operations + FROM + operations_v1 + WHERE + operation_id = $1 + ", + ) + .bind(id.as_str()) + .fetch_optional(&self.pool) + .await + .map_err(|e| OperationStorageError::Custom(e.to_string()))? + .unwrap(); + + let operation_fields = self.get_operation_fields_by_id(id).await?; + let schema: SchemaId = operation_row.schema_id.parse().unwrap(); + + // Need some more tooling around prev_ops in p2panda-rs + let mut previous_operations: Vec = Vec::new(); + if operation_row.action != "create" { + previous_operations = operation_row + .previous_operations + .rsplit('_') + .map(|id_str| Hash::new(id_str).unwrap().into()) + .collect(); + } + + let operation = match operation_row.action.as_str() { + "create" => Operation::new_create(schema, operation_fields), + "update" => Operation::new_update(schema, previous_operations, operation_fields), + "delete" => Operation::new_delete(schema, previous_operations), + _ => panic!(), + } + .unwrap(); + + Ok(Some(OperationStorage::new( + &Author::new(&operation_row.author).unwrap(), + &operation, + &operation_row.operation_id.parse().unwrap(), + &operation_row.document_id.parse().unwrap(), + ))) + } + + /// Get just the fields of an operation, identified by it's OperationId. + async fn get_operation_fields_by_id( + &self, + id: OperationId, + ) -> Result { + let operation_field_rows = query_as::<_, OperationFieldRow>( + " + SELECT + operation_id, + name, + field_type, + value + FROM + operation_fields_v1 + WHERE + operation_id = $1 + ", + ) + .bind(id.as_str()) + .fetch_all(&self.pool) + .await + .map_err(|e| OperationStorageError::Custom(e.to_string()))?; + + Ok(parse_operation_fields(operation_field_rows)) + } +} + +#[cfg(test)] +mod tests { + use p2panda_rs::document::DocumentId; + use p2panda_rs::identity::Author; + use p2panda_rs::operation::OperationId; + use p2panda_rs::test_utils::constants::DEFAULT_HASH; + + use crate::db::provider::SqlStorage; + use crate::db::stores::test_utils::test_operation; + use crate::db::traits::AsStorageOperation; + use crate::test_helpers::initialize_db; + + use super::{OperationStorage, OperationStore}; + + const TEST_AUTHOR: &str = "1a8a62c5f64eed987326513ea15a6ea2682c256ac57a418c1c92d96787c8b36e"; + + #[tokio::test] + async fn insert_operation() { + let pool = initialize_db().await; + let storage_provider = SqlStorage { pool }; + + // Create Author, OperationId and DocumentId in order to compose a OperationStorage. + let author = Author::new(TEST_AUTHOR).unwrap(); + let operation_id = OperationId::new(DEFAULT_HASH.parse().unwrap()); + let document_id = DocumentId::new(operation_id.clone()); + let doggo_operation = + OperationStorage::new(&author, &test_operation(), &operation_id, &document_id); + + // Insert the doggo operation into the db, returns Ok(true) when succesful. + let result = storage_provider + .insert_operation(&doggo_operation) + .await + .unwrap(); + assert!(result); + + // Request the previously inserted operation by it's id. + let returned_doggo_operation = storage_provider + .get_operation_by_id(operation_id.clone()) + .await + .unwrap() + .unwrap(); + assert_eq!(returned_doggo_operation.author(), doggo_operation.author()); + assert_eq!(returned_doggo_operation.fields(), doggo_operation.fields()); + assert_eq!(returned_doggo_operation.id(), doggo_operation.id()); + assert_eq!( + returned_doggo_operation.document_id(), + doggo_operation.document_id() + ); + } + + #[tokio::test] + async fn get_operation_fields() { + let pool = initialize_db().await; + let storage_provider = SqlStorage { pool }; + + // Create Author, OperationId and DocumentId in order to compose a OperationStorage. + let author = Author::new(TEST_AUTHOR).unwrap(); + let operation_id = OperationId::new(DEFAULT_HASH.parse().unwrap()); + let document_id = DocumentId::new(operation_id.clone()); + let doggo_operation = + OperationStorage::new(&author, &test_operation(), &operation_id, &document_id); + + // Insert the doggo operation into the db, returns Ok(true) when succesful. + let result = storage_provider + .insert_operation(&doggo_operation) + .await + .unwrap(); + assert!(result); + + // Get the operation fields for an operation identified by it's OperationId. + let result = storage_provider + .get_operation_fields_by_id(operation_id.clone()) + .await + .unwrap(); + assert_eq!(result, doggo_operation.fields().unwrap()); + } +} diff --git a/aquadoggo/src/db/stores/test_utils.rs b/aquadoggo/src/db/stores/test_utils.rs new file mode 100644 index 000000000..10b408dbc --- /dev/null +++ b/aquadoggo/src/db/stores/test_utils.rs @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::convert::TryFrom; +use std::str::FromStr; + +use p2panda_rs::document::DocumentId; +use p2panda_rs::entry::{sign_and_encode, Entry, LogId, SeqNum}; +use p2panda_rs::hash::Hash; +use p2panda_rs::identity::{Author, KeyPair}; +use p2panda_rs::operation::{ + Operation, OperationEncoded, OperationFields, OperationValue, PinnedRelation, + PinnedRelationList, Relation, RelationList, +}; +use p2panda_rs::schema::SchemaId; +use p2panda_rs::storage_provider::traits::StorageProvider; +use p2panda_rs::test_utils::constants::{DEFAULT_HASH, DEFAULT_PRIVATE_KEY, TEST_SCHEMA_ID}; + +use crate::db::provider::SqlStorage; +use crate::rpc::{EntryArgsRequest, PublishEntryRequest}; +use crate::test_helpers::initialize_db; + +pub fn test_operation() -> Operation { + let mut fields = OperationFields::new(); + fields + .add("username", OperationValue::Text("bubu".to_owned())) + .unwrap(); + + fields.add("height", OperationValue::Float(3.5)).unwrap(); + + fields.add("age", OperationValue::Integer(28)).unwrap(); + + fields + .add("is_admin", OperationValue::Boolean(false)) + .unwrap(); + + fields + .add( + "profile_picture", + OperationValue::Relation(Relation::new(DEFAULT_HASH.parse().unwrap())), + ) + .unwrap(); + fields + .add( + "special_profile_picture", + OperationValue::PinnedRelation(PinnedRelation::new(DEFAULT_HASH.parse().unwrap())), + ) + .unwrap(); + fields + .add( + "many_profile_pictures", + OperationValue::RelationList(RelationList::new(vec![ + Hash::new("0020aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + .unwrap() + .into(), + Hash::new("0020bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + .unwrap() + .into(), + ])), + ) + .unwrap(); + fields + .add( + "many_special_profile_pictures", + OperationValue::PinnedRelationList(PinnedRelationList::new(vec![ + Hash::new("0020bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + .unwrap() + .into(), + Hash::new("0020aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + .unwrap() + .into(), + ])), + ) + .unwrap(); + Operation::new_create(SchemaId::from_str(TEST_SCHEMA_ID).unwrap(), fields).unwrap() +} + +pub async fn test_db(no_of_entries: usize) -> SqlStorage { + let pool = initialize_db().await; + let storage_provider = SqlStorage { pool }; + + // If we don't want any entries in the db return now + if no_of_entries == 0 { + return storage_provider; + } + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(key_pair.public_key().to_owned()).unwrap(); + let schema = SchemaId::from_str(TEST_SCHEMA_ID).unwrap(); + + // Build first CREATE entry for the db + let create_operation = test_operation(); + let create_entry = Entry::new( + &LogId::default(), + Some(&create_operation), + None, + None, + &SeqNum::new(1).unwrap(), + ) + .unwrap(); + + let entry_encoded = sign_and_encode(&create_entry, &key_pair).unwrap(); + let operation_encoded = OperationEncoded::try_from(&create_operation).unwrap(); + + // Derive the document id from the CREATE entries hash + let document: DocumentId = entry_encoded.hash().into(); + + // Publish the CREATE entry + storage_provider + .publish_entry(&PublishEntryRequest { + entry_encoded, + operation_encoded, + }) + .await + .unwrap(); + + let mut fields = OperationFields::new(); + fields + .add("username", OperationValue::Text("yahoooo".to_owned())) + .unwrap(); + + // Publish more update entries + for _ in 1..no_of_entries { + // Get next entry args + let next_entry_args = storage_provider + .get_entry_args(&EntryArgsRequest { + author: author.clone(), + document: Some(document.clone()), + }) + .await + .unwrap(); + + let backlink = next_entry_args.entry_hash_backlink.clone().unwrap(); + + // Construct the next UPDATE operation, we use the backlink hash in the prev_op vector + let update_operation = + Operation::new_update(schema.clone(), vec![backlink.into()], fields.clone()).unwrap(); + + let update_entry = Entry::new( + &next_entry_args.log_id, + Some(&update_operation), + next_entry_args.entry_hash_skiplink.as_ref(), + next_entry_args.entry_hash_backlink.as_ref(), + &next_entry_args.seq_num, + ) + .unwrap(); + + let entry_encoded = sign_and_encode(&update_entry, &key_pair).unwrap(); + let operation_encoded = OperationEncoded::try_from(&update_operation).unwrap(); + + // Publish the new entry + storage_provider + .publish_entry(&PublishEntryRequest { + entry_encoded, + operation_encoded, + }) + .await + .unwrap(); + } + storage_provider +} diff --git a/aquadoggo/src/db/traits.rs b/aquadoggo/src/db/traits.rs new file mode 100644 index 000000000..bbec88c98 --- /dev/null +++ b/aquadoggo/src/db/traits.rs @@ -0,0 +1,124 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::collections::btree_map::Iter; +use std::collections::BTreeMap; + +use async_trait::async_trait; +use p2panda_rs::document::{DocumentId, DocumentViewId}; +use p2panda_rs::identity::Author; +use p2panda_rs::operation::{OperationAction, OperationFields, OperationId, OperationValue}; +use p2panda_rs::schema::SchemaId; + +use crate::db::errors::{DocumentViewStorageError, OperationStorageError}; + +/// The string name of a documents field +pub type FieldName = String; + +/// A map associating fields identified by their name with an operation which +/// conatins this fields value(s). +pub type FieldIds = BTreeMap; + +/// The fields of a document view. +pub type DocumentViewFields = OperationFields; + +/// WIP: Storage trait representing a document view. +pub trait AsStorageDocumentView: Sized + Clone + Send + Sync { + /// The error type returned by this traits' methods. + type AsStorageDocumentViewError: 'static + std::error::Error; + + fn id(&self) -> DocumentViewId; + + fn iter(&self) -> Iter; + + fn get(&self, key: &str) -> Option<&OperationValue>; + + fn schema_id(&self) -> SchemaId; + + fn field_ids(&self) -> FieldIds; +} + +/// Storage traits for documents and document views. +#[async_trait] +pub trait DocumentStore { + async fn insert_document_view( + &self, + document_view: &DocumentViewId, + field_ids: &FieldIds, + schema_id: &SchemaId, + ) -> Result; + + async fn get_document_view_by_id( + &self, + id: &DocumentViewId, + ) -> Result; +} + +pub type PreviousOperations = Vec; + +pub trait AsStorageOperation: Sized + Clone + Send + Sync { + /// The error type returned by this traits' methods. + type AsStorageOperationError: 'static + std::error::Error; + + fn action(&self) -> OperationAction; + + fn author(&self) -> Author; + + fn document_id(&self) -> DocumentId; + + fn fields(&self) -> Option; + + fn id(&self) -> OperationId; + + fn previous_operations(&self) -> PreviousOperations; + + fn schema_id(&self) -> SchemaId; +} + +#[async_trait] +pub trait OperationStore { + /// Insert an operation into the db. + /// + /// The passed operation must implement the `AsStorageOperation` trait. Errors when + /// a fatal DB error occurs, returns true or false depending if the expected number + /// of insertions occured. + async fn insert_operation( + &self, + operation: &StorageOperation, + ) -> Result; + + /// Get an operation identified by it's OperationId. + /// + /// Returns a type implementing `AsStorageOperation` which includes `Author`, `DocumentId` and + /// `OperationId` metadata. + async fn get_operation_by_id( + &self, + id: OperationId, + ) -> Result, OperationStorageError>; + + /// Retrieve the id of the document an operation is contained within. + /// + /// If no document was found, then this method returns a result wrapping + /// a None variant. + async fn get_document_by_operation_id( + &self, + id: OperationId, + ) -> Result, OperationStorageError>; + + /// Get just the fields of an operation, identified by their OperationId. + async fn get_operation_fields_by_id( + &self, + id: OperationId, + ) -> Result; + + // async fn get_operation_value_by_id( + // &self, + // id: OperationId, + // name: String, + // ) -> Result; + + // async fn get_operations_by_document_id( + // &self, + // id: DocumentId, + // name: String, + // ) -> Result, OperationStorageError>; +} diff --git a/aquadoggo/src/db/utils.rs b/aquadoggo/src/db/utils.rs new file mode 100644 index 000000000..b4db7865e --- /dev/null +++ b/aquadoggo/src/db/utils.rs @@ -0,0 +1,114 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use p2panda_rs::document::{DocumentId, DocumentViewId}; +use p2panda_rs::operation::{ + OperationFields, OperationValue, PinnedRelation, PinnedRelationList, Relation, RelationList, +}; + +use crate::db::models::operation::OperationFieldRow; + +pub fn parse_operation_fields(operation_field_rows: Vec) -> OperationFields { + let mut relation_list: Vec = Vec::new(); + let mut pinned_relation_list: Vec = Vec::new(); + + let mut operation_fields = OperationFields::new(); + + // Iterate over returned field values, for each value: + // - if it is a simple value type, parse it into an OperationValue and add it to the operation_fields + // - if it is a relation list value type parse each item into a DocumentId/DocumentViewId and push to + // the suitable vec (instantiated above) + operation_field_rows.iter().for_each(|row| { + match row.field_type.as_str() { + "bool" => { + operation_fields + .add( + row.name.as_str(), + OperationValue::Boolean(row.value.parse::().unwrap()), + ) + .unwrap(); + } + "int" => { + operation_fields + .add( + row.name.as_str(), + OperationValue::Integer(row.value.parse::().unwrap()), + ) + .unwrap(); + } + "float" => { + operation_fields + .add( + row.name.as_str(), + OperationValue::Float(row.value.parse::().unwrap()), + ) + .unwrap(); + } + "str" => { + operation_fields + .add(row.name.as_str(), OperationValue::Text(row.value.clone())) + .unwrap(); + } + "relation" => { + operation_fields + .add( + row.name.as_str(), + OperationValue::Relation(Relation::new( + row.value.parse::().unwrap(), + )), + ) + .unwrap(); + } + // A special case, this is a list item, so we push it to a vec but _don't_ add it + // to the operation_fields yet. + "relation_list" => relation_list.push(row.value.parse::().unwrap()), + "pinned_relation" => { + operation_fields + .add( + row.name.as_str(), + OperationValue::PinnedRelation(PinnedRelation::new( + row.value.parse::().unwrap(), + )), + ) + .unwrap(); + } + // A special case, this is a list item, so we push it to a vec but _don't_ add it + // to the operation_fields yet. + "pinned_relation_list" => { + pinned_relation_list.push(row.value.parse::().unwrap()) + } + _ => (), + }; + }); + + // Find if there is at least one field containing a "relation_list" type + let relation_list_field = &operation_field_rows + .iter() + .find(|row| row.field_type == "relation_list"); + + // If so, then parse the `relation_list` vec into an operation value and add it to the document view fields + if let Some(relation_list_field) = relation_list_field { + operation_fields + .add( + relation_list_field.name.as_str(), + OperationValue::RelationList(RelationList::new(relation_list)), + ) + .unwrap(); + } + + // Find if there is at least one field containing a "pinned_relation_list" type + let pinned_relation_list_field = &operation_field_rows + .iter() + .find(|row| row.field_type == "pinned_relation_list"); + + // If so, then parse the `pinned_relation_list` vec into an operation value and add it to the document view fields + if let Some(pinned_relation_list_field) = pinned_relation_list_field { + operation_fields + .add( + pinned_relation_list_field.name.as_str(), + OperationValue::PinnedRelationList(PinnedRelationList::new(pinned_relation_list)), + ) + .unwrap(); + } + + operation_fields +} diff --git a/aquadoggo/src/rpc/api.rs b/aquadoggo/src/rpc/api.rs index 8a85265b9..093fa8431 100644 --- a/aquadoggo/src/rpc/api.rs +++ b/aquadoggo/src/rpc/api.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use jsonrpc_v2::{Data, MapRouter, Server as Service}; -use crate::db::store::SqlStorage; +use crate::db::provider::SqlStorage; use crate::db::Pool; use crate::rpc::methods::{get_entry_args, publish_entry, query_entries}; diff --git a/aquadoggo/src/rpc/methods/entry_args.rs b/aquadoggo/src/rpc/methods/entry_args.rs index 3962a7daa..24551c266 100644 --- a/aquadoggo/src/rpc/methods/entry_args.rs +++ b/aquadoggo/src/rpc/methods/entry_args.rs @@ -3,7 +3,7 @@ use jsonrpc_v2::{Data, Params}; use p2panda_rs::storage_provider::traits::StorageProvider; -use crate::db::store::SqlStorage; +use crate::db::provider::SqlStorage; use crate::errors::StorageProviderResult; use crate::rpc::request::EntryArgsRequest; use crate::rpc::response::EntryArgsResponse; @@ -76,8 +76,8 @@ mod tests { r#"{ "entryHashBacklink": null, "entryHashSkiplink": null, - "seqNum": "1", - "logId": "1" + "seqNum": 1, + "logId": 1 }"#, ); diff --git a/aquadoggo/src/rpc/methods/publish_entry.rs b/aquadoggo/src/rpc/methods/publish_entry.rs index 43e7901a2..4074e610a 100644 --- a/aquadoggo/src/rpc/methods/publish_entry.rs +++ b/aquadoggo/src/rpc/methods/publish_entry.rs @@ -3,7 +3,7 @@ use jsonrpc_v2::{Data, Params}; use p2panda_rs::storage_provider::traits::StorageProvider; -use crate::db::store::SqlStorage; +use crate::db::provider::SqlStorage; use crate::errors::StorageProviderResult; use crate::rpc::{PublishEntryRequest, PublishEntryResponse}; @@ -111,8 +111,8 @@ mod tests { r#"{{ "entryHashBacklink": "{}", "entryHashSkiplink": {}, - "seqNum": "{}", - "logId": "{}" + "seqNum": {}, + "logId": {} }}"#, entry_encoded.hash().as_str(), skiplink_str, @@ -374,9 +374,10 @@ mod tests { ), ); - let response = rpc_error( - "The backlink hash encoded in the entry does not match the lipmaa entry provided", - ); + let response = rpc_error(&format!( + "The backlink hash encoded in the entry: {} did not match the expected backlink hash", + entry_wrong_hash.hash() + )); assert_eq!(handle_http(&client, request).await, response); // Send invalid sequence number @@ -403,7 +404,7 @@ mod tests { ); let response = rpc_error(&format!( - "Could not find backlink entry in database with id: {}", + "Could not find expected backlink in database for entry with id: {}", entry_wrong_seq_num.hash() )); assert_eq!(handle_http(&client, request).await, response); diff --git a/aquadoggo/src/rpc/methods/query_entries.rs b/aquadoggo/src/rpc/methods/query_entries.rs index 56bc61945..d07f6a5b4 100644 --- a/aquadoggo/src/rpc/methods/query_entries.rs +++ b/aquadoggo/src/rpc/methods/query_entries.rs @@ -3,7 +3,7 @@ use jsonrpc_v2::{Data, Params}; use p2panda_rs::storage_provider::traits::EntryStore; -use crate::db::store::SqlStorage; +use crate::db::provider::SqlStorage; use crate::errors::StorageProviderResult; use crate::rpc::request::QueryEntriesRequest; use crate::rpc::response::QueryEntriesResponse; diff --git a/aquadoggo/src/rpc/response.rs b/aquadoggo/src/rpc/response.rs index f43e43505..a5750b4c9 100644 --- a/aquadoggo/src/rpc/response.rs +++ b/aquadoggo/src/rpc/response.rs @@ -2,10 +2,11 @@ use serde::Serialize; +use p2panda_rs::entry::{LogId, SeqNum}; use p2panda_rs::hash::Hash; use p2panda_rs::storage_provider::traits::{AsEntryArgsResponse, AsPublishEntryResponse}; -use crate::db::models::EntryRow; +use crate::db::models::entry::EntryRow; /// Response body of `panda_getEntryArguments`. /// @@ -15,22 +16,22 @@ use crate::db::models::EntryRow; pub struct EntryArgsResponse { pub entry_hash_backlink: Option, pub entry_hash_skiplink: Option, - pub seq_num: String, - pub log_id: String, + pub seq_num: SeqNum, + pub log_id: LogId, } impl AsEntryArgsResponse for EntryArgsResponse { fn new( entry_hash_backlink: Option, entry_hash_skiplink: Option, - seq_num: p2panda_rs::entry::SeqNum, - log_id: p2panda_rs::entry::LogId, + seq_num: SeqNum, + log_id: LogId, ) -> Self { EntryArgsResponse { entry_hash_backlink, entry_hash_skiplink, - seq_num: seq_num.as_u64().to_string(), - log_id: log_id.as_u64().to_string(), + seq_num, + log_id, } } } @@ -43,22 +44,22 @@ impl AsEntryArgsResponse for EntryArgsResponse { pub struct PublishEntryResponse { pub entry_hash_backlink: Option, pub entry_hash_skiplink: Option, - pub seq_num: String, - pub log_id: String, + pub seq_num: SeqNum, + pub log_id: LogId, } impl AsPublishEntryResponse for PublishEntryResponse { fn new( entry_hash_backlink: Option, entry_hash_skiplink: Option, - seq_num: p2panda_rs::entry::SeqNum, - log_id: p2panda_rs::entry::LogId, + seq_num: SeqNum, + log_id: LogId, ) -> Self { PublishEntryResponse { entry_hash_backlink, entry_hash_skiplink, - seq_num: seq_num.as_u64().to_string(), - log_id: log_id.as_u64().to_string(), + seq_num, + log_id, } } }