Skip to content

Commit

Permalink
storage: Skew test with upsert and fix SE startup bug (crash)
Browse files Browse the repository at this point in the history
  • Loading branch information
ohsayan committed Apr 17, 2024
1 parent 0ae9428 commit 79a31cd
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 61 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ All changes in this project will be noted in this file.

### Fixes

- Fixed an issue where an incorrect handshake with multiple errors cause the client connection
- Fixed an issue where an incorrect handshake with multiple errors caused the client connection
to be terminated without yielding an error
- Fixed SE bug that resulted in unsafe cleanup of journals when multiple failures occur in sequence
- Fixed SE memory management bug in delta diff algorithm: In rare cases, a crash might occur on startup (*only during startup* and *not* at runtime)

## Version 0.8.1

Expand Down
67 changes: 43 additions & 24 deletions server/src/engine/core/index/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,6 @@ pub struct PrimaryIndexKey {
data: SpecialPaddedWord,
}

impl Clone for PrimaryIndexKey {
fn clone(&self) -> Self {
match self.tag {
TagUnique::SignedInt | TagUnique::UnsignedInt => {
let (qw, nw) = self.data.dwordqn_load_qw_nw();
unsafe {
let slice = slice::from_raw_parts(nw as *const u8, qw as _);
let mut data = ManuallyDrop::new(slice.to_owned().into_boxed_slice());
Self {
tag: self.tag,
data: SpecialPaddedWord::new(qw, data.as_mut_ptr() as usize),
}
}
}
TagUnique::Bin | TagUnique::Str => Self {
tag: self.tag,
data: unsafe { core::mem::transmute_copy(&self.data) },
},
_ => unreachable!(),
}
}
}

