Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BQL,QE,SE: Add UPSERT query, BQL query shorthands and fix SE bootup crash #342

Merged
merged 6 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,23 @@ All changes in this project will be noted in this file.

- Skyhash/2: Restored support for pipelines
- Enable online (runtime) recovery of transactional failures due to disk errors
- Added BlueQL shorthands:
- `INSERT`: `INS`
- `SELECT`: `SEL`
- `UPDATE`: `UPD`
- `DELETE`: `DEL`
- Added new `UPSERT` (shorthand: `UPS`) query

### Fixes

- Fixed an issue where an incorrect handshake with multiple errors cause the client connection
- Fixed an issue where an incorrect handshake with multiple errors caused the client connection
to be terminated without yielding an error
- Fixed SE bug that resulted in unsafe cleanup of journals when multiple failures occur in sequence
- Fixed SE memory management bug in delta diff algorithm: In rare cases, a crash might occur on startup (*only during startup* and *not* at runtime)

### Platform notes

- 32-bit Windows (MSVC) has been downgraded to a Tier-2 platform and will likely be deprecated in the future

## Version 0.8.1

Expand Down
3 changes: 2 additions & 1 deletion harness/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ fn run_test_inner() -> HarnessResult<()> {
append_target(&mut standard_test_suite_args);
// get cmd
let build_cmd = util::assemble_command_from_slice(build_cmd_args);
let standard_test_suite = util::assemble_command_from_slice(standard_test_suite_args);
let mut standard_test_suite = util::assemble_command_from_slice(standard_test_suite_args);
standard_test_suite.env("RUST_MIN_STACK", "16777216");

// build skyd
info!("Building server binary ...");
Expand Down
3 changes: 2 additions & 1 deletion server/src/engine/core/dml/del.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn delete(global: &impl GlobalInstanceLike, mut delete: DeleteStatement) ->
core::with_model_for_data_update(global, delete.entity(), |model| {
let g = sync::atm::cpin();
let delta_state = model.delta_state();
let _idx_latch = model.primary_index().acquire_cd();
let _idx_latch = model.primary_index().acquire_shared();
// create new version
let new_version = delta_state.create_new_data_delta_version();
match model
Expand All @@ -54,6 +54,7 @@ pub fn delete(global: &impl GlobalInstanceLike, mut delete: DeleteStatement) ->
.mt_delete_return_entry(&model.resolve_where(delete.clauses_mut())?, &g)
{
Some(row) => {
row.d_data().write().set_txn_revised(new_version);
let dp = delta_state.append_new_data_delta_with(
DataDeltaKind::Delete,
row.clone(),
Expand Down
27 changes: 26 additions & 1 deletion server/src/engine/core/dml/ins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn insert_resp(
pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> QueryResult<()> {
core::with_model_for_data_update(global, insert.entity(), |mdl| {
let (pk, data) = prepare_insert(mdl, insert.data())?;
let _idx_latch = mdl.primary_index().acquire_cd();
let _idx_latch = mdl.primary_index().acquire_shared();
let g = cpin();
let ds = mdl.delta_state();
// create new version
Expand All @@ -65,6 +65,31 @@ pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> Quer
})
}

pub fn upsert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> QueryResult<bool> {
let mut ret = false;
core::with_model_for_data_update(global, insert.entity(), |mdl| {
let (pk, data) = prepare_insert(mdl, insert.data())?;
let _idx_latch = mdl.primary_index().acquire_shared();
let g = cpin();
let ds = mdl.delta_state();
// create new version
let new_version = ds.create_new_data_delta_version();
let row = Row::new(pk, data, ds.schema_current_version(), new_version);
ret = mdl.primary_index().__raw_index().mt_upsert(row.clone(), &g);
// append delta for new version
let dp = ds.append_new_data_delta_with(DataDeltaKind::Upsert, row, new_version, &g);
Ok(QueryExecMeta::new(dp))
})
.map(|_| ret)
}

pub fn upsert_resp(
global: &impl GlobalInstanceLike,
insert: InsertStatement,
) -> QueryResult<Response> {
self::upsert(global, insert).map(Response::Bool)
}

// TODO(@ohsayan): optimize null case
fn prepare_insert(
model: &ModelData,
Expand Down
2 changes: 1 addition & 1 deletion server/src/engine/core/dml/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub use {
};
pub use {
del::delete_resp,
ins::insert_resp,
ins::{insert_resp, upsert_resp},
sel::{select_all_resp, select_resp},
upd::update_resp,
};
Expand Down
9 changes: 7 additions & 2 deletions server/src/engine/core/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,26 @@ fn run_nb(
&Global,
&mut ClientLocalState,
&mut State<'static, InplaceData>,
) -> QueryResult<Response>; 9] = [
) -> QueryResult<Response>; {
// +SELECT ALL
KeywordStmt::NONBLOCKING_COUNT + 1
}] = [
cstate_use, // use
|g, c, s| _callgcs(g, c, s, ddl_misc::inspect),
|_, _, _| Err(QueryError::QLUnknownStatement), // describe
|g, _, s| _callgs(g, s, dml::insert_resp),
|g, _, s| _callgs(g, s, dml::select_resp),
|g, _, s| _callgs(g, s, dml::update_resp),
|g, _, s| _callgs(g, s, dml::delete_resp),
|g, _, s| _callgs(g, s, dml::upsert_resp),
|_, _, _| Err(QueryError::QLUnknownStatement), // exists
|g, _, s| _callgs(g, s, dml::select_all_resp),
];
{
let n_offset_adjust = (stmt == KeywordStmt::Select) & state.cursor_rounded_eq(Token![all]);
state.cursor_ahead_if(n_offset_adjust);
let corrected_offset = (n_offset_adjust as u8 * 8) | (stmt_c * (!n_offset_adjust as u8));
let corrected_offset =
(n_offset_adjust as u8 * (F.len() - 1) as u8) | (stmt_c * (!n_offset_adjust as u8));
let mut state = unsafe {
// UNSAFE(@ohsayan): this is a lifetime issue with the token handle
core::mem::transmute(state)
Expand Down
67 changes: 43 additions & 24 deletions server/src/engine/core/index/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,6 @@ pub struct PrimaryIndexKey {
data: SpecialPaddedWord,
}

impl Clone for PrimaryIndexKey {
fn clone(&self) -> Self {
match self.tag {
TagUnique::SignedInt | TagUnique::UnsignedInt => {
let (qw, nw) = self.data.dwordqn_load_qw_nw();
unsafe {
let slice = slice::from_raw_parts(nw as *const u8, qw as _);
let mut data = ManuallyDrop::new(slice.to_owned().into_boxed_slice());
Self {
tag: self.tag,
data: SpecialPaddedWord::new(qw, data.as_mut_ptr() as usize),
}
}
}
TagUnique::Bin | TagUnique::Str => Self {
tag: self.tag,
data: unsafe { core::mem::transmute_copy(&self.data) },
},
_ => unreachable!(),
}
}
}

impl PrimaryIndexKey {
pub fn tag(&self) -> TagUnique {
self.tag
Expand Down Expand Up @@ -145,13 +122,18 @@ impl PrimaryIndexKey {
if cfg!(debug_assertions) && tag < TagUnique::Bin {
assert_eq!(b, mem::ZERO_BLOCK.as_ptr() as usize);
}
Self {
let me = Self {
tag,
data: unsafe {
// UNSAFE(@ohsayan): loaded above, writing here
SpecialPaddedWord::new(a, b)
},
};
if cfg!(debug_assertions) {
let vblk = me.virtual_block();
assert_eq!(vblk.as_ptr() as usize, b);
}
me
}
/// Create a new quadword based primary key
pub unsafe fn new_from_qw(tag: TagUnique, qw: u64) -> Self {
Expand Down Expand Up @@ -395,3 +377,40 @@ fn empty_slice() {
assert_eq!(pk2, pk2_);
drop((pk2, pk2_));
}

#[test]
fn ensure_ptr_offsets() {
let data = String::from("hello").into_boxed_str();
let __orig = (data.as_ptr(), data.len());
// dc
let dc = Datacell::new_str(data);
assert_eq!(__orig, (dc.str().as_ptr(), dc.str().len()));
// pk
let pk = PrimaryIndexKey::try_from_dc(dc).unwrap();
assert_eq!(
__orig,
(pk.str().unwrap().as_ptr(), pk.str().unwrap().len())
);
}

#[test]
fn queue_ensure_offsets() {
use crate::engine::sync::queue::Queue;
let data: Vec<_> = (0..100)
.map(|_| "hello".to_owned().into_boxed_str())
.collect();
let __orig: Vec<_> = data.iter().map(|s| (s.as_ptr(), s.len())).collect();
let q = Queue::new();
for datum in data {
q.blocking_enqueue(
PrimaryIndexKey::try_from_dc(Datacell::new_str(datum)).unwrap(),
&crossbeam_epoch::pin(),
);
}
let mut __reloaded = vec![];
while let Some(key) = q.blocking_try_dequeue(&crossbeam_epoch::pin()) {
let pk_str = key.str().unwrap();
__reloaded.push((pk_str.as_ptr(), pk_str.len()));
}
assert_eq!(__orig, __reloaded);
}
2 changes: 1 addition & 1 deletion server/src/engine/core/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl PrimaryIndex {
latch: IndexLatch::new(),
}
}
pub fn acquire_cd(&self) -> IndexLatchHandleShared {
pub fn acquire_shared(&self) -> IndexLatchHandleShared {
self.latch.gl_handle_shared()
}
pub fn acquire_exclusive(&self) -> IndexLatchHandleExclusive {
Expand Down
4 changes: 4 additions & 0 deletions server/src/engine/core/index/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub struct RowData {
}

impl RowData {
#[cfg(test)]
pub fn get_schema_version(&self) -> DeltaVersion {
self.txn_revised_schema_version
}
pub fn fields(&self) -> &DcFieldIndex {
&self.fields
}
Expand Down
5 changes: 5 additions & 0 deletions server/src/engine/core/model/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,9 @@ pub enum DataDeltaKind {
Delete = 0,
Insert = 1,
Update = 2,
/*
HACK(@ohsayan): I was daft enough to map (hardcode) an `EarlyExit` event (skewed read) to OPC 3 in the SE.
Hence, we have no choice but to map this to 4
*/
Upsert = 4,
}
4 changes: 4 additions & 0 deletions server/src/engine/core/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ pub struct ModelMutator<'a> {
}

impl<'a> ModelMutator<'a> {
#[cfg(test)]
pub unsafe fn allocate(&mut self, k: &str) -> RawStr {
self.model.private.allocate_or_recycle(k)
}
pub unsafe fn vacuum_stashed(&mut self) {
self.model.private.vacuum_marked()
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/engine/idx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub trait MTIndex<E, K, V>: IndexBaseSpec {
where
V: AsValue;
/// Updates or inserts the given value
fn mt_upsert(&self, e: E, g: &Guard)
fn mt_upsert(&self, e: E, g: &Guard) -> bool
where
V: AsValue;
// read
Expand Down
2 changes: 1 addition & 1 deletion server/src/engine/idx/mtchm/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl<E: TreeElement, C: Config> MTIndex<E, E::Key, E::Value> for Raw<E, C> {
self.patch(VanillaInsert(e), g)
}

fn mt_upsert(&self, e: E, g: &Guard)
fn mt_upsert(&self, e: E, g: &Guard) -> bool
where
E::Value: AsValue,
{
Expand Down
10 changes: 7 additions & 3 deletions server/src/engine/idx/mtchm/patch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<T: TreeElement> PatchWrite<T> for VanillaInsert<T> {
pub struct VanillaUpsert<T: TreeElement>(pub T);
impl<T: TreeElement> PatchWrite<T> for VanillaUpsert<T> {
const WMODE: WriteFlag = WRITEMODE_ANY;
type Ret<'a> = ();
type Ret<'a> = bool;
type Target = T::Key;
fn target<'a>(&'a self) -> &Self::Target {
self.0.key()
Expand All @@ -94,12 +94,16 @@ impl<T: TreeElement> PatchWrite<T> for VanillaUpsert<T> {
fn nx_new(&mut self) -> T {
self.0.clone()
}
fn nx_ret<'a>() -> Self::Ret<'a> {}
fn nx_ret<'a>() -> Self::Ret<'a> {
false
}
// ex
fn ex_apply(&mut self, _: &T) -> T {
self.0.clone()
}
fn ex_ret<'a>(_: &'a T) -> Self::Ret<'a> {}
fn ex_ret<'a>(_: &'a T) -> Self::Ret<'a> {
true
}
}

pub struct VanillaUpsertRet<T: TreeElement>(pub T);
Expand Down
Loading
Loading