From 925db9d8b5db46907f0f21275b7568c6244a53d4 Mon Sep 17 00:00:00 2001 From: adz Date: Tue, 20 Jun 2023 18:34:27 +0200 Subject: [PATCH 1/8] Pick the right cursor when assembling documents --- aquadoggo/src/db/stores/query.rs | 285 +++++++++++++++++++++++++++-- aquadoggo/src/graphql/resolvers.rs | 5 +- 2 files changed, 272 insertions(+), 18 deletions(-) diff --git a/aquadoggo/src/db/stores/query.rs b/aquadoggo/src/db/stores/query.rs index 925e723cc..27e567550 100644 --- a/aquadoggo/src/db/stores/query.rs +++ b/aquadoggo/src/db/stores/query.rs @@ -2,6 +2,7 @@ //! This module offers a query API to find many p2panda documents, filtered or sorted by custom //! parameters. The results are batched via cursor-based pagination. +use std::collections::HashMap; use std::fmt::Display; use std::str::FromStr; @@ -936,7 +937,14 @@ impl SqlStore { list: Option<&RelationList>, ) -> Result { // Get all selected application fields from query - let application_fields = args.select.application_fields(); + let mut application_fields = args.select.application_fields(); + + // Add the field we order by to the fields we want to query in case it is not selected + if let Some(Field::Field(field_name)) = &args.order.field { + if !application_fields.contains(field_name) { + application_fields.push(field_name.clone()); + } + } // Generate SQL based on the given schema and query let mut select_vec = vec![ @@ -1068,7 +1076,7 @@ impl SqlStore { }; // Finally convert everything into the right format - let documents = convert_rows(rows, list, &application_fields, schema.id()); + let documents = convert_rows(rows, &args.order, list, &application_fields, schema.id()); // Determine cursors for pagination by looking at beginning and end of results let start_cursor = if args @@ -1158,9 +1166,16 @@ impl SqlStore { /// Merges all operation fields from the database into documents. /// /// Due to the special table layout we receive one row per operation field in the query. Usually we -/// need to merge multiple rows / operation fields fields into one document. +/// need to merge multiple rows / operation fields into one document. +/// +/// This method also selects one cursor per document which then can be used by the client to +/// control pagination. Since cursors are unique per document fields this method selects the right +/// one depending on what ordering was selected. For example if the client chose to order by +/// "timestamp" then the cursor will be selected for each document pointing at the "timestamp" +/// field. Like this we can correctly paginate over an ordered collection of documents. fn convert_rows( rows: Vec, + order: &Order, list: Option<&RelationList>, fields: &ApplicationFields, schema_id: &SchemaId, @@ -1173,8 +1188,10 @@ fn convert_rows( // Helper method to convert database row into final document and cursor type let finalize_document = |row: &QueryRow, - collected_fields: Vec| + collected_fields: Vec, + collected_rows: &HashMap| -> (PaginationCursor, StorageDocument) { + // Convert all gathered data into final `StorageDocument` format let fields = parse_document_view_field_rows(collected_fields); let document = StorageDocument { @@ -1186,15 +1203,31 @@ fn convert_rows( deleted: row.is_deleted, }; - let cursor = row_to_cursor(row, list); + // Determine cursor for this document + let cursor = match &order.field { + Some(Field::Field(field_name)) => { + // Since pagination is affected by ordering, we need to pick the cursor from the + // operation field our query is ordered by + row_to_cursor( + collected_rows + .get(field_name) + .expect("Field selected for ordering needs to be inside of document"), + list, + ) + } + _ => { + // For any other ordering (meta or undefined) we just pick the first row / field + // which came in for that document + row_to_cursor(row, list) + } + }; (cursor, document) }; - let last_row = rows.last().unwrap().clone(); - let mut current = rows[0].clone(); let mut current_fields = Vec::new(); + let mut current_rows = HashMap::new(); let rows_per_document = std::cmp::max(fields.len(), 1); @@ -1202,7 +1235,7 @@ fn convert_rows( // We observed a new document coming up in the next row, time to change if index % rows_per_document == 0 && index > 0 { // Finalize the current document, convert it and push it into the final array - let (cursor, document) = finalize_document(¤t, current_fields); + let (cursor, document) = finalize_document(¤t, current_fields, ¤t_rows); converted.push((cursor, document)); // Change the pointer to the next document @@ -1210,7 +1243,10 @@ fn convert_rows( current_fields = Vec::new(); } - // Collect every field of the document + // Collect original rows from SQL query + current_rows.insert(row.name.clone(), row.clone()); + + // Collect every field of the document to assemble it later into a `StorageDocument` current_fields.push(DocumentViewFieldRow { document_id: row.document_id, document_view_id: row.document_view_id, @@ -1223,7 +1259,7 @@ fn convert_rows( } // Do it one last time at the end for the last document - let (cursor, document) = finalize_document(&last_row, current_fields); + let (cursor, document) = finalize_document(¤t, current_fields, ¤t_rows); converted.push((cursor, document)); converted @@ -1393,6 +1429,50 @@ mod tests { .await } + async fn create_chat_test_data( + node: &mut TestNode, + key_pair: &KeyPair, + ) -> (Schema, Vec) { + add_schema_and_documents( + node, + "chat", + vec![ + vec![ + ("message", "Hello, Panda!".into(), None), + ("username", "penguin".into(), None), + ("timestamp", 1687265969.into(), None), + ], + vec![ + ("message", "Oh, howdy, Pengi!".into(), None), + ("username", "panda".into(), None), + ("timestamp", 1687266014.into(), None), + ], + vec![ + ("message", "How are you?".into(), None), + ("username", "panda".into(), None), + ("timestamp", 1687266032.into(), None), + ], + vec![ + ("message", "I miss Pengolina. How about you?".into(), None), + ("username", "penguin".into(), None), + ("timestamp", 1687266055.into(), None), + ], + vec![ + ("message", "I'm cute and very hungry".into(), None), + ("username", "panda".into(), None), + ("timestamp", 1687266141.into(), None), + ], + vec![ + ("message", "(°◇°) !!".into(), None), + ("username", "penguin".into(), None), + ("timestamp", 1687266160.into(), None), + ], + ], + key_pair, + ) + .await + } + #[rstest] #[case::order_by_date_asc( Query::new( @@ -1484,6 +1564,121 @@ mod tests { }); } + #[rstest] + #[case::order_by_timestamp( + Order::new(&"timestamp".into(), &Direction::Ascending), + "message".into(), + vec![ + "Hello, Panda!".into(), + "Oh, howdy, Pengi!".into(), + "How are you?".into(), + "I miss Pengolina. How about you?".into(), + "I'm cute and very hungry".into(), + "(°◇°) !!".into(), + ], + )] + #[case::order_by_timestamp_descending( + Order::new(&"timestamp".into(), &Direction::Descending), + "message".into(), + vec![ + "(°◇°) !!".into(), + "I'm cute and very hungry".into(), + "I miss Pengolina. How about you?".into(), + "How are you?".into(), + "Oh, howdy, Pengi!".into(), + "Hello, Panda!".into(), + ], + )] + #[case::order_by_message( + Order::new(&"message".into(), &Direction::Ascending), + "message".into(), + vec![ + "(°◇°) !!".into(), + "Hello, Panda!".into(), + "How are you?".into(), + "I miss Pengolina. How about you?".into(), + "I'm cute and very hungry".into(), + "Oh, howdy, Pengi!".into(), + ], + )] + fn pagination_over_ordered_fields( + key_pair: KeyPair, + #[case] order: Order, + #[case] selected_field: String, + #[case] expected_fields: Vec, + ) { + test_runner(|mut node: TestNode| async move { + let (schema, view_ids) = create_chat_test_data(&mut node, &key_pair).await; + + let mut cursor: Option = None; + + let mut args = Query::new( + &Pagination::new( + &NonZeroU64::new(1).unwrap(), + cursor.as_ref(), + &vec![ + PaginationField::TotalCount, + PaginationField::EndCursor, + PaginationField::HasNextPage, + ], + ), + &Select::new(&[ + Field::Meta(MetaField::DocumentViewId), + Field::Field("message".into()), + Field::Field("username".into()), + Field::Field("timestamp".into()), + ]), + &Filter::default(), + &order, + ); + + // Go through all pages, one document at a time + for (index, expected_field) in expected_fields.into_iter().enumerate() { + args.pagination.after = cursor; + + let (pagination_data, documents) = node + .context + .store + .query(&schema, &args, None) + .await + .expect("Query failed"); + + match pagination_data.end_cursor { + Some(end_cursor) => { + cursor = Some(end_cursor); + } + None => panic!("Expected cursor"), + } + + if view_ids.len() - 1 == index { + assert_eq!(pagination_data.has_next_page, false); + } else { + assert_eq!(pagination_data.has_next_page, true); + } + + assert_eq!(pagination_data.total_count, Some(view_ids.len() as u64)); + assert_eq!(documents.len(), 1); + assert_eq!(documents[0].1.get(&selected_field), Some(&expected_field)); + assert_eq!(cursor.as_ref(), Some(&documents[0].0)); + } + + // Query one last time after we paginated through everything + args.pagination.after = cursor; + + let (pagination_data, documents) = node + .context + .store + .query(&schema, &args, None) + .await + .expect("Query failed"); + + assert_eq!(pagination_data.total_count, Some(view_ids.len() as u64)); + assert_eq!(pagination_data.end_cursor, None); + assert_eq!(pagination_data.has_next_page, false); + assert_eq!(documents.len(), 0); + }); + } + #[rstest] fn pagination_over_ordered_view_ids(key_pair: KeyPair) { test_runner(|mut node: TestNode| async move { @@ -2108,17 +2303,26 @@ mod tests { let root_cursor_2 = Hash::new_from_bytes(&[0, 2]).to_string(); let cursor_1 = Hash::new_from_bytes(&[0, 3]).to_string(); let cursor_2 = Hash::new_from_bytes(&[0, 4]).to_string(); + let cursor_3 = Hash::new_from_bytes(&[0, 5]).to_string(); + let cursor_4 = Hash::new_from_bytes(&[0, 6]).to_string(); let query_rows = vec![ // First document + // ============== QueryRow { document_id: first_document_hash.clone(), document_view_id: first_document_hash.clone(), operation_id: first_document_hash.clone(), is_deleted: false, is_edited: false, + // This is the "root" cursor, marking the position of the document inside a + // relation list root_cursor: root_cursor_1.clone(), - cmp_value_cursor: cursor_1.clone(), + // This is the "field" cursor, marking the value we're using to paginate the + // resulting documents with. + // + // Cursors are unique for each operation field. + cmp_value_cursor: cursor_1.clone(), // Cursor #1 owner: OptionalOwner::default(), name: "username".to_string(), value: Some("panda".to_string()), @@ -2132,7 +2336,7 @@ mod tests { is_deleted: false, is_edited: false, root_cursor: root_cursor_1.clone(), - cmp_value_cursor: cursor_1.clone(), + cmp_value_cursor: cursor_2.clone(), // Cursor #2 owner: OptionalOwner::default(), name: "is_admin".to_string(), value: Some("false".to_string()), @@ -2140,6 +2344,7 @@ mod tests { list_index: 0, }, // Second document + // =============== QueryRow { document_id: second_document_hash.clone(), document_view_id: second_document_hash.clone(), @@ -2147,7 +2352,7 @@ mod tests { is_deleted: false, is_edited: false, root_cursor: root_cursor_2.clone(), - cmp_value_cursor: cursor_2.clone(), + cmp_value_cursor: cursor_3.clone(), // Cursor #3 owner: OptionalOwner::default(), name: "username".to_string(), value: Some("penguin".to_string()), @@ -2161,7 +2366,7 @@ mod tests { is_deleted: false, is_edited: false, root_cursor: root_cursor_2.clone(), - cmp_value_cursor: cursor_2.clone(), + cmp_value_cursor: cursor_4.clone(), // Cursor #4 owner: OptionalOwner::default(), name: "is_admin".to_string(), value: Some("true".to_string()), @@ -2170,9 +2375,14 @@ mod tests { }, ]; - // convert as if this is a relation list query + // 1. + // + // Convert query rows into documents as if this is a relation list query. We do this by + // passing in the relation list information (the "root") from where this query was executed + // from let result = convert_rows( query_rows.clone(), + &Order::default(), Some(&RelationList::new_unpinned( &relation_list_hash.parse().unwrap(), "relation_list_field", @@ -2182,6 +2392,9 @@ mod tests { ); assert_eq!(result.len(), 2); + + // We expect the cursor of the first query row to be returned per document when no ordering + // was set, that is cursor #1 and #3 assert_eq!( result[0].0, PaginationCursor::new( @@ -2192,16 +2405,54 @@ mod tests { ); assert_eq!( result[1].0, + PaginationCursor::new( + OperationCursor::from(cursor_3.as_str()), + Some(OperationCursor::from(root_cursor_2.as_str())), + Some(relation_list_hash.parse().unwrap()) + ) + ); + + // 2. + // + // Now we do the same but this time we pass in an ordering + let result = convert_rows( + query_rows.clone(), + &Order::new(&"is_admin".into(), &Direction::Ascending), + Some(&RelationList::new_unpinned( + &relation_list_hash.parse().unwrap(), + "relation_list_field", + )), + &vec!["username".to_string(), "is_admin".to_string()], + &schema_id, + ); + + assert_eq!(result.len(), 2); + + // We expect the cursor of the "is_admin" query row to be returned per document as that + // ordering was set, that is cursor #2 and #4 + assert_eq!( + result[0].0, PaginationCursor::new( OperationCursor::from(cursor_2.as_str()), + Some(OperationCursor::from(root_cursor_1.as_str())), + Some(relation_list_hash.parse().unwrap()) + ) + ); + assert_eq!( + result[1].0, + PaginationCursor::new( + OperationCursor::from(cursor_4.as_str()), Some(OperationCursor::from(root_cursor_2.as_str())), Some(relation_list_hash.parse().unwrap()) ) ); - // convert as if this is _not_ a relation list query + // 3. + // + // We pretend now that this query was executed without a relation list let result = convert_rows( query_rows, + &Order::default(), None, &vec!["username".to_string(), "is_admin".to_string()], &schema_id, @@ -2214,7 +2465,7 @@ mod tests { ); assert_eq!( result[1].0, - PaginationCursor::new(OperationCursor::from(cursor_2.as_str()), None, None) + PaginationCursor::new(OperationCursor::from(cursor_3.as_str()), None, None) ); } diff --git a/aquadoggo/src/graphql/resolvers.rs b/aquadoggo/src/graphql/resolvers.rs index 7ec45414a..7ce9fc1ea 100644 --- a/aquadoggo/src/graphql/resolvers.rs +++ b/aquadoggo/src/graphql/resolvers.rs @@ -131,7 +131,10 @@ pub async fn resolve_document_field( // Determine name of the field to be resolved let name = ctx.field().name(); - match document.get(name).unwrap() { + match document + .get(name) + .expect("Selected field should be in document") + { // Relation fields are expected to resolve to the related document OperationValue::Relation(relation) => { let document = match store.get_document(relation.document_id()).await? { From f50e8bec48889b325041cc82487d8427c97c8bd5 Mon Sep 17 00:00:00 2001 From: adz Date: Tue, 20 Jun 2023 18:47:54 +0200 Subject: [PATCH 2/8] Sorted strings are different in postgres --- aquadoggo/src/db/stores/query.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aquadoggo/src/db/stores/query.rs b/aquadoggo/src/db/stores/query.rs index 27e567550..63262fe2f 100644 --- a/aquadoggo/src/db/stores/query.rs +++ b/aquadoggo/src/db/stores/query.rs @@ -1458,7 +1458,7 @@ mod tests { ("timestamp", 1687266055.into(), None), ], vec![ - ("message", "I'm cute and very hungry".into(), None), + ("message", "I am cute and very hungry".into(), None), ("username", "panda".into(), None), ("timestamp", 1687266141.into(), None), ], @@ -1573,7 +1573,7 @@ mod tests { "Oh, howdy, Pengi!".into(), "How are you?".into(), "I miss Pengolina. How about you?".into(), - "I'm cute and very hungry".into(), + "I am cute and very hungry".into(), "(°◇°) !!".into(), ], )] @@ -1582,7 +1582,7 @@ mod tests { "message".into(), vec![ "(°◇°) !!".into(), - "I'm cute and very hungry".into(), + "I am cute and very hungry".into(), "I miss Pengolina. How about you?".into(), "How are you?".into(), "Oh, howdy, Pengi!".into(), @@ -1597,7 +1597,7 @@ mod tests { "Hello, Panda!".into(), "How are you?".into(), "I miss Pengolina. How about you?".into(), - "I'm cute and very hungry".into(), + "I am cute and very hungry".into(), "Oh, howdy, Pengi!".into(), ], )] From bc2cc519c9dadc6049b82abf9a141dacc94e7f49 Mon Sep 17 00:00:00 2001 From: adz Date: Tue, 20 Jun 2023 21:42:16 +0200 Subject: [PATCH 3/8] Always select last cursor but for pagination the ordered one --- aquadoggo/src/db/stores/query.rs | 120 +++++++++++-------------------- 1 file changed, 43 insertions(+), 77 deletions(-) diff --git a/aquadoggo/src/db/stores/query.rs b/aquadoggo/src/db/stores/query.rs index 63262fe2f..6ff429e35 100644 --- a/aquadoggo/src/db/stores/query.rs +++ b/aquadoggo/src/db/stores/query.rs @@ -640,8 +640,24 @@ fn where_pagination_sql( {cmp_field} FROM operation_fields_v1 + LEFT JOIN + document_view_fields + ON document_view_fields.operation_id = operation_fields_v1.operation_id WHERE - operation_fields_v1.cursor = '{operation_cursor}' + document_view_fields.document_view_id = ( + SELECT + document_view_fields.document_view_id + FROM + operation_fields_v1 + LEFT JOIN + document_view_fields + ON document_view_fields.operation_id = operation_fields_v1.operation_id + WHERE + operation_fields_v1.cursor = '{operation_cursor}' + LIMIT 1 + ) + AND operation_fields_v1.name = '{order_field_name}' + LIMIT 1 "# ); @@ -937,14 +953,7 @@ impl SqlStore { list: Option<&RelationList>, ) -> Result { // Get all selected application fields from query - let mut application_fields = args.select.application_fields(); - - // Add the field we order by to the fields we want to query in case it is not selected - if let Some(Field::Field(field_name)) = &args.order.field { - if !application_fields.contains(field_name) { - application_fields.push(field_name.clone()); - } - } + let application_fields = args.select.application_fields(); // Generate SQL based on the given schema and query let mut select_vec = vec![ @@ -1076,7 +1085,7 @@ impl SqlStore { }; // Finally convert everything into the right format - let documents = convert_rows(rows, &args.order, list, &application_fields, schema.id()); + let documents = convert_rows(rows, list, &application_fields, schema.id()); // Determine cursors for pagination by looking at beginning and end of results let start_cursor = if args @@ -1175,7 +1184,6 @@ impl SqlStore { /// field. Like this we can correctly paginate over an ordered collection of documents. fn convert_rows( rows: Vec, - order: &Order, list: Option<&RelationList>, fields: &ApplicationFields, schema_id: &SchemaId, @@ -1191,6 +1199,20 @@ fn convert_rows( collected_fields: Vec, collected_rows: &HashMap| -> (PaginationCursor, StorageDocument) { + // Determine cursor for this document by looking at the last field + let cursor = { + let last_field = collected_fields + .last() + .expect("Needs to be at least one field"); + + row_to_cursor( + collected_rows + .get(&last_field.name) + .expect("Field selected for ordering needs to be inside of document"), + list, + ) + }; + // Convert all gathered data into final `StorageDocument` format let fields = parse_document_view_field_rows(collected_fields); @@ -1203,34 +1225,15 @@ fn convert_rows( deleted: row.is_deleted, }; - // Determine cursor for this document - let cursor = match &order.field { - Some(Field::Field(field_name)) => { - // Since pagination is affected by ordering, we need to pick the cursor from the - // operation field our query is ordered by - row_to_cursor( - collected_rows - .get(field_name) - .expect("Field selected for ordering needs to be inside of document"), - list, - ) - } - _ => { - // For any other ordering (meta or undefined) we just pick the first row / field - // which came in for that document - row_to_cursor(row, list) - } - }; - (cursor, document) }; + let rows_per_document = std::cmp::max(fields.len(), 1); + let mut current = rows[0].clone(); - let mut current_fields = Vec::new(); + let mut current_fields = Vec::with_capacity(rows_per_document); let mut current_rows = HashMap::new(); - let rows_per_document = std::cmp::max(fields.len(), 1); - for (index, row) in rows.into_iter().enumerate() { // We observed a new document coming up in the next row, time to change if index % rows_per_document == 0 && index > 0 { @@ -1240,7 +1243,7 @@ fn convert_rows( // Change the pointer to the next document current = row.clone(); - current_fields = Vec::new(); + current_fields = Vec::with_capacity(rows_per_document); } // Collect original rows from SQL query @@ -1596,8 +1599,8 @@ mod tests { "(°◇°) !!".into(), "Hello, Panda!".into(), "How are you?".into(), - "I miss Pengolina. How about you?".into(), "I am cute and very hungry".into(), + "I miss Pengolina. How about you?".into(), "Oh, howdy, Pengi!".into(), ], )] @@ -2382,42 +2385,6 @@ mod tests { // from let result = convert_rows( query_rows.clone(), - &Order::default(), - Some(&RelationList::new_unpinned( - &relation_list_hash.parse().unwrap(), - "relation_list_field", - )), - &vec!["username".to_string(), "is_admin".to_string()], - &schema_id, - ); - - assert_eq!(result.len(), 2); - - // We expect the cursor of the first query row to be returned per document when no ordering - // was set, that is cursor #1 and #3 - assert_eq!( - result[0].0, - PaginationCursor::new( - OperationCursor::from(cursor_1.as_str()), - Some(OperationCursor::from(root_cursor_1.as_str())), - Some(relation_list_hash.parse().unwrap()) - ) - ); - assert_eq!( - result[1].0, - PaginationCursor::new( - OperationCursor::from(cursor_3.as_str()), - Some(OperationCursor::from(root_cursor_2.as_str())), - Some(relation_list_hash.parse().unwrap()) - ) - ); - - // 2. - // - // Now we do the same but this time we pass in an ordering - let result = convert_rows( - query_rows.clone(), - &Order::new(&"is_admin".into(), &Direction::Ascending), Some(&RelationList::new_unpinned( &relation_list_hash.parse().unwrap(), "relation_list_field", @@ -2428,8 +2395,8 @@ mod tests { assert_eq!(result.len(), 2); - // We expect the cursor of the "is_admin" query row to be returned per document as that - // ordering was set, that is cursor #2 and #4 + // We expect the cursor of the last query row to be returned per document, that is cursor + // #2 and #4 assert_eq!( result[0].0, PaginationCursor::new( @@ -2447,12 +2414,11 @@ mod tests { ) ); - // 3. + // 2. // // We pretend now that this query was executed without a relation list let result = convert_rows( query_rows, - &Order::default(), None, &vec!["username".to_string(), "is_admin".to_string()], &schema_id, @@ -2461,11 +2427,11 @@ mod tests { assert_eq!(result.len(), 2); assert_eq!( result[0].0, - PaginationCursor::new(OperationCursor::from(cursor_1.as_str()), None, None) + PaginationCursor::new(OperationCursor::from(cursor_2.as_str()), None, None) ); assert_eq!( result[1].0, - PaginationCursor::new(OperationCursor::from(cursor_3.as_str()), None, None) + PaginationCursor::new(OperationCursor::from(cursor_4.as_str()), None, None) ); } From 9715df8ed3af9a6910a5623aabcfed9d25b47941 Mon Sep 17 00:00:00 2001 From: adz Date: Tue, 20 Jun 2023 21:55:37 +0200 Subject: [PATCH 4/8] Move comment to the right section --- aquadoggo/src/db/stores/query.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/aquadoggo/src/db/stores/query.rs b/aquadoggo/src/db/stores/query.rs index 6ff429e35..c447a336f 100644 --- a/aquadoggo/src/db/stores/query.rs +++ b/aquadoggo/src/db/stores/query.rs @@ -634,6 +634,11 @@ fn where_pagination_sql( let cmp_field = typecast_field_sql("operation_fields_v1.value", order_field_name, schema, false); + // Since cursors are unique per document fields this method selects the right one + // depending on what ordering was selected. For example if the client chose to order by + // "timestamp" then the cursor will be selected for each document pointing at the + // "timestamp" field. Like this we can correctly paginate over an ordered collection of + // documents. let cmp_value = format!( r#" SELECT @@ -1176,12 +1181,6 @@ impl SqlStore { /// /// Due to the special table layout we receive one row per operation field in the query. Usually we /// need to merge multiple rows / operation fields into one document. -/// -/// This method also selects one cursor per document which then can be used by the client to -/// control pagination. Since cursors are unique per document fields this method selects the right -/// one depending on what ordering was selected. For example if the client chose to order by -/// "timestamp" then the cursor will be selected for each document pointing at the "timestamp" -/// field. Like this we can correctly paginate over an ordered collection of documents. fn convert_rows( rows: Vec, list: Option<&RelationList>, From ae86354ff6086a400b73ed15a227622a64258e20 Mon Sep 17 00:00:00 2001 From: adz Date: Wed, 21 Jun 2023 11:14:18 +0200 Subject: [PATCH 5/8] Write more about pagination --- aquadoggo/src/db/stores/query.rs | 161 ++++++++++++++++++++++++++----- 1 file changed, 135 insertions(+), 26 deletions(-) diff --git a/aquadoggo/src/db/stores/query.rs b/aquadoggo/src/db/stores/query.rs index c447a336f..2108a5976 100644 --- a/aquadoggo/src/db/stores/query.rs +++ b/aquadoggo/src/db/stores/query.rs @@ -27,7 +27,7 @@ use crate::db::SqlStore; /// Configure query to select documents based on a relation list field. pub struct RelationList { /// View id of document which holds relation list field. - pub root: DocumentViewId, + pub root_view_id: DocumentViewId, /// Field which contains the relation list values. pub field: FieldName, @@ -42,17 +42,17 @@ pub enum RelationListType { } impl RelationList { - pub fn new_unpinned(root: &DocumentViewId, field: &str) -> Self { + pub fn new_unpinned(root_view_id: &DocumentViewId, field: &str) -> Self { Self { - root: root.to_owned(), + root_view_id: root_view_id.to_owned(), field: field.to_string(), list_type: RelationListType::Unpinned, } } - pub fn new_pinned(root: &DocumentViewId, field: &str) -> Self { + pub fn new_pinned(root_view_id: &DocumentViewId, field: &str) -> Self { Self { - root: root.to_owned(), + root_view_id: root_view_id.to_owned(), field: field.to_string(), list_type: RelationListType::Pinned, } @@ -478,6 +478,40 @@ fn where_filter_sql(filter: &Filter, schema: &Schema) -> (String, Vec, fields: &ApplicationFields, @@ -485,20 +519,24 @@ fn where_pagination_sql( schema: &Schema, order: &Order, ) -> String { + // No pagination cursor was given + if pagination.after.is_none() { + return "".to_string(); + } + // Ignore pagination if we're in a relation list query and the cursor does not match the parent // document view id if let (Some(relation_list), Some(cursor)) = (list, pagination.after.as_ref()) { - if Some(&relation_list.root) != cursor.root_view_id.as_ref() { + if Some(&relation_list.root_view_id) != cursor.root_view_id.as_ref() { return "".to_string(); } } - if pagination.after.is_none() { - return "".to_string(); - } - - // Unwrap as we know now that a cursor exists - let cursor = pagination.after.as_ref().unwrap(); + // Unwrap as we know now that a cursor exists at this point + let cursor = pagination + .after + .as_ref() + .expect("Expect cursor to be set at this point"); let operation_cursor = &cursor.operation_cursor; let cursor_sql = match list { @@ -521,10 +559,30 @@ fn where_pagination_sql( }; match &order.field { - // If no ordering has been applied we can simply paginate over the unique cursor. If we're - // in a relation list we need to paginate over the unique list index. + // 1. No ordering has been chosen by the client + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + // + // We order by default which is either: + // + // a) Simply paginate over document view ids, from the view where the cursor points at + // b) If we're in a relation list, we paginate over the list index values, from where the + // root_cursor points at None => match list { None => { + // Collection of all documents following a certain schema id: + // + // ------------------------------------------------- + // document_id | view_id | field_name | ... | cursor + // ------------------------------------------------- + // 0x01 | 0x01 | username | ... | 0xa0 + // 0x01 | 0x01 | message | ... | 0xc2 <--- Select + // 0x02 | 0x02 | username | ... | 0x06 + // 0x02 | 0x02 | message | ... | 0x8b + // ------------------------------------------------- + // + // -> Select document_view_id at cursor 0xc2 + // -> Show results from document_view_id > 0x01 + // @TODO: Factor this out into separate SQL pre-query let cmp_value = format!( r#" SELECT @@ -542,6 +600,22 @@ fn where_pagination_sql( format!("AND documents.document_view_id > ({cmp_value})") } Some(_) => { + // All documents mentioned in a root document's relation list, note that duplicates + // are possible in relation lists: + // + // --------------------------------------------------||------------------------- + // Documents from relation list || Relation list + // --------------------------------------------------||------------------------- + // document_id | view_id | field_name | ... | cursor || list_index | root_cursor + // --------------------------------------------------||------------------------- + // 0x03 | 0x03 | username | ... | 0x99 || 0 | 0x54 <--- Select + // 0x03 | 0x03 | message | ... | 0xc2 || 0 | 0x54 + // 0x01 | 0x01 | username | ... | 0xcd || 1 | 0x8a + // 0x01 | 0x01 | message | ... | 0xea || 1 | 0x8a + // --------------------------------------------------||------------------------- + // + // -> Select list_index of relation list at root_cursor 0x54 + // -> Show results from list_index > 0 let root_cursor = cursor .root_operation_cursor .as_ref() @@ -565,10 +639,15 @@ fn where_pagination_sql( } }, - // Ordering over a meta field + // 2. Ordering over a meta field + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + // + // We select the meta data from the document the cursor points at and use it to paginate + // over. Some(Field::Meta(meta_field)) => { let (cmp_value, cmp_field) = match meta_field { MetaField::DocumentId => { + // Select document_id of operation where the cursor points at let cmp_value = format!( r#" SELECT @@ -586,6 +665,7 @@ fn where_pagination_sql( (cmp_value, "documents.document_id") } MetaField::DocumentViewId => { + // Select document_view_id of operation where the cursor points at let cmp_value = format!( r#" SELECT @@ -609,10 +689,17 @@ fn where_pagination_sql( }; if fields.is_empty() && list.is_none() { - // When a root query was grouped by documents / no fields have been selected we can - // assume that document id and view id are unique + // If: + // + // 1. No application fields have been selected by the client + // 2. We're not paginating over a relation list + // + // .. then we can do a simplification of the query, since the results were grouped + // by documents and we can be sure that for each row the document id and view are + // unique. format!("AND {cmp_field} {cmp_direction} ({cmp_value})") } else { + // Cursor-based pagination format!( r#" AND ( @@ -629,16 +716,37 @@ fn where_pagination_sql( } } - // Ordering over an application field + // 3. Ordering over an application field + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + // + // Cursors are always pointing at the last field of a document. In the following example + // this is the "message" field. Since we've ordered the results by "timestamp" we need to + // manually find out what this value is by tracing back the cursor to the document to that + // ordered field. + // + // Collection of all documents, ordered by "timestamp", following a certain schema id: + // + // ------------------------------------------------- + // document_id | view_id | field_name | ... | cursor + // ------------------------------------------------- + // 0x01 | 0x01 | username | ... | 0xa0 + // 0x01 | 0x01 | timestamp | ... | 0x72 <-- Compare + // 0x01 | 0x01 | message | ... | 0xc2 <-- Select + // 0x02 | 0x02 | username | ... | 0x06 + // 0x02 | 0x02 | timestamp | ... | 0x8f + // 0x02 | 0x02 | message | ... | 0x8b + // ------------------------------------------------- + // + // -> Select document_view_id where cursor points at + // -> Find "timestamp" field of that document and use this value + // -> Show results from "timestamp" value > other "timestamp" values Some(Field::Field(order_field_name)) => { let cmp_field = typecast_field_sql("operation_fields_v1.value", order_field_name, schema, false); - // Since cursors are unique per document fields this method selects the right one - // depending on what ordering was selected. For example if the client chose to order by - // "timestamp" then the cursor will be selected for each document pointing at the - // "timestamp" field. Like this we can correctly paginate over an ordered collection of - // documents. + // Select the value we want to compare with from the document the cursor is pointing + // at. This is the value which we also order the whole results by + // @TODO: Factor this out into separate SQL pre-query let cmp_value = format!( r#" SELECT @@ -666,6 +774,7 @@ fn where_pagination_sql( "# ); + // Cursor-based pagination format!( r#" AND EXISTS ( @@ -887,7 +996,7 @@ fn where_sql(schema: &Schema, fields: &ApplicationFields, list: Option<&Relation Some(relation_list) => { // Filter by the parent document view id of this relation list let field_name = &relation_list.field; - let view_id = &relation_list.root; + let root_view_id = &relation_list.root_view_id; let field_type = match relation_list.list_type { RelationListType::Pinned => "pinned_relation_list", RelationListType::Unpinned => "relation_list", @@ -895,7 +1004,7 @@ fn where_sql(schema: &Schema, fields: &ApplicationFields, list: Option<&Relation format!( r#" - document_view_fields_list.document_view_id = '{view_id}' + document_view_fields_list.document_view_id = '{root_view_id}' AND operation_fields_v1_list.field_type = '{field_type}' AND @@ -1277,7 +1386,7 @@ fn row_to_cursor(row: &QueryRow, list: Option<&RelationList>) -> PaginationCurso PaginationCursor::new( row.cmp_value_cursor.as_str().into(), Some(row.root_cursor.as_str().into()), - Some(relation_list.root.clone()), + Some(relation_list.root_view_id.clone()), ) } None => PaginationCursor::new(row.cmp_value_cursor.as_str().into(), None, None), From e064ababd249e5323399acd1bdfe6d632a315eeb Mon Sep 17 00:00:00 2001 From: adz Date: Wed, 21 Jun 2023 11:19:37 +0200 Subject: [PATCH 6/8] Add more doc strings --- aquadoggo/src/db/stores/query.rs | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/aquadoggo/src/db/stores/query.rs b/aquadoggo/src/db/stores/query.rs index 2108a5976..04569558a 100644 --- a/aquadoggo/src/db/stores/query.rs +++ b/aquadoggo/src/db/stores/query.rs @@ -495,19 +495,6 @@ fn where_filter_sql(filter: &Filter, schema: &Schema) -> (String, Vec, list: Option<&RelationList>, From d35fc344d1edefd166b1f0f698f7759172b17db1 Mon Sep 17 00:00:00 2001 From: adz Date: Wed, 21 Jun 2023 12:01:24 +0200 Subject: [PATCH 7/8] Move sub-SELECT's into separate SQL queries --- aquadoggo/src/db/stores/query.rs | 119 +++++++++++++++++++++++-------- 1 file changed, 91 insertions(+), 28 deletions(-) diff --git a/aquadoggo/src/db/stores/query.rs b/aquadoggo/src/db/stores/query.rs index 04569558a..bad5ffa9c 100644 --- a/aquadoggo/src/db/stores/query.rs +++ b/aquadoggo/src/db/stores/query.rs @@ -22,7 +22,7 @@ use crate::db::query::{ }; use crate::db::stores::OperationCursor; use crate::db::types::StorageDocument; -use crate::db::SqlStore; +use crate::db::{Pool, SqlStore}; /// Configure query to select documents based on a relation list field. pub struct RelationList { @@ -499,23 +499,32 @@ fn where_filter_sql(filter: &Filter, schema: &Schema) -> (String, Vec, pagination: &Pagination, fields: &ApplicationFields, list: Option<&RelationList>, schema: &Schema, order: &Order, -) -> String { +) -> Result { // No pagination cursor was given if pagination.after.is_none() { - return "".to_string(); + return Ok("".to_string()); } // Ignore pagination if we're in a relation list query and the cursor does not match the parent // document view id if let (Some(relation_list), Some(cursor)) = (list, pagination.after.as_ref()) { if Some(&relation_list.root_view_id) != cursor.root_view_id.as_ref() { - return "".to_string(); + return Ok("".to_string()); } } @@ -569,8 +578,7 @@ fn where_pagination_sql( // // -> Select document_view_id at cursor 0xc2 // -> Show results from document_view_id > 0x01 - // @TODO: Factor this out into separate SQL pre-query - let cmp_value = format!( + let cmp_value_pre = format!( r#" SELECT document_view_fields.document_view_id @@ -584,7 +592,17 @@ fn where_pagination_sql( "# ); - format!("AND documents.document_view_id > ({cmp_value})") + // Make a "pre" SQL query to avoid duplicate sub SELECT's always returning the same + // result + let document_view_id: (String,) = query_as(&cmp_value_pre) + .fetch_one(pool) + .await + .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + + Ok(format!( + "AND documents.document_view_id > '{}'", + document_view_id.0 + )) } Some(_) => { // All documents mentioned in a root document's relation list, note that duplicates @@ -608,7 +626,7 @@ fn where_pagination_sql( .as_ref() .expect("Expect root_operation_cursor to be set when querying relation list"); - let cmp_value = format!( + let cmp_value_pre = format!( r#" -- When ordering is activated we need to compare against the value -- of the ordered field - but from the row where the cursor points at @@ -621,8 +639,18 @@ fn where_pagination_sql( "# ); + // Make a "pre" SQL query to avoid duplicate sub SELECT's always returning the same + // result + let list_index: (i32,) = query_as(&cmp_value_pre) + .fetch_one(pool) + .await + .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + // List indexes are always unique so we can simply just compare them like that - format!("AND operation_fields_v1_list.list_index > ({cmp_value})") + Ok(format!( + "AND operation_fields_v1_list.list_index > {}", + list_index.0 + )) } }, @@ -632,7 +660,7 @@ fn where_pagination_sql( // We select the meta data from the document the cursor points at and use it to paginate // over. Some(Field::Meta(meta_field)) => { - let (cmp_value, cmp_field) = match meta_field { + let (cmp_value_pre, cmp_field) = match meta_field { MetaField::DocumentId => { // Select document_id of operation where the cursor points at let cmp_value = format!( @@ -675,6 +703,14 @@ fn where_pagination_sql( } }; + // Make a "pre" SQL query to avoid duplicate sub SELECT's always returning the same + // result + let cmp_value: (String,) = query_as(&cmp_value_pre) + .fetch_one(pool) + .await + .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + let cmp_value = format!("'{}'", cmp_value.0); + if fields.is_empty() && list.is_none() { // If: // @@ -684,22 +720,22 @@ fn where_pagination_sql( // .. then we can do a simplification of the query, since the results were grouped // by documents and we can be sure that for each row the document id and view are // unique. - format!("AND {cmp_field} {cmp_direction} ({cmp_value})") + Ok(format!("AND {cmp_field} {cmp_direction} {cmp_value}")) } else { // Cursor-based pagination - format!( + Ok(format!( r#" AND ( - {cmp_field} {cmp_direction} ({cmp_value}) + {cmp_field} {cmp_direction} {cmp_value} OR ( - {cmp_field} = ({cmp_value}) + {cmp_field} = {cmp_value} AND {cursor_sql} ) ) "# - ) + )) } } @@ -728,41 +764,65 @@ fn where_pagination_sql( // -> Find "timestamp" field of that document and use this value // -> Show results from "timestamp" value > other "timestamp" values Some(Field::Field(order_field_name)) => { - let cmp_field = - typecast_field_sql("operation_fields_v1.value", order_field_name, schema, false); - // Select the value we want to compare with from the document the cursor is pointing // at. This is the value which we also order the whole results by - // @TODO: Factor this out into separate SQL pre-query - let cmp_value = format!( + let cmp_value_pre = format!( r#" SELECT - {cmp_field} + operation_fields_v1.value + FROM operation_fields_v1 LEFT JOIN document_view_fields ON document_view_fields.operation_id = operation_fields_v1.operation_id + WHERE document_view_fields.document_view_id = ( SELECT document_view_fields.document_view_id + FROM operation_fields_v1 LEFT JOIN document_view_fields ON document_view_fields.operation_id = operation_fields_v1.operation_id + WHERE operation_fields_v1.cursor = '{operation_cursor}' + LIMIT 1 ) AND operation_fields_v1.name = '{order_field_name}' + LIMIT 1 "# ); + // Make a "pre" SQL query to avoid duplicate sub SELECT's always returning the same + // result. + // + // The returned value is added to the bindable arguments array since this is untrusted + // user content. + let operation_fields_value: (String,) = query_as(&cmp_value_pre) + .fetch_one(pool) + .await + .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + bind_args.push(BindArgument::String(operation_fields_value.0)); + + // Necessary casting for operation values of different type + let cmp_field = + typecast_field_sql("operation_fields_v1.value", order_field_name, schema, false); + + let bind_arg_marker = typecast_field_sql( + &format!("${}", bind_args.len()), + order_field_name, + schema, + false, + ); + // Cursor-based pagination - format!( + Ok(format!( r#" AND EXISTS ( SELECT @@ -775,17 +835,17 @@ fn where_pagination_sql( operation_fields_v1.operation_id = document_view_fields.operation_id AND ( - {cmp_field} {cmp_direction} ({cmp_value}) + {cmp_field} {cmp_direction} {bind_arg_marker} OR ( - {cmp_field} = ({cmp_value}) + {cmp_field} = {bind_arg_marker} AND {cursor_sql} ) ) ) "# - ) + )) } } } @@ -1083,14 +1143,17 @@ impl SqlStore { let where_ = where_sql(schema, &application_fields, list); let and_fields = where_fields_sql(&application_fields); - let (and_filters, bind_args) = where_filter_sql(&args.filter, schema); + let (and_filters, mut bind_args) = where_filter_sql(&args.filter, schema); let and_pagination = where_pagination_sql( + &self.pool, + &mut bind_args, &args.pagination, &application_fields, list, schema, &args.order, - ); + ) + .await?; let order = order_sql(&args.order, schema, list); let (page_size, limit) = limit_sql(&args.pagination, &application_fields); From 464b6c24b61fe710d560a237c5a8edbc2832d007 Mon Sep 17 00:00:00 2001 From: adz Date: Wed, 21 Jun 2023 12:03:18 +0200 Subject: [PATCH 8/8] Add entry to CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 33b9885e6..3a15f2ac5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix missing field when filtering owner [#359](https://github.com/p2panda/aquadoggo/pull/359) - Bind untrusted user filter arguments in SQL query [#358](https://github.com/p2panda/aquadoggo/pull/358) - Reintroduce property tests for GraphQL query api [#362](https://github.com/p2panda/aquadoggo/pull/362) +- Fix cursor pagination over ordered queries [#412](https://github.com/p2panda/aquadoggo/pull/412) ## [0.4.0]