From f37cef480062a998be0f230318edd0c8e237cac5 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Wed, 18 Jun 2025 15:00:28 +0100 Subject: [PATCH 1/4] add some debug logs to find location of parameter misuse --- core/rs/bundle_static/Cargo.lock | 10 ++++ core/rs/core/Cargo.lock | 10 ++++ core/rs/core/Cargo.toml | 1 + core/rs/core/src/changes_vtab_write.rs | 77 +++++++++++++++++++------- core/rs/core/src/debug.rs | 39 +++++++++++++ core/rs/core/src/lib.rs | 19 +++++++ 6 files changed, 137 insertions(+), 19 deletions(-) create mode 100644 core/rs/core/src/debug.rs diff --git a/core/rs/bundle_static/Cargo.lock b/core/rs/bundle_static/Cargo.lock index a0858632d..909df5a63 100644 --- a/core/rs/bundle_static/Cargo.lock +++ b/core/rs/bundle_static/Cargo.lock @@ -108,6 +108,7 @@ name = "crsql_core" version = "0.1.0" dependencies = [ "bytes", + "libc-print", "num-derive", "num-traits", "sqlite_nostd", @@ -180,6 +181,15 @@ version = "0.2.148" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +[[package]] +name = "libc-print" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a660208db49e35faf57b37484350f1a61072f2a5becf0592af6015d9ddd4b0" +dependencies = [ + "libc", +] + [[package]] name = "libloading" version = "0.7.4" diff --git a/core/rs/core/Cargo.lock b/core/rs/core/Cargo.lock index d2006ae23..dc45e3470 100644 --- a/core/rs/core/Cargo.lock +++ b/core/rs/core/Cargo.lock @@ -74,6 +74,7 @@ name = "crsql_core" version = "0.1.0" dependencies = [ "bytes", + "libc-print", "num-derive", "num-traits", "sqlite_nostd", @@ -109,6 +110,15 @@ version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +[[package]] +name = "libc-print" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a660208db49e35faf57b37484350f1a61072f2a5becf0592af6015d9ddd4b0" +dependencies = [ + "libc", +] + [[package]] name = "libloading" version = "0.7.4" diff --git a/core/rs/core/Cargo.toml b/core/rs/core/Cargo.toml index c6464f5aa..3556c1342 100644 --- a/core/rs/core/Cargo.toml +++ b/core/rs/core/Cargo.toml @@ -15,6 +15,7 @@ sqlite_nostd = { path="../sqlite-rs-embedded/sqlite_nostd" } bytes = { version = "1.5", default-features = false } num-traits = { version = "0.2.17", default-features = false } num-derive = "0.4.1" +libc-print = "0.1.22" [dev-dependencies] diff --git a/core/rs/core/src/changes_vtab_write.rs b/core/rs/core/src/changes_vtab_write.rs index dee60f70c..5337e4dde 100644 --- a/core/rs/core/src/changes_vtab_write.rs +++ b/core/rs/core/src/changes_vtab_write.rs @@ -11,6 +11,7 @@ use sqlite_nostd::{sqlite3, ResultCode, Value}; use crate::c::crsql_ExtData; use crate::c::{crsql_Changes_vtab, CrsqlChangesColumn}; use crate::compare_values::crsql_compare_sqlite_values; +use crate::debug::debug_log; use crate::pack_columns::bind_package_to_stmt; use crate::pack_columns::{unpack_columns, ColumnValue}; use crate::stmt_cache::reset_cached_stmt; @@ -263,21 +264,23 @@ fn merge_sentinel_only_insert( return Err(rc); } let rc = unsafe { - (*ext_data) + let rc = (*ext_data) .pSetSyncBitStmt .step() - .and_then(|_| (*ext_data).pSetSyncBitStmt.reset()) - .and_then(|_| merge_stmt.step()) + .and_then(|_| merge_stmt.step()); + + (*ext_data).pSetSyncBitStmt.reset()?; + rc }; // TODO: report err? let _ = reset_cached_stmt(merge_stmt.stmt); let sync_rc = unsafe { - (*ext_data) - .pClearSyncBitStmt - .step() - .and_then(|_| (*ext_data).pClearSyncBitStmt.reset()) + let rc = (*ext_data).pClearSyncBitStmt.step(); + + (*ext_data).pClearSyncBitStmt.reset()?; + rc }; if let Err(sync_rc) = sync_rc { @@ -507,9 +510,15 @@ unsafe fn merge_insert( // Get or create key as the first thing we do. // We'll need the key for all later operations. - let key = tbl_info.get_or_create_key(db, &unpacked_pks)?; + let key = tbl_info.get_or_create_key(db, &unpacked_pks).map_err(|e| { + debug_log(&format!("[merge_insert] get_or_create_key error: {:?}", e)); + e + })?; - let local_cl = get_local_cl(db, &tbl_info, key)?; + let local_cl = get_local_cl(db, &tbl_info, key).map_err(|e| { + debug_log(&format!("[merge_insert] get_local_cl error: {:?}", e)); + e + })?; // We can ignore all updates from older causal lengths. // They won't win at anything. @@ -546,6 +555,7 @@ unsafe fn merge_insert( ); match merge_result { Err(rc) => { + debug_log(&format!("[merge_insert] merge_delete error:")); return Err(rc); } Ok(inner_rowid) => { @@ -583,6 +593,7 @@ unsafe fn merge_insert( ); match merge_result { Err(rc) => { + debug_log(&format!("[merge_insert] merge_sentinel_only_insert error:")); return Err(rc); } Ok(inner_rowid) => { @@ -598,6 +609,8 @@ unsafe fn merge_insert( } } + debug_log("[merge_insert] check if row needs resurrect"); + // we got a causal length which would resurrect the row. // In an in-order delivery situation then `sentinel_only` would have already resurrected the row // In out-of-order delivery, we need to resurrect the row as soon as we get a value @@ -617,10 +630,19 @@ unsafe fn merge_insert( insert_db_vrsn, insert_site_id, insert_seq, - )?; + ) + .map_err(|e| { + debug_log(&format!( + "[merge_insert] merge_sentinel_only_insert error: {:?}", + e + )); + e + })?; (*(*tab).pExtData).rowsImpacted += 1; } + debug_log("[merge_insert] checking which cid wins"); + // we can short-circuit via needs_resurrect // given the greater cl automatically means a win. // or if we realize that the row does not exist locally at all. @@ -638,7 +660,10 @@ unsafe fn merge_insert( insert_col, insert_col_vrsn, errmsg, - )?; + ).map_err(|e| { + debug_log(&format!("[merge_insert] did_cid_win: {:?}", e)); + e + })?; if !does_cid_win { // doesCidWin == 0? compared against our clocks, nothing wins. OK and @@ -646,30 +671,44 @@ unsafe fn merge_insert( return Ok(ResultCode::OK); } + debug_log("[merge_insert] merging row"); // TODO: this is all almost identical between all three merge cases! - let merge_stmt_ref = tbl_info.get_merge_insert_stmt(db, insert_col)?; + let merge_stmt_ref = tbl_info + .get_merge_insert_stmt(db, insert_col) + .map_err(|e| { + debug_log(&format!( + "[merge_insert] get_merge_insert_stmt error: {:?}", + e + )); + e + })?; let merge_stmt = merge_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; let bind_result = bind_package_to_stmt(merge_stmt.stmt, &unpacked_pks, 0) .and_then(|_| merge_stmt.bind_value(unpacked_pks.len() as i32 + 1, insert_val)) .and_then(|_| merge_stmt.bind_value(unpacked_pks.len() as i32 + 2, insert_val)); if let Err(rc) = bind_result { - reset_cached_stmt(merge_stmt.stmt)?; + reset_cached_stmt(merge_stmt.stmt).map_err(|e| { + debug_log(&format!("[merge_insert] reset_cached_stmt error: {:?}", e)); + e + })?; return Err(rc); } let rc = (*(*tab).pExtData) .pSetSyncBitStmt .step() - .and_then(|_| (*(*tab).pExtData).pSetSyncBitStmt.reset()) .and_then(|_| merge_stmt.step()); - reset_cached_stmt(merge_stmt.stmt)?; + reset_cached_stmt(merge_stmt.stmt).map_err(|e| { + debug_log(&format!("[merge_insert] reset_cached_stmt error: {:?}", e)); + e + })?; - let sync_rc = (*(*tab).pExtData) - .pClearSyncBitStmt - .step() - .and_then(|_| (*(*tab).pExtData).pClearSyncBitStmt.reset()); + let sync_rc = (*(*tab).pExtData).pClearSyncBitStmt.step(); + + (*(*tab).pExtData).pSetSyncBitStmt.reset(); + (*(*tab).pExtData).pClearSyncBitStmt.reset(); if let Err(rc) = rc { return Err(rc); diff --git a/core/rs/core/src/debug.rs b/core/rs/core/src/debug.rs new file mode 100644 index 000000000..08df1bc86 --- /dev/null +++ b/core/rs/core/src/debug.rs @@ -0,0 +1,39 @@ +use alloc::format; +use alloc::string::String; +use core::ffi::c_void; +use sqlite::{context, value}; +use sqlite_nostd as sqlite; + +// Global context for logging - will be set during initialization +static mut DEBUG_ENABLED: bool = false; + +pub fn debug_log(msg: &str) { + unsafe { + if DEBUG_ENABLED { + libc_print::libc_println!("[DEBUG] {}", msg); + } + } +} + +pub unsafe extern "C" fn x_crsql_set_debug(ctx: *mut context, argc: i32, argv: *mut *mut value) { + if argc == 0 { + // If no arguments, return current state + sqlite::result_int(ctx, if DEBUG_ENABLED { 1 } else { 0 }); + return; + } + + if argc > 1 { + // Too many arguments + return; + } + + let enabled = { + let arg = *argv; + sqlite::value_int(arg) != 0 + }; + + DEBUG_ENABLED = enabled; + + // Return success (the new state) + sqlite::result_int(ctx, if enabled { 1 } else { 0 }); +} diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index 3e83a0103..b66e43a0e 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -26,6 +26,7 @@ mod create_crr; pub mod db_version; #[cfg(not(feature = "test"))] mod db_version; +mod debug; mod ext_data; mod is_crr; mod local_writes; @@ -69,6 +70,8 @@ use tableinfo::{crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pu use teardown::*; use triggers::create_triggers; +pub use debug::debug_log; + pub extern "C" fn crsql_as_table( ctx: *mut sqlite::context, argc: i32, @@ -109,6 +112,22 @@ pub extern "C" fn sqlite3_crsqlcore_init( ) -> *mut c_void { sqlite::EXTENSION_INIT2(api); + let rc = db + .create_function_v2( + "crsql_set_debug", + 1, + sqlite::UTF8 | sqlite::DIRECTONLY, + None, + Some(debug::x_crsql_set_debug), + None, + None, + None, + ) + .unwrap_or(sqlite::ResultCode::ERROR); + if rc != ResultCode::OK { + return null_mut(); + } + let rc = db .create_function_v2( "crsql_automigrate", From 493b8aa15063ce9b371ffef8a575c3f34ce2f5a9 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Thu, 19 Jun 2025 17:00:10 +0100 Subject: [PATCH 2/4] Reset statement in set_winner_clock --- core/rs/core/src/changes_vtab_write.rs | 149 +++++++++++-------------- core/rs/integration_check/Cargo.lock | 1 + core/src/crsqlite.test.c | 3 +- 3 files changed, 67 insertions(+), 86 deletions(-) diff --git a/core/rs/core/src/changes_vtab_write.rs b/core/rs/core/src/changes_vtab_write.rs index 5337e4dde..c2dc67434 100644 --- a/core/rs/core/src/changes_vtab_write.rs +++ b/core/rs/core/src/changes_vtab_write.rs @@ -11,7 +11,6 @@ use sqlite_nostd::{sqlite3, ResultCode, Value}; use crate::c::crsql_ExtData; use crate::c::{crsql_Changes_vtab, CrsqlChangesColumn}; use crate::compare_values::crsql_compare_sqlite_values; -use crate::debug::debug_log; use crate::pack_columns::bind_package_to_stmt; use crate::pack_columns::{unpack_columns, ColumnValue}; use crate::stmt_cache::reset_cached_stmt; @@ -172,38 +171,61 @@ fn set_winner_clock( if insert_site_id.is_empty() { None } else { - (*ext_data).pSelectSiteIdOrdinalStmt.bind_blob( + let bind_result = (*ext_data).pSelectSiteIdOrdinalStmt.bind_blob( 1, insert_site_id, sqlite::Destructor::STATIC, - )?; - let rc = (*ext_data).pSelectSiteIdOrdinalStmt.step()?; - if rc == ResultCode::ROW { - let ordinal = (*ext_data).pSelectSiteIdOrdinalStmt.column_int64(0); - (*ext_data).pSelectSiteIdOrdinalStmt.clear_bindings()?; - (*ext_data).pSelectSiteIdOrdinalStmt.reset()?; - - Some(ordinal) - } else { - (*ext_data).pSelectSiteIdOrdinalStmt.clear_bindings()?; - (*ext_data).pSelectSiteIdOrdinalStmt.reset()?; - // site id had no ordinal yet. - // set one and return the ordinal. - (*ext_data).pSetSiteIdOrdinalStmt.bind_blob( - 1, - insert_site_id, - sqlite::Destructor::STATIC, - )?; - let rc = (*ext_data).pSetSiteIdOrdinalStmt.step()?; - if rc == ResultCode::DONE { - (*ext_data).pSetSiteIdOrdinalStmt.clear_bindings()?; - (*ext_data).pSetSiteIdOrdinalStmt.reset()?; - return Err(ResultCode::ABORT); + ); + + if let Err(rc) = bind_result { + reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?; + return Err(rc); + } + + let res = (*ext_data).pSelectSiteIdOrdinalStmt.step(); + + match res { + Ok(ResultCode::ROW) => { + let ordinal = (*ext_data).pSelectSiteIdOrdinalStmt.column_int64(0); + reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?; + Some(ordinal) + } + Ok(_) => { + reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?; + // site id had no ordinal yet. + // set one and return the ordinal. + let bind_result = (*ext_data).pSetSiteIdOrdinalStmt.bind_blob( + 1, + insert_site_id, + sqlite::Destructor::STATIC, + ); + + if let Err(rc) = bind_result { + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt); + return Err(rc); + } + + let res = (*ext_data).pSetSiteIdOrdinalStmt.step(); + match res { + Ok(ResultCode::DONE) => { + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; + return Err(ResultCode::ABORT); + } + Ok(_) => { + let ordinal = (*ext_data).pSetSiteIdOrdinalStmt.column_int64(0); + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; + Some(ordinal) + } + Err(rc) => { + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; + return Err(rc); + } + } + } + Err(rc) => { + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; + return Err(rc); } - let ordinal = (*ext_data).pSetSiteIdOrdinalStmt.column_int64(0); - (*ext_data).pSetSiteIdOrdinalStmt.clear_bindings()?; - (*ext_data).pSetSiteIdOrdinalStmt.reset()?; - Some(ordinal) } } }; @@ -264,21 +286,17 @@ fn merge_sentinel_only_insert( return Err(rc); } let rc = unsafe { - let rc = (*ext_data) + (*ext_data) .pSetSyncBitStmt .step() - .and_then(|_| merge_stmt.step()); - - (*ext_data).pSetSyncBitStmt.reset()?; - rc + .and_then(|_| merge_stmt.step()) }; - // TODO: report err? - let _ = reset_cached_stmt(merge_stmt.stmt); + unsafe { (*ext_data).pSetSyncBitStmt.reset()? }; + reset_cached_stmt(merge_stmt.stmt)?; let sync_rc = unsafe { let rc = (*ext_data).pClearSyncBitStmt.step(); - (*ext_data).pClearSyncBitStmt.reset()?; rc }; @@ -346,16 +364,16 @@ unsafe fn merge_delete( let rc = (*ext_data) .pSetSyncBitStmt .step() - .and_then(|_| (*ext_data).pSetSyncBitStmt.reset()) .and_then(|_| delete_stmt.step()); + (*ext_data).pSetSyncBitStmt.reset()?; reset_cached_stmt(delete_stmt.stmt)?; let sync_rc = (*ext_data) .pClearSyncBitStmt - .step() - .and_then(|_| (*ext_data).pClearSyncBitStmt.reset()); + .step(); + (*ext_data).pClearSyncBitStmt.reset()?; if let Err(sync_rc) = sync_rc { return Err(sync_rc); } @@ -510,15 +528,9 @@ unsafe fn merge_insert( // Get or create key as the first thing we do. // We'll need the key for all later operations. - let key = tbl_info.get_or_create_key(db, &unpacked_pks).map_err(|e| { - debug_log(&format!("[merge_insert] get_or_create_key error: {:?}", e)); - e - })?; + let key = tbl_info.get_or_create_key(db, &unpacked_pks)?; - let local_cl = get_local_cl(db, &tbl_info, key).map_err(|e| { - debug_log(&format!("[merge_insert] get_local_cl error: {:?}", e)); - e - })?; + let local_cl = get_local_cl(db, &tbl_info, key)?; // We can ignore all updates from older causal lengths. // They won't win at anything. @@ -555,7 +567,6 @@ unsafe fn merge_insert( ); match merge_result { Err(rc) => { - debug_log(&format!("[merge_insert] merge_delete error:")); return Err(rc); } Ok(inner_rowid) => { @@ -593,7 +604,6 @@ unsafe fn merge_insert( ); match merge_result { Err(rc) => { - debug_log(&format!("[merge_insert] merge_sentinel_only_insert error:")); return Err(rc); } Ok(inner_rowid) => { @@ -609,8 +619,6 @@ unsafe fn merge_insert( } } - debug_log("[merge_insert] check if row needs resurrect"); - // we got a causal length which would resurrect the row. // In an in-order delivery situation then `sentinel_only` would have already resurrected the row // In out-of-order delivery, we need to resurrect the row as soon as we get a value @@ -630,19 +638,10 @@ unsafe fn merge_insert( insert_db_vrsn, insert_site_id, insert_seq, - ) - .map_err(|e| { - debug_log(&format!( - "[merge_insert] merge_sentinel_only_insert error: {:?}", - e - )); - e - })?; + )?; (*(*tab).pExtData).rowsImpacted += 1; } - debug_log("[merge_insert] checking which cid wins"); - // we can short-circuit via needs_resurrect // given the greater cl automatically means a win. // or if we realize that the row does not exist locally at all. @@ -660,10 +659,7 @@ unsafe fn merge_insert( insert_col, insert_col_vrsn, errmsg, - ).map_err(|e| { - debug_log(&format!("[merge_insert] did_cid_win: {:?}", e)); - e - })?; + )?; if !does_cid_win { // doesCidWin == 0? compared against our clocks, nothing wins. OK and @@ -671,27 +667,15 @@ unsafe fn merge_insert( return Ok(ResultCode::OK); } - debug_log("[merge_insert] merging row"); // TODO: this is all almost identical between all three merge cases! - let merge_stmt_ref = tbl_info - .get_merge_insert_stmt(db, insert_col) - .map_err(|e| { - debug_log(&format!( - "[merge_insert] get_merge_insert_stmt error: {:?}", - e - )); - e - })?; + let merge_stmt_ref = tbl_info.get_merge_insert_stmt(db, insert_col)?; let merge_stmt = merge_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; let bind_result = bind_package_to_stmt(merge_stmt.stmt, &unpacked_pks, 0) .and_then(|_| merge_stmt.bind_value(unpacked_pks.len() as i32 + 1, insert_val)) .and_then(|_| merge_stmt.bind_value(unpacked_pks.len() as i32 + 2, insert_val)); if let Err(rc) = bind_result { - reset_cached_stmt(merge_stmt.stmt).map_err(|e| { - debug_log(&format!("[merge_insert] reset_cached_stmt error: {:?}", e)); - e - })?; + reset_cached_stmt(merge_stmt.stmt)?; return Err(rc); } @@ -700,10 +684,7 @@ unsafe fn merge_insert( .step() .and_then(|_| merge_stmt.step()); - reset_cached_stmt(merge_stmt.stmt).map_err(|e| { - debug_log(&format!("[merge_insert] reset_cached_stmt error: {:?}", e)); - e - })?; + reset_cached_stmt(merge_stmt.stmt)?; let sync_rc = (*(*tab).pExtData).pClearSyncBitStmt.step(); diff --git a/core/rs/integration_check/Cargo.lock b/core/rs/integration_check/Cargo.lock index f5193cbd1..9ce49607f 100644 --- a/core/rs/integration_check/Cargo.lock +++ b/core/rs/integration_check/Cargo.lock @@ -131,6 +131,7 @@ name = "crsql_core" version = "0.1.0" dependencies = [ "bytes", + "libc-print", "num-derive", "num-traits", "sqlite_nostd", diff --git a/core/src/crsqlite.test.c b/core/src/crsqlite.test.c index e27f01d7a..e2385fe66 100644 --- a/core/src/crsqlite.test.c +++ b/core/src/crsqlite.test.c @@ -503,7 +503,6 @@ static void noopsDoNotMoveClocks() { printf("NoopsDoNotMoveClocks\n"); // syncing from A -> B, while no changes happen on B, moves up // B's clock still. - sqlite3 *db1; sqlite3 *db2; int rc = SQLITE_OK; @@ -634,4 +633,4 @@ void crsqlTestSuite() { // testSyncBit(); // testDbVersion(); // testSiteId(); -} \ No newline at end of file +} From c76e323123031d1cf0d381eef6793c597b8c51af Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Mon, 30 Jun 2025 20:13:25 +0100 Subject: [PATCH 3/4] reset statement where needed --- core/rs/core/src/changes_vtab_write.rs | 45 ++++++++------------------ core/rs/core/src/tableinfo.rs | 33 ++++++++++++------- 2 files changed, 35 insertions(+), 43 deletions(-) diff --git a/core/rs/core/src/changes_vtab_write.rs b/core/rs/core/src/changes_vtab_write.rs index c2dc67434..818dd124e 100644 --- a/core/rs/core/src/changes_vtab_write.rs +++ b/core/rs/core/src/changes_vtab_write.rs @@ -41,15 +41,13 @@ fn did_cid_win( let col_vrsn_stmt_ref = tbl_info.get_col_version_stmt(db)?; let col_vrsn_stmt = col_vrsn_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - let bind_result = col_vrsn_stmt.bind_int64(1, key); + let bind_result = col_vrsn_stmt + .bind_int64(1, key) + .and_then(|_| col_vrsn_stmt.bind_text(2, col_name, sqlite::Destructor::STATIC)); if let Err(rc) = bind_result { reset_cached_stmt(col_vrsn_stmt.stmt)?; return Err(rc); } - if let Err(rc) = col_vrsn_stmt.bind_text(2, col_name, sqlite::Destructor::STATIC) { - reset_cached_stmt(col_vrsn_stmt.stmt)?; - return Err(rc); - } match col_vrsn_stmt.step() { Ok(ResultCode::ROW) => { @@ -98,16 +96,13 @@ fn did_cid_win( let col_site_id_stmt_ref = tbl_info.get_col_site_id_stmt(db)?; let col_site_id_stmt = col_site_id_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - let bind_result = col_site_id_stmt.bind_int64(1, key); + let bind_result = col_site_id_stmt.bind_int64(1, key).and_then(|_| { + col_site_id_stmt.bind_text(2, col_name, sqlite::Destructor::STATIC) + }); if let Err(rc) = bind_result { reset_cached_stmt(col_site_id_stmt.stmt)?; return Err(rc); } - if let Err(rc) = col_site_id_stmt.bind_text(2, col_name, sqlite::Destructor::STATIC) - { - reset_cached_stmt(col_site_id_stmt.stmt)?; - return Err(rc); - } match col_site_id_stmt.step() { Ok(ResultCode::ROW) => { @@ -182,9 +177,7 @@ fn set_winner_clock( return Err(rc); } - let res = (*ext_data).pSelectSiteIdOrdinalStmt.step(); - - match res { + match (*ext_data).pSelectSiteIdOrdinalStmt.step() { Ok(ResultCode::ROW) => { let ordinal = (*ext_data).pSelectSiteIdOrdinalStmt.column_int64(0); reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?; @@ -205,8 +198,7 @@ fn set_winner_clock( return Err(rc); } - let res = (*ext_data).pSetSiteIdOrdinalStmt.step(); - match res { + match (*ext_data).pSetSiteIdOrdinalStmt.step() { Ok(ResultCode::DONE) => { reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; return Err(ResultCode::ABORT); @@ -233,13 +225,9 @@ fn set_winner_clock( let set_stmt_ref = tbl_info.get_set_winner_clock_stmt(db)?; let set_stmt = set_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - let bind_result = set_stmt.bind_int64(1, key); - if let Err(rc) = bind_result { - reset_cached_stmt(set_stmt.stmt)?; - return Err(rc); - } let bind_result = set_stmt - .bind_text(2, insert_col_name, sqlite::Destructor::STATIC) + .bind_int64(1, key) + .and_then(|_| set_stmt.bind_text(2, insert_col_name, sqlite::Destructor::STATIC)) .and_then(|_| set_stmt.bind_int64(3, insert_col_vrsn)) .and_then(|_| set_stmt.bind_int64(4, insert_db_vrsn)) .and_then(|_| set_stmt.bind_int64(5, insert_seq)) @@ -369,9 +357,7 @@ unsafe fn merge_delete( (*ext_data).pSetSyncBitStmt.reset()?; reset_cached_stmt(delete_stmt.stmt)?; - let sync_rc = (*ext_data) - .pClearSyncBitStmt - .step(); + let sync_rc = (*ext_data).pClearSyncBitStmt.step(); (*ext_data).pClearSyncBitStmt.reset()?; if let Err(sync_rc) = sync_rc { @@ -428,12 +414,9 @@ fn get_local_cl( let local_cl_stmt_ref = tbl_info.get_local_cl_stmt(db)?; let local_cl_stmt = local_cl_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - let rc = local_cl_stmt.bind_int64(1, key); - if let Err(rc) = rc { - reset_cached_stmt(local_cl_stmt.stmt)?; - return Err(rc); - } - let rc = local_cl_stmt.bind_int64(2, key); + let rc = local_cl_stmt + .bind_int64(1, key) + .and_then(|_| local_cl_stmt.bind_int64(2, key)); if let Err(rc) = rc { reset_cached_stmt(local_cl_stmt.stmt)?; return Err(rc); diff --git a/core/rs/core/src/tableinfo.rs b/core/rs/core/src/tableinfo.rs index 256900bed..98510d158 100644 --- a/core/rs/core/src/tableinfo.rs +++ b/core/rs/core/src/tableinfo.rs @@ -111,7 +111,10 @@ impl TableInfo { let stmt_ref = self.get_select_key_stmt(db)?; let stmt = stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; for (i, pk) in pks.iter().enumerate() { - stmt.bind_value(i as i32 + 1, *pk)?; + if let Err(rc) = stmt.bind_value(i as i32 + 1, *pk) { + stmt.clear_bindings()?; + return Err(rc); + } } match stmt.step() { Ok(ResultCode::DONE) => { @@ -141,7 +144,10 @@ impl TableInfo { let stmt_ref = self.get_insert_or_ignore_returning_key_stmt(db)?; let stmt = stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; for (i, pk) in pks.iter().enumerate() { - stmt.bind_value(i as i32 + 1, *pk)?; + if let Err(rc) = stmt.bind_value(i as i32 + 1, *pk) { + stmt.clear_bindings()?; + return Err(rc); + } } match stmt.step() { Ok(ResultCode::DONE) => { @@ -208,7 +214,10 @@ impl TableInfo { let stmt_ref = self.get_insert_key_stmt(db)?; let stmt = stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; for (i, pk) in pks.iter().enumerate() { - stmt.bind_value(i as i32 + 1, *pk)?; + if let Err(rc) = stmt.bind_value(i as i32 + 1, *pk) { + stmt.clear_bindings()?; + return Err(rc); + } } match stmt.step() { Ok(ResultCode::ROW) => { @@ -478,15 +487,15 @@ impl TableInfo { // from the old pk can override the ones from the new at a node // following our changes. let sql = format!( - "UPDATE OR REPLACE \"{table_name}__crsql_clock\" SET - key = ?, - db_version = ?, - seq = ?, - col_version = col_version + 1, - site_id = 0 - WHERE - key = ? AND col_name = ?", - table_name = crate::util::escape_ident(&self.tbl_name), + "UPDATE OR REPLACE \"{table_name}__crsql_clock\" SET + key = ?, + db_version = ?, + seq = ?, + col_version = col_version + 1, + site_id = 0 + WHERE + key = ? AND col_name = ?", + table_name = crate::util::escape_ident(&self.tbl_name), ); let ret = db.prepare_v3(&sql, sqlite::PREPARE_PERSISTENT)?; *self.move_non_sentinels_stmt.try_borrow_mut()? = Some(ret); From 23e0001e1108afc2edc7a9daefc937e4f8d693a9 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Mon, 30 Jun 2025 19:41:46 +0100 Subject: [PATCH 4/4] fix tie_breaker test --- py/correctness/tests/test_sync.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/py/correctness/tests/test_sync.py b/py/correctness/tests/test_sync.py index fd30f2b18..fe1602d11 100644 --- a/py/correctness/tests/test_sync.py +++ b/py/correctness/tests/test_sync.py @@ -397,17 +397,23 @@ def test_merge_same_w_tie_breaker(): db3.execute("SELECT crsql_config_set('merge-equal-values', 1);") db3.commit() + # Sync changes so all nodes have seen changes from other nodes sync_left_to_right(db1, db2, 0) - changes2 = db2.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() - sync_left_to_right(db2, db1, 0) - changes1 = db1.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() - sync_left_to_right(db2, db3, 0) + sync_left_to_right(db3, db2, 0) + sync_left_to_right(db3, db1, 0) + + changes1 = db1.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() + changes2 = db2.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() changes3 = db3.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() - # check that everything by db_version is the same - assert (changes2[:-6] == changes1[:-6] == changes3[:-6]) + # check that everything but db_version is the same + # print("changes2", changes2) + changes1_no_dbv = [x[:-1] for x in changes1] + changes2_no_dbv = [x[:-1] for x in changes2] + changes3_no_dbv = [x[:-1] for x in changes3] + assert (changes2_no_dbv == changes1_no_dbv == changes3_no_dbv) # Test that we're stable / do not loop when we tie break equal values