From f95ade01c2c869e73a496e341263b17032bffec6 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 23 May 2023 16:11:59 +0200 Subject: [PATCH 01/21] Introduce sorted_index column to operations_v1 table --- .../migrations/20230523135826_alter-operations.sql | 3 +++ aquadoggo/src/db/models/operation.rs | 4 ++++ aquadoggo/src/db/models/utils.rs | 13 +++++++++++++ aquadoggo/src/db/stores/operation.rs | 8 ++++++-- 4 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 aquadoggo/migrations/20230523135826_alter-operations.sql diff --git a/aquadoggo/migrations/20230523135826_alter-operations.sql b/aquadoggo/migrations/20230523135826_alter-operations.sql new file mode 100644 index 000000000..38a89a4d6 --- /dev/null +++ b/aquadoggo/migrations/20230523135826_alter-operations.sql @@ -0,0 +1,3 @@ +-- SPDX-License-Identifier: AGPL-3.0-or-later + +ALTER TABLE operations_v1 ADD COLUMN sorted_index NUMERIC; \ No newline at end of file diff --git a/aquadoggo/src/db/models/operation.rs b/aquadoggo/src/db/models/operation.rs index 4f8c8656b..da194902c 100644 --- a/aquadoggo/src/db/models/operation.rs +++ b/aquadoggo/src/db/models/operation.rs @@ -25,6 +25,8 @@ pub struct OperationRow { /// The previous operations of this operation concatenated into string format with `_` /// separator. pub previous: Option, + + pub sorted_index: Option, } /// A struct representing a single operation field row as it is inserted in the database. @@ -99,4 +101,6 @@ pub struct OperationFieldsJoinedRow { /// This numeric value is a simple list index to represent multiple values within one operation /// field. pub list_index: Option, + + pub sorted_index: Option, } diff --git a/aquadoggo/src/db/models/utils.rs b/aquadoggo/src/db/models/utils.rs index eb89aedea..ef2ef66bc 100644 --- a/aquadoggo/src/db/models/utils.rs +++ b/aquadoggo/src/db/models/utils.rs @@ -431,6 +431,7 @@ mod tests { field_type: Some("int".to_string()), value: Some("28".to_string()), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -449,6 +450,7 @@ mod tests { field_type: Some("float".to_string()), value: Some("3.5".to_string()), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -467,6 +469,7 @@ mod tests { field_type: Some("bool".to_string()), value: Some("false".to_string()), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -488,6 +491,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -509,6 +513,7 @@ mod tests { .to_string(), ), list_index: Some(1), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -530,6 +535,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -551,6 +557,7 @@ mod tests { .to_string(), ), list_index: Some(1), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -572,6 +579,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -593,6 +601,7 @@ mod tests { .to_string(), ), list_index: Some(1), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -614,6 +623,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -635,6 +645,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -653,6 +664,7 @@ mod tests { field_type: Some("str".to_string()), value: Some("bubu".to_string()), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -671,6 +683,7 @@ mod tests { field_type: Some("pinned_relation_list".to_string()), value: None, list_index: Some(0), + sorted_index: None, }, ]; diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index d93a93e35..193691b90 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -96,10 +96,11 @@ impl OperationStore for SqlStore { operation_id, action, schema_id, - previous + previous, + sorted_index ) VALUES - ($1, $2, $3, $4, $5, $6) + ($1, $2, $3, $4, $5, $6, null) ", ) .bind(public_key.to_string()) @@ -183,6 +184,7 @@ impl OperationStore for SqlStore { operations_v1.action, operations_v1.schema_id, operations_v1.previous, + operations_v1.sorted_index, operation_fields_v1.name, operation_fields_v1.field_type, operation_fields_v1.value, @@ -221,6 +223,7 @@ impl OperationStore for SqlStore { operations_v1.action, operations_v1.schema_id, operations_v1.previous, + operations_v1.sorted_index, operation_fields_v1.name, operation_fields_v1.field_type, operation_fields_v1.value, @@ -277,6 +280,7 @@ impl OperationStore for SqlStore { operations_v1.action, operations_v1.schema_id, operations_v1.previous, + operations_v1.sorted_index, operation_fields_v1.name, operation_fields_v1.field_type, operation_fields_v1.value, From 726e4df6e1b5f401c9ffdffa15da75dd0f9ae6ef Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 23 May 2023 16:23:17 +0200 Subject: [PATCH 02/21] Iterate over sorted operations in reduce task --- aquadoggo/src/materializer/tasks/reduce.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index b1be1ee73..2aaa2b26d 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -3,6 +3,7 @@ use std::convert::TryFrom; use log::{debug, info, trace}; +use p2panda_rs::document::materialization::build_graph; use p2panda_rs::document::traits::AsDocument; use p2panda_rs::document::{Document, DocumentBuilder, DocumentId, DocumentViewId}; use p2panda_rs::operation::traits::{AsOperation, WithPublicKey}; @@ -206,6 +207,13 @@ async fn reduce_document + WithPublicKey>( if document_view_exists { return Ok(None); + }; + + let operations = DocumentBuilder::from(operations).operations(); + let sorted_operations = build_graph(&operations).unwrap().sort().unwrap().sorted(); + + for (index, (id, _, _)) in sorted_operations.iter().enumerate() { + // @TODO: Update operations in document with correct sorted index } // Insert this document into storage. If it already existed, this will update it's From acf4b6248da7cdaecd2568e658d0fec2a303bce3 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 23 May 2023 16:55:26 +0200 Subject: [PATCH 03/21] Update operation sorted index when reducing documents --- aquadoggo/src/db/stores/operation.rs | 28 +++++++++++++++++++++- aquadoggo/src/materializer/tasks/reduce.rs | 12 ++++++++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index 193691b90..c0fca19c7 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -12,7 +12,7 @@ use p2panda_rs::operation::{Operation, OperationId}; use p2panda_rs::schema::SchemaId; use p2panda_rs::storage_provider::error::OperationStorageError; use p2panda_rs::storage_provider::traits::OperationStore; -use sqlx::{query, query_as, query_scalar}; +use sqlx::{query, query_as, query_scalar, Any}; use crate::db::models::utils::{parse_operation_rows, parse_value_to_string_vec}; use crate::db::models::{DocumentViewFieldRow, OperationFieldsJoinedRow}; @@ -324,6 +324,32 @@ impl OperationStore for SqlStore { } } +impl SqlStore { + pub async fn update_operation_index( + &self, + operation_id: &OperationId, + sorted_index: i32, + ) -> Result<(), OperationStorageError> { + query::( + " + UPDATE + operations_v1 + SET + sorted_index = $2 + WHERE + operation_id = $1 + ", + ) + .bind(operation_id.as_str()) + .bind(sorted_index) + .fetch_all(&self.pool) + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + Ok(()) + } +} + #[derive(Debug, Clone, Eq, PartialEq)] pub struct OperationCursor(String); diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 2aaa2b26d..5b59aa20f 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -209,11 +209,19 @@ async fn reduce_document + WithPublicKey>( return Ok(None); }; + // @TODO: Make sorted operations available after building the document above so we can skip this step. let operations = DocumentBuilder::from(operations).operations(); let sorted_operations = build_graph(&operations).unwrap().sort().unwrap().sorted(); - for (index, (id, _, _)) in sorted_operations.iter().enumerate() { - // @TODO: Update operations in document with correct sorted index + // Iterate over the sorted document operations and update their sorted index on the + // operations_v1 table. + for (index, (operation_id, _, _)) in sorted_operations.iter().enumerate() { + let sorted_index = { index + 1 } as i32; + context + .store + .update_operation_index(operation_id, sorted_index) + .await + .map_err(|err| TaskError::Critical(err.to_string()))?; } // Insert this document into storage. If it already existed, this will update it's From 5aacdce48d1d1c5c0863a0cf29b569f76b82fbaa Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 23 May 2023 22:50:26 +0100 Subject: [PATCH 04/21] Order operations by their index when topological sorted in SQL queries --- aquadoggo/src/db/stores/operation.rs | 110 ++++++++++++++++----------- 1 file changed, 66 insertions(+), 44 deletions(-) diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index c0fca19c7..b305aa059 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -236,7 +236,9 @@ impl OperationStore for SqlStore { WHERE operations_v1.document_id = $1 ORDER BY - operation_fields_v1.list_index ASC + -- order the operations by their index when topologically sorted, in the case where this may not be set yet + -- we fall back to ordering by operation id. In both cases we additionally order by list index. + operations_v1.sorted_index ASC, operations_v1.operation_id ASC, operation_fields_v1.list_index ASC ", ) .bind(id.as_str()) @@ -244,26 +246,11 @@ impl OperationStore for SqlStore { .await .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - let mut grouped_operation_rows: BTreeMap> = - BTreeMap::new(); - - for operation_row in operation_rows { - if let Some(current_operations) = - grouped_operation_rows.get_mut(&operation_row.operation_id) - { - current_operations.push(operation_row) - } else { - grouped_operation_rows - .insert(operation_row.clone().operation_id, vec![operation_row]); - }; + if operation_rows.is_empty() { + return Ok(vec![]); } - let operations: Vec = grouped_operation_rows - .iter() - .filter_map(|(_id, operation_rows)| parse_operation_rows(operation_rows.to_owned())) - .collect(); - - Ok(operations) + Ok(group_and_parse_operation_rows(operation_rows)) } /// Get all operations that are part of a given document. @@ -293,7 +280,7 @@ impl OperationStore for SqlStore { WHERE operations_v1.schema_id = $1 ORDER BY - operation_fields_v1.list_index ASC + operations_v1.operation_id ASC, operation_fields_v1.list_index ASC ", ) .bind(id.to_string()) @@ -301,27 +288,53 @@ impl OperationStore for SqlStore { .await .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - let mut grouped_operation_rows: BTreeMap> = - BTreeMap::new(); - - for operation_row in operation_rows { - if let Some(current_operations) = - grouped_operation_rows.get_mut(&operation_row.operation_id) - { - current_operations.push(operation_row) - } else { - grouped_operation_rows - .insert(operation_row.clone().operation_id, vec![operation_row]); - }; + if operation_rows.is_empty() { + return Ok(vec![]); } - let operations: Vec = grouped_operation_rows - .iter() - .filter_map(|(_id, operation_rows)| parse_operation_rows(operation_rows.to_owned())) - .collect(); + Ok(group_and_parse_operation_rows(operation_rows)) + } +} - Ok(operations) +/// Parse a collection of operation rows from multiple operations, into a list of `StorageOperation`. +/// +/// Expects the rows to be grouped by operation id. +fn group_and_parse_operation_rows( + operation_rows: Vec, +) -> Vec { + // We need to group all the operation rows so they can be parsed into operations. + // They come from the database ordered by their index once topologically sorted when + // present, otherwise by operation id. List items are additionally ordered by their + // list index. + + let mut grouped_operation_rows = vec![]; + + let mut current_operation_id = operation_rows.first().unwrap().operation_id.clone(); + let mut current_operation_rows = vec![]; + + let mut operation_rows_iter = operation_rows.into_iter(); + while let Some(row) = operation_rows_iter.next() { + if row.operation_id == current_operation_id { + // If this row is part of the current operation push it to the current rows vec. + current_operation_rows.push(row); + } else { + // If we've moved on to the next operation, then push the complete vec of + // operation rows to the grouped rows collection and then setup for the next + // iteration. + grouped_operation_rows.push(current_operation_rows.clone()); + current_operation_id = row.operation_id.clone(); + current_operation_rows = vec![row]; + } } + + // Push the final operation to the grouped rows. + grouped_operation_rows.push(current_operation_rows); + + // Parse all the operation rows into operations. + grouped_operation_rows + .into_iter() + .filter_map(|operation_rows| parse_operation_rows(operation_rows.to_owned())) + .collect() } impl SqlStore { @@ -389,7 +402,8 @@ impl From<&DocumentViewFieldRow> for OperationCursor { #[cfg(test)] mod tests { - use p2panda_rs::document::DocumentId; + use p2panda_rs::document::materialization::build_graph; + use p2panda_rs::document::{DocumentBuilder, DocumentId}; use p2panda_rs::identity::PublicKey; use p2panda_rs::operation::traits::{AsOperation, WithPublicKey}; use p2panda_rs::operation::{Operation, OperationAction, OperationBuilder, OperationId}; @@ -400,11 +414,13 @@ mod tests { document_id, operation, operation_id, operation_with_schema, public_key, random_document_view_id, random_operation_id, random_previous_operations, schema_id, }; - use p2panda_rs::test_utils::memory_store::helpers::{populate_store, PopulateStoreConfig}; + use p2panda_rs::test_utils::memory_store::helpers::PopulateStoreConfig; use p2panda_rs::WithId; use rstest::rstest; - use crate::test_utils::{doggo_fields, populate_store_config, test_runner, TestNode}; + use crate::test_utils::{ + doggo_fields, populate_and_materialize, populate_store_config, test_runner, TestNode, + }; use super::OperationCursor; @@ -561,9 +577,9 @@ mod tests { #[with(10, 1, 1)] config: PopulateStoreConfig, ) { - test_runner(|node: TestNode| async move { - // Populate the store with some entries and operations but DON'T materialise any resulting documents. - let (_, document_ids) = populate_store(&node.context.store, &config).await; + test_runner(|mut node: TestNode| async move { + // Populate the store with some entries and operations and materialize documents. + let (_, document_ids) = populate_and_materialize(&mut node, &config).await; let document_id = document_ids.get(0).expect("At least one document id"); let operations_by_document_id = node @@ -574,7 +590,13 @@ mod tests { .expect("Get operations by their document id"); // We expect the number of operations returned to match the expected number. - assert_eq!(operations_by_document_id.len(), 10) + assert_eq!(operations_by_document_id.len(), 10); + + // The operations should be in their topologically sorted order. + let operations = DocumentBuilder::from(&operations_by_document_id).operations(); + let expected_operation_order = build_graph(&operations).unwrap().sort().unwrap().sorted(); + + assert_eq!(operations, expected_operation_order); }); } From 197cb7771763554bf500cc89021e5d9eef203719 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 23 May 2023 22:58:49 +0100 Subject: [PATCH 05/21] Test for reduce task updating operation sorted index --- aquadoggo/src/db/types/operation.rs | 2 +- aquadoggo/src/materializer/tasks/reduce.rs | 111 ++++++++++++++++++++- 2 files changed, 111 insertions(+), 2 deletions(-) diff --git a/aquadoggo/src/db/types/operation.rs b/aquadoggo/src/db/types/operation.rs index 231a89239..f86c506f8 100644 --- a/aquadoggo/src/db/types/operation.rs +++ b/aquadoggo/src/db/types/operation.rs @@ -7,7 +7,7 @@ use p2panda_rs::operation::{OperationAction, OperationFields, OperationId, Opera use p2panda_rs::schema::SchemaId; use p2panda_rs::WithId; -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct StorageOperation { /// The id of the document this operation is part of. pub(crate) document_id: DocumentId, diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 5b59aa20f..a458eadd5 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -276,7 +276,11 @@ mod tests { use p2panda_rs::document::materialization::build_graph; use p2panda_rs::document::traits::AsDocument; - use p2panda_rs::document::{Document, DocumentBuilder, DocumentId, DocumentViewId}; + use p2panda_rs::document::{ + Document, DocumentBuilder, DocumentId, DocumentViewFields, DocumentViewId, + DocumentViewValue, + }; + use p2panda_rs::operation::traits::AsOperation; use p2panda_rs::operation::OperationValue; use p2panda_rs::schema::Schema; use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore}; @@ -287,6 +291,7 @@ mod tests { use p2panda_rs::test_utils::memory_store::helpers::{ populate_store, send_to_store, PopulateStoreConfig, }; + use p2panda_rs::WithId; use rstest::rstest; use crate::materializer::tasks::reduce_task; @@ -600,4 +605,108 @@ mod tests { assert!(reduce_task(node.context.clone(), input).await.is_ok()); }) } + + #[rstest] + fn updates_operations_sorted_index( + schema: Schema, + #[from(populate_store_config)] + #[with( + 3, + 1, + 1, + false, + constants::schema(), + constants::test_fields(), + constants::test_fields() + )] + config: PopulateStoreConfig, + ) { + test_runner(move |node: TestNode| async move { + // Populate the store with some entries and operations but DON'T materialise any resulting documents. + let (key_pairs, document_ids) = populate_store(&node.context.store, &config).await; + let document_id = document_ids + .get(0) + .expect("There should be at least one document id"); + + let key_pair = key_pairs + .get(0) + .expect("There should be at least one key_pair"); + + // Now we create and insert an UPDATE operation for this document which is pointing at + // the root CREATE operation. + let (_, _) = send_to_store( + &node.context.store, + &operation( + Some(operation_fields(vec![( + "username", + OperationValue::String("hello".to_string()), + )])), + Some(document_id.as_str().parse().unwrap()), + schema.id().to_owned(), + ), + &schema, + key_pair, + ) + .await + .unwrap(); + + // Before running the reduce task retrieve the operations. These should not be in + // their topologically sorted order. + let pre_materialization_operations = node + .context + .store + .get_operations_by_document_id(&document_id) + .await + .unwrap(); + + // Run a reduce task with the document id as input. + let input = TaskInput::new(Some(document_id.clone()), None); + assert!(reduce_task(node.context.clone(), input.clone()) + .await + .is_ok()); + + // Retrieve the operations again, they should now be in their topologically sorted order. + let post_materialization_operations = node + .context + .store + .get_operations_by_document_id(&document_id) + .await + .unwrap(); + + // Check we got 4 operations both times. + assert_eq!(pre_materialization_operations.len(), 4); + assert_eq!(post_materialization_operations.len(), 4); + // Check the ordering is different. + assert_ne!( + pre_materialization_operations, + post_materialization_operations + ); + + // The first operation should be a CREATE. + let create_operation = post_materialization_operations.first().unwrap(); + assert!(create_operation.is_create()); + + // Reduce the operations to a document view. + let mut document_view_fields = DocumentViewFields::new(); + for operation in post_materialization_operations { + let fields = operation.fields().unwrap(); + for (key, value) in fields.iter() { + let document_view_value = DocumentViewValue::new(operation.id(), value); + document_view_fields.insert(key, document_view_value); + } + } + + // Retrieve the expected document from the store. + let expected_document = node + .context + .store + .get_document(document_id) + .await + .unwrap() + .unwrap(); + + // The fields should be the same. + assert_eq!(document_view_fields, *expected_document.fields().unwrap()); + }) + } } From 926f3f6f2d1833eb95f60e4b2b056d402ace7c80 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 23 May 2023 22:59:05 +0100 Subject: [PATCH 06/21] Remove unused import --- aquadoggo/src/db/stores/operation.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index b305aa059..dced0f94c 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -1,6 +1,5 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use std::collections::BTreeMap; use std::fmt::Display; use async_trait::async_trait; From 8a7fa0d3ac25155cb2bb60f2a9e8d8c0fd4c04cf Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 23 May 2023 23:04:32 +0100 Subject: [PATCH 07/21] Make clippy happy --- aquadoggo/src/db/stores/operation.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index dced0f94c..79f568166 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -311,8 +311,7 @@ fn group_and_parse_operation_rows( let mut current_operation_id = operation_rows.first().unwrap().operation_id.clone(); let mut current_operation_rows = vec![]; - let mut operation_rows_iter = operation_rows.into_iter(); - while let Some(row) = operation_rows_iter.next() { + for row in operation_rows { if row.operation_id == current_operation_id { // If this row is part of the current operation push it to the current rows vec. current_operation_rows.push(row); @@ -332,7 +331,7 @@ fn group_and_parse_operation_rows( // Parse all the operation rows into operations. grouped_operation_rows .into_iter() - .filter_map(|operation_rows| parse_operation_rows(operation_rows.to_owned())) + .filter_map(parse_operation_rows) .collect() } @@ -593,7 +592,8 @@ mod tests { // The operations should be in their topologically sorted order. let operations = DocumentBuilder::from(&operations_by_document_id).operations(); - let expected_operation_order = build_graph(&operations).unwrap().sort().unwrap().sorted(); + let expected_operation_order = + build_graph(&operations).unwrap().sort().unwrap().sorted(); assert_eq!(operations, expected_operation_order); }); From ac46607ecc113f25d87854224b6c8590ce50de72 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 14 Jun 2023 05:27:49 +0100 Subject: [PATCH 08/21] Doc strings --- aquadoggo/src/db/models/operation.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/aquadoggo/src/db/models/operation.rs b/aquadoggo/src/db/models/operation.rs index da194902c..a5a12b290 100644 --- a/aquadoggo/src/db/models/operation.rs +++ b/aquadoggo/src/db/models/operation.rs @@ -26,6 +26,15 @@ pub struct OperationRow { /// separator. pub previous: Option, + /// Index for the position of this operation once topological sorting of the operation graph + /// has been performed. + /// + /// This is an option as when an operation which is not appended to the tip of a + /// document graph is handled then a `reduce` materialization task will be issued and all + /// indexes for operations in the effected document re-calculated. + /// + /// If this value is None we can assume the operation has not been processed yet and we are + /// waiting for the reduce task to complete. pub sorted_index: Option, } @@ -102,5 +111,14 @@ pub struct OperationFieldsJoinedRow { /// field. pub list_index: Option, + /// Index for the position of this operation once topological sorting of the operation graph + /// has been performed. + /// + /// This is an option as when an operation which is not appended to the tip of a + /// document graph is handled then a `reduce` materialization task will be issued and all + /// indexes for operations in the effected document re-calculated. + /// + /// If this value is None we can assume the operation has not been processed yet and we are + /// waiting for the reduce task to complete. pub sorted_index: Option, } From 0c0ead93fbfc78e1bd24be48b30e40cb1f8049a3 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 14 Jun 2023 05:28:27 +0100 Subject: [PATCH 09/21] Add insert_operation_with_index method to store --- aquadoggo/src/db/stores/operation.rs | 187 ++++++++++++++------------- 1 file changed, 100 insertions(+), 87 deletions(-) diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index 79f568166..0b2c713a9 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -76,94 +76,8 @@ impl OperationStore for SqlStore { operation: &Operation, document_id: &DocumentId, ) -> Result<(), OperationStorageError> { - // Start a transaction, any db insertions after this point, and before the `commit()` will - // be rolled back in the event of an error. - let mut tx = self - .pool - .begin() + self.insert_operation_with_index(id, public_key, operation, document_id, None) .await - .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - - // Construct query for inserting operation an row, execute it and check exactly one row was - // affected. - query( - " - INSERT INTO - operations_v1 ( - public_key, - document_id, - operation_id, - action, - schema_id, - previous, - sorted_index - ) - VALUES - ($1, $2, $3, $4, $5, $6, null) - ", - ) - .bind(public_key.to_string()) - .bind(document_id.as_str()) - .bind(id.as_str()) - .bind(operation.action().as_str()) - .bind(operation.schema_id().to_string()) - .bind( - operation - .previous() - .map(|document_view_id| document_view_id.to_string()), - ) - .execute(&mut tx) - .await - .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - - let mut results = Vec::new(); - if let Some(fields) = operation.fields() { - for (name, value) in fields.iter() { - // If the value is a relation_list or pinned_relation_list we need to insert a new - // field row for every item in the list. Here we collect these items and return - // them in a vector. If this operation value is anything except for the above list - // types, we will return a vec containing a single item. - let db_values = parse_value_to_string_vec(value); - - for (index, db_value) in db_values.into_iter().enumerate() { - let cursor = OperationCursor::new(index, name, id); - - let result = query( - " - INSERT INTO - operation_fields_v1 ( - operation_id, - name, - field_type, - value, - list_index, - cursor - ) - VALUES - ($1, $2, $3, $4, $5, $6) - ", - ) - .bind(id.as_str().to_owned()) - .bind(name.to_owned()) - .bind(value.field_type().to_string()) - .bind(db_value) - .bind(index as i32) - .bind(cursor.to_string()) - .execute(&mut tx) - .await - .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - - results.push(result); - } - } - }; - - // Commit the transaction. - tx.commit() - .await - .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - - Ok(()) } /// Get an operation identified by it's `OperationId`. @@ -359,6 +273,105 @@ impl SqlStore { Ok(()) } + + pub async fn insert_operation_with_index( + &self, + id: &OperationId, + public_key: &PublicKey, + operation: &Operation, + document_id: &DocumentId, + sorted_index: Option, + ) -> Result<(), OperationStorageError> { + // Start a transaction, any db insertions after this point, and before the `commit()` will + // be rolled back in the event of an error. + let mut tx = self + .pool + .begin() + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + // Construct query for inserting operation an row, execute it and check exactly one row was + // affected. + query( + " + INSERT INTO + operations_v1 ( + public_key, + document_id, + operation_id, + action, + schema_id, + previous, + sorted_index + ) + VALUES + ($1, $2, $3, $4, $5, $6, $7) + ", + ) + .bind(public_key.to_string()) + .bind(document_id.as_str()) + .bind(id.as_str()) + .bind(operation.action().as_str()) + .bind(operation.schema_id().to_string()) + .bind( + operation + .previous() + .map(|document_view_id| document_view_id.to_string()), + ) + .bind(sorted_index) + .execute(&mut tx) + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + let mut results = Vec::new(); + if let Some(fields) = operation.fields() { + for (name, value) in fields.iter() { + // If the value is a relation_list or pinned_relation_list we need to insert a new + // field row for every item in the list. Here we collect these items and return + // them in a vector. If this operation value is anything except for the above list + // types, we will return a vec containing a single item. + let db_values = parse_value_to_string_vec(value); + + for (index, db_value) in db_values.into_iter().enumerate() { + let cursor = OperationCursor::new(index, name, id); + + let result = query( + " + INSERT INTO + operation_fields_v1 ( + operation_id, + name, + field_type, + value, + list_index, + cursor + ) + VALUES + ($1, $2, $3, $4, $5, $6) + ", + ) + .bind(id.as_str().to_owned()) + .bind(name.to_owned()) + .bind(value.field_type().to_string()) + .bind(db_value) + .bind(index as i32) + .bind(cursor.to_string()) + .execute(&mut tx) + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + results.push(result); + } + } + }; + + // Commit the transaction. + tx.commit() + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + Ok(()) + } } #[derive(Debug, Clone, Eq, PartialEq)] From ffad6eb24cdc66efa929efaa5d107feac14875f0 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 28 Jun 2023 16:36:14 +0900 Subject: [PATCH 10/21] Operation sorted index starts from 0 --- aquadoggo/src/materializer/tasks/reduce.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index a458eadd5..1d60971a0 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -216,10 +216,9 @@ async fn reduce_document + WithPublicKey>( // Iterate over the sorted document operations and update their sorted index on the // operations_v1 table. for (index, (operation_id, _, _)) in sorted_operations.iter().enumerate() { - let sorted_index = { index + 1 } as i32; context .store - .update_operation_index(operation_id, sorted_index) + .update_operation_index(operation_id, index as i32) .await .map_err(|err| TaskError::Critical(err.to_string()))?; } From b7e31b6330916e7678e14cb2e0cb35b1f50d031b Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 4 Jul 2023 11:59:06 +0900 Subject: [PATCH 11/21] Make sorted_index an INT --- aquadoggo/migrations/20230523135826_alter-operations.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/migrations/20230523135826_alter-operations.sql b/aquadoggo/migrations/20230523135826_alter-operations.sql index 38a89a4d6..a894d8117 100644 --- a/aquadoggo/migrations/20230523135826_alter-operations.sql +++ b/aquadoggo/migrations/20230523135826_alter-operations.sql @@ -1,3 +1,3 @@ -- SPDX-License-Identifier: AGPL-3.0-or-later -ALTER TABLE operations_v1 ADD COLUMN sorted_index NUMERIC; \ No newline at end of file +ALTER TABLE operations_v1 ADD COLUMN sorted_index INT; \ No newline at end of file From a030fc8aa951a557c79df76c3fc6aa1a33f40ede Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 28 Jun 2023 15:32:29 +0900 Subject: [PATCH 12/21] Include missed change in utils --- aquadoggo/src/db/models/utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/aquadoggo/src/db/models/utils.rs b/aquadoggo/src/db/models/utils.rs index ef2ef66bc..ba742b601 100644 --- a/aquadoggo/src/db/models/utils.rs +++ b/aquadoggo/src/db/models/utils.rs @@ -188,6 +188,7 @@ pub fn parse_operation_rows( previous: operation.previous(), fields: operation.fields(), public_key, + sorted_index, }; Some(operation) From c17eada893570d5dcfc3ca6846dbccbb97e3af4d Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 6 Jul 2023 16:04:43 +0900 Subject: [PATCH 13/21] Add sorted_index to StorageOperation --- aquadoggo/src/db/models/utils.rs | 1 + aquadoggo/src/db/types/operation.rs | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/aquadoggo/src/db/models/utils.rs b/aquadoggo/src/db/models/utils.rs index ba742b601..f35dc9c54 100644 --- a/aquadoggo/src/db/models/utils.rs +++ b/aquadoggo/src/db/models/utils.rs @@ -35,6 +35,7 @@ pub fn parse_operation_rows( let public_key = PublicKey::new(&first_row.public_key).unwrap(); let operation_id = first_row.operation_id.parse().unwrap(); let document_id = first_row.document_id.parse().unwrap(); + let sorted_index = first_row.sorted_index; let mut relation_lists: BTreeMap> = BTreeMap::new(); let mut pinned_relation_lists: BTreeMap> = BTreeMap::new(); diff --git a/aquadoggo/src/db/types/operation.rs b/aquadoggo/src/db/types/operation.rs index f86c506f8..813dc39bb 100644 --- a/aquadoggo/src/db/types/operation.rs +++ b/aquadoggo/src/db/types/operation.rs @@ -32,6 +32,12 @@ pub struct StorageOperation { /// The public key of the key pair used to publish this operation. pub(crate) public_key: PublicKey, + + /// Index for the position of this operation once topological sorting of the operation graph + /// has been performed. + /// + /// Is None when the operation has not been materialized into it's document yet. + pub(crate) sorted_index: Option, } impl WithPublicKey for StorageOperation { From 497dd07d8e90c756822fce309cb73943768d50fe Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 7 Jul 2023 15:05:16 +0900 Subject: [PATCH 14/21] TaskInput is now an enum --- aquadoggo/src/materializer/tasks/reduce.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 1d60971a0..aa69d0759 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -659,7 +659,7 @@ mod tests { .unwrap(); // Run a reduce task with the document id as input. - let input = TaskInput::new(Some(document_id.clone()), None); + let input = TaskInput::DocumentId(document_id.clone()); assert!(reduce_task(node.context.clone(), input.clone()) .await .is_ok()); From d955e4ef3d08f06dd83414a7f1f3a246655bf191 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 7 Jul 2023 15:23:37 +0900 Subject: [PATCH 15/21] fmt --- aquadoggo/src/db/models/operation.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aquadoggo/src/db/models/operation.rs b/aquadoggo/src/db/models/operation.rs index a5a12b290..129616c05 100644 --- a/aquadoggo/src/db/models/operation.rs +++ b/aquadoggo/src/db/models/operation.rs @@ -28,11 +28,11 @@ pub struct OperationRow { /// Index for the position of this operation once topological sorting of the operation graph /// has been performed. - /// + /// /// This is an option as when an operation which is not appended to the tip of a /// document graph is handled then a `reduce` materialization task will be issued and all /// indexes for operations in the effected document re-calculated. - /// + /// /// If this value is None we can assume the operation has not been processed yet and we are /// waiting for the reduce task to complete. pub sorted_index: Option, @@ -113,11 +113,11 @@ pub struct OperationFieldsJoinedRow { /// Index for the position of this operation once topological sorting of the operation graph /// has been performed. - /// + /// /// This is an option as when an operation which is not appended to the tip of a /// document graph is handled then a `reduce` materialization task will be issued and all /// indexes for operations in the effected document re-calculated. - /// + /// /// If this value is None we can assume the operation has not been processed yet and we are /// waiting for the reduce task to complete. pub sorted_index: Option, From bebc40ae265791f0022a897f9261c13e076283c6 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 7 Jul 2023 16:35:33 +0900 Subject: [PATCH 16/21] Make insert_operation_with_index private --- aquadoggo/src/db/stores/operation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index 0b2c713a9..c776a8fa0 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -274,7 +274,7 @@ impl SqlStore { Ok(()) } - pub async fn insert_operation_with_index( + async fn insert_operation_with_index( &self, id: &OperationId, public_key: &PublicKey, From e7e9a015a1954bee13968cef5b79f16385eb88ec Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 7 Jul 2023 16:35:44 +0900 Subject: [PATCH 17/21] Update doc strings --- aquadoggo/src/db/stores/operation.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index c776a8fa0..a1352d0cf 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -58,17 +58,13 @@ impl OperationStore for SqlStore { Ok(document_id.map(|id_str| id_str.parse().unwrap())) } - /// Insert an operation into storage. - /// - /// This requires a DoggoOperation to be composed elsewhere, it contains an `PublicKey`, - /// `DocumentId`, `OperationId` and the actual `Operation` we want to store. - /// - /// Returns a result containing `true` when one insertion occured, and false when no insertions - /// occured. Errors when a fatal storage error occurs. - /// - /// In aquadoggo we store an operation in the database in three different tables: `operations`, - /// `previous` and `operation_fields`. This means that this method actually makes 3 - /// different sets of insertions. + /// Insert an operation into storage along with the `PublicKey` of the author who created it the + /// `DocumentId` of the document it's part of and it's `OperationId` derived from the hash of + /// the entry it was published with. + /// + /// The `sorted_index` of the inserted operation will be set to `null` as this value is only + /// available once materialization has occurred. Use `update_operation_index` to set this + /// value. async fn insert_operation( &self, id: &OperationId, @@ -250,6 +246,8 @@ fn group_and_parse_operation_rows( } impl SqlStore { + /// Update the sorted index of an operation. This method is used in `reduce` tasks as each + /// operation is processed. pub async fn update_operation_index( &self, operation_id: &OperationId, @@ -274,6 +272,8 @@ impl SqlStore { Ok(()) } + /// Insert an operation as well as the index for it's position in the document after + /// materialization has occurred. async fn insert_operation_with_index( &self, id: &OperationId, From 57813bd66a94ee437009b4afaa54eb08c61d9bc6 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 7 Jul 2023 16:38:03 +0900 Subject: [PATCH 18/21] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08a8dd46f..b4b66da76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Naive protocol replication [#380](https://github.com/p2panda/aquadoggo/pull/380) - Integrate replication manager with networking stack [#387](https://github.com/p2panda/aquadoggo/pull/387) 🥞 - Reverse lookup for pinned relations in dependency task [#434](https://github.com/p2panda/aquadoggo/pull/434) +- Persist and maintain index of operation's position in document [#438](https://github.com/p2panda/aquadoggo/pull/438) ### Changed From 46101da106918a3757ebdb3bbf7750be6e3c663b Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 7 Jul 2023 16:39:26 +0900 Subject: [PATCH 19/21] fmt --- aquadoggo/src/db/stores/operation.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index a1352d0cf..678cd4e8d 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -58,10 +58,10 @@ impl OperationStore for SqlStore { Ok(document_id.map(|id_str| id_str.parse().unwrap())) } - /// Insert an operation into storage along with the `PublicKey` of the author who created it the + /// Insert an operation into storage along with the `PublicKey` of the author who created it the /// `DocumentId` of the document it's part of and it's `OperationId` derived from the hash of /// the entry it was published with. - /// + /// /// The `sorted_index` of the inserted operation will be set to `null` as this value is only /// available once materialization has occurred. Use `update_operation_index` to set this /// value. @@ -273,7 +273,7 @@ impl SqlStore { } /// Insert an operation as well as the index for it's position in the document after - /// materialization has occurred. + /// materialization has occurred. async fn insert_operation_with_index( &self, id: &OperationId, From 2d93eb5d702bef562078d253441ddf8e8b1aa0c1 Mon Sep 17 00:00:00 2001 From: adz Date: Fri, 7 Jul 2023 13:35:39 +0200 Subject: [PATCH 20/21] Simplify doc-string --- aquadoggo/src/db/models/operation.rs | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/aquadoggo/src/db/models/operation.rs b/aquadoggo/src/db/models/operation.rs index 129616c05..67d0f1b16 100644 --- a/aquadoggo/src/db/models/operation.rs +++ b/aquadoggo/src/db/models/operation.rs @@ -26,15 +26,10 @@ pub struct OperationRow { /// separator. pub previous: Option, - /// Index for the position of this operation once topological sorting of the operation graph - /// has been performed. + /// Index of this operation once topological sorting of the operation graph has been performed. /// - /// This is an option as when an operation which is not appended to the tip of a - /// document graph is handled then a `reduce` materialization task will be issued and all - /// indexes for operations in the effected document re-calculated. - /// - /// If this value is None we can assume the operation has not been processed yet and we are - /// waiting for the reduce task to complete. + /// If this value is `None` we can assume the operation has not been processed yet and we are + /// waiting for the `reduce` task to complete materialization. pub sorted_index: Option, } @@ -111,14 +106,9 @@ pub struct OperationFieldsJoinedRow { /// field. pub list_index: Option, - /// Index for the position of this operation once topological sorting of the operation graph - /// has been performed. - /// - /// This is an option as when an operation which is not appended to the tip of a - /// document graph is handled then a `reduce` materialization task will be issued and all - /// indexes for operations in the effected document re-calculated. + /// Index of this operation once topological sorting of the operation graph has been performed. /// - /// If this value is None we can assume the operation has not been processed yet and we are - /// waiting for the reduce task to complete. + /// If this value is `None` we can assume the operation has not been processed yet and we are + /// waiting for the `reduce` task to complete materialization. pub sorted_index: Option, } From 5864c35e0f086e54679eac4aab3051774d0ce13f Mon Sep 17 00:00:00 2001 From: adz Date: Fri, 7 Jul 2023 13:47:20 +0200 Subject: [PATCH 21/21] Update doc-strings --- aquadoggo/src/db/stores/operation.rs | 43 +++++++++++++--------------- aquadoggo/src/db/types/operation.rs | 2 +- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index 678cd4e8d..d70e438b1 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -58,13 +58,14 @@ impl OperationStore for SqlStore { Ok(document_id.map(|id_str| id_str.parse().unwrap())) } - /// Insert an operation into storage along with the `PublicKey` of the author who created it the - /// `DocumentId` of the document it's part of and it's `OperationId` derived from the hash of - /// the entry it was published with. + /// Insert an operation into storage. /// - /// The `sorted_index` of the inserted operation will be set to `null` as this value is only - /// available once materialization has occurred. Use `update_operation_index` to set this - /// value. + /// The `PublicKey` is determined by the author who created the operation, the `DocumentId` is + /// of the document this operation is part of and the `OperationId` is derived from the hash of + /// the `Entry` it was published with. + /// + /// The `sorted_index` of the inserted operation will be set to `None` as this value is only + /// available once materialization completed. Use `update_operation_index` to set this value. async fn insert_operation( &self, id: &OperationId, @@ -139,14 +140,14 @@ impl OperationStore for SqlStore { operation_fields_v1.list_index FROM operations_v1 - LEFT JOIN operation_fields_v1 - ON - operation_fields_v1.operation_id = operations_v1.operation_id + LEFT JOIN operation_fields_v1 + ON operation_fields_v1.operation_id = operations_v1.operation_id WHERE operations_v1.document_id = $1 ORDER BY - -- order the operations by their index when topologically sorted, in the case where this may not be set yet - -- we fall back to ordering by operation id. In both cases we additionally order by list index. + -- order the operations by their index when topologically sorted, in the case where + -- this may not be set yet we fall back to ordering by operation id. In both cases + -- we additionally order by list index. operations_v1.sorted_index ASC, operations_v1.operation_id ASC, operation_fields_v1.list_index ASC ", ) @@ -183,9 +184,8 @@ impl OperationStore for SqlStore { operation_fields_v1.list_index FROM operations_v1 - LEFT JOIN operation_fields_v1 - ON - operation_fields_v1.operation_id = operations_v1.operation_id + LEFT JOIN operation_fields_v1 + ON operation_fields_v1.operation_id = operations_v1.operation_id WHERE operations_v1.schema_id = $1 ORDER BY @@ -211,11 +211,9 @@ impl OperationStore for SqlStore { fn group_and_parse_operation_rows( operation_rows: Vec, ) -> Vec { - // We need to group all the operation rows so they can be parsed into operations. - // They come from the database ordered by their index once topologically sorted when - // present, otherwise by operation id. List items are additionally ordered by their - // list index. - + // We need to group all the operation rows so they can be parsed into operations. They come + // from the database ordered by their index once topologically sorted when present, otherwise + // by operation id. List items are additionally ordered by their list index. let mut grouped_operation_rows = vec![]; let mut current_operation_id = operation_rows.first().unwrap().operation_id.clone(); @@ -226,9 +224,8 @@ fn group_and_parse_operation_rows( // If this row is part of the current operation push it to the current rows vec. current_operation_rows.push(row); } else { - // If we've moved on to the next operation, then push the complete vec of - // operation rows to the grouped rows collection and then setup for the next - // iteration. + // If we've moved on to the next operation, then push the complete vec of operation + // rows to the grouped rows collection and then setup for the next iteration. grouped_operation_rows.push(current_operation_rows.clone()); current_operation_id = row.operation_id.clone(); current_operation_rows = vec![row]; @@ -255,7 +252,7 @@ impl SqlStore { ) -> Result<(), OperationStorageError> { query::( " - UPDATE + UPDATE operations_v1 SET sorted_index = $2 diff --git a/aquadoggo/src/db/types/operation.rs b/aquadoggo/src/db/types/operation.rs index 813dc39bb..2af007136 100644 --- a/aquadoggo/src/db/types/operation.rs +++ b/aquadoggo/src/db/types/operation.rs @@ -36,7 +36,7 @@ pub struct StorageOperation { /// Index for the position of this operation once topological sorting of the operation graph /// has been performed. /// - /// Is None when the operation has not been materialized into it's document yet. + /// Is `None` when the operation has not been materialized into it's document yet. pub(crate) sorted_index: Option, }