Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions core/rs/core/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,27 @@ fn create_site_id_and_site_id_table(db: *mut sqlite3) -> Result<[u8; 16], Result
insert_site_id(db)
}

pub fn create_site_id_triggers(db: *mut sqlite3) -> Result<ResultCode, ResultCode> {
db.exec_safe(&format!(
"CREATE TRIGGER IF NOT EXISTS {tbl}_insert_trig AFTER INSERT ON \"{tbl}\"
WHEN NEW.ordinal != 0
BEGIN
VALUES (crsql_update_site_id(NEW.site_id, NEW.ordinal));
END;
CREATE TRIGGER IF NOT EXISTS {tbl}_update_trig AFTER UPDATE ON \"{tbl}\"
WHEN NEW.ordinal != 0
BEGIN
VALUES (crsql_update_site_id(NEW.site_id, NEW.ordinal));
END;
CREATE TRIGGER IF NOT EXISTS {tbl}_delete_trig AFTER DELETE ON \"{tbl}\"
WHEN OLD.ordinal != 0
BEGIN
VALUES (crsql_update_site_id(OLD.site_id, -1));
END;",
tbl = consts::TBL_SITE_ID
))
}

#[no_mangle]
pub extern "C" fn crsql_init_peer_tracking_table(db: *mut sqlite3) -> c_int {
match db.exec_safe("CREATE TABLE IF NOT EXISTS crsql_tracked_peers (\"site_id\" BLOB NOT NULL, \"version\" INTEGER NOT NULL, \"seq\" INTEGER DEFAULT 0, \"tag\" INTEGER, \"event\" INTEGER, PRIMARY KEY (\"site_id\", \"tag\", \"event\")) STRICT;") {
Expand Down
19 changes: 16 additions & 3 deletions core/rs/core/src/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub struct crsql_ExtData {
pub pSelectClockTablesStmt: *mut sqlite::stmt,
pub mergeEqualValues: ::core::ffi::c_int,
pub timestamp: ::core::ffi::c_ulonglong,
pub ordinalMap: *mut ::core::ffi::c_void,
}

#[repr(C)]
Expand Down Expand Up @@ -108,10 +109,12 @@ extern "C" {
db: *mut sqlite::sqlite3,
pExtData: *mut crsql_ExtData,
) -> c_int;
pub fn crsql_newExtData(
pub fn crsql_newExtData(db: *mut sqlite::sqlite3) -> *mut crsql_ExtData;
pub fn crsql_initSiteIdExt(
db: *mut sqlite::sqlite3,
pExtData: *mut crsql_ExtData,
siteIdBuffer: *mut c_char,
) -> *mut crsql_ExtData;
) -> c_int;
pub fn crsql_freeExtData(pExtData: *mut crsql_ExtData);
pub fn crsql_finalize(pExtData: *mut crsql_ExtData);
}
Expand Down Expand Up @@ -268,7 +271,7 @@ fn bindgen_test_layout_crsql_ExtData() {
let ptr = UNINIT.as_ptr();
assert_eq!(
::core::mem::size_of::<crsql_ExtData>(),
160usize,
168usize,
concat!("Size of: ", stringify!(crsql_ExtData))
);
assert_eq!(
Expand Down Expand Up @@ -498,4 +501,14 @@ fn bindgen_test_layout_crsql_ExtData() {
stringify!(timestamp)
)
);
assert_eq!(
unsafe { ::core::ptr::addr_of!((*ptr).ordinalMap) as usize - ptr as usize },
160usize,
concat!(
"Offset of field: ",
stringify!(crsql_ExtData),
"::",
stringify!(ordinalMap)
)
);
}
26 changes: 25 additions & 1 deletion core/rs/core/src/changes_vtab.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
extern crate alloc;
use crate::alloc::string::ToString;
use crate::alloc::{collections::BTreeMap, string::ToString};
use crate::changes_vtab_write::crsql_merge_insert;
use crate::stmt_cache::reset_cached_stmt;
use crate::tableinfo::{crsql_ensure_table_infos_are_up_to_date, TableInfo};
Expand Down Expand Up @@ -565,3 +565,27 @@ pub extern "C" fn crsql_changes_commit(vtab: *mut sqlite::vtab) -> c_int {
}
ResultCode::OK as c_int
}

#[no_mangle]
pub extern "C" fn crsql_changes_savepoint(_vtab: *mut sqlite::vtab, _n: c_int) -> c_int {
ResultCode::OK as c_int
}

#[no_mangle]
pub extern "C" fn crsql_changes_release(_vtab: *mut sqlite::vtab, _n: c_int) -> c_int {
ResultCode::OK as c_int
}

// clear ordinal cache on rollback so we don't have wrong data in the cache.
#[no_mangle]
pub extern "C" fn crsql_changes_rollback_to(vtab: *mut sqlite::vtab, _: c_int) -> c_int {
let tab = vtab.cast::<crsql_Changes_vtab>();

let mut ordinals = unsafe {
mem::ManuallyDrop::new(Box::from_raw(
(*(*tab).pExtData).ordinalMap as *mut BTreeMap<Vec<u8>, i64>,
))
};
ordinals.clear();
ResultCode::OK as c_int
}
7 changes: 7 additions & 0 deletions core/rs/core/src/commit.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use alloc::{boxed::Box, collections::BTreeMap, vec::Vec};
use core::{
ffi::{c_int, c_void},
mem,
ptr::null,
};