impl PrimaryIndexKey {
pub fn tag(&self) -> TagUnique {
self.tag
Expand Down Expand Up @@ -145,13 +122,18 @@ impl PrimaryIndexKey {
if cfg!(debug_assertions) && tag < TagUnique::Bin {
assert_eq!(b, mem::ZERO_BLOCK.as_ptr() as usize);
}
Self {
let me = Self {
tag,
data: unsafe {
// UNSAFE(@ohsayan): loaded above, writing here
SpecialPaddedWord::new(a, b)
},
};
if cfg!(debug_assertions) {
let vblk = me.virtual_block();
assert_eq!(vblk.as_ptr() as usize, b);
}
me
}
/// Create a new quadword based primary key
pub unsafe fn new_from_qw(tag: TagUnique, qw: u64) -> Self {
Expand Down Expand Up @@ -395,3 +377,40 @@ fn empty_slice() {
assert_eq!(pk2, pk2_);
drop((pk2, pk2_));
}

#[test]
fn ensure_ptr_offsets() {
let data = String::from("hello").into_boxed_str();
let __orig = (data.as_ptr(), data.len());
// dc
let dc = Datacell::new_str(data);
assert_eq!(__orig, (dc.str().as_ptr(), dc.str().len()));
// pk
let pk = PrimaryIndexKey::try_from_dc(dc).unwrap();
assert_eq!(
__orig,
(pk.str().unwrap().as_ptr(), pk.str().unwrap().len())
);
}

#[test]
fn queue_ensure_offsets() {
use crate::engine::sync::queue::Queue;
let data: Vec<_> = (0..100)
.map(|_| "hello".to_owned().into_boxed_str())
.collect();
let __orig: Vec<_> = data.iter().map(|s| (s.as_ptr(), s.len())).collect();
let q = Queue::new();
for datum in data {
q.blocking_enqueue(
PrimaryIndexKey::try_from_dc(Datacell::new_str(datum)).unwrap(),
&crossbeam_epoch::pin(),
);
}
let mut __reloaded = vec![];
while let Some(key) = q.blocking_try_dequeue(&crossbeam_epoch::pin()) {
let pk_str = key.str().unwrap();
__reloaded.push((pk_str.as_ptr(), pk_str.len()));
}
assert_eq!(__orig, __reloaded);
}
4 changes: 4 additions & 0 deletions server/src/engine/core/index/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub struct RowData {
}

impl RowData {
#[cfg(test)]
pub fn get_schema_version(&self) -> DeltaVersion {
self.txn_revised_schema_version
}
pub fn fields(&self) -> &DcFieldIndex {
&self.fields
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/engine/ql/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
engine::{data::cell::Datacell, error::QueryResult},
util::test_utils,
},
rand::{self, Rng},
rand::Rng,
};

mod dcl;
Expand Down
2 changes: 1 addition & 1 deletion server/src/engine/ql/tests/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use super::*;
use crate::engine::ql::{
ast::{traits::ASTNode, State},
ast::State,
ddl::{Inspect, Use},
};

Expand Down
10 changes: 2 additions & 8 deletions server/src/engine/ql/tests/schema_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use {
};

mod alter_space {
use {
super::*,
crate::engine::{data::lit::Lit, ql::ddl::alt::AlterSpace},
};
use {super::*, crate::engine::ql::ddl::alt::AlterSpace};
#[test]
fn alter_space_mini() {
fullparse_verify_substmt("alter model mymodel with {}", |r: AlterSpace| {
Expand Down Expand Up @@ -66,7 +63,7 @@ mod alter_space {
mod tymeta {
use super::*;
use crate::engine::ql::{
ast::{parse_ast_node_full, traits::ASTNode, State},
ast::{parse_ast_node_full, State},
ddl::syn::{DictTypeMeta, DictTypeMetaSplit},
};
#[test]
Expand Down Expand Up @@ -271,7 +268,6 @@ mod fields {
crate::engine::ql::{
ast::parse_ast_node_full,
ddl::syn::{FieldSpec, LayerSpec},
lex::Ident,
},
};
#[test]
Expand Down Expand Up @@ -716,7 +712,6 @@ mod alter_model_remove {
use crate::engine::ql::{
ast::parse_ast_node_full_with_space,
ddl::alt::{AlterKind, AlterModel},
lex::Ident,
};
#[test]
fn alter_mini() {
Expand Down Expand Up @@ -1095,7 +1090,6 @@ mod ddl_other_query_tests {
crate::engine::ql::{
ast::{parse_ast_node_full, parse_ast_node_full_with_space},
ddl::drop::{DropModel, DropSpace},
lex::Ident,
},
};
#[test]
Expand Down
41 changes: 28 additions & 13 deletions server/src/engine/storage/v2/impls/mdl_journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ impl BatchAdapterSpec for ModelDataAdapter {
/*
go over each change in this batch, resolve conflicts and then apply to global state
*/
let g = unsafe { crossbeam_epoch::unprotected() };
let g = crossbeam_epoch::pin();
let mut pending_delete = HashMap::new();
let p_index = gs.primary_index().__raw_index();
let m = gs;
Expand All @@ -534,22 +534,37 @@ impl BatchAdapterSpec for ModelDataAdapter {
DecodedBatchEventKind::Insert(new_row)
| DecodedBatchEventKind::Update(new_row)
| DecodedBatchEventKind::Upsert(new_row) => {
let popped_row = p_index.mt_delete_return(&pk, &g);
if let Some(row) = popped_row {
let popped_row = p_index.mt_delete_return_entry(&pk, &g);
if let Some(popped_row) = popped_row {
/*
if a newer version of the row is received first and the older version is pending to be synced, the older
version is never synced. this is how the diffing algorithm works to ensure consistency.
the delta diff algorithm statically guarantees this. insert(s),update(s) and upsert(s) are all essentially
"new versions" of rows and as such, the only thing we need to do is remove the older row (which is guranteed
to be "old") and replace it with the newer row.
the delta diff algorithm statically guarantees this.
However, upsert is a special case because it will not touch the existing row (if present, with the same key).
This means that we potentially have this "ghost row" that will be written to disk. So assume that the upsert
"happens before" (once again, we're talking logical clocks and not time). In that case we have a completely
unrelated row present occupying the same key. So when we receive an update or insert after the the below assertion
would fail. We want to be able to guard against this.
FIXME(@ohsayan): try and trace this somehow, overall in an effort to ensure consistency (and be able to clearly test
it)
*/
let row_txn_revised = row.read().get_txn_revised();
assert!(
row_txn_revised.value_u64() == 0 || row_txn_revised < txn_id,
"revised ID is {} but our row has version {}",
row.read().get_txn_revised().value_u64(),
txn_id.value_u64()
);
let popped_row_txn_revised = popped_row.d_data().read().get_txn_revised();
if popped_row_txn_revised > txn_id {
// the row present is actually newer. in this case we resolve deltas and go to the next txn ID
let _ = popped_row.resolve_schema_deltas_and_freeze(m.delta_state());
let _ = p_index.mt_insert(popped_row.clone(), &g);
continue;
} else {
assert!(
popped_row_txn_revised.value_u64() == 0
|| popped_row_txn_revised < txn_id,
"revised ID is {} but our row has version {}",
popped_row.d_data().read().get_txn_revised().value_u64(),
txn_id.value_u64()
);
}
}
if txn_id > real_last_txn_id {
real_last_txn_id = txn_id;
Expand Down
Loading

0 comments on commit 79a31cd

Please sign in to comment.