diff --git a/.github/workflows/py-tests.yaml b/.github/workflows/py-tests.yaml index 5744b66cb..ca291fe73 100644 --- a/.github/workflows/py-tests.yaml +++ b/.github/workflows/py-tests.yaml @@ -28,7 +28,7 @@ jobs: cd core make loadable - - uses: conda-incubator/setup-miniconda@v2 + - uses: conda-incubator/setup-miniconda@v3 with: auto-update-conda: true auto-activate-base: true diff --git a/core/Makefile b/core/Makefile index 32afdefeb..d97f2b983 100644 --- a/core/Makefile +++ b/core/Makefile @@ -72,6 +72,8 @@ dbg_prefix=./dbg bundle=bundle_static valgrind: bundle = integration_check test: bundle = integration_check +ubsan: bundle = integration_check +asan: bundle = integration_check TARGET_LOADABLE=$(prefix)/crsqlite.$(LOADABLE_EXTENSION) TARGET_DBG_LOADABLE=$(dbg_prefix)/crsqlite.$(LOADABLE_EXTENSION) @@ -124,7 +126,10 @@ valgrind: $(TARGET_TEST) valgrind $(prefix)/test analyzer: scan-build $(MAKE) clean loadable -ubsan: CC=clang +asan: export RUSTFLAGS=-Zsanitizer=address +asan: rs_build_flags=--target x86_64-unknown-linux-gnu -Zbuild-std +asan: rs_lib_dbg_static=./rs/$(bundle)/target/x86_64-unknown-linux-gnu/debug/libcrsql_$(bundle).a +asan: CC=clang ubsan: LDLIBS += -lubsan ubsan: clean $(TARGET_TEST) $(prefix)/test @@ -255,7 +260,7 @@ $(TARGET_TEST): $(prefix) $(TARGET_SQLITE3_EXTRA_C) src/tests.c src/*.test.c $(e $(TARGET_SQLITE3_EXTRA_C) src/tests.c src/*.test.c $(ext_files) $(rs_lib_dbg_static_cpy) \ $(LDLIBS) -o $@ -$(TARGET_TEST_ASAN): $(prefix) $(TARGET_SQLITE3_EXTRA_C) src/tests.c src/*.test.c $(ext_files) +$(TARGET_TEST_ASAN): $(prefix) $(TARGET_SQLITE3_EXTRA_C) src/tests.c src/*.test.c $(ext_files) $(rs_lib_dbg_static_cpy) $(CC) -fsanitize=address -g -fno-omit-frame-pointer -Wall \ -DSQLITE_THREADSAFE=0 \ -DSQLITE_OMIT_LOAD_EXTENSION=1 \ diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index 8acf10078..6a48199c6 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -548,7 +548,7 @@ unsafe extern "C" fn x_crsql_as_crr( ) { if argc == 0 { ctx.result_error( - "Wrong number of args provided to crsql_as_crr. Provide the schema + "Wrong number of args provided to crsql_as_crr. Provide the schema name and table name or just the table name.", ); return; @@ -609,7 +609,7 @@ unsafe extern "C" fn x_crsql_begin_alter( ) { if argc == 0 { ctx.result_error( - "Wrong number of args provided to crsql_begin_alter. Provide the + "Wrong number of args provided to crsql_begin_alter. Provide the schema name and table name or just the table name.", ); return; @@ -620,7 +620,7 @@ unsafe extern "C" fn x_crsql_begin_alter( let (_schema_name, table_name) = if argc == 2 { (args[0].text(), args[1].text()) } else { - ("main", args[0].text()) + ("main\0", args[0].text()) }; let db = ctx.db_handle(); @@ -645,7 +645,7 @@ unsafe extern "C" fn x_crsql_commit_alter( ) { if argc == 0 { ctx.result_error( - "Wrong number of args provided to crsql_commit_alter. Provide the + "Wrong number of args provided to crsql_commit_alter. Provide the schema name and table name or just the table name.", ); return; @@ -655,7 +655,7 @@ unsafe extern "C" fn x_crsql_commit_alter( let (schema_name, table_name) = if argc >= 2 { (args[0].text(), args[1].text()) } else { - ("main", args[0].text()) + ("main\0", args[0].text()) }; let non_destructive = if argc >= 3 { args[2].int() == 1 } else { false }; diff --git a/core/rs/core/src/local_writes/after_update.rs b/core/rs/core/src/local_writes/after_update.rs index ced2422a3..4103f4161 100644 --- a/core/rs/core/src/local_writes/after_update.rs +++ b/core/rs/core/src/local_writes/after_update.rs @@ -74,7 +74,7 @@ fn after_update( let next_db_version = crate::db_version::next_db_version(db, ext_data, None)?; let new_key = tbl_info .get_or_create_key_via_raw_values(db, pks_new) - .or_else(|_| Err("failed geteting or creating lookaside key"))?; + .or_else(|_| Err("failed getting or creating lookaside key"))?; // Changing a primary key column to a new value is the same thing as deleting the row // previously identified by the primary key. @@ -85,16 +85,21 @@ fn after_update( let next_seq = super::bump_seq(ext_data); // Record the delete of the row identified by the old primary keys after_update__mark_old_pk_row_deleted(db, tbl_info, old_key, next_db_version, next_seq)?; - // TODO: each non sentinel needs a unique seq on the move? - after_update__move_non_sentinels(db, tbl_info, new_key, old_key)?; - // Record a create of the row identified by the new primary keys - // if no rows were moved. This is related to the optimization to not save - // sentinels unless required. - // if db.changes64() == 0 { <-- an optimization if we can get to it. we'd need to know to increment causal length. - // so we can get to this when CL is stored in the lookaside. let next_seq = super::bump_seq(ext_data); + // todo: we don't need to this, if there's no existing row (cl is assumed to be 1). super::mark_new_pk_row_created(db, tbl_info, new_key, next_db_version, next_seq)?; - // } + for col in tbl_info.non_pks.iter() { + let next_seq = super::bump_seq(ext_data); + after_update__move_non_pk_col( + db, + tbl_info, + new_key, + old_key, + &col.name, + next_db_version, + next_seq, + )?; + } } // now for each non_pk_col we need to do an insert @@ -146,6 +151,32 @@ fn after_update__mark_old_pk_row_deleted( super::step_trigger_stmt(mark_locally_deleted_stmt) } +#[allow(non_snake_case)] +fn after_update__move_non_pk_col( + db: *mut sqlite3, + tbl_info: &TableInfo, + new_key: sqlite::int64, + old_key: sqlite::int64, + col_name: &str, + db_version: sqlite::int64, + seq: i32, +) -> Result { + let move_non_pk_col_stmt_ref = tbl_info + .get_move_non_pk_col_stmt(db) + .or_else(|_e| Err("failed to get move_non_pk_col_stmt"))?; + let move_non_pk_col_stmt = move_non_pk_col_stmt_ref + .as_ref() + .ok_or("Failed to deref sentinel stmt")?; + move_non_pk_col_stmt + .bind_int64(1, new_key) + .and_then(|_| move_non_pk_col_stmt.bind_int64(2, db_version)) + .and_then(|_| move_non_pk_col_stmt.bind_int(3, seq)) + .and_then(|_| move_non_pk_col_stmt.bind_int64(4, old_key)) + .and_then(|_| move_non_pk_col_stmt.bind_text(5, col_name, sqlite::Destructor::STATIC)) + .or_else(|_| Err("failed binding to move_non_pk_col_stmt"))?; + super::step_trigger_stmt(move_non_pk_col_stmt) +} + // TODO: in the future we can keep sentinel information in the lookaside #[allow(non_snake_case)] fn after_update__move_non_sentinels( diff --git a/core/rs/core/src/tableinfo.rs b/core/rs/core/src/tableinfo.rs index 102ca805c..256900bed 100644 --- a/core/rs/core/src/tableinfo.rs +++ b/core/rs/core/src/tableinfo.rs @@ -468,6 +468,32 @@ impl TableInfo { Ok(self.move_non_sentinels_stmt.try_borrow()?) } + pub fn get_move_non_pk_col_stmt( + &self, + db: *mut sqlite3, + ) -> Result>, ResultCode> { + if self.move_non_sentinels_stmt.try_borrow()?.is_none() { + // Incrementing col_version is especially important for the case where we + // are updating to a currently existing pk, so that the columns + // from the old pk can override the ones from the new at a node + // following our changes. + let sql = format!( + "UPDATE OR REPLACE \"{table_name}__crsql_clock\" SET + key = ?, + db_version = ?, + seq = ?, + col_version = col_version + 1, + site_id = 0 + WHERE + key = ? AND col_name = ?", + table_name = crate::util::escape_ident(&self.tbl_name), + ); + let ret = db.prepare_v3(&sql, sqlite::PREPARE_PERSISTENT)?; + *self.move_non_sentinels_stmt.try_borrow_mut()? = Some(ret); + } + Ok(self.move_non_sentinels_stmt.try_borrow()?) + } + pub fn get_mark_locally_created_stmt( &self, db: *mut sqlite3, diff --git a/py/correctness/tests/test_cl_triggers.py b/py/correctness/tests/test_cl_triggers.py index 937a2b78f..61b9b85b7 100644 --- a/py/correctness/tests/test_cl_triggers.py +++ b/py/correctness/tests/test_cl_triggers.py @@ -17,6 +17,23 @@ from pprint import pprint import pytest +def create_db(): + c = connect(":memory:") + c.execute("CREATE TABLE foo (a INTEGER PRIMARY KEY NOT NULL, b INTEGER) STRICT;") + c.execute("SELECT crsql_as_crr('foo')") + c.commit() + return c + +def get_site_id(c): + return c.execute("SELECT crsql_site_id()").fetchone()[0] + +def sync_left_to_right(l, r, since): + changes = l.execute( + "SELECT * FROM crsql_changes WHERE db_version > ?", (since,)) + for change in changes: + r.execute( + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + r.commit() # The idea here is that we are using an upsert to create a row that has never existing in our db # In metadata tables or otherwise @@ -237,68 +254,162 @@ def test_delete_previously_deleted(): # - single # - pko def test_change_primary_key_to_something_new(): - c = connect(":memory:") - c.execute("CREATE TABLE foo (a INTEGER PRIMARY KEY NOT NULL, b INTEGER) STRICT;") - c.execute("SELECT crsql_as_crr('foo')") - c.commit() + c1 = create_db() + c2 = create_db() - c.execute("INSERT INTO foo VALUES (1, 2)") - c.execute("UPDATE foo SET a = 2 WHERE a = 1") + # First step: Insert initial row + c1.execute("INSERT INTO foo VALUES (1, 2)") + c1.commit() + sync_left_to_right(c1, c2, 0) - changes = c.execute( + assert (c1.execute("SELECT * FROM foo").fetchall() == + c2.execute("SELECT * FROM foo").fetchall()) + + c1.execute("UPDATE foo SET a = 2 WHERE a = 1") + c1.commit() + sync_left_to_right(c1, c2, 1) + + changes = c1.execute( "SELECT pk, cid, cl FROM crsql_changes").fetchall() # pk 1 was deleted so has a CL of 2 # pk 2 was created so has a CL of 1 and also has the `b` column data as that was moved! - assert (changes == [(b'\x01\t\x02', 'b', 1), - (b'\x01\t\x01', '-1', 2), (b'\x01\t\x02', '-1', 1)]) + assert (changes == [(b'\x01\t\x01', '-1', 2), + (b'\x01\t\x02', '-1', 1), (b'\x01\t\x02', 'b', 1)]) + + assert (c1.execute("SELECT * FROM foo").fetchall() == + c2.execute("SELECT * FROM foo").fetchall()) + + close(c1) + close(c2) # Previously existing thing should get bumped to re-existing # Previously existing means we have metadata for the row but it is not a live row in the base tables. def test_change_primary_key_to_previously_existing(): - c = connect(":memory:") - c.execute("CREATE TABLE foo (a INTEGER PRIMARY KEY NOT NULL, b INTEGER) STRICT;") - c.execute("SELECT crsql_as_crr('foo')") - c.commit() + c1 = create_db() + c2 = create_db() - c.execute("INSERT INTO foo VALUES (1, 2)") - c.execute("INSERT INTO foo VALUES (2, 3)") - c.commit() - c.execute("DELETE FROM foo WHERE a = 2") - c.execute("UPDATE foo SET a = 2 WHERE a = 1") + c1.execute("INSERT INTO foo VALUES (1, 2)") + c1.execute("INSERT INTO foo VALUES (2, 3)") + c1.commit() + sync_left_to_right(c1, c2, 0) - changes = c.execute( + assert (c1.execute("SELECT * FROM foo").fetchall() == + c2.execute("SELECT * FROM foo").fetchall()) + + c1.execute("DELETE FROM foo WHERE a = 2") + c1.execute("UPDATE foo SET a = 2 WHERE a = 1") + c1.commit() + sync_left_to_right(c1, c2, 1) + + changes = c1.execute( + "SELECT pk, cid, cl FROM crsql_changes").fetchall() + changes2 = c2.execute( "SELECT pk, cid, cl FROM crsql_changes").fetchall() # pk 1 was deleted so has a CL of 2 # pk 2 was resurrected so has a CL of 3 - assert (changes == [(b'\x01\t\x02', 'b', 3), - (b'\x01\t\x01', '-1', 2), (b'\x01\t\x02', '-1', 3)]) + assert (changes == [(b'\x01\t\x01', '-1', 2), (b'\x01\t\x02', '-1', 3), + (b'\x01\t\x02', 'b', 3)]) + assert (changes2 == changes) + + # Verify both nodes have same data after final sync + assert (c1.execute("SELECT * FROM foo").fetchall() == + c2.execute("SELECT * FROM foo").fetchall()) - # try changing to and away from 1 again to ensure we aren't stuck at 2 + close(c1) + close(c2) # Changing to something currently existing is an update that replaces the thing on conflict def test_change_primary_key_to_currently_existing(): - c = connect(":memory:") - c.execute("CREATE TABLE foo (a INTEGER PRIMARY KEY NOT NULL, b INTEGER) STRICT;") - c.execute("SELECT crsql_as_crr('foo')") - c.commit() + c1 = create_db() + c2 = create_db() - c.execute("INSERT INTO foo VALUES (1, 2)") - c.execute("INSERT INTO foo VALUES (2, 3)") - c.commit() - c.execute("UPDATE OR REPLACE foo SET a = 2 WHERE a = 1") - c.commit() + c1.execute("INSERT INTO foo VALUES (1, 2)") + c1.execute("INSERT INTO foo VALUES (2, 3)") + c1.commit() + sync_left_to_right(c1, c2, 0) - changes = c.execute( + changes = c1.execute( + "SELECT pk, cid, val, cl FROM crsql_changes").fetchall() + changes2 = c2.execute( + "SELECT pk, cid, val, cl FROM crsql_changes").fetchall() + assert (changes == [(b'\x01\t\x01', 'b', 2, 1), (b'\x01\t\x02', 'b', 3, 1)]) + assert (changes2 == changes) + + assert (c1.execute("SELECT * FROM foo").fetchall() == + c2.execute("SELECT * FROM foo").fetchall()) + + c1.execute("UPDATE OR REPLACE foo SET a = 2 WHERE a = 1") + c1.commit() + sync_left_to_right(c1, c2, 1) + + changes = c1.execute( + "SELECT pk, cid, cl FROM crsql_changes").fetchall() + changes2 = c2.execute( "SELECT pk, cid, cl FROM crsql_changes").fetchall() # pk 2 is alive as we `update or replaced` to it - # and it is alive at version 3 given it is a re-insertion of the currently existing row + # and it is alive at version 3 given it iassert (changes2 == changes)s a re-insertion of the currently existing row # pk 1 is dead (cl of 2) given we mutated / updated away from it. E.g., # set a = 2 where a = 1 - assert (changes == [(b'\x01\t\x02', 'b', 1), - (b'\x01\t\x01', '-1', 2), (b'\x01\t\x02', '-1', 1)]) + assert (changes == [(b'\x01\t\x01', '-1', 2), (b'\x01\t\x02', '-1', 1), + (b'\x01\t\x02', 'b', 1)]) + # TODO: The change from second node is missing the sentinel row for the + # existing row because we skip inserts if cl hasn't changed and we assume + # an existing row has a cl of 1. + # assert (changes2 == changes) + + # Verify both nodes have same data after final sync + assert (c1.execute("SELECT * FROM foo").fetchall() == + c2.execute("SELECT * FROM foo").fetchall()) + + close(c1) + close(c2) + + +# Changing to the primary key of a row that was created in another db. +def test_change_primary_key_from_another_db(): + c1 = create_db() + c2 = create_db() + + c1.execute("INSERT INTO foo VALUES (1, 2)") + c1.execute("INSERT INTO foo VALUES (2, 3)") + c1.commit() + sync_left_to_right(c1, c2, 0) + + changes = c1.execute( + "SELECT pk, cid, val, cl FROM crsql_changes").fetchall() + changes2 = c2.execute( + "SELECT pk, cid, val, cl FROM crsql_changes").fetchall() + assert (changes == [(b'\x01\t\x01', 'b', 2, 1), (b'\x01\t\x02', 'b', 3, 1)]) + assert (changes2 == changes) + + assert (c1.execute("SELECT * FROM foo").fetchall() == + c2.execute("SELECT * FROM foo").fetchall()) + + c2.execute("UPDATE OR REPLACE foo SET a = 3 WHERE a = 1") + c2.commit() + assert (c2.execute("SELECT crsql_db_version()").fetchone()[0] == 2) + sync_left_to_right(c2, c1, 1) + + changes = c2.execute( + "SELECT pk, cid, cl FROM crsql_changes").fetchall() + changes1 = c1.execute( + "SELECT pk, cid, cl FROM crsql_changes").fetchall() + # pk 2 is alive as we `update or replaced` to it + # and it is alive at version 3 given it iassert (changes2 == changes)s a re-insertion of the currently existing row + # pk 1 is dead (cl of 2) given we mutated / updated away from it. E.g., + # set a = 2 where a = 1 + assert (changes == [(b'\x01\t\x02', 'b', 1), (b'\x01\t\x01', '-1', 2), (b'\x01\t\x03', '-1', 1), + (b'\x01\t\x03', 'b', 1)]) + # assert (changes2 == changes) + + # Verify both nodes have same data after final sync + assert (c1.execute("SELECT * FROM foo").fetchall() == + c2.execute("SELECT * FROM foo").fetchall()) + close(c1) + close(c2) def test_change_primary_key_away_from_thing_with_large_length(): c = connect(":memory:") @@ -318,8 +429,8 @@ def test_change_primary_key_away_from_thing_with_large_length(): "SELECT pk, cid, cl FROM crsql_changes").fetchall() # first time existing for 2 # third deletion for 1 - assert (changes == [(b'\x01\t\x02', 'b', 1), - (b'\x01\t\x01', '-1', 6), (b'\x01\t\x02', '-1', 1)]) + assert (changes == [(b'\x01\t\x01', '-1', 6), (b'\x01\t\x02', '-1', 1), + (b'\x01\t\x02', 'b', 1),]) # Test inserting something for which we have delete records for but no actual row diff --git a/py/correctness/tests/test_update_rows.py b/py/correctness/tests/test_update_rows.py index 2d3ff6d78..450f03ac7 100644 --- a/py/correctness/tests/test_update_rows.py +++ b/py/correctness/tests/test_update_rows.py @@ -1,2 +1,67 @@ from crsql_correctness import connect +def sync_left_to_right(l, r, since): + changes = l.execute( + "SELECT * FROM crsql_changes WHERE db_version > ?", (since,)) + for change in changes: + r.execute( + "INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", change) + r.commit() + +def test_update_pk(): + def create_db(): + db = connect(":memory:") + db.execute("CREATE TABLE foo (id INTEGER PRIMARY KEY NOT NULL, a, b)") + db.execute("SELECT crsql_as_crr('foo');") + db.commit() + return db + + def get_site_id(db): + return db.execute("SELECT crsql_site_id()").fetchone()[0] + + db1 = create_db() + db2 = create_db() + + db1_site_id = get_site_id(db1) + db2_site_id = get_site_id(db2) + + db1.execute("INSERT INTO foo (id, a, b) VALUES (1, 2, 3)") + db1.commit() + + db1.execute("INSERT INTO foo (id, a, b) VALUES (2, 5, 6)") + db1.commit() + + sync_left_to_right(db1, db2, 0) + + db1_changes = db1.execute("SELECT * FROM crsql_changes").fetchall() + + assert (db1_changes == [('foo', b'\x01\t\x01', 'a', 2, 1, 1, db1_site_id, 1, 0), + ('foo', b'\x01\t\x01', 'b', 3, 1, 1, db1_site_id, 1, 1), + ('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0), + ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1)]) + + db2_changes = db2.execute("SELECT * FROM crsql_changes").fetchall() + assert (db2_changes == db1_changes) + + # update primary key + db1.execute("UPDATE foo SET id = 10 WHERE id = 1") + db1.commit() + + db1_foo = db1.execute("SELECT * FROM foo").fetchall() + assert (db1_foo == [(2, 5, 6), (10, 2, 3)]) + + db1_changes = db1.execute("SELECT * FROM crsql_changes").fetchall() + assert (db1_changes == [('foo', b'\x01\t\x02', 'a', 5, 1, 2, db1_site_id, 1, 0), + ('foo', b'\x01\t\x02', 'b', 6, 1, 2, db1_site_id, 1, 1), + ('foo', b'\x01\t\x01', '-1', None, 2, 3, db1_site_id, 2, 0), + ('foo', b'\x01\t\n', '-1', None, 1, 3, db1_site_id, 1, 1), + ('foo', b'\x01\t\n', 'a', 2, 2, 3, db1_site_id, 1, 2), + ('foo', b'\x01\t\n', 'b', 3, 2, 3, db1_site_id, 1, 3)]) + + sync_left_to_right(db1, db2, 2) + + db2_changes = db2.execute("SELECT * FROM crsql_changes").fetchall() + assert (db2_changes == db1_changes) + + db2_foo = db2.execute("SELECT * FROM foo").fetchall() + assert (db2_foo == db1_foo)