Expand Down Expand Up @@ -31,4 +33,9 @@ pub unsafe fn commit_or_rollback_reset(ext_data: *mut crsql_ExtData) {
(*ext_data).seq = 0;
(*ext_data).timestamp = 0;
(*ext_data).updatedTableInfosThisTx = 0;

let mut ordinals: mem::ManuallyDrop<Box<BTreeMap<Vec<u8>, i64>>> = mem::ManuallyDrop::new(
Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap<Vec<u8>, i64>),
);
ordinals.clear();
}
34 changes: 30 additions & 4 deletions core/rs/core/src/db_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ pub extern "C" fn crsql_init_last_db_versions_map(ext_data: *mut crsql_ExtData)
unsafe { (*ext_data).lastDbVersions = Box::into_raw(Box::new(map)) as *mut c_void }
}

#[no_mangle]
pub extern "C" fn crsql_init_ordinal_map(ext_data: *mut crsql_ExtData) {
let map: BTreeMap<Vec<u8>, i64> = BTreeMap::new();
unsafe { (*ext_data).ordinalMap = Box::into_raw(Box::new(map)) as *mut c_void }
}

#[no_mangle]
pub extern "C" fn crsql_drop_last_db_versions_map(ext_data: *mut crsql_ExtData) {
unsafe {
Expand All @@ -196,6 +202,15 @@ pub extern "C" fn crsql_drop_last_db_versions_map(ext_data: *mut crsql_ExtData)
}
}

#[no_mangle]
pub extern "C" fn crsql_drop_ordinal_map(ext_data: *mut crsql_ExtData) {
unsafe {
drop(Box::from_raw(
(*ext_data).ordinalMap as *mut BTreeMap<Vec<u8>, i64>,
));
}
}

