Skip to content

Commit

Permalink
storage: Fully test rollbacks and simulate failures
Browse files Browse the repository at this point in the history
  • Loading branch information
ohsayan committed Apr 7, 2024
1 parent 4982a53 commit d7100da
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 15 deletions.
1 change: 0 additions & 1 deletion server/src/engine/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ enumerate_err! {
runtime errors
----
*/
RawJournalRuntimeHeartbeatFail = "journal-lwt-heartbeat-failed",
RawJournalRuntimeDirty = "journal-in-dirty-state",
}
}
1 change: 1 addition & 0 deletions server/src/engine/storage/common/interface/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub trait FileWrite {
)
}
Ok(n) => written += n,
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return (written, Err(e)),
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/engine/storage/common/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@
pub mod fs;
#[cfg(test)]
mod vfs;
#[cfg(test)]
pub use vfs::vfs_utils;
59 changes: 53 additions & 6 deletions server/src/engine/storage/common/interface/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/

use {
crate::{engine::sync::cell::Lazy, IoResult},
crate::{engine::sync::cell::Lazy, util::test_utils, IoResult},
parking_lot::RwLock,
std::{
collections::{
Expand All @@ -41,6 +41,33 @@ use {
---
*/

pub mod vfs_utils {
#[derive(Debug, PartialEq, Clone, Copy)]
pub(super) enum WriteCrashKind {
None,
Zero,
Random,
}
local!(
static RANDOM_WRITE_CRASH: WriteCrashKind = WriteCrashKind::None;
);
/// WARNING: A random write crash automatically degrades to a [`WriteCrashKind::Zero`] as soon as it completes
/// to prevent any further data writes (due to retries in
/// [`fs::FileWrite::fwrite_all_count`](super::super::fs::FileWrite::fwrite_all_count))
pub fn debug_enable_random_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Random)
}
pub fn debug_enable_zero_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Zero)
}
pub fn debug_disable_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::None)
}
pub(super) fn debug_write_crash_setting() -> WriteCrashKind {
local_ref!(RANDOM_WRITE_CRASH, |crash| *crash)
}
}

