From b766afbd183a0f1cd6dff1fcfe005289adc5bd7e Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Wed, 9 Jul 2025 23:13:21 +0100 Subject: [PATCH] Add fn to store timestamp in sqlite in clock table. This adds the `crsql_set_ts` function that can be called first in a transaction to set a timestamp for all changes that are commited. It defaults to zero if the function isn't set. --- core/rs/core/src/alter.rs | 2 +- core/rs/core/src/bootstrap.rs | 3 +- core/rs/core/src/c.rs | 15 ++- core/rs/core/src/changes_vtab.rs | 4 + core/rs/core/src/changes_vtab_read.rs | 5 +- core/rs/core/src/changes_vtab_write.rs | 13 +- core/rs/core/src/commit.rs | 1 + core/rs/core/src/db_version.rs | 1 - core/rs/core/src/lib.rs | 76 ++++++++++++ core/rs/core/src/local_writes/after_delete.rs | 5 +- core/rs/core/src/local_writes/after_insert.rs | 15 ++- core/rs/core/src/local_writes/after_update.rs | 20 ++- core/rs/core/src/local_writes/mod.rs | 20 ++- core/rs/core/src/tableinfo.rs | 38 +++--- .../integration_check/src/t/pk_only_tables.rs | 4 +- core/rs/integration_check/src/t/tableinfo.rs | 6 +- core/src/changes-vtab.c | 2 +- core/src/crsqlite.test.c | 8 +- core/src/ext-data.h | 1 + core/src/rows-impacted.test.c | 38 +++--- .../src/crsql_correctness/__init__.py | 2 +- py/correctness/tests/test_cl_merging.py | 117 +++++++++--------- py/correctness/tests/test_cl_triggers.py | 2 +- py/correctness/tests/test_dbversion.py | 4 +- .../tests/test_lookaside_key_creation.py | 2 +- py/correctness/tests/test_sandbox.py | 2 +- .../tests/test_sentinel_omission.py | 2 +- py/correctness/tests/test_seq.py | 2 +- .../tests/test_site_id_lookaside.py | 14 +-- py/correctness/tests/test_siteid.py | 2 +- py/correctness/tests/test_sync.py | 110 ++++++++-------- py/correctness/tests/test_sync_bit.py | 25 ++-- py/correctness/tests/test_sync_prop.py | 2 +- py/correctness/tests/test_update_rows.py | 77 +++++++++--- tools/src/main.rs | 106 +++++++++++----- 35 files changed, 494 insertions(+), 252 deletions(-) diff --git a/core/rs/core/src/alter.rs b/core/rs/core/src/alter.rs index baf131208..87f15036c 100644 --- a/core/rs/core/src/alter.rs +++ b/core/rs/core/src/alter.rs @@ -51,7 +51,7 @@ unsafe fn compact_post_alter( SELECT name FROM pragma_table_info('{table_name}') WHERE pk > 0 AND name NOT IN (SELECT name FROM pragma_index_info('{table_name}__crsql_pks_pks')) - UNION SELECT name FROM pragma_index_info('{table_name}__crsql_pks_pks') WHERE name NOT IN + UNION SELECT name FROM pragma_index_info('{table_name}__crsql_pks_pks') WHERE name NOT IN (SELECT name FROM pragma_table_info('{table_name}') WHERE pk > 0) AND name != 'col_name' );", table_name = crate::util::escape_ident_as_value(tbl_name_str), diff --git a/core/rs/core/src/bootstrap.rs b/core/rs/core/src/bootstrap.rs index 1039e4e35..c3fc2b1e2 100644 --- a/core/rs/core/src/bootstrap.rs +++ b/core/rs/core/src/bootstrap.rs @@ -1,4 +1,3 @@ -use alloc::string::ToString; use core::ffi::{c_char, c_int}; use crate::{consts, tableinfo::TableInfo}; @@ -208,6 +207,8 @@ pub fn create_clock_table( db_version INTEGER NOT NULL, site_id INTEGER NOT NULL DEFAULT 0, seq INTEGER NOT NULL, + ts TEXT NOT NULL DEFAULT '0', + PRIMARY KEY (key, col_name) ) WITHOUT ROWID, STRICT", table_name = crate::util::escape_ident(table_name), diff --git a/core/rs/core/src/c.rs b/core/rs/core/src/c.rs index 979a8c8d2..c31f83e94 100644 --- a/core/rs/core/src/c.rs +++ b/core/rs/core/src/c.rs @@ -23,6 +23,7 @@ pub enum CrsqlChangesColumn { SiteId = 6, Cl = 7, Seq = 8, + Ts = 9, } #[derive(FromPrimitive, PartialEq, Debug)] @@ -36,6 +37,7 @@ pub enum ClockUnionColumn { RowId = 6, Seq = 7, Cl = 8, + Ts = 9, } #[derive(FromPrimitive, PartialEq, Debug)] @@ -70,6 +72,7 @@ pub struct crsql_ExtData { pub pSelectSiteIdOrdinalStmt: *mut sqlite::stmt, pub pSelectClockTablesStmt: *mut sqlite::stmt, pub mergeEqualValues: ::core::ffi::c_int, + pub timestamp: ::core::ffi::c_ulonglong, } #[repr(C)] @@ -265,7 +268,7 @@ fn bindgen_test_layout_crsql_ExtData() { let ptr = UNINIT.as_ptr(); assert_eq!( ::core::mem::size_of::(), - 152usize, + 160usize, concat!("Size of: ", stringify!(crsql_ExtData)) ); assert_eq!( @@ -485,4 +488,14 @@ fn bindgen_test_layout_crsql_ExtData() { stringify!(mergeEqualValues) ) ); + assert_eq!( + unsafe { ::core::ptr::addr_of!((*ptr).timestamp) as usize - ptr as usize }, + 152usize, + concat!( + "Offset of field: ", + stringify!(crsql_ExtData), + "::", + stringify!(timestamp) + ) + ); } diff --git a/core/rs/core/src/changes_vtab.rs b/core/rs/core/src/changes_vtab.rs index 82d235069..6ef95c58a 100644 --- a/core/rs/core/src/changes_vtab.rs +++ b/core/rs/core/src/changes_vtab.rs @@ -208,6 +208,7 @@ fn get_clock_table_col_name(col: &Option) -> Option Some(CrsqlChangesColumn::SiteId) => Some("site_id".to_string()), Some(CrsqlChangesColumn::Seq) => Some("seq".to_string()), Some(CrsqlChangesColumn::Cl) => Some("cl".to_string()), + Some(CrsqlChangesColumn::Ts) => Some("ts".to_string()), None => None, } } @@ -494,6 +495,9 @@ fn column_impl( Some(CrsqlChangesColumn::Cl) => { ctx.result_value(changes_stmt.column_value(ClockUnionColumn::Cl as i32)) } + Some(CrsqlChangesColumn::Ts) => { + ctx.result_value(changes_stmt.column_value(ClockUnionColumn::Ts as i32)); + } None => return Err(ResultCode::MISUSE), } diff --git a/core/rs/core/src/changes_vtab_read.rs b/core/rs/core/src/changes_vtab_read.rs index 4d857909a..9debf7e19 100644 --- a/core/rs/core/src/changes_vtab_read.rs +++ b/core/rs/core/src/changes_vtab_read.rs @@ -33,7 +33,8 @@ fn crsql_changes_query_for_table(table_info: &TableInfo) -> Result Result { // set the site_id ordinal // get the returned ordinal @@ -224,7 +225,8 @@ fn set_winner_clock( .and_then(|_| match ordinal { Some(ordinal) => set_stmt.bind_int64(6, ordinal), None => set_stmt.bind_null(6), - }); + }) + .and_then(|_| set_stmt.bind_text(7, insert_ts, sqlite::Destructor::STATIC)); if let Err(rc) = bind_result { reset_cached_stmt(set_stmt.stmt)?; @@ -256,6 +258,7 @@ fn merge_sentinel_only_insert( remote_db_vsn: sqlite::int64, remote_site_id: &[u8], remote_seq: sqlite::int64, + remote_ts: &str, ) -> Result { let merge_stmt_ref = tbl_info.get_merge_pk_only_insert_stmt(db)?; let merge_stmt = merge_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; @@ -302,6 +305,7 @@ fn merge_sentinel_only_insert( remote_db_vsn, remote_site_id, remote_seq, + remote_ts, ); } @@ -332,6 +336,7 @@ unsafe fn merge_delete( remote_db_vrsn: sqlite::int64, remote_site_id: &[u8], remote_seq: sqlite::int64, + remote_ts: &str, ) -> Result { let delete_stmt_ref = tbl_info.get_merge_delete_stmt(db)?; let delete_stmt = delete_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; @@ -370,6 +375,7 @@ unsafe fn merge_delete( remote_db_vrsn, remote_site_id, remote_seq, + remote_ts, )?; // Drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!! @@ -477,6 +483,7 @@ unsafe fn merge_insert( let insert_site_id = args[2 + CrsqlChangesColumn::SiteId as usize]; let insert_cl = args[2 + CrsqlChangesColumn::Cl as usize].int64(); let insert_seq = args[2 + CrsqlChangesColumn::Seq as usize].int64(); + let insert_ts = args[2 + CrsqlChangesColumn::Ts as usize].text(); if insert_site_id.bytes() > crate::consts::SITE_ID_LEN { let err = CString::new("crsql - site id exceeded max length")?; @@ -545,6 +552,7 @@ unsafe fn merge_insert( insert_db_vrsn, insert_site_id, insert_seq, + insert_ts, ); match merge_result { Err(rc) => { @@ -582,6 +590,7 @@ unsafe fn merge_insert( insert_db_vrsn, insert_site_id, insert_seq, + insert_ts, ); match merge_result { Err(rc) => { @@ -619,6 +628,7 @@ unsafe fn merge_insert( insert_db_vrsn, insert_site_id, insert_seq, + insert_ts, )?; (*(*tab).pExtData).rowsImpacted += 1; } @@ -686,6 +696,7 @@ unsafe fn merge_insert( insert_db_vrsn, insert_site_id, insert_seq, + insert_ts, ); match merge_result { Err(rc) => { diff --git a/core/rs/core/src/commit.rs b/core/rs/core/src/commit.rs index 72dc4eb12..1c29bba31 100644 --- a/core/rs/core/src/commit.rs +++ b/core/rs/core/src/commit.rs @@ -29,5 +29,6 @@ pub unsafe extern "C" fn crsql_rollback_hook(user_data: *mut c_void) -> *const c pub unsafe fn commit_or_rollback_reset(ext_data: *mut crsql_ExtData) { (*ext_data).pendingDbVersion = -1; (*ext_data).seq = 0; + (*ext_data).timestamp = 0; (*ext_data).updatedTableInfosThisTx = 0; } diff --git a/core/rs/core/src/db_version.rs b/core/rs/core/src/db_version.rs index 9250869ae..85f44c540 100644 --- a/core/rs/core/src/db_version.rs +++ b/core/rs/core/src/db_version.rs @@ -1,6 +1,5 @@ use core::ffi::c_void; use core::mem; -use core::ptr; use crate::alloc::string::ToString; use crate::alloc::{boxed::Box, vec::Vec}; diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index 6adb594d7..ab5868fd9 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -49,6 +49,7 @@ mod util; use alloc::borrow::Cow; use alloc::format; +use alloc::string::ToString; use core::ffi::c_char; use core::mem; use core::ptr::null_mut; @@ -367,6 +368,40 @@ pub extern "C" fn sqlite3_crsqlcore_init( return null_mut(); } + let rc = db + .create_function_v2( + "crsql_set_ts", + -1, + sqlite::UTF8 | sqlite::DETERMINISTIC, + Some(ext_data as *mut c_void), + Some(x_crsql_set_ts), + None, + None, + None, + ) + .unwrap_or(ResultCode::ERROR); + if rc != ResultCode::OK { + unsafe { crsql_freeExtData(ext_data) }; + return null_mut(); + } + + let rc = db + .create_function_v2( + "crsql_get_ts", + -1, + sqlite::UTF8 | sqlite::DETERMINISTIC, + Some(ext_data as *mut c_void), + Some(x_crsql_get_ts), + None, + None, + None, + ) + .unwrap_or(ResultCode::ERROR); + if rc != ResultCode::OK { + unsafe { crsql_freeExtData(ext_data) }; + return null_mut(); + } + let rc = db .create_function_v2( "crsql_begin_alter", @@ -756,6 +791,47 @@ unsafe extern "C" fn x_crsql_increment_and_get_seq( (*ext_data).seq += 1; } +/** + * Set the timestamp for the current transaction. + */ +unsafe extern "C" fn x_crsql_set_ts( + ctx: *mut sqlite::context, + argc: i32, + argv: *mut *mut sqlite::value, +) { + if argc == 0 { + ctx.result_error( + "Wrong number of args provided to crsql_begin_alter. Provide the + schema name and table name or just the table name.", + ); + return; + } + + let args = sqlite::args!(argc, argv); + let ts = args[0].text(); + // we expect a string that we can parse as a u64 + let ts_u64 = match ts.parse::() { + Ok(ts_u64) => ts_u64, + Err(_) => { + ctx.result_error("Timestamp cannot be parsed as a valid u64"); + return; + } + }; + let ext_data = ctx.user_data() as *mut c::crsql_ExtData; + (*ext_data).timestamp = ts_u64; + ctx.result_text_static("OK"); +} + +unsafe extern "C" fn x_crsql_get_ts( + ctx: *mut sqlite::context, + _argc: i32, + _argv: *mut *mut sqlite::value, +) { + let ext_data = ctx.user_data() as *mut c::crsql_ExtData; + let ts = (*ext_data).timestamp.to_string(); + ctx.result_text_transient(&ts); +} + /** * Return the current version of the database. * diff --git a/core/rs/core/src/local_writes/after_delete.rs b/core/rs/core/src/local_writes/after_delete.rs index 7253ab53c..1f5b922e5 100644 --- a/core/rs/core/src/local_writes/after_delete.rs +++ b/core/rs/core/src/local_writes/after_delete.rs @@ -1,4 +1,5 @@ use alloc::string::String; +use alloc::string::ToString; use core::ffi::c_int; use sqlite::sqlite3; use sqlite::value; @@ -39,11 +40,12 @@ fn after_delete( tbl_info: &TableInfo, pks_old: &[*mut value], ) -> Result { + let ts = unsafe { (*ext_data).timestamp.to_string() }; let db_version = crate::db_version::next_db_version(db, ext_data)?; let seq = bump_seq(ext_data); let key = tbl_info .get_or_create_key_via_raw_values(db, pks_old) - .map_err(|_| "failed geteting or creating lookaside key")?; + .map_err(|_| "failed getting or creating lookaside key")?; let mark_locally_deleted_stmt_ref = tbl_info .get_mark_locally_deleted_stmt(db) @@ -55,6 +57,7 @@ fn after_delete( .bind_int64(1, key) .and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version)) .and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq)) + .and_then(|_| mark_locally_deleted_stmt.bind_text(4, &ts, sqlite::Destructor::STATIC)) .map_err(|_| "failed binding to mark locally deleted stmt")?; super::step_trigger_stmt(mark_locally_deleted_stmt)?; diff --git a/core/rs/core/src/local_writes/after_insert.rs b/core/rs/core/src/local_writes/after_insert.rs index 0219abb11..e7a25c7c8 100644 --- a/core/rs/core/src/local_writes/after_insert.rs +++ b/core/rs/core/src/local_writes/after_insert.rs @@ -1,4 +1,5 @@ use alloc::string::String; +use alloc::string::ToString; use core::ffi::c_int; use sqlite::sqlite3; use sqlite::value; @@ -39,6 +40,8 @@ fn after_insert( tbl_info: &TableInfo, pks_new: &[*mut value], ) -> Result { + let ts = unsafe { (*ext_data).timestamp.to_string() }; + let db_version = crate::db_version::next_db_version(db, ext_data)?; let (create_record_existed, key_new) = tbl_info .get_or_create_key_for_insert(db, pks_new) @@ -46,14 +49,14 @@ fn after_insert( if tbl_info.non_pks.is_empty() { let seq = bump_seq(ext_data); // just a sentinel record - return super::mark_new_pk_row_created(db, tbl_info, key_new, db_version, seq); + return super::mark_new_pk_row_created(db, tbl_info, key_new, db_version, seq, &ts); } else if create_record_existed { // update the create record since it already exists. let seq = bump_seq(ext_data); - update_create_record(db, tbl_info, key_new, db_version, seq)?; + update_create_record(db, tbl_info, key_new, db_version, seq, &ts)?; } - super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version)?; + super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version, &ts)?; Ok(ResultCode::OK) } @@ -64,6 +67,7 @@ fn update_create_record( new_key: sqlite::int64, db_version: sqlite::int64, seq: i32, + ts: &str, ) -> Result { let update_create_record_stmt_ref = tbl_info .get_maybe_mark_locally_reinserted_stmt(db) @@ -75,10 +79,11 @@ fn update_create_record( update_create_record_stmt .bind_int64(1, db_version) .and_then(|_| update_create_record_stmt.bind_int(2, seq)) - .and_then(|_| update_create_record_stmt.bind_int64(3, new_key)) + .and_then(|_| update_create_record_stmt.bind_text(3, ts, sqlite::Destructor::STATIC)) + .and_then(|_| update_create_record_stmt.bind_int64(4, new_key)) .and_then(|_| { update_create_record_stmt.bind_text( - 4, + 5, crate::c::INSERT_SENTINEL, sqlite::Destructor::STATIC, ) diff --git a/core/rs/core/src/local_writes/after_update.rs b/core/rs/core/src/local_writes/after_update.rs index c22dbcac1..e825664ca 100644 --- a/core/rs/core/src/local_writes/after_update.rs +++ b/core/rs/core/src/local_writes/after_update.rs @@ -2,6 +2,7 @@ use core::ffi::c_int; use alloc::format; use alloc::string::String; +use alloc::string::ToString; use sqlite::{sqlite3, value, Context, ResultCode}; use sqlite_nostd as sqlite; @@ -71,10 +72,11 @@ fn after_update( non_pks_new: &[*mut value], non_pks_old: &[*mut value], ) -> Result { - let next_db_version: i64 = crate::db_version::peek_next_db_version(db, ext_data)?; + let ts = unsafe { (*ext_data).timestamp.to_string() }; + let next_db_version = crate::db_version::peek_next_db_version(db, ext_data)?; let new_key = tbl_info .get_or_create_key_via_raw_values(db, pks_new) - .map_err(|_| "failed geteting or creating lookaside key")?; + .map_err(|_| "failed getting or creating lookaside key")?; let mut changed = false; // Changing a primary key column to a new value is the same thing as deleting the row @@ -86,10 +88,10 @@ fn after_update( let next_seq = super::bump_seq(ext_data); changed = true; // Record the delete of the row identified by the old primary keys - after_update__mark_old_pk_row_deleted(db, tbl_info, old_key, next_db_version, next_seq)?; + after_update__mark_old_pk_row_deleted(db, tbl_info, old_key, next_db_version, next_seq, &ts)?; let next_seq = super::bump_seq(ext_data); // todo: we don't need to this, if there's no existing row (cl is assumed to be 1). - super::mark_new_pk_row_created(db, tbl_info, new_key, next_db_version, next_seq)?; + super::mark_new_pk_row_created(db, tbl_info, new_key, next_db_version, next_seq, &ts)?; for col in tbl_info.non_pks.iter() { let next_seq = super::bump_seq(ext_data); after_update__move_non_pk_col( @@ -99,6 +101,7 @@ fn after_update( old_key, &col.name, next_db_version, + &ts, next_seq, )?; } @@ -123,6 +126,7 @@ fn after_update( col_info, next_db_version, next_seq, + &ts, )?; } } @@ -143,6 +147,7 @@ fn after_update__mark_old_pk_row_deleted( old_key: sqlite::int64, db_version: sqlite::int64, seq: i32, + ts: &str, ) -> Result { let mark_locally_deleted_stmt_ref = tbl_info .get_mark_locally_deleted_stmt(db) @@ -154,6 +159,7 @@ fn after_update__mark_old_pk_row_deleted( .bind_int64(1, old_key) .and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version)) .and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq)) + .and_then(|_| mark_locally_deleted_stmt.bind_text(4, ts, sqlite::Destructor::STATIC)) // .and_then(|_| mark_locally_deleted_stmt.bind_int64(4, db_version)) .or_else(|_| Err("failed binding to mark_locally_deleted_stmt"))?; super::step_trigger_stmt(mark_locally_deleted_stmt) @@ -167,6 +173,7 @@ fn after_update__move_non_pk_col( old_key: sqlite::int64, col_name: &str, db_version: sqlite::int64, + ts: &str, seq: i32, ) -> Result { let move_non_pk_col_stmt_ref = tbl_info @@ -179,8 +186,9 @@ fn after_update__move_non_pk_col( .bind_int64(1, new_key) .and_then(|_| move_non_pk_col_stmt.bind_int64(2, db_version)) .and_then(|_| move_non_pk_col_stmt.bind_int(3, seq)) - .and_then(|_| move_non_pk_col_stmt.bind_int64(4, old_key)) - .and_then(|_| move_non_pk_col_stmt.bind_text(5, col_name, sqlite::Destructor::STATIC)) + .and_then(|_| move_non_pk_col_stmt.bind_text(4, ts, sqlite::Destructor::STATIC)) + .and_then(|_| move_non_pk_col_stmt.bind_int64(5, old_key)) + .and_then(|_| move_non_pk_col_stmt.bind_text(6, col_name, sqlite::Destructor::STATIC)) .or_else(|_| Err("failed binding to move_non_pk_col_stmt"))?; super::step_trigger_stmt(move_non_pk_col_stmt) } diff --git a/core/rs/core/src/local_writes/mod.rs b/core/rs/core/src/local_writes/mod.rs index 5dc4f9a30..c2261cfc8 100644 --- a/core/rs/core/src/local_writes/mod.rs +++ b/core/rs/core/src/local_writes/mod.rs @@ -1,4 +1,3 @@ -use alloc::collections::BTreeMap; use core::ffi::{c_char, c_int}; use core::mem::ManuallyDrop; @@ -84,6 +83,7 @@ fn mark_new_pk_row_created( key_new: sqlite::int64, db_version: i64, seq: i32, + ts: &str, ) -> Result { let mark_locally_created_stmt_ref = tbl_info .get_mark_locally_created_stmt(db) @@ -96,6 +96,7 @@ fn mark_new_pk_row_created( .bind_int64(1, key_new) .and_then(|_| mark_locally_created_stmt.bind_int64(2, db_version)) .and_then(|_| mark_locally_created_stmt.bind_int(3, seq)) + .and_then(|_| mark_locally_created_stmt.bind_text(4, ts, sqlite::Destructor::STATIC)) .map_err(|_| "failed binding to mark_locally_created_stmt")?; step_trigger_stmt(mark_locally_created_stmt) } @@ -114,6 +115,7 @@ fn mark_locally_inserted( tbl_info: &TableInfo, new_key: sqlite::int64, db_version: sqlite::int64, + ts: &str, ) -> Result { let mut last_seq = None; let mut to_insert = Vec::with_capacity(tbl_info.non_pks.len()); @@ -130,8 +132,9 @@ fn mark_locally_inserted( update_clock_stmt .bind_int64(1, db_version) .and_then(|_| update_clock_stmt.bind_int(2, seq)) - .and_then(|_| update_clock_stmt.bind_int64(3, new_key)) - .and_then(|_| update_clock_stmt.bind_text(4, &col.name, sqlite::Destructor::STATIC)) + .and_then(|_| update_clock_stmt.bind_text(3, ts, sqlite::Destructor::STATIC)) + .and_then(|_| update_clock_stmt.bind_int64(4, new_key)) + .and_then(|_| update_clock_stmt.bind_text(5, &col.name, sqlite::Destructor::STATIC)) .map_err(|_| "failed binding to update_clock_stmt")?; step_trigger_stmt(update_clock_stmt)?; @@ -154,7 +157,7 @@ fn mark_locally_inserted( .ok_or("Failed to deref combo_insert_clock_stmt")?; for (i, col) in tbl_info.non_pks.iter().enumerate() { let seq = last_seq.take().unwrap_or_else(|| bump_seq(ext_data)); - let offset = i as i32 * 4; + let offset = i as i32 * 5; combo_insert_clock_stmt .bind_int64(offset + 1, new_key) @@ -167,6 +170,7 @@ fn mark_locally_inserted( }) .and_then(|_| combo_insert_clock_stmt.bind_int64(offset + 3, db_version)) .and_then(|_| combo_insert_clock_stmt.bind_int(offset + 4, seq)) + .and_then(|_| combo_insert_clock_stmt.bind_text(offset + 5, ts, sqlite::Destructor::STATIC)) .map_err(|code| { format!("failed binding to combo_insert_clock_stmt, code: {code}") })?; @@ -196,6 +200,7 @@ fn mark_locally_inserted( }) .and_then(|_| insert_clock_stmt.bind_int64(3, db_version)) .and_then(|_| insert_clock_stmt.bind_int(4, seq)) + .and_then(|_| insert_clock_stmt.bind_text(5, ts, sqlite::Destructor::STATIC)) .map_err(|_| "failed binding to insert_clock_stmt")?; step_trigger_stmt(insert_clock_stmt)?; @@ -214,6 +219,7 @@ fn mark_locally_updated( col_info: &ColumnInfo, db_version: sqlite::int64, seq: i32, + ts: &str, ) -> Result { let update_clock_stmt_ref = tbl_info .get_update_clock_stmt(db) @@ -225,8 +231,9 @@ fn mark_locally_updated( update_clock_stmt .bind_int64(1, db_version) .and_then(|_| update_clock_stmt.bind_int(2, seq)) - .and_then(|_| update_clock_stmt.bind_int64(3, new_key)) - .and_then(|_| update_clock_stmt.bind_text(4, &col_info.name, sqlite::Destructor::STATIC)) + .and_then(|_| update_clock_stmt.bind_text(3, ts, sqlite::Destructor::STATIC)) + .and_then(|_| update_clock_stmt.bind_int64(4, new_key)) + .and_then(|_| update_clock_stmt.bind_text(5, &col_info.name, sqlite::Destructor::STATIC)) .map_err(|_| "failed binding to update_clock_stmt")?; step_trigger_stmt(update_clock_stmt)?; @@ -246,6 +253,7 @@ fn mark_locally_updated( }) .and_then(|_| insert_clock_stmt.bind_int64(3, db_version)) .and_then(|_| insert_clock_stmt.bind_int(4, seq)) + .and_then(|_| insert_clock_stmt.bind_text(5, ts, sqlite::Destructor::STATIC)) .map_err(|_| "failed binding to insert_clock_stmt")?; step_trigger_stmt(insert_clock_stmt)?; diff --git a/core/rs/core/src/tableinfo.rs b/core/rs/core/src/tableinfo.rs index 3d5b04427..217147a1c 100644 --- a/core/rs/core/src/tableinfo.rs +++ b/core/rs/core/src/tableinfo.rs @@ -289,13 +289,14 @@ impl TableInfo { if self.set_winner_clock_stmt.try_borrow()?.is_none() { let sql = format!( "INSERT OR REPLACE INTO \"{table_name}__crsql_clock\" - (key, col_name, col_version, db_version, seq, site_id) + (key, col_name, col_version, db_version, seq, site_id, ts) VALUES ( ?, ?, ?, ?, ?, + ?, ? ) RETURNING key", table_name = crate::util::escape_ident(&self.tbl_name), @@ -433,19 +434,22 @@ impl TableInfo { col_version, db_version, seq, - site_id + site_id, + ts ) SELECT ?, '{sentinel}', 2, ?, ?, - 0 WHERE true + 0, + ? WHERE true ON CONFLICT DO UPDATE SET col_version = 1 + col_version, db_version = excluded.db_version, seq = excluded.seq, - site_id = 0", + site_id = 0, + ts = excluded.ts", table_name = crate::util::escape_ident(&self.tbl_name), sentinel = crate::c::DELETE_SENTINEL, ); @@ -486,7 +490,8 @@ impl TableInfo { db_version = ?, seq = ?, col_version = col_version + 1, - site_id = 0 + site_id = 0, + ts = ? WHERE key = ? AND col_name = ?", table_name = crate::util::escape_ident(&self.tbl_name), @@ -509,19 +514,22 @@ impl TableInfo { col_version, db_version, seq, - site_id + site_id, + ts ) SELECT ?, '{sentinel}', 1, ?, ?, - 0 WHERE true + 0, + ? WHERE true ON CONFLICT DO UPDATE SET col_version = CASE col_version % 2 WHEN 0 THEN col_version + 1 ELSE col_version + 2 END, db_version = excluded.db_version, seq = excluded.seq, - site_id = 0", + site_id = 0, + ts = excluded.ts", table_name = crate::util::escape_ident(&self.tbl_name), sentinel = crate::c::INSERT_SENTINEL, ); @@ -538,12 +546,12 @@ impl TableInfo { if self.combo_insert_clock_stmt.try_borrow()?.is_none() { let sql = format!( "INSERT OR IGNORE INTO \"{table_name}__crsql_clock\" ( - key, col_name, col_version, db_version, seq, site_id + key, col_name, col_version, db_version, seq, site_id, ts ) VALUES {values};", values = self .non_pks .iter() - .map(|_col| "(?, ?, 1, ?, ?, 0)") + .map(|_col| "(?, ?, 1, ?, ?, 0, ?)") .collect::>() .join(", "), table_name = crate::util::escape_ident(&self.tbl_name) @@ -577,8 +585,8 @@ impl TableInfo { if self.insert_clock_stmt.try_borrow()?.is_none() { let sql = format!( "INSERT INTO \"{table_name}__crsql_clock\" ( - key, col_name, col_version, db_version, seq, site_id - ) VALUES (?, ?, 1, ?, ?, 0);", + key, col_name, col_version, db_version, seq, site_id, ts + ) VALUES (?, ?, 1, ?, ?, 0, ?);", table_name = crate::util::escape_ident(&self.tbl_name), ); let ret = db.prepare_v3(&sql, sqlite::PREPARE_PERSISTENT)?; @@ -598,7 +606,8 @@ impl TableInfo { col_version = col_version + 1, db_version = ?, seq = ?, - site_id = 0 + site_id = 0, + ts = ? WHERE key = ? AND col_name = ?;", table_name = crate::util::escape_ident(&self.tbl_name), ); @@ -622,7 +631,8 @@ impl TableInfo { col_version = CASE col_version % 2 WHEN 0 THEN col_version + 1 ELSE col_version + 2 END, db_version = ?, seq = ?, - site_id = 0 + site_id = 0, + ts = ? WHERE key = ? AND col_name = ?", table_name = crate::util::escape_ident(&self.tbl_name), ); diff --git a/core/rs/integration_check/src/t/pk_only_tables.rs b/core/rs/integration_check/src/t/pk_only_tables.rs index b7367e2b5..5f63a57ad 100644 --- a/core/rs/integration_check/src/t/pk_only_tables.rs +++ b/core/rs/integration_check/src/t/pk_only_tables.rs @@ -20,9 +20,9 @@ fn sync_left_to_right(l: &dyn Connection, r: &dyn Connection, since: sqlite::int while stmt_l.step().expect("pulled change set") == ResultCode::ROW { let stmt_r = r - .prepare_v2("INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") + .prepare_v2("INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") .expect("prepared insert changes"); - for x in 0..9 { + for x in 0..10 { stmt_r .bind_value(x + 1, stmt_l.column_value(x).expect("got changeset value")) .expect("bound value"); diff --git a/core/rs/integration_check/src/t/tableinfo.rs b/core/rs/integration_check/src/t/tableinfo.rs index f93c2f491..3e74e88a2 100644 --- a/core/rs/integration_check/src/t/tableinfo.rs +++ b/core/rs/integration_check/src/t/tableinfo.rs @@ -47,7 +47,8 @@ fn test_ensure_table_infos_are_up_to_date() { col_version, db_version, site_id, - seq + seq, + ts )", ) .expect("made foo clock"); @@ -82,7 +83,8 @@ fn test_ensure_table_infos_are_up_to_date() { col_version, db_version, site_id, - seq + seq, + ts )", ) .expect("made boo clock"); diff --git a/core/src/changes-vtab.c b/core/src/changes-vtab.c index 923f68391..cb48a6642 100644 --- a/core/src/changes-vtab.c +++ b/core/src/changes-vtab.c @@ -28,7 +28,7 @@ static int changesConnect(sqlite3 *db, void *pAux, int argc, "CREATE TABLE x([table] TEXT NOT NULL, [pk] BLOB NOT NULL, [cid] TEXT " "NOT NULL, [val] ANY, [col_version] INTEGER NOT NULL, [db_version] " "INTEGER NOT NULL, [site_id] BLOB NOT NULL, [cl] INTEGER NOT NULL, [seq] " - "INTEGER NOT NULL)"); + "INTEGER NOT NULL, [ts] TEXT NOT NULL)"); if (rc != SQLITE_OK) { *pzErr = sqlite3_mprintf("Could not define the table"); return rc; diff --git a/core/src/crsqlite.test.c b/core/src/crsqlite.test.c index f01376817..e193aee1d 100644 --- a/core/src/crsqlite.test.c +++ b/core/src/crsqlite.test.c @@ -57,7 +57,7 @@ int syncLeftToRight(sqlite3 *db1, sqlite3 *db2, sqlite3_int64 since) { rc += sqlite3_bind_value(pStmtRead, 1, sqlite3_column_value(pStmt, 0)); assert(rc == SQLITE_OK); rc += sqlite3_prepare_v2( - db2, "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + db2, "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", -1, &pStmtWrite, 0); assert(rc == SQLITE_OK); // printf("err: %s\n", err); @@ -450,7 +450,7 @@ static const void *getSiteId(sqlite3 *db) { return NULL; } - + sqlite3_step(pStmt); const void *site_id = sqlite3_column_blob(pStmt, 0); sqlite3_finalize(pStmt); @@ -520,7 +520,7 @@ static void testLamportCondition() { // assert(site_id2 != NULL); sqlite3_int64 db2_db1v = getSiteDbVersion(db2, site_id1); - + printf("db1v: %lld\n", db1v); printf("db2_db1v: %lld\n", db2_db1v); assert(db1v == db2_db1v); @@ -682,4 +682,4 @@ void crsqlTestSuite() { // testSyncBit(); // testDbVersion(); // testSiteId(); -} \ No newline at end of file +} diff --git a/core/src/ext-data.h b/core/src/ext-data.h index 7c5ad972a..dd56a0ae0 100644 --- a/core/src/ext-data.h +++ b/core/src/ext-data.h @@ -47,6 +47,7 @@ struct crsql_ExtData { sqlite3_stmt *pSelectClockTablesStmt; int mergeEqualValues; + unsigned long long timestamp; }; crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer); diff --git a/core/src/rows-impacted.test.c b/core/src/rows-impacted.test.c index 4e3c03df3..8ad8dc50b 100644 --- a/core/src/rows-impacted.test.c +++ b/core/src/rows-impacted.test.c @@ -28,7 +28,7 @@ static void testSingleInsertSingleTx() { rc = sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010901', 'b', " - "2, 1, 1, NULL, 1, 1)", + "2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -59,15 +59,15 @@ static void testManyInsertsInATx() { rc = sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010901', 'b', " - "2, 1, 1, NULL, 1, 1)", + "2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010902', 'b', " - "2, 1, 1, NULL, 1, 1)", + "2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010903', 'b', " - "2, 1, 1, NULL, 1, 1)", + "2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -96,9 +96,9 @@ static void testMultipartInsertInTx() { rc = sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010901', 'b', " - "2, 1, 1, NULL, 1, 1), " - "('foo', X'010902', 'b', 2, 1, 1, NULL, 1, 1), ('foo', " - "X'010903', 'b', 2, 1, 1, NULL, 1, 1)", + "2, 1, 1, NULL, 1, 1, '0'), " + "('foo', X'010902', 'b', 2, 1, 1, NULL, 1, 1, '0'), ('foo', " + "X'010903', 'b', 2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -128,7 +128,7 @@ static void testManyTxns() { rc = sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010901', 'b', " - "2, 1, 1, NULL, 1, 1)", + "2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -140,11 +140,11 @@ static void testManyTxns() { rc = sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010902', 'b', " - "2, 1, 1, NULL, 1, 1)", + "2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010903', 'b', " - "2, 1, 1, NULL, 1, 1)", + "2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -198,7 +198,7 @@ static void testUpdateThatDoesNotChangeAnything() { rc += sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', " - "crsql_pack_columns(1), 'b', 2, 1, 1, NULL, 1, 1)", + "crsql_pack_columns(1), 'b', 2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -211,7 +211,7 @@ static void testUpdateThatDoesNotChangeAnything() { rc += sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', " - "crsql_pack_columns(1), 'b', 0, 1, 1, NULL, 1, 1)", + "crsql_pack_columns(1), 'b', 0, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -224,7 +224,7 @@ static void testUpdateThatDoesNotChangeAnything() { rc += sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', " - "crsql_pack_columns(1), 'b', 2, 0, 0, NULL, 1, 1)", + "crsql_pack_columns(1), 'b', 2, 0, 0, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -251,7 +251,7 @@ static void testDeleteThatDoesNotChangeAnything() { rc += sqlite3_exec( db, "INSERT INTO crsql_changes VALUES ('foo', crsql_pack_columns(1), " - "'-1', NULL, 2, 2, NULL, 1, 1)", //__crsql_del + "'-1', NULL, 2, 2, NULL, 1, 1, '0')", //__crsql_del 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -276,7 +276,7 @@ static void testDelete() { rc += sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010901', " - "'-1', NULL, 2, 2, NULL, 2, 1)", //__crsql_del + "'-1', NULL, 2, 2, NULL, 2, 1, '0')", //__crsql_del 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -301,7 +301,7 @@ static void testCreateThatDoesNotChangeAnything() { rc += sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010901', 'b', " - "2, 1, 1, NULL, 1, 1)", + "2, 1, 1, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -326,7 +326,7 @@ static void testValueWin() { rc = sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010901', 'b', " - "3, 1, 1, X'00000000000000000000000000000000', 1, 1)", + "3, 1, 1, X'00000000000000000000000000000000', 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -351,7 +351,7 @@ static void testClockWin() { rc = sqlite3_exec(db, "BEGIN", 0, 0, 0); rc += sqlite3_exec(db, "INSERT INTO crsql_changes VALUES ('foo', X'010901', 'b', " - "2, 2, 2, NULL, 1, 1)", + "2, 2, 2, NULL, 1, 1, '0')", 0, 0, &err); sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0); sqlite3_step(pStmt); @@ -377,4 +377,4 @@ void rowsImpactedTestSuite() { testValueWin(); testClockWin(); testDelete(); -} \ No newline at end of file +} diff --git a/py/correctness/src/crsql_correctness/__init__.py b/py/correctness/src/crsql_correctness/__init__.py index 19b3817b1..64a101249 100644 --- a/py/correctness/src/crsql_correctness/__init__.py +++ b/py/correctness/src/crsql_correctness/__init__.py @@ -24,7 +24,7 @@ def sync_left_to_right(l, r, since): "SELECT * FROM crsql_changes WHERE db_version > ?", (since,)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() min_db_v = 0 diff --git a/py/correctness/tests/test_cl_merging.py b/py/correctness/tests/test_cl_merging.py index 43206e687..0aebe8dba 100644 --- a/py/correctness/tests/test_cl_merging.py +++ b/py/correctness/tests/test_cl_merging.py @@ -45,7 +45,7 @@ def sync_left_to_right(l, r, since): "SELECT * FROM crsql_changes WHERE db_version > ?", (since,)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() @@ -54,16 +54,16 @@ def sync_left_to_right_exact_version(l, r, db_version): "SELECT * FROM crsql_changes WHERE db_version = ?", (db_version,)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() def sync_left_to_right_include_siteid(l, r, since): changes = l.execute( - "SELECT [table], pk, cid, val, col_version, db_version, site_id, cl, seq FROM crsql_changes WHERE db_version > ?", (since,)) + "SELECT [table], pk, cid, val, col_version, db_version, site_id, cl, seq, ts FROM crsql_changes WHERE db_version > ?", (since,)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() @@ -73,13 +73,13 @@ def sync_left_to_right_include_siteid(l, r, since): def sync_left_to_right_normal_delta_state(l, r, since): r_siteid = r.execute("SELECT crsql_site_id()").fetchone()[0] changes = l.execute( - "SELECT [table], pk, cid, val, col_version, db_version, site_id, cl, seq FROM crsql_changes WHERE db_version > ? AND site_id IS NOT ?", + "SELECT [table], pk, cid, val, col_version, db_version, site_id, cl, seq, ts FROM crsql_changes WHERE db_version > ? AND site_id IS NOT ?", (since, r_siteid)) largest_version = 0 for change in changes: max(largest_version, change[5]) r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() return largest_version @@ -87,11 +87,11 @@ def sync_left_to_right_normal_delta_state(l, r, since): def sync_left_to_right_single_vrsn(l, r, vrsn): r_siteid = r.execute("SELECT crsql_site_id()").fetchone()[0] changes = l.execute( - "SELECT [table], pk, cid, val, col_version, db_version, site_id, cl, seq FROM crsql_changes WHERE db_version = ? AND site_id IS NOT ?", + "SELECT [table], pk, cid, val, col_version, db_version, site_id, cl, seq, ts FROM crsql_changes WHERE db_version = ? AND site_id IS NOT ?", (vrsn, r_siteid)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() @@ -162,10 +162,10 @@ def test_larger_cl_delete_deletes_all(): c1_site_id = get_site_id(c1) # c1 shouldn't have column metadata but only a delete record of the dropped item whose causal length should be 2. assert (c1_changes == [ - ('foo', b'\x01\t\x01', '-1', None, 2, 1, c1_site_id, 2, 1)]) + ('foo', b'\x01\t\x01', '-1', None, 2, 1, c1_site_id, 2, 1, '0')]) # c2 merged in the delete thus bumping causal length to 2 and bumping db version since there was a change. assert (c2_changes == [ - ('foo', b'\x01\t\x01', '-1', None, 2, 1, c1_site_id, 2, 1)]) + ('foo', b'\x01\t\x01', '-1', None, 2, 1, c1_site_id, 2, 1, '0')]) close(c1) close(c2) @@ -187,7 +187,7 @@ def test_smaller_delete_does_not_delete_larger_cl(): c1_site_id = get_site_id(c1) c1_changes = c1.execute("SELECT * FROM crsql_changes").fetchall() assert (c1_changes == [ - ('foo', b'\x01\t\x01', '-1', None, 2, 1, c1_site_id, 2, 1)]) + ('foo', b'\x01\t\x01', '-1', None, 2, 1, c1_site_id, 2, 1, '0')]) c2_changes_pre_merge = c2.execute("SELECT * FROM crsql_changes").fetchall() @@ -218,7 +218,7 @@ def test_equivalent_delete_cls_is_noop(): # create a manual clock entry that wouldn't normally exist # this clock entry would be removed if the merge does any work rather than bailing early c2.execute( - "INSERT INTO foo__crsql_clock VALUES (1, 'b', 3, 1, 0, 1)") + "INSERT INTO foo__crsql_clock VALUES (1, 'b', 3, 1, 0, 1, '0')") c2.commit() pre_changes = c2.execute("SELECT * FROM crsql_changes").fetchall() sync_left_to_right(c1, c2, 0) @@ -282,8 +282,8 @@ def test_pr_299_scenario(): # c2 should have accepted all the changes given the higher causal length # a = 1, b = 1, cl = 3 c1_site_id = get_site_id(c1) - assert (changes == [('foo', b'\x01\t\x01', '-1', None, 3, 3, c1_site_id, 3, 0), - ('foo', b'\x01\t\x01', 'b', 1, 1, 3, c1_site_id, 3, 1)]) + assert (changes == [('foo', b'\x01\t\x01', '-1', None, 3, 3, c1_site_id, 3, 0, '0'), + ('foo', b'\x01\t\x01', 'b', 1, 1, 3, c1_site_id, 3, 1, '0')]) # c2 and c1 should match in terms of data assert (c1.execute("SELECT * FROM foo").fetchall() == c2.execute("SELECT * FROM foo").fetchall()) @@ -318,7 +318,8 @@ def test_sync_with_siteid(): 1, c1_site_id, 1, - 0)]) + 0, + '0')]) c1.execute("UPDATE foo SET b = 2 WHERE a = 1") c1.commit() @@ -332,7 +333,8 @@ def test_sync_with_siteid(): 2, c1_site_id, 1, - 0)]) + 0, + '0')]) c1.execute("DELETE FROM foo WHERE a = 1") c1.commit() @@ -346,7 +348,8 @@ def test_sync_with_siteid(): 3, c1_site_id, 2, - 0)]) + 0, + '0')]) c1.execute("INSERT INTO foo VALUES (1, 5)") c1.commit() @@ -360,7 +363,8 @@ def test_sync_with_siteid(): 4, c1_site_id, 3, - 0), + 0, + '0'), ('foo', b'\x01\t\x01', 'b', @@ -369,7 +373,8 @@ def test_sync_with_siteid(): 4, c1_site_id, 3, - 1)]) + 1, + '0')]) close(c1) close(c2) @@ -392,7 +397,7 @@ def test_resurrection_of_live_thing_via_sentinel(): sentinel_resurrect = c1.execute( "SELECT * FROM crsql_changes WHERE cid = '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) c2.commit() changes = c2.execute("SELECT * FROM crsql_changes").fetchall() @@ -401,18 +406,18 @@ def test_resurrection_of_live_thing_via_sentinel(): # The sentinel row will also zero the column on another node when it receives it. c2_site_id = get_site_id(c2) c1_site_id = get_site_id(c1) - assert (changes == [('foo', b'\x01\t\x01', 'b', 1, 0, 1, c2_site_id, 3, 0), - ('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 1, 0, 1, c2_site_id, 3, 0, '0'), + ('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2, '0')]) # now lets finish getting changes from the other node changes = c1.execute( "SELECT * FROM crsql_changes WHERE cid != '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) c2.commit() changes = c2.execute("SELECT * FROM crsql_changes").fetchall() - assert (changes == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2), + assert (changes == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2, '0'), # col version bump to 1 since the other guy won on col version. # db version bumped as well since the col version changed. # holding the db version stable would prevent nodes that proxy other nodes @@ -423,7 +428,7 @@ def test_resurrection_of_live_thing_via_sentinel(): # Then B receives changes from A which move B's clock forward w/o changing B's value # C then merges to B and loses there # If B db version didn't change then C would never get the changes that B is proxying from A - ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3)]) + ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3, '0')]) close(c1) close(c2) @@ -452,12 +457,12 @@ def test_resurrection_of_live_thing_via_sentinel_multiple(): c3_changes = c3.execute( "SELECT * FROM crsql_changes").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", c3_changes) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", c3_changes) c2.commit() # 'b' should be set to 2 since with c3's db_version c3 has a higher col_version. changes2 = c2.execute("SELECT * FROM crsql_changes").fetchall() - assert (changes2 == [('foo', b'\x01\t\x01', 'b', 2, 2, 1, c3_site_id, 1, 1)]) + assert (changes2 == [('foo', b'\x01\t\x01', 'b', 2, 2, 1, c3_site_id, 1, 1, '0')]) # a resurrection of an already live row @@ -465,19 +470,19 @@ def test_resurrection_of_live_thing_via_sentinel_multiple(): sentinel_resurrect = c1.execute( "SELECT * FROM crsql_changes WHERE cid = '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) c2.commit() c3.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) c3.commit() changes2 = c2.execute("SELECT * FROM crsql_changes").fetchall() changes3 = c3.execute("SELECT * FROM crsql_changes").fetchall() # 'b' should be zeroed column version but same db version. - assert (changes2 == [('foo', b'\x01\t\x01', 'b', 2, 0, 1, c3_site_id, 3, 1), - ('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2)]) + assert (changes2 == [('foo', b'\x01\t\x01', 'b', 2, 0, 1, c3_site_id, 3, 1, '0'), + ('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2, '0')]) assert (changes2 == changes3) @@ -485,18 +490,18 @@ def test_resurrection_of_live_thing_via_sentinel_multiple(): changes = c1.execute( "SELECT * FROM crsql_changes WHERE cid != '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) c2.commit() c3.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) c3.commit() changes2 = c2.execute("SELECT * FROM crsql_changes").fetchall() changes3 = c3.execute("SELECT * FROM crsql_changes").fetchall() - assert (changes2 == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2), - ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3)]) + assert (changes2 == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2, '0'), + ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3, '0')]) assert (changes2 == changes3) close(c1) @@ -526,27 +531,27 @@ def test_resurrection_of_live_thing_via_sentinel_out_of_order(): sentinel_resurrect = c1.execute( "SELECT * FROM crsql_changes WHERE cid = '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) c2.commit() c3.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) c3.commit() changes2 = c2.execute("SELECT * FROM crsql_changes").fetchall() changes3 = c3.execute("SELECT * FROM crsql_changes").fetchall() # 'b' should be zeroed column version but same db version. - assert (changes2 == [('foo', b'\x01\t\x01', 'b', 1, 0, 1, c2_site_id, 3, 0), - ('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2)]) + assert (changes2 == [('foo', b'\x01\t\x01', 'b', 1, 0, 1, c2_site_id, 3, 0, '0'), + ('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2, '0')]) # actor c3 will only have the sentinel row - assert (changes3 == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2)]) + assert (changes3 == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2, '0')]) changes_c2 = c2.execute( "SELECT * FROM crsql_changes WHERE cid != '-1'").fetchone() c3.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", changes_c2) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", changes_c2) c3.commit() # syncing with c2 won't change anything since c3 already has the sentinel row @@ -557,17 +562,17 @@ def test_resurrection_of_live_thing_via_sentinel_out_of_order(): changes = c1.execute( "SELECT * FROM crsql_changes WHERE cid != '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) c2.commit() - c3.execute("INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) + c3.execute("INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", changes) c3.commit() changes2 = c2.execute("SELECT * FROM crsql_changes").fetchall() changes3 = c3.execute("SELECT * FROM crsql_changes").fetchall() - assert (changes2 == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2), - ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3)]) + assert (changes2 == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 2, '0'), + ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3, '0')]) assert (changes2 == changes3) close(c1) @@ -591,7 +596,7 @@ def test_resurrection_of_live_thing_via_non_sentinel(): non_sentinel_resurrect = c1.execute( "SELECT * FROM crsql_changes WHERE cid != '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", non_sentinel_resurrect) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", non_sentinel_resurrect) c2.commit() changes = c2.execute("SELECT * FROM crsql_changes").fetchall() @@ -599,8 +604,8 @@ def test_resurrection_of_live_thing_via_non_sentinel(): # db version pushed # col version is at 1 given we rolled the causal length forward for the resurrection c1_site_id = get_site_id(c1) - assert (changes == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 3), - ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3)]) + assert (changes == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 3, '0'), + ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3, '0')]) # sync all other entries should be a no-op sync_left_to_right(c1, c2, 0) @@ -626,7 +631,7 @@ def test_resurrection_of_dead_thing_via_sentinel(): sentinel_resurrect = c1.execute( "SELECT * FROM crsql_changes WHERE cid = '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) c2.commit() changes = c2.execute("SELECT * FROM crsql_changes").fetchall() @@ -635,7 +640,7 @@ def test_resurrection_of_dead_thing_via_sentinel(): # cl = 3 given resurrected from dead (2) # db_version = 2 given it was a change assert (changes == [('foo', b'\x01\t\x01', - '-1', None, 3, 1, c1_site_id, 3, 2)]) + '-1', None, 3, 1, c1_site_id, 3, 2, '0')]) close(c1) close(c2) @@ -656,7 +661,7 @@ def test_resurrection_of_dead_thing_via_non_sentinel(): sentinel_resurrect = c1.execute( "SELECT * FROM crsql_changes WHERE cid != '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_resurrect) c2.commit() changes = c2.execute("SELECT * FROM crsql_changes").fetchall() @@ -665,8 +670,8 @@ def test_resurrection_of_dead_thing_via_non_sentinel(): # db_version = 2 given it was a change # col version rolled back given cl moved forward c1_site_id = get_site_id(c1) - assert (changes == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 3), - ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3)]) + assert (changes == [('foo', b'\x01\t\x01', '-1', None, 3, 1, c1_site_id, 3, 3, '0'), + ('foo', b'\x01\t\x01', 'b', 1, 1, 1, c1_site_id, 3, 3, '0')]) close(c1) close(c2) @@ -705,13 +710,13 @@ def test_delete_via_sentinel(): sentinel_delete = c1.execute( "SELECT * FROM crsql_changes WHERE cid = '-1'").fetchone() c2.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_delete) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", sentinel_delete) c2.commit() changes = c2.execute("SELECT * FROM crsql_changes").fetchall() c1_site_id = get_site_id(c1) assert (changes == [('foo', b'\x01\t\x01', - '-1', None, 2, 2, c1_site_id, 2, 0)]) + '-1', None, 2, 2, c1_site_id, 2, 0, '0')]) close(c1) close(c2) @@ -1064,7 +1069,7 @@ def test_pko_resurrect(): changes = c2.execute("SELECT * FROM crsql_changes").fetchall() c1_site_id = get_site_id(c1) assert (changes == [('foo', b'\x01\t\x01', - '-1', None, 3, 3, c1_site_id, 3, 0)]) + '-1', None, 3, 3, c1_site_id, 3, 0, '0')]) close(c1) close(c2) diff --git a/py/correctness/tests/test_cl_triggers.py b/py/correctness/tests/test_cl_triggers.py index 5a07a7a5e..b02950f4e 100644 --- a/py/correctness/tests/test_cl_triggers.py +++ b/py/correctness/tests/test_cl_triggers.py @@ -32,7 +32,7 @@ def sync_left_to_right(l, r, since): "SELECT * FROM crsql_changes WHERE db_version > ?", (since,)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() # The idea here is that we are using an upsert to create a row that has never existing in our db diff --git a/py/correctness/tests/test_dbversion.py b/py/correctness/tests/test_dbversion.py index 8bb6be723..93b79e76c 100644 --- a/py/correctness/tests/test_dbversion.py +++ b/py/correctness/tests/test_dbversion.py @@ -179,7 +179,7 @@ def create_db2(): assert db1.execute("SELECT db_version from crsql_db_versions where site_id = ?", (bytes(db2_site_id),)).fetchone()[0] == min_db_v + 1 changes = db1.execute("SELECT * FROM crsql_changes").fetchall() - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 3, 1, db2_site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 3, 1, db2_site_id, 1, 0, '0')]) db1.execute("UPDATE foo SET b = 3;") db1.commit() # db_version = 3 @@ -188,7 +188,7 @@ def create_db2(): changes = db1.execute("SELECT * FROM crsql_changes").fetchall() - assert (changes == [('foo', b'\x01\t\x01', 'b', 3, 4, 3, db1_site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 3, 4, 3, db1_site_id, 1, 0, '0')]) db_versions_1 = db1.execute("SELECT * FROM crsql_db_versions").fetchall() db_versions_2 = db2.execute("SELECT * FROM crsql_db_versions").fetchall() diff --git a/py/correctness/tests/test_lookaside_key_creation.py b/py/correctness/tests/test_lookaside_key_creation.py index 4e080696a..a47326505 100644 --- a/py/correctness/tests/test_lookaside_key_creation.py +++ b/py/correctness/tests/test_lookaside_key_creation.py @@ -16,7 +16,7 @@ def sync_left_to_right(l, r, since): "SELECT * FROM crsql_changes WHERE db_version > ?", (since,)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() diff --git a/py/correctness/tests/test_sandbox.py b/py/correctness/tests/test_sandbox.py index 554cc19a8..46a107419 100644 --- a/py/correctness/tests/test_sandbox.py +++ b/py/correctness/tests/test_sandbox.py @@ -12,7 +12,7 @@ def sync_left_to_right(l, r, since): for change in changes: ret = change[5] r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() return ret diff --git a/py/correctness/tests/test_sentinel_omission.py b/py/correctness/tests/test_sentinel_omission.py index b5091c7c7..6919edf52 100644 --- a/py/correctness/tests/test_sentinel_omission.py +++ b/py/correctness/tests/test_sentinel_omission.py @@ -7,7 +7,7 @@ def sync_left_to_right(l, r, since): "SELECT * FROM crsql_changes WHERE db_version > ?", (since,)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() diff --git a/py/correctness/tests/test_seq.py b/py/correctness/tests/test_seq.py index 904d06923..d33a0d572 100644 --- a/py/correctness/tests/test_seq.py +++ b/py/correctness/tests/test_seq.py @@ -6,7 +6,7 @@ def sync_left_to_right(l, r): changes = l.execute("SELECT * FROM crsql_changes") for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() diff --git a/py/correctness/tests/test_site_id_lookaside.py b/py/correctness/tests/test_site_id_lookaside.py index ac2838a42..734427a86 100644 --- a/py/correctness/tests/test_site_id_lookaside.py +++ b/py/correctness/tests/test_site_id_lookaside.py @@ -18,7 +18,7 @@ def test_insert_site_id(): # is an ordinal in actual table a = make_simple_schema() a.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0)") + "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0, '0')") a.commit() # Ordinal value, not site id, is in the clock table @@ -39,7 +39,7 @@ def test_insert_site_id(): def test_site_id_filter(): a = make_simple_schema() a.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0)") + "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0, '0')") a.commit() assert (a.execute( @@ -53,7 +53,7 @@ def test_local_changes_have_local_site(): a.execute("INSERT INTO foo VALUES (4,2)") a.commit() a.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0)") + "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0, '0')") a.commit() assert (a.execute( @@ -67,7 +67,7 @@ def test_site_id_ordinals_do_not_move_on_merge(): a = make_simple_schema() a.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0)") + "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0, '0')") a.commit() x = a.execute("SELECT quote(site_id) FROM crsql_changes").fetchall() @@ -75,7 +75,7 @@ def test_site_id_ordinals_do_not_move_on_merge(): # insert again with the same site id a.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010902', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0)") + "INSERT INTO crsql_changes VALUES ('foo', x'010902', 'b', 1, 1, 1, x'1dc8d6bb7f8941088327d9439a7927a4', 1, 0, '0')") a.commit() x = a.execute("SELECT quote(site_id) FROM crsql_changes").fetchall() @@ -85,11 +85,11 @@ def test_site_id_ordinals_do_not_move_on_merge(): # insert a new site id a.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010903', 'b', 1, 1, 1, x'2dc8d6bb7f8941088327d9439a7927a4', 1, 0)") + "INSERT INTO crsql_changes VALUES ('foo', x'010903', 'b', 1, 1, 1, x'2dc8d6bb7f8941088327d9439a7927a4', 1, 0, '0')") a.commit() # insert again with that new site id a.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010904', 'b', 1, 1, 1, x'2dc8d6bb7f8941088327d9439a7927a4', 1, 0)") + "INSERT INTO crsql_changes VALUES ('foo', x'010904', 'b', 1, 1, 1, x'2dc8d6bb7f8941088327d9439a7927a4', 1, 0, '0')") a.commit() # should only be 2 site ids w/ ordinals 1 and 2. 1DC... -> 1, 2DC... -> 2 x = a.execute( diff --git a/py/correctness/tests/test_siteid.py b/py/correctness/tests/test_siteid.py index 16500d2af..9f74af715 100644 --- a/py/correctness/tests/test_siteid.py +++ b/py/correctness/tests/test_siteid.py @@ -10,7 +10,7 @@ def sync_left_to_right(l, r, since): "SELECT * FROM crsql_changes WHERE db_version > ? AND site_id IS NOT ?", (since, r_site_id)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() diff --git a/py/correctness/tests/test_sync.py b/py/correctness/tests/test_sync.py index f01eff310..33db7ed3c 100644 --- a/py/correctness/tests/test_sync.py +++ b/py/correctness/tests/test_sync.py @@ -22,7 +22,7 @@ def sync_left_to_right(l, r, since): "SELECT * FROM crsql_changes WHERE db_version > ?", (since,)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() @@ -84,24 +84,24 @@ def test_changes_since(): # siteid = dbs[0].execute("select crsql_site_id()").fetchone()[0] siteid = None expected = [ - ('user', b'\x01\t\x01', 'name', 'Javi', 1, 1, site_id, 1, 0), - ('deck', b'\x01\t\x01', 'owner_id', 1, 1, 1, site_id, 1, 1), - ('deck', b'\x01\t\x01', 'title', 'Preso', 1, 1, site_id, 1, 2), - ('slide', b'\x01\t\x01', 'deck_id', 1, 1, 1, site_id, 1, 3), - ('slide', b'\x01\t\x01', 'order', 0, 1, 1, site_id, 1, 4), - ('component', b'\x01\t\x01', 'type', 'text', 1, 1, site_id, 1, 5), - ('component', b'\x01\t\x01', 'slide_id', 1, 1, 1, site_id, 1, 6), - ('component', b'\x01\t\x01', 'content', 'wootwoot', 1, 1, site_id, 1, 7), - ('component', b'\x01\t\x02', 'type', 'text', 1, 1, site_id, 1, 8), - ('component', b'\x01\t\x02', 'slide_id', 1, 1, 1, site_id, 1, 9), - ('component', b'\x01\t\x02', 'content', 'toottoot', 1, 1, site_id, 1, 10), - ('component', b'\x01\t\x03', 'type', 'text', 1, 1, site_id, 1, 11), - ('component', b'\x01\t\x03', 'slide_id', 1, 1, 1, site_id, 1, 12), - ('component', b'\x01\t\x03', 'content', 'footfoot', 1, 1, site_id, 1, 13), - ('slide', b'\x01\t\x02', 'deck_id', 1, 1, 1, site_id, 1, 14), - ('slide', b'\x01\t\x02', 'order', 1, 1, 1, site_id, 1, 15), - ('slide', b'\x01\t\x03', 'deck_id', 1, 1, 1, site_id, 1, 16), - ('slide', b'\x01\t\x03', 'order', 2, 1, 1, site_id, 1, 17) + ('user', b'\x01\t\x01', 'name', 'Javi', 1, 1, site_id, 1, 0, '0'), + ('deck', b'\x01\t\x01', 'owner_id', 1, 1, 1, site_id, 1, 1, '0'), + ('deck', b'\x01\t\x01', 'title', 'Preso', 1, 1, site_id, 1, 2, '0'), + ('slide', b'\x01\t\x01', 'deck_id', 1, 1, 1, site_id, 1, 3, '0'), + ('slide', b'\x01\t\x01', 'order', 0, 1, 1, site_id, 1, 4, '0'), + ('component', b'\x01\t\x01', 'type', 'text', 1, 1, site_id, 1, 5, '0'), + ('component', b'\x01\t\x01', 'slide_id', 1, 1, 1, site_id, 1, 6, '0'), + ('component', b'\x01\t\x01', 'content', 'wootwoot', 1, 1, site_id, 1, 7, '0'), + ('component', b'\x01\t\x02', 'type', 'text', 1, 1, site_id, 1, 8, '0'), + ('component', b'\x01\t\x02', 'slide_id', 1, 1, 1, site_id, 1, 9, '0'), + ('component', b'\x01\t\x02', 'content', 'toottoot', 1, 1, site_id, 1, 10, '0'), + ('component', b'\x01\t\x03', 'type', 'text', 1, 1, site_id, 1, 11, '0'), + ('component', b'\x01\t\x03', 'slide_id', 1, 1, 1, site_id, 1, 12, '0'), + ('component', b'\x01\t\x03', 'content', 'footfoot', 1, 1, site_id, 1, 13, '0'), + ('slide', b'\x01\t\x02', 'deck_id', 1, 1, 1, site_id, 1, 14, '0'), + ('slide', b'\x01\t\x02', 'order', 1, 1, 1, site_id, 1, 15, '0'), + ('slide', b'\x01\t\x03', 'deck_id', 1, 1, 1, site_id, 1, 16, '0'), + ('slide', b'\x01\t\x03', 'order', 2, 1, 1, site_id, 1, 17, '0') ] assert (rows == expected) @@ -110,8 +110,8 @@ def test_changes_since(): rows = get_changes_since(dbs[0], 1, 'FF') - assert (rows == [('user', b'\x01\x09\x01', 'name', "Maestro", 2, 2, site_id, 1, 0), - ('deck', b'\x01\x09\x01', 'title', "Presto", 2, 2, site_id, 1, 1)]) + assert (rows == [('user', b'\x01\x09\x01', 'name', "Maestro", 2, 2, site_id, 1, 0, '0'), + ('deck', b'\x01\x09\x01', 'title', "Presto", 2, 2, site_id, 1, 1, '0')]) def test_delete(): @@ -126,7 +126,7 @@ def test_delete(): site_id = get_site_id(db) # Deletes are marked with a sentinel id assert (rows == [('component', b'\x01\x09\x01', - '-1', None, 2, 2, site_id, 2, 0)]) + '-1', None, 2, 2, site_id, 2, 0, '0')]) db.execute("DELETE FROM component") db.execute("DELETE FROM deck") @@ -136,14 +136,14 @@ def test_delete(): rows = get_changes_since(db, 0, 'FF') # TODO: should deletes not get a proper version? Would be better for ordering and chunking replications - assert (rows == [('user', b'\x01\t\x01', 'name', 'Javi', 1, 1, site_id, 1, 0), - ('component', b'\x01\t\x01', '-1', None, 2, 2, site_id, 2, 0), - ('component', b'\x01\t\x02', '-1', None, 2, 3, site_id, 2, 0), - ('component', b'\x01\t\x03', '-1', None, 2, 3, site_id, 2, 1), - ('deck', b'\x01\t\x01', '-1', None, 2, 3, site_id, 2, 2), - ('slide', b'\x01\t\x01', '-1', None, 2, 3, site_id, 2, 3), - ('slide', b'\x01\t\x02', '-1', None, 2, 3, site_id, 2, 4), - ('slide', b'\x01\t\x03', '-1', None, 2, 3, site_id, 2, 5)]) + assert (rows == [('user', b'\x01\t\x01', 'name', 'Javi', 1, 1, site_id, 1, 0, '0'), + ('component', b'\x01\t\x01', '-1', None, 2, 2, site_id, 2, 0, '0'), + ('component', b'\x01\t\x02', '-1', None, 2, 3, site_id, 2, 0, '0'), + ('component', b'\x01\t\x03', '-1', None, 2, 3, site_id, 2, 1, '0'), + ('deck', b'\x01\t\x01', '-1', None, 2, 3, site_id, 2, 2, '0'), + ('slide', b'\x01\t\x01', '-1', None, 2, 3, site_id, 2, 3, '0'), + ('slide', b'\x01\t\x02', '-1', None, 2, 3, site_id, 2, 4, '0'), + ('slide', b'\x01\t\x03', '-1', None, 2, 3, site_id, 2, 5, '0')]) # test insert @@ -182,7 +182,7 @@ def create_db2(): changes = db1.execute("SELECT * FROM crsql_changes").fetchall() # w the same db_version as db2 site_id = get_site_id(db2) - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0, '0')]) close(db1) close(db2) @@ -196,7 +196,7 @@ def create_db2(): # db1 into db2 # db2 should still win w. no db version change since no write happened site_id = get_site_id(db2) - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0, '0')]) # test merging from thing without records (db1) to thing with records (db2) @@ -236,7 +236,7 @@ def create_dbs(): # db version is pushed since 4 wins the col_version tie # col version stays since 1 is the max of winner and loser. site_id = get_site_id(db1) - assert (changes == [('foo', b'\x01\t\x01', 'b', 4, 1, 1, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 4, 1, 1, site_id, 1, 0, '0')]) def test_merging_larger(): @@ -269,13 +269,13 @@ def test_db_version_moves_as_expected_post_alter(): changes = db.execute("SELECT * FROM crsql_changes").fetchall() site_id = get_site_id(db) - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0), - ('foo', b'\x01\t\x02', 'b', 3, 1, 2, site_id, 1, 0), - ('foo', b'\x01\t\x02', 'c', 4, 1, 2, site_id, 1, 1), - ('foo', b'\x01\t\x03', 'b', 4, 1, 3, site_id, 1, 0), - ('foo', b'\x01\t\x03', 'c', 5, 1, 3, site_id, 1, 1), - ('foo', b'\x01\t\x04', 'b', 4, 1, 4, site_id, 1, 0), - ('foo', b'\x01\t\x04', 'c', 5, 1, 4, site_id, 1, 1)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0, '0'), + ('foo', b'\x01\t\x02', 'b', 3, 1, 2, site_id, 1, 0, '0'), + ('foo', b'\x01\t\x02', 'c', 4, 1, 2, site_id, 1, 1, '0'), + ('foo', b'\x01\t\x03', 'b', 4, 1, 3, site_id, 1, 0, '0'), + ('foo', b'\x01\t\x03', 'c', 5, 1, 3, site_id, 1, 1, '0'), + ('foo', b'\x01\t\x04', 'b', 4, 1, 4, site_id, 1, 0, '0'), + ('foo', b'\x01\t\x04', 'c', 5, 1, 4, site_id, 1, 1, '0')]) # DB1 has a row with no clock records (added during schema modification) @@ -324,8 +324,8 @@ def create_db2(): site_id1 = get_site_id(db1) site_id2 = get_site_id(db2) - assert (changes == [('foo', b'\x01\t\x01', 'b', 4, 1, 1, site_id1, 1, 0), - ('foo', b'\x01\t\x01', 'c', 3, 1, 1, site_id2, 1, 1)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 4, 1, 1, site_id1, 1, 0, '0'), + ('foo', b'\x01\t\x01', 'c', 3, 1, 1, site_id2, 1, 1, '0')]) close(db1) close(db2) @@ -338,13 +338,13 @@ def create_db2(): sync_left_to_right(db1, db2, 0) changes = db2.execute("SELECT * FROM crsql_changes").fetchall() - assert (changes == [ + assert (changes == [ # update db_version to db1's own since b lost on db2. # b had the value 2 on db2. - ('foo', b'\x01\t\x01', 'b', 4, 1, 1, site_id1, 1, 0), + ('foo', b'\x01\t\x01', 'b', 4, 1, 1, site_id1, 1, 0, '0'), # db2 c 3 wins given columns with no value after an alter # do no merging - ('foo', b'\x01\t\x01', 'c', 3, 1, 1, site_id2, 1, 1)]) + ('foo', b'\x01\t\x01', 'c', 3, 1, 1, site_id2, 1, 1, '0')]) def create_basic_db(): @@ -372,14 +372,14 @@ def make_dbs(): changes = db2.execute("SELECT * FROM crsql_changes").fetchall() # all at base version site_id = get_site_id(db2) - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0, '0')]) (db1, db2) = make_dbs() sync_left_to_right(db2, db1, 0) changes = db2.execute("SELECT * FROM crsql_changes").fetchall() # all at base version site_id = get_site_id(db2) - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0, '0')]) def test_merge_same_w_tie_breaker(): db1 = create_basic_db() @@ -437,14 +437,14 @@ def make_dbs(): changes = db2.execute("SELECT * FROM crsql_changes").fetchall() site_id = get_site_id(db2) # no change since incoming is lesser - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0, '0')]) (db1, db2) = make_dbs() sync_left_to_right(db2, db1, 0) changes = db1.execute("SELECT * FROM crsql_changes").fetchall() # change since incoming is greater site_id = get_site_id(db2) - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 1, 1, site_id, 1, 0, '0')]) def test_merge_larger_clock_larger_value(): @@ -465,13 +465,13 @@ def make_dbs(): sync_left_to_right(db1, db2, 0) changes = db2.execute("SELECT * FROM crsql_changes").fetchall() site_id = get_site_id(db1) - assert (changes == [('foo', b'\x01\t\x01', 'b', 3, 2, 2, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 3, 2, 2, site_id, 1, 0, '0')]) (db1, db2) = make_dbs() sync_left_to_right(db2, db1, 0) changes = db1.execute("SELECT * FROM crsql_changes").fetchall() site_id = get_site_id(db1) - assert (changes == [('foo', b'\x01\t\x01', 'b', 3, 2, 2, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 3, 2, 2, site_id, 1, 0, '0')]) def test_merge_larger_clock_smaller_value(): @@ -492,13 +492,13 @@ def make_dbs(): sync_left_to_right(db1, db2, 0) changes = db2.execute("SELECT * FROM crsql_changes").fetchall() site_id = get_site_id(db1) - assert (changes == [('foo', b'\x01\t\x01', 'b', 0, 2, 2, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 0, 2, 2, site_id, 1, 0, '0')]) (db1, db2) = make_dbs() sync_left_to_right(db2, db1, 0) changes = db1.execute("SELECT * FROM crsql_changes").fetchall() site_id = get_site_id(db1) - assert (changes == [('foo', b'\x01\t\x01', 'b', 0, 2, 2, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 0, 2, 2, site_id, 1, 0, '0')]) def test_merge_larger_clock_same_value(): @@ -519,13 +519,13 @@ def make_dbs(): sync_left_to_right(db1, db2, 0) site_id = get_site_id(db1) changes = db2.execute("SELECT * FROM crsql_changes").fetchall() - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 2, 2, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 2, 2, site_id, 1, 0, '0')]) (db1, db2) = make_dbs() sync_left_to_right(db2, db1, 0) site_id = get_site_id(db1) changes = db1.execute("SELECT * FROM crsql_changes").fetchall() - assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 2, 2, site_id, 1, 0)]) + assert (changes == [('foo', b'\x01\t\x01', 'b', 2, 2, 2, site_id, 1, 0, '0')]) # Row exists but col added thus no defaults backfilled diff --git a/py/correctness/tests/test_sync_bit.py b/py/correctness/tests/test_sync_bit.py index d2f488a9b..03119735b 100644 --- a/py/correctness/tests/test_sync_bit.py +++ b/py/correctness/tests/test_sync_bit.py @@ -19,7 +19,7 @@ def test_insert_row(): # db version, seq, col version, site id, cl should all be from the insertion c = create_db() c.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 4, 4, x'1dc8d6bb7f8941088327d9439a7927a4', 3, 6)") + "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 4, 4, x'1dc8d6bb7f8941088327d9439a7927a4', 3, 6, '0')") c.commit() changes = c.execute("SELECT * FROM crsql_changes").fetchall() @@ -32,7 +32,8 @@ def test_insert_row(): 4, b"\x1d\xc8\xd6\xbb\x7f\x89A\x08\x83'\xd9C\x9ay'\xa4", 3, - 6), + 6, + '0'), ('foo', b'\x01\t\x01', 'b', @@ -41,7 +42,8 @@ def test_insert_row(): 4, b"\x1d\xc8\xd6\xbb\x7f\x89A\x08\x83'\xd9C\x9ay'\xa4", 3, - 6)]) + 6, + '0')]) def test_update_row(): @@ -49,7 +51,7 @@ def test_update_row(): c.execute("INSERT INTO foo VALUES (1, 2)") c.commit() c.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 4, 4, x'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 3, 6)") + "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 4, 4, x'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 3, 6, '0')") changes = c.execute("SELECT * FROM crsql_changes").fetchall() # what we wrote should be what we get back since we win the merge assert (changes == [('foo', @@ -60,7 +62,8 @@ def test_update_row(): 4, b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff', 3, - 6), + 6, + '0'), ('foo', b'\x01\t\x01', 'b', @@ -69,14 +72,15 @@ def test_update_row(): 4, b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff', 3, - 6)]) + 6, + '0')]) def test_delete_row(): c = create_db() c.execute("INSERT INTO foo VALUES (1, 2)") c.commit() - c.execute("INSERT INTO crsql_changes VALUES ('foo', x'010901', '-1', 1, 4, 4, x'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 4, 6)") + c.execute("INSERT INTO crsql_changes VALUES ('foo', x'010901', '-1', 1, 4, 4, x'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 4, 6, '0')") c.commit() changes = c.execute("SELECT * FROM crsql_changes").fetchall() assert (changes == [('foo', @@ -87,7 +91,8 @@ def test_delete_row(): 4, b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff', 4, - 6)]) + 6, + '0')]) def test_custom_trigger(): @@ -98,7 +103,7 @@ def test_custom_trigger(): END;""") c.commit() c.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 4, 4, x'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 3, 6)") + "INSERT INTO crsql_changes VALUES ('foo', x'010901', 'b', 1, 4, 4, x'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 3, 6, '0')") c.commit() rows = c.execute("SELECT * FROM log").fetchall() assert (rows == []) @@ -117,6 +122,6 @@ def test_custom_trigger(): END;""") c.commit() c.execute( - "INSERT INTO crsql_changes VALUES ('foo', x'010902', 'b', 1, 4, 4, x'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 3, 6)") + "INSERT INTO crsql_changes VALUES ('foo', x'010902', 'b', 1, 4, 4, x'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 3, 6, '0')") rows = c.execute("SELECT * FROM log").fetchall() assert (rows == [(1, 1), (2, 1)]) diff --git a/py/correctness/tests/test_sync_prop.py b/py/correctness/tests/test_sync_prop.py index 2746750dd..46ca4e35d 100644 --- a/py/correctness/tests/test_sync_prop.py +++ b/py/correctness/tests/test_sync_prop.py @@ -247,7 +247,7 @@ def sync_left_to_right(l, r, since_map, since_is_rowid): else: ret[change[0]] = change[5] r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() return ret diff --git a/py/correctness/tests/test_update_rows.py b/py/correctness/tests/test_update_rows.py index 45816f97e..746406838 100644 --- a/py/correctness/tests/test_update_rows.py +++ b/py/correctness/tests/test_update_rows.py @@ -5,7 +5,7 @@ def sync_left_to_right(l, r, since): "SELECT * FROM crsql_changes WHERE db_version > ?", (since,)) for change in changes: r.execute( - "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", change) r.commit() def test_update_pk(): @@ -35,10 +35,10 @@ def get_site_id(db): db1_changes = db1.execute("SELECT * FROM crsql_changes").fetchall() - assert (db1_changes == [('foo', b'\x01\t\x01', 'a', 2, 1, 1, db1_site_id, 1, 0), - ('foo', b'\x01\t\x01', 'b', 3, 1, 1, db1_site_id, 1, 1), - ('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0), - ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1)]) + assert (db1_changes == [('foo', b'\x01\t\x01', 'a', 2, 1, 1, db1_site_id, 1, 0, '0'), + ('foo', b'\x01\t\x01', 'b', 3, 1, 1, db1_site_id, 1, 1, '0'), + ('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0, '0'), + ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1, '0')]) db2_changes = db2.execute("SELECT * FROM crsql_changes").fetchall() assert (db2_changes == db1_changes) @@ -51,12 +51,12 @@ def get_site_id(db): assert (db1_foo == [(2, 5, 6), (10, 2, 3)]) db1_changes = db1.execute("SELECT * FROM crsql_changes").fetchall() - assert (db1_changes == [('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0), - ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1), - ('foo', b'\x01\t\x01', '-1', None, 2, 3, db1_site_id, 2, 0), - ('foo', b'\x01\t\n', '-1', None, 1, 3, db1_site_id, 1, 1), - ('foo', b'\x01\t\n', 'a', 2, 2, 3, db1_site_id, 1, 2), - ('foo', b'\x01\t\n', 'b', 3, 2, 3, db1_site_id, 1, 3)]) + assert (db1_changes == [('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0, '0'), + ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1, '0'), + ('foo', b'\x01\t\x01', '-1', None, 2, 3, db1_site_id, 2, 0, '0'), + ('foo', b'\x01\t\n', '-1', None, 1, 3, db1_site_id, 1, 1, '0'), + ('foo', b'\x01\t\n', 'a', 2, 2, 3, db1_site_id, 1, 2, '0'), + ('foo', b'\x01\t\n', 'b', 3, 2, 3, db1_site_id, 1, 3, '0')]) sync_left_to_right(db1, db2, 2) @@ -94,10 +94,10 @@ def get_site_id(db): db1_changes = db1.execute("SELECT * FROM crsql_changes").fetchall() - assert (db1_changes == [('foo', b'\x01\t\x01', 'a', 2, 1, 1, db1_site_id, 1, 0), - ('foo', b'\x01\t\x01', 'b', 3, 1, 1, db1_site_id, 1, 1), - ('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0), - ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1)]) + assert (db1_changes == [('foo', b'\x01\t\x01', 'a', 2, 1, 1, db1_site_id, 1, 0, '0'), + ('foo', b'\x01\t\x01', 'b', 3, 1, 1, db1_site_id, 1, 1, '0'), + ('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0, '0'), + ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1, '0')]) db2_changes = db2.execute("SELECT * FROM crsql_changes").fetchall() assert (db2_changes == db1_changes) @@ -117,10 +117,10 @@ def get_site_id(db): assert (db1_db_version == 2) db1_changes = db1.execute("SELECT * FROM crsql_changes").fetchall() - assert (db1_changes == [('foo', b'\x01\t\x01', 'a', 2, 1, 1, db1_site_id, 1, 0), - ('foo', b'\x01\t\x01', 'b', 3, 1, 1, db1_site_id, 1, 1), - ('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0), - ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1)]) + assert (db1_changes == [('foo', b'\x01\t\x01', 'a', 2, 1, 1, db1_site_id, 1, 0, '0'), + ('foo', b'\x01\t\x01', 'b', 3, 1, 1, db1_site_id, 1, 1, '0'), + ('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0, '0'), + ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1, '0')]) # do an actual update db1.execute("UPDATE foo SET a = 10 WHERE id = 1") @@ -128,3 +128,42 @@ def get_site_id(db): db1_db_version = db1.execute("SELECT crsql_db_version()").fetchone()[0] assert (db1_db_version == 3) + + +def test_ts_is_inserted(): + def create_db(): + db = connect(":memory:") + db.execute("CREATE TABLE foo (id INTEGER PRIMARY KEY NOT NULL, a, b)") + db.execute("SELECT crsql_as_crr('foo');") + db.commit() + return db + + def get_site_id(db): + return db.execute("SELECT crsql_site_id()").fetchone()[0] + + db1 = create_db() + db2 = create_db() + + db1_site_id = get_site_id(db1) + db2_site_id = get_site_id(db2) + + # use max u64 + db1.execute("SELECT crsql_set_ts('18446744073709551615');") + db1.execute("INSERT INTO foo (id, a, b) VALUES (1, 2, 3)") + db1.commit() + + # ts should be zero if it isn't set in a transaction + db1.execute("INSERT INTO foo (id, a, b) VALUES (2, 5, 6)") + db1.commit() + + sync_left_to_right(db1, db2, 0) + + db1_changes = db1.execute("SELECT * FROM crsql_changes").fetchall() + + assert (db1_changes == [('foo', b'\x01\t\x01', 'a', 2, 1, 1, db1_site_id, 1, 0, '18446744073709551615'), + ('foo', b'\x01\t\x01', 'b', 3, 1, 1, db1_site_id, 1, 1, '18446744073709551615'), + ('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0, '0'), + ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1, '0')]) + + db2_changes = db2.execute("SELECT * FROM crsql_changes").fetchall() + assert (db2_changes == db1_changes) diff --git a/tools/src/main.rs b/tools/src/main.rs index eb6d8ba78..9c65de4e3 100644 --- a/tools/src/main.rs +++ b/tools/src/main.rs @@ -1,4 +1,4 @@ -use std::time::Instant; +use std::time::{Duration, Instant}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use rusqlite::{types::Value, Connection}; @@ -10,6 +10,9 @@ use tempfile::tempdir; fn main() { let tmp = tempdir().unwrap(); + let mut args = std::env::args(); + let ext_file = args.nth(1).unwrap_or("../core/dist/crsqlite".to_string()); + let mut conn = rusqlite::Connection::open(tmp.path().join("perf.db")).unwrap(); conn.execute_batch( @@ -22,11 +25,16 @@ fn main() { unsafe { conn.load_extension_enable().unwrap(); - conn.load_extension("../core/dist/crsqlite", None).unwrap(); + conn.load_extension(&ext_file, None).unwrap(); } + let version = conn.query_row("SELECT value from crsql_master where key = 'crsqlite_version'", (), |row| row.get::<_, i32>(0)) + .unwrap(); + let use_ts = version == 171000; create_crr(&conn); + println!("Using ext_file: {ext_file}, set_ts: {use_ts}"); + // create vanilla tables conn.execute_batch( " @@ -41,45 +49,75 @@ fn main() { let mut trials = 100; let mut batch_size = 1000; + let mut count = 5; + // conn.trace(Some(|sql| println!("{sql}"))); // inserts - let start = Instant::now(); - for i in 0..trials { + let mut times = Vec::new(); + for j in 0..count { let start = Instant::now(); - insert(&mut conn, "v", batch_size, batch_size * i); - let elapsed = start.elapsed(); - // println!("insert #{i} done in {elapsed:?}"); + for i in 0..trials { + // let start = Instant::now(); + insert(&mut conn, "v", batch_size, batch_size * ( i + (j * trials)), use_ts); + // let elapsed = start.elapsed(); + // println!("insert #{i} done in {elapsed:?}"); + } + times.push(start.elapsed()); } - println!("insert (vanilla) total time: {:?}", start.elapsed()); + let avg_time = times.iter().sum::() / times.len() as u32; + let max_time = times.iter().max().unwrap(); + let min_time = times.iter().min().unwrap(); + println!("insert (vanilla) avg time: {:?}, max: {:?}, min: {:?}", avg_time, max_time, min_time); - let start = Instant::now(); - for i in 0..trials { + let mut times = Vec::new(); + for j in 0..count { let start = Instant::now(); - insert(&mut conn, "", batch_size, batch_size * i); - let elapsed = start.elapsed(); - // println!("insert #{i} done in {elapsed:?}"); + for i in 0..trials { + let start = Instant::now(); + insert(&mut conn, "", batch_size, batch_size * ( i + (j * trials)), use_ts); + let elapsed = start.elapsed(); + // println!("insert #{i} done in {elapsed:?}"); + } + times.push(start.elapsed()); } - println!("insert total time: {:?}", start.elapsed()); + let avg_time = times.iter().sum::() / times.len() as u32; + let max_time = times.iter().max().unwrap(); + let min_time = times.iter().min().unwrap(); + println!("insert avg time: {:?}, max: {:?}, min: {:?}", avg_time, max_time, min_time); // updates - let start = Instant::now(); - for i in 0..trials { + let mut times = Vec::new(); + for j in 0..count { let start = Instant::now(); - update(&mut conn, "v", batch_size, batch_size * i); - let elapsed = start.elapsed(); + for i in 0..trials { + let start = Instant::now(); + update(&mut conn, "v", batch_size, batch_size * ( i + (j * trials)), use_ts); + let elapsed = start.elapsed(); + } + times.push(start.elapsed()); // println!("update #{i} done in {elapsed:?}"); } - println!("update (vanilla) total time: {:?}", start.elapsed()); + let avg_time = times.iter().sum::() / times.len() as u32; + let max_time = times.iter().max().unwrap(); + let min_time = times.iter().min().unwrap(); + println!("update (vanilla) avg time: {:?}, max: {:?}, min: {:?}", avg_time, max_time, min_time); - let start = Instant::now(); - for i in 0..trials { + let mut times = Vec::new(); + for j in 0..count { let start = Instant::now(); - update(&mut conn, "", batch_size, batch_size * i); - let elapsed = start.elapsed(); + for i in 0..trials { + let start = Instant::now(); + update(&mut conn, "", batch_size, batch_size * ( i + (j * trials)), use_ts); + let elapsed = start.elapsed(); + } + times.push(start.elapsed()); // println!("update #{i} done in {elapsed:?}"); } - println!("update total time: {:?}", start.elapsed()); + let avg_time = times.iter().sum::() / times.len() as u32; + let max_time = times.iter().max().unwrap(); + let min_time = times.iter().min().unwrap(); + println!("update avg time: {:?}, max: {:?}, min: {:?}", avg_time, max_time, min_time); // // single insert // let start = Instant::now(); @@ -368,10 +406,15 @@ fn random_str() -> String { .collect() } -fn insert(conn: &mut Connection, pfx: &str, count: usize, offset: usize) { - let tx = conn.transaction().unwrap(); +fn insert(conn: &mut Connection, pfx: &str, count: usize, offset: usize, use_ts: bool) { for i in 0..count { + let tx = conn.transaction().unwrap(); + if use_ts && pfx == "" { + let sql = format!("SELECT crsql_set_ts('{}')", i + offset); + tx.query_row(&sql, (), |row| row.get::<_, String>(0)) + .unwrap(); + } tx.execute( &format!("INSERT INTO {pfx}user VALUES (?, ?)"), (i + offset, random_str()), @@ -392,9 +435,9 @@ fn insert(conn: &mut Connection, pfx: &str, count: usize, offset: usize) { (i + offset, "text", i + offset, random_str()), ) .unwrap(); + tx.commit().unwrap(); } - tx.commit().unwrap(); } // def update(pfx, count, offset): @@ -406,9 +449,14 @@ fn insert(conn: &mut Connection, pfx: &str, count: usize, offset: usize) { // if c.in_transaction: // c.commit() -fn update(conn: &mut Connection, pfx: &str, count: usize, offset: usize) { +fn update(conn: &mut Connection, pfx: &str, count: usize, offset: usize, use_ts: bool) { let tx = conn.transaction().unwrap(); for i in 0..count { + if use_ts && pfx == "" { + let sql = format!("SELECT crsql_set_ts('{}')", i + offset); + tx.query_row(&sql, (), |row| row.get::<_, String>(0)) + .unwrap(); + } tx.execute( &format!("UPDATE {pfx}user SET name = ? WHERE id = ?"), (random_str(), i + offset), @@ -439,6 +487,8 @@ fn single_stmt_insert(conn: &mut Connection, pfx: &str, count: usize, offset: us .map(|i| format!("({}, '{}', {}, '{}')", i + offset, "text", i, random_str())) .collect::>(); let tx = conn.transaction().unwrap(); + // tx.query_row("SELECT crsql_set_ts('18446744073709551615')", (), |row| row.get::<_, String>(0)) + // .unwrap(); tx.execute_batch(&format!( "INSERT INTO {pfx}component VALUES {values}", values = values.join(", ")