pub fn insert_db_version(
ext_data: *mut crsql_ExtData,
insert_site_id: &[u8],
Expand Down Expand Up @@ -258,6 +273,15 @@ pub unsafe fn get_or_set_site_ordinal(
ext_data: *mut crsql_ExtData,
site_id: &[u8],
) -> Result<i64, ResultCode> {
// check the cache first
let mut ordinals: mem::ManuallyDrop<Box<BTreeMap<Vec<u8>, i64>>> = mem::ManuallyDrop::new(
Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap<Vec<u8>, i64>),
);

if let Some(ordinal) = ordinals.get(site_id) {
return Ok(*ordinal);
}

let bind_result =
(*ext_data)
.pSelectSiteIdOrdinalStmt
Expand All @@ -268,11 +292,11 @@ pub unsafe fn get_or_set_site_ordinal(
return Err(rc);
}

match (*ext_data).pSelectSiteIdOrdinalStmt.step() {
let ordinal = match (*ext_data).pSelectSiteIdOrdinalStmt.step() {
Ok(ResultCode::ROW) => {
let ordinal = (*ext_data).pSelectSiteIdOrdinalStmt.column_int64(0);
reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?;
Ok(ordinal)
ordinal
}
Ok(_) => {
reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?;
Expand All @@ -296,7 +320,7 @@ pub unsafe fn get_or_set_site_ordinal(
Ok(_) => {
let ordinal = (*ext_data).pSetSiteIdOrdinalStmt.column_int64(0);
reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?;
Ok(ordinal)
ordinal
}
Err(rc) => {
reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?;
Expand All @@ -308,5 +332,7 @@ pub unsafe fn get_or_set_site_ordinal(
reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?;
return Err(rc);
}
}
};
ordinals.insert(site_id.to_vec(), ordinal);
Ok(ordinal)
}
122 changes: 112 additions & 10 deletions core/rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ mod triggers;
mod unpack_columns_vtab;
mod util;

use alloc::borrow::Cow;
use alloc::format;
use alloc::string::ToString;
use alloc::{borrow::Cow, boxed::Box, collections::BTreeMap, vec::Vec};
use core::ffi::c_char;
use core::mem;
use core::ptr::null_mut;
extern crate alloc;
use alter::crsql_compact_post_alter;
use automigrate::*;
use backfill::*;
use c::{crsql_freeExtData, crsql_newExtData};
use c::{crsql_freeExtData, crsql_initSiteIdExt, crsql_newExtData};
use config::{crsql_config_get, crsql_config_set};
use core::ffi::{c_int, c_void, CStr};
use create_crr::create_crr;
Expand Down Expand Up @@ -232,6 +232,30 @@ pub extern "C" fn sqlite3_crsqlcore_init(
return null_mut();
}

// allocate ext data earlier in the init process because we need its
// pointer to be available for the crsql_update_site_id function.
let ext_data = unsafe { crsql_newExtData(db) };
if ext_data.is_null() {
return null_mut();
}

let rc = db
.create_function_v2(
"crsql_update_site_id",
2,
sqlite::UTF8 | sqlite::INNOCUOUS | sqlite::DETERMINISTIC,
Some(ext_data as *mut c_void),
Some(x_crsql_update_site_id),
None,
None,
None,
)
.unwrap_or(ResultCode::ERROR);
if rc != ResultCode::OK {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

// TODO: convert this function to a proper rust function
// and have rust free:
// 1. site_id_buffer
Expand All @@ -243,12 +267,18 @@ pub extern "C" fn sqlite3_crsqlcore_init(
let rc = crate::bootstrap::crsql_init_site_id(db, site_id_buffer);
if rc != ResultCode::OK as c_int {
sqlite::free(site_id_buffer as *mut c_void);
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

let ext_data = unsafe { crsql_newExtData(db, site_id_buffer as *mut c_char) };
if ext_data.is_null() {
// no need to free the site id buffer here, this is cleaned up already.
let rc = unsafe { crsql_initSiteIdExt(db, ext_data, site_id_buffer as *mut c_char) };
if rc != ResultCode::OK as c_int {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

if let Err(_) = crate::bootstrap::create_site_id_triggers(db) {
sqlite::free(site_id_buffer as *mut c_void);
return null_mut();
}

Expand Down Expand Up @@ -408,7 +438,7 @@ pub extern "C" fn sqlite3_crsqlcore_init(
let rc = db
.create_function_v2(
"crsql_set_ts",
-1,
1,
sqlite::UTF8 | sqlite::DETERMINISTIC,
Some(ext_data as *mut c_void),
Some(x_crsql_set_ts),
Expand All @@ -422,6 +452,24 @@ pub extern "C" fn sqlite3_crsqlcore_init(
return null_mut();
}

#[cfg(feature = "test")]
let rc = db
.create_function_v2(
"crsql_cache_site_ordinal",
1,
sqlite::UTF8 | sqlite::DETERMINISTIC,
Some(ext_data as *mut c_void),
Some(x_crsql_cache_site_ordinal),
None,
None,
None,
)
.unwrap_or(ResultCode::ERROR);
if rc != ResultCode::OK {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

let rc = db
.create_function_v2(
"crsql_set_db_version",
Expand Down Expand Up @@ -627,6 +675,32 @@ unsafe extern "C" fn x_crsql_site_id(
sqlite::result_blob(ctx, site_id, consts::SITE_ID_LEN, Destructor::STATIC);
}

/**
* update in-memory map of site ids to ordinals. Only valid within a transaction.
*
* `select crsql_update_site_id(site_id, ordinal)`
*/
unsafe extern "C" fn x_crsql_update_site_id(
ctx: *mut sqlite::context,
argc: i32,
argv: *mut *mut sqlite::value,
) {
let ext_data = ctx.user_data() as *mut c::crsql_ExtData;
let args = sqlite::args!(argc, argv);
let site_id = args[0].blob();
let ordinal = args[1].int64();
let mut ordinals: mem::ManuallyDrop<Box<BTreeMap<Vec<u8>, i64>>> = mem::ManuallyDrop::new(
Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap<Vec<u8>, i64>),
);

if ordinal == -1 {
ordinals.remove(&site_id.to_vec());
} else {
ordinals.insert(site_id.to_vec(), ordinal);
}
ctx.result_text_static("OK");
}

unsafe extern "C" fn x_crsql_finalize(
ctx: *mut sqlite::context,
_argc: i32,
Expand Down Expand Up @@ -854,10 +928,7 @@ unsafe extern "C" fn x_crsql_set_ts(
argv: *mut *mut sqlite::value,
) {
if argc == 0 {
ctx.result_error(
"Wrong number of args provided to crsql_begin_alter. Provide the
schema name and table name or just the table name.",
);
ctx.result_error("Wrong number of args provided to x_crsql_set_ts. Provide the timestamp.");
return;
}

Expand All @@ -876,6 +947,37 @@ unsafe extern "C" fn x_crsql_set_ts(
ctx.result_text_static("OK");
}

/**
* Get the site ordinal cached in the ext data for the current transaction.
* only used for test to inspect the ordinal map.
*/
#[cfg(feature = "test")]
unsafe extern "C" fn x_crsql_cache_site_ordinal(
ctx: *mut sqlite::context,
argc: i32,
argv: *mut *mut sqlite::value,
) {
if argc == 0 {
ctx.result_error(
"Wrong number of args provided to crsql_cache_site_ordinal. Provide the site id.",
);
return;
}

let ext_data = ctx.user_data() as *mut c::crsql_ExtData;
let args = sqlite::args!(argc, argv);
let site_id = args[0].blob();

let ord_map = mem::ManuallyDrop::new(Box::from_raw(
(*ext_data).ordinalMap as *mut BTreeMap<Vec<u8>, i64>,
));
let res = ord_map.get(site_id).cloned().unwrap_or(-1);
sqlite::result_int64(ctx, res);
}

/**
* Return the timestamp for the current transaction.
*/
unsafe extern "C" fn x_crsql_get_ts(
ctx: *mut sqlite::context,
_argc: i32,
Expand Down
Loading
Loading