/*
definitions
---
Expand Down Expand Up @@ -167,12 +194,32 @@ impl VFile {
if !self.write {
return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into());
}
if self.pos + bytes.len() > self.data.len() {
self.data.resize(self.pos + bytes.len(), 0);
match vfs_utils::debug_write_crash_setting() {
vfs_utils::WriteCrashKind::None => {
if self.pos + bytes.len() > self.data.len() {
self.data.resize(self.pos + bytes.len(), 0);
}
self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes);
self.pos += bytes.len();
Ok(bytes.len() as _)
}
vfs_utils::WriteCrashKind::Random => {
// write some random part of the buffer into this file
let mut rng = rand::thread_rng();
let actual_write_length = test_utils::random_number(0, bytes.len(), &mut rng);
if self.pos + actual_write_length > self.data.len() {
self.data.resize(self.pos + actual_write_length, 0);
}
self.data[self.pos..self.pos + actual_write_length]
.copy_from_slice(&bytes[..actual_write_length]);
self.pos += actual_write_length;
// now soon as this is complete, downgrade error type to writezero so that we don't write any further data during
// a retry
vfs_utils::debug_enable_zero_write_crash();
Ok(actual_write_length as _)
}
vfs_utils::WriteCrashKind::Zero => Ok(0),
}
self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes);
self.pos += bytes.len();
Ok(bytes.len() as _)
}
}

Expand Down
6 changes: 4 additions & 2 deletions server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,10 @@ impl<
match self.f_d.fwrite_all_count(buf) {
(cnt, r) => {
self.t_cursor += cnt;
if r.is_err() && CHECKSUM_WRITTEN_IF_BLOCK_ERROR {
self.t_checksum.update(&buf[..cnt as usize]);
if r.is_err() {
if CHECKSUM_WRITTEN_IF_BLOCK_ERROR {
self.t_checksum.update(&buf[..cnt as usize]);
}
} else {
self.t_checksum.update(buf);
}
Expand Down
32 changes: 29 additions & 3 deletions server/src/engine/storage/v2/raw/journal/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ pub fn debug_set_offset_tracking(track: bool) {
local_mut!(TRACE_OFFSETS, |track_| *track_ = track)
}

#[cfg(test)]
pub fn debug_get_first_meta_triplet() -> Option<(u64, u64, u64)> {
local_mut!(FIRST_TRIPLET, |tr| core::mem::take(tr))
}

#[derive(Debug, PartialEq)]
#[cfg(test)]
pub enum JournalTraceEvent {
Expand Down Expand Up @@ -230,6 +235,7 @@ local! {
static TRACE: Vec<JournalTraceEvent> = Vec::new();
static OFFSETS: std::collections::BTreeMap<u64, u64> = Default::default();
static TRACE_OFFSETS: bool = false;
static FIRST_TRIPLET: Option<(u64, u64, u64)> = None;
}

macro_rules! jtrace_event_offset {
Expand Down Expand Up @@ -630,8 +636,14 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
self.log_file.verify_cursor()?;
// reverse
self.log_file.inner_mut(|file| {
file.f_truncate(self.known_txn_offset)?;
Ok(self.known_txn_offset)
let new_offset = if self.txn_id == 0 {
debug_assert_eq!(self.known_txn_offset, 0);
<<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64
} else {
self.known_txn_offset
};
file.f_truncate(new_offset)?;
Ok(new_offset)
})?;
// reverse successful, now empty write buffer
unsafe {
Expand All @@ -647,6 +659,16 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
&mut self,
f: impl FnOnce(&mut Self, u128) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
#[cfg(test)]
if local_ref!(FIRST_TRIPLET, |tr| { tr.is_none() }) {
local_mut!(FIRST_TRIPLET, |tr| {
*tr = Some((
self.known_txn_id,
self.known_txn_offset,
self.log_file.current_checksum(),
));
})
}
let id = self.txn_id;
let ret = f(self, id as u128);
if ret.is_ok() {
Expand Down Expand Up @@ -864,7 +886,6 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
},
ErrorKind::Storage(e) => match e {
// unreachable errors (no execution path here)
StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start
| StorageError::RawJournalRuntimeDirty
| StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier
| StorageError::FileDecodeHeaderCorrupted // should be caught earlier
Expand Down Expand Up @@ -1035,6 +1056,11 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
jtrace_reader!(DriverEventExpectedCloseGotClose);
// a driver closed event; we've checked integrity, but we must check the field values
let valid_meta = okay! {
/*
basically:
- if this is a new journal all these values are 0 (we're essentially reading the first event)
- otherwise, it is the last event offset
*/
self.last_txn_checksum == drv_close_event.last_checksum,
self.last_txn_id == drv_close_event.last_txn_id,
self.last_txn_offset == drv_close_event.last_offset,
Expand Down
49 changes: 47 additions & 2 deletions server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ mod recovery;

