Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/rs/core/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ unsafe fn compact_post_alter(
SELECT name FROM pragma_table_info('{table_name}')
WHERE pk > 0 AND name NOT IN
(SELECT name FROM pragma_index_info('{table_name}__crsql_pks_pks'))
UNION SELECT name FROM pragma_index_info('{table_name}__crsql_pks_pks') WHERE name NOT IN
UNION SELECT name FROM pragma_index_info('{table_name}__crsql_pks_pks') WHERE name NOT IN
(SELECT name FROM pragma_table_info('{table_name}') WHERE pk > 0) AND name != 'col_name'
);",
table_name = crate::util::escape_ident_as_value(tbl_name_str),
Expand Down
3 changes: 2 additions & 1 deletion core/rs/core/src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use alloc::string::ToString;
use core::ffi::{c_char, c_int};

use crate::{consts, tableinfo::TableInfo};
Expand Down Expand Up @@ -208,6 +207,8 @@ pub fn create_clock_table(
db_version INTEGER NOT NULL,
site_id INTEGER NOT NULL DEFAULT 0,
seq INTEGER NOT NULL,
ts TEXT NOT NULL DEFAULT '0',

PRIMARY KEY (key, col_name)
) WITHOUT ROWID, STRICT",
table_name = crate::util::escape_ident(table_name),
Expand Down
15 changes: 14 additions & 1 deletion core/rs/core/src/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub enum CrsqlChangesColumn {
SiteId = 6,
Cl = 7,
Seq = 8,
Ts = 9,
}

#[derive(FromPrimitive, PartialEq, Debug)]
Expand All @@ -36,6 +37,7 @@ pub enum ClockUnionColumn {
RowId = 6,
Seq = 7,
Cl = 8,
Ts = 9,
}

#[derive(FromPrimitive, PartialEq, Debug)]
Expand Down Expand Up @@ -70,6 +72,7 @@ pub struct crsql_ExtData {
pub pSelectSiteIdOrdinalStmt: *mut sqlite::stmt,
pub pSelectClockTablesStmt: *mut sqlite::stmt,
pub mergeEqualValues: ::core::ffi::c_int,
pub timestamp: ::core::ffi::c_ulonglong,
}

#[repr(C)]
Expand Down Expand Up @@ -265,7 +268,7 @@ fn bindgen_test_layout_crsql_ExtData() {
let ptr = UNINIT.as_ptr();
assert_eq!(
::core::mem::size_of::<crsql_ExtData>(),
152usize,
160usize,
concat!("Size of: ", stringify!(crsql_ExtData))
);
assert_eq!(
Expand Down Expand Up @@ -485,4 +488,14 @@ fn bindgen_test_layout_crsql_ExtData() {
stringify!(mergeEqualValues)
)
);
assert_eq!(
unsafe { ::core::ptr::addr_of!((*ptr).timestamp) as usize - ptr as usize },
152usize,
concat!(
"Offset of field: ",
stringify!(crsql_ExtData),
"::",
stringify!(timestamp)
)
);
}
4 changes: 4 additions & 0 deletions core/rs/core/src/changes_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ fn get_clock_table_col_name(col: &Option<CrsqlChangesColumn>) -> Option<String>
Some(CrsqlChangesColumn::SiteId) => Some("site_id".to_string()),
Some(CrsqlChangesColumn::Seq) => Some("seq".to_string()),
Some(CrsqlChangesColumn::Cl) => Some("cl".to_string()),
Some(CrsqlChangesColumn::Ts) => Some("ts".to_string()),
None => None,
}
}
Expand Down Expand Up @@ -494,6 +495,9 @@ fn column_impl(
Some(CrsqlChangesColumn::Cl) => {
ctx.result_value(changes_stmt.column_value(ClockUnionColumn::Cl as i32))
}
Some(CrsqlChangesColumn::Ts) => {
ctx.result_value(changes_stmt.column_value(ClockUnionColumn::Ts as i32));
}
None => return Err(ResultCode::MISUSE),
}