use {
super::{
CommitPreference, DriverEvent, DriverEventKind, JournalInitializer, RawJournalAdapter,
RawJournalAdapterEvent, RawJournalWriter,
create_journal, CommitPreference, DriverEvent, DriverEventKind, JournalInitializer,
RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter,
},
crate::engine::{
error::StorageError,
Expand Down Expand Up @@ -216,3 +216,48 @@ fn encode_decode_meta() {
let decoded1 = DriverEvent::decode(encoded1).unwrap();
assert_eq!(dv1, decoded1);
}

#[test]
fn first_triplet_sanity() {
// first driver event
{
assert_eq!(
super::debug_get_first_meta_triplet(),
None,
"failed for first driver event"
);
let mut jrnl = create_journal::<SimpleDBJournal>("first_triplet_sanity_drv_event").unwrap();
assert_eq!(
super::debug_get_first_meta_triplet(),
None,
"failed for first driver event"
);
RawJournalWriter::close_driver(&mut jrnl).unwrap();
assert_eq!(
super::debug_get_first_meta_triplet(),
Some((0, 0, 0)),
"failed for first driver event"
);
}
// first server event
{
assert_eq!(
super::debug_get_first_meta_triplet(),
None,
"failed for first server event"
);
let mut jrnl =
create_journal::<SimpleDBJournal>("first_triplet_sanity_server_event").unwrap();
assert_eq!(
super::debug_get_first_meta_triplet(),
None,
"failed for first server event"
);
SimpleDB::new().push(&mut jrnl, "hello").unwrap();
assert_eq!(
super::debug_get_first_meta_triplet(),
Some((0, 0, 0)),
"failed for first driver event"
);
}
}
121 changes: 120 additions & 1 deletion server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ use {
crate::{
engine::{
error::ErrorKind,
fractal,
storage::{
common::{
interface::fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt},
interface::{
fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt},
vfs_utils,
},
sdss::sdss_r1::FileSpecV1,
},
v2::raw::journal::{
Expand Down Expand Up @@ -1495,3 +1499,118 @@ fn midway_corruption_at_runtime() {
},
)
}

/*
rollback tests
*/

/// Steps:
/// 1. A new log is created
/// 2. Events and corruptions are introduced
/// 3. Rolled back
/// 4. Closed
/// 5. Re-opened
fn emulate_failure_for_rollback(
journal_id: &str,
action: impl Fn(&mut SimpleDB, &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<()>,
verify_error: impl Fn(fractal::error::Error),
post_rollback: impl Fn(&SimpleDB),
) {
{
let mut db = SimpleDB::new();
let mut jrnl = create_journal::<SimpleDBJournal>(journal_id).unwrap();
let err = action(&mut db, &mut jrnl).unwrap_err();
verify_error(err);
jrnl.__rollback().unwrap();
RawJournalWriter::close_driver(&mut jrnl).unwrap();
}
{
let db = SimpleDB::new();
let mut jrnl = open_journal::<SimpleDBJournal>(journal_id, &db, JournalSettings::default())
.expect(&format!("{:#?}", debug_get_trace()));
post_rollback(&db);
RawJournalWriter::close_driver(&mut jrnl).unwrap();
}
}

#[test]
fn rollback_write_zero_empty_log() {
emulate_failure_for_rollback(
"rollback_empty_log_write_zero",
|db, jrnl| {
vfs_utils::debug_enable_zero_write_crash();
let r = db.push(jrnl, "hello, world");
vfs_utils::debug_disable_write_crash();
r
},
|e| match e.kind() {
ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
unexpected => panic!("expected write zero, got {unexpected:?}"),
},
|db| assert_eq!(db.data().len(), 0),
);
}

#[test]
fn rollback_write_zero_nonempty_log() {
emulate_failure_for_rollback(
"rollback_write_zero_nonempty_log",
|db, jrnl| {
// commit a single "good" event
db.push(jrnl, "my good key")?;
vfs_utils::debug_enable_zero_write_crash();
let r = db.push(jrnl, "this won't go in");
vfs_utils::debug_disable_write_crash();
r
},
|e| match e.kind() {
ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
unexpected => panic!("expected write zero, got {unexpected:?}"),
},
|db| {
assert_eq!(db.data().len(), 1);
assert_eq!(db.data()[0], "my good key")
},
)
}

#[test]
fn rollback_random_write_failure_empty_log() {
emulate_failure_for_rollback(
"rollback_random_write_failure_empty_log",
|db, jrnl| {
vfs_utils::debug_enable_random_write_crash();
let r = db.push(jrnl, "hello, world");
vfs_utils::debug_disable_write_crash();
r
},
|e| match e.kind() {
ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
unexpected => panic!("expected write zero, got {unexpected:?}"),
},
|db| assert_eq!(db.data().len(), 0),
);
}

#[test]
fn rollback_random_write_failure_log() {
emulate_failure_for_rollback(
"rollback_random_write_failure_log",
|db, jrnl| {
// commit a single "good" event
db.push(jrnl, "my good key")?;
vfs_utils::debug_enable_random_write_crash();
let r = db.push(jrnl, "this won't go in");
vfs_utils::debug_disable_write_crash();
r
},
|e| match e.kind() {
ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
unexpected => panic!("expected write zero, got {unexpected:?}"),
},
|db| {
assert_eq!(db.data().len(), 1);
assert_eq!(db.data()[0], "my good key")
},
)
}
3 changes: 3 additions & 0 deletions server/src/util/os.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ impl SysIOError {
pub fn kind(&self) -> std::io::ErrorKind {
self.0.kind()
}
pub fn inner(&self) -> &std::io::Error {
&self.0
}
}

impl From<std::io::Error> for SysIOError {
Expand Down

0 comments on commit d7100da

Please sign in to comment.