Expand Down
5 changes: 3 additions & 2 deletions core/rs/core/src/changes_vtab_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ fn crsql_changes_query_for_table(table_info: &TableInfo) -> Result<String, Resul
site_tbl.site_id as site_id,
t1.key,
t1.seq as seq,
COALESCE(t2.col_version, 1) as cl
COALESCE(t2.col_version, 1) as cl,
t1.ts as ts
FROM \"{table_name_ident}__crsql_clock\" AS t1
JOIN \"{table_name_ident}__crsql_pks\" AS pk_tbl ON t1.key = pk_tbl.__crsql_key
LEFT JOIN crsql_site_id AS site_tbl ON t1.site_id = site_tbl.ordinal
Expand All @@ -60,7 +61,7 @@ pub fn changes_union_query(
// Manually null-terminate the string so we don't have to copy it to create a CString.
// We can just extract the raw bytes of the Rust string.
Ok(format!(
"SELECT tbl, pks, cid, col_vrsn, db_vrsn, site_id, key, seq, cl FROM ({unions}) {idx_str}\0",
"SELECT tbl, pks, cid, col_vrsn, db_vrsn, site_id, key, seq, cl, ts FROM ({unions}) {idx_str}\0",
unions = sub_queries.join(" UNION ALL "),
idx_str = idx_str,
))
Expand Down
13 changes: 12 additions & 1 deletion core/rs/core/src/changes_vtab_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ fn set_winner_clock(
insert_db_vrsn: sqlite::int64,
insert_site_id: &[u8],
insert_seq: sqlite::int64,
insert_ts: &str,
) -> Result<sqlite::int64, ResultCode> {
// set the site_id ordinal
// get the returned ordinal
Expand Down Expand Up @@ -224,7 +225,8 @@ fn set_winner_clock(
.and_then(|_| match ordinal {
Some(ordinal) => set_stmt.bind_int64(6, ordinal),
None => set_stmt.bind_null(6),
});
})
.and_then(|_| set_stmt.bind_text(7, insert_ts, sqlite::Destructor::STATIC));

if let Err(rc) = bind_result {
reset_cached_stmt(set_stmt.stmt)?;
Expand Down Expand Up @@ -256,6 +258,7 @@ fn merge_sentinel_only_insert(
remote_db_vsn: sqlite::int64,
remote_site_id: &[u8],
remote_seq: sqlite::int64,
remote_ts: &str,
) -> Result<sqlite::int64, ResultCode> {
let merge_stmt_ref = tbl_info.get_merge_pk_only_insert_stmt(db)?;
let merge_stmt = merge_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;
Expand Down Expand Up @@ -302,6 +305,7 @@ fn merge_sentinel_only_insert(
remote_db_vsn,
remote_site_id,
remote_seq,
remote_ts,
);
}

Expand Down Expand Up @@ -332,6 +336,7 @@ unsafe fn merge_delete(
remote_db_vrsn: sqlite::int64,
remote_site_id: &[u8],
remote_seq: sqlite::int64,
remote_ts: &str,
) -> Result<sqlite::int64, ResultCode> {
let delete_stmt_ref = tbl_info.get_merge_delete_stmt(db)?;
let delete_stmt = delete_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;
Expand Down Expand Up @@ -370,6 +375,7 @@ unsafe fn merge_delete(
remote_db_vrsn,
remote_site_id,
remote_seq,
remote_ts,
)?;

// Drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
Expand Down Expand Up @@ -477,6 +483,7 @@ unsafe fn merge_insert(
let insert_site_id = args[2 + CrsqlChangesColumn::SiteId as usize];
let insert_cl = args[2 + CrsqlChangesColumn::Cl as usize].int64();
let insert_seq = args[2 + CrsqlChangesColumn::Seq as usize].int64();
let insert_ts = args[2 + CrsqlChangesColumn::Ts as usize].text();

if insert_site_id.bytes() > crate::consts::SITE_ID_LEN {
let err = CString::new("crsql - site id exceeded max length")?;
Expand Down Expand Up @@ -545,6 +552,7 @@ unsafe fn merge_insert(
insert_db_vrsn,
insert_site_id,
insert_seq,
insert_ts,
);
match merge_result {
Err(rc) => {
Expand Down Expand Up @@ -582,6 +590,7 @@ unsafe fn merge_insert(
insert_db_vrsn,
insert_site_id,
insert_seq,
insert_ts,
);
match merge_result {
Err(rc) => {
Expand Down Expand Up @@ -619,6 +628,7 @@ unsafe fn merge_insert(
insert_db_vrsn,
insert_site_id,
insert_seq,
insert_ts,
)?;
(*(*tab).pExtData).rowsImpacted += 1;
}
Expand Down Expand Up @@ -686,6 +696,7 @@ unsafe fn merge_insert(
insert_db_vrsn,
insert_site_id,
insert_seq,
insert_ts,
);
match merge_result {
Err(rc) => {
Expand Down
1 change: 1 addition & 0 deletions core/rs/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ pub unsafe extern "C" fn crsql_rollback_hook(user_data: *mut c_void) -> *const c
pub unsafe fn commit_or_rollback_reset(ext_data: *mut crsql_ExtData) {
(*ext_data).pendingDbVersion = -1;
(*ext_data).seq = 0;
(*ext_data).timestamp = 0;
(*ext_data).updatedTableInfosThisTx = 0;
}
1 change: 0 additions & 1 deletion core/rs/core/src/db_version.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use core::ffi::c_void;
use core::mem;
use core::ptr;

use crate::alloc::string::ToString;
use crate::alloc::{boxed::Box, vec::Vec};
Expand Down
76 changes: 76 additions & 0 deletions core/rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod util;

use alloc::borrow::Cow;
use alloc::format;
use alloc::string::ToString;
use core::ffi::c_char;
use core::mem;
use core::ptr::null_mut;
Expand Down Expand Up @@ -367,6 +368,40 @@ pub extern "C" fn sqlite3_crsqlcore_init(
return null_mut();
}

let rc = db
.create_function_v2(
"crsql_set_ts",
-1,
sqlite::UTF8 | sqlite::DETERMINISTIC,
Some(ext_data as *mut c_void),
Some(x_crsql_set_ts),
None,
None,
None,
)
.unwrap_or(ResultCode::ERROR);
if rc != ResultCode::OK {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

let rc = db
.create_function_v2(
"crsql_get_ts",
-1,
sqlite::UTF8 | sqlite::DETERMINISTIC,
Some(ext_data as *mut c_void),
Some(x_crsql_get_ts),
None,
None,
None,
)
.unwrap_or(ResultCode::ERROR);
if rc != ResultCode::OK {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

let rc = db
.create_function_v2(
"crsql_begin_alter",
Expand Down Expand Up @@ -756,6 +791,47 @@ unsafe extern "C" fn x_crsql_increment_and_get_seq(
(*ext_data).seq += 1;
}

/**
* Set the timestamp for the current transaction.
*/
unsafe extern "C" fn x_crsql_set_ts(
ctx: *mut sqlite::context,
argc: i32,
argv: *mut *mut sqlite::value,
) {
if argc == 0 {
ctx.result_error(
"Wrong number of args provided to crsql_begin_alter. Provide the
schema name and table name or just the table name.",
);
return;
}

let args = sqlite::args!(argc, argv);
let ts = args[0].text();
// we expect a string that we can parse as a u64
let ts_u64 = match ts.parse::<u64>() {
Ok(ts_u64) => ts_u64,
Err(_) => {
ctx.result_error("Timestamp cannot be parsed as a valid u64");
return;
}
};
let ext_data = ctx.user_data() as *mut c::crsql_ExtData;
(*ext_data).timestamp = ts_u64;
ctx.result_text_static("OK");
}

unsafe extern "C" fn x_crsql_get_ts(
ctx: *mut sqlite::context,
_argc: i32,
_argv: *mut *mut sqlite::value,
) {
let ext_data = ctx.user_data() as *mut c::crsql_ExtData;
let ts = (*ext_data).timestamp.to_string();
ctx.result_text_transient(&ts);
}

/**
* Return the current version of the database.
*
Expand Down
5 changes: 4 additions & 1 deletion core/rs/core/src/local_writes/after_delete.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloc::string::String;
use alloc::string::ToString;
use core::ffi::c_int;
use sqlite::sqlite3;
use sqlite::value;
Expand Down Expand Up @@ -39,11 +40,12 @@ fn after_delete(
tbl_info: &TableInfo,
pks_old: &[*mut value],
) -> Result<ResultCode, String> {
let ts = unsafe { (*ext_data).timestamp.to_string() };
let db_version = crate::db_version::next_db_version(db, ext_data)?;
let seq = bump_seq(ext_data);
let key = tbl_info
.get_or_create_key_via_raw_values(db, pks_old)
.map_err(|_| "failed geteting or creating lookaside key")?;
.map_err(|_| "failed getting or creating lookaside key")?;

let mark_locally_deleted_stmt_ref = tbl_info
.get_mark_locally_deleted_stmt(db)
Expand All @@ -55,6 +57,7 @@ fn after_delete(
.bind_int64(1, key)
.and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version))
.and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq))
.and_then(|_| mark_locally_deleted_stmt.bind_text(4, &ts, sqlite::Destructor::STATIC))
.map_err(|_| "failed binding to mark locally deleted stmt")?;
super::step_trigger_stmt(mark_locally_deleted_stmt)?;

Expand Down
15 changes: 10 additions & 5 deletions core/rs/core/src/local_writes/after_insert.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloc::string::String;
use alloc::string::ToString;
use core::ffi::c_int;
use sqlite::sqlite3;
use sqlite::value;
Expand Down Expand Up @@ -39,21 +40,23 @@ fn after_insert(
tbl_info: &TableInfo,
pks_new: &[*mut value],
) -> Result<ResultCode, String> {
let ts = unsafe { (*ext_data).timestamp.to_string() };

let db_version = crate::db_version::next_db_version(db, ext_data)?;
let (create_record_existed, key_new) = tbl_info
.get_or_create_key_for_insert(db, pks_new)
.map_err(|_| "failed getting or creating lookaside key")?;
if tbl_info.non_pks.is_empty() {
let seq = bump_seq(ext_data);
// just a sentinel record
return super::mark_new_pk_row_created(db, tbl_info, key_new, db_version, seq);
return super::mark_new_pk_row_created(db, tbl_info, key_new, db_version, seq, &ts);
} else if create_record_existed {
// update the create record since it already exists.
let seq = bump_seq(ext_data);
update_create_record(db, tbl_info, key_new, db_version, seq)?;
update_create_record(db, tbl_info, key_new, db_version, seq, &ts)?;
}

super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version)?;
super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version, &ts)?;

Ok(ResultCode::OK)
}
Expand All @@ -64,6 +67,7 @@ fn update_create_record(
new_key: sqlite::int64,
db_version: sqlite::int64,
seq: i32,
ts: &str,
) -> Result<ResultCode, String> {
let update_create_record_stmt_ref = tbl_info
.get_maybe_mark_locally_reinserted_stmt(db)
Expand All @@ -75,10 +79,11 @@ fn update_create_record(
update_create_record_stmt
.bind_int64(1, db_version)
.and_then(|_| update_create_record_stmt.bind_int(2, seq))
.and_then(|_| update_create_record_stmt.bind_int64(3, new_key))
.and_then(|_| update_create_record_stmt.bind_text(3, ts, sqlite::Destructor::STATIC))
.and_then(|_| update_create_record_stmt.bind_int64(4, new_key))
.and_then(|_| {
update_create_record_stmt.bind_text(
4,
5,
crate::c::INSERT_SENTINEL,
sqlite::Destructor::STATIC,
)
Expand Down
Loading
Loading