Skip to content

Commit

Permalink
storage: Fix recovery algorithms and add test for repair
Browse files Browse the repository at this point in the history
  • Loading branch information
ohsayan committed Mar 26, 2024
1 parent 26d8335 commit b9c97c5
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 187 deletions.
36 changes: 27 additions & 9 deletions server/src/engine/storage/v2/raw/journal/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ where
RawJournalWriter::new(initializer, file)
}

#[derive(Debug)]
#[derive(Debug, PartialEq)]
/// The result of a journal repair operation
pub enum RepairResult {
/// No errors were detected
Expand Down Expand Up @@ -215,8 +215,8 @@ pub(super) enum JournalWriterTraceEvent {
DriverClosed,
}

#[cfg(test)]
local! {
#[cfg(test)]
static TRACE: Vec<JournalTraceEvent> = Vec::new();
}

Expand Down Expand Up @@ -746,10 +746,9 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
);
return Ok(initializer);
}
Ok(false) => {}
Ok(false) => self.state = JournalState::AwaitingEvent,
Err(e) => return Err(e),
}
self.state = JournalState::AwaitingEvent;
}
}
fn new(
Expand Down Expand Up @@ -807,7 +806,13 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
e: Error,
repair_mode: JournalRepairMode,
) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile<J::Spec>)> {
let lost = self.tr.cached_size() - self.tr.cursor();
let lost = if self.last_txn_offset == 0 {
// we haven't scanned any events and already hit an error
// so essentially, we lost the entire log
self.tr.cached_size() - <J::Spec as FileSpecV1>::SIZE as u64
} else {
self.tr.cached_size() - self.last_txn_offset
};
let mut repair_result = RepairResult::LostBytes(lost);
match repair_mode {
JournalRepairMode::Simple => {}
Expand Down Expand Up @@ -867,9 +872,13 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
*/
l!(let known_event_id, known_event_offset, known_event_checksum = self.last_txn_id, self.last_txn_offset, self.last_txn_checksum);
let mut last_logged_checksum = self.tr.checksum();
let was_eof = self.tr.is_eof();
let mut base_log = self.tr.into_inner();
base_log.truncate(known_event_offset)?;
if known_event_offset == 0 {
// no event, so just trim upto header
base_log.truncate(<J::Spec as FileSpecV1>::SIZE as _)?;
} else {
base_log.truncate(known_event_offset)?;
}
/*
see what needs to be done next
*/
Expand All @@ -882,13 +891,22 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
the log is in a dirty state that can only be resolved by closing it
*/
let drv_close = DriverEvent::new(
(known_event_id + 1) as u128,
if known_event_offset == 0 {
// no event occurred
0
} else {
// something happened prior to this, so we'll use an incremented ID for this event
known_event_id + 1
} as u128,
DriverEventKind::Closed,
known_event_checksum,
known_event_offset,
known_event_id,
);
if matches!(self.state, JournalState::AwaitingClose) & was_eof {
if {
(self.state == JournalState::AwaitingClose) | // expecting a close but we couldn't parse it
(self.state == JournalState::AwaitingEvent) // we were awaiting an event but we couldn't get enough metadata to do anything
} {
// we reached eof and we were expecting a close. definitely lost an unspecified number of bytes
repair_result = RepairResult::UnspecifiedLoss(lost);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Created on Tue Jan 30 2024
* Created on Tue Mar 26 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
Expand All @@ -26,188 +26,28 @@

use {
super::{
create_journal, open_journal, CommitPreference, DriverEvent, DriverEventKind,
JournalInitializer, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter,
},
crate::engine::{
error::StorageError,
fractal::error::ErrorContext,
storage::{
common::sdss::sdss_r1::rw::TrackedReader,
v2::raw::{
journal::raw::{JournalReaderTraceEvent, JournalSettings, JournalWriterTraceEvent},
spec::SystemDatabaseV1,
},
super::{
create_journal, obtain_trace, open_journal, DriverEventKind, JournalReaderTraceEvent,
JournalSettings, JournalWriterTraceEvent, RawJournalWriter,
},
RuntimeResult,
SimpleDB, SimpleDBJournal,
},
std::cell::RefCell,
crate::engine::fractal::error::ErrorContext,
};

#[test]
fn encode_decode_meta() {
let dv1 = DriverEvent::new(u128::MAX - 1, DriverEventKind::Reopened, 0, 0, 0);
let encoded1 = dv1.encode_self();
let decoded1 = DriverEvent::decode(encoded1).unwrap();
assert_eq!(dv1, decoded1);
}

/*
impls for journal tests
*/

#[derive(Debug, Clone, PartialEq)]
pub struct SimpleDB {
data: RefCell<Vec<String>>,
}
impl SimpleDB {
fn new() -> Self {
Self {
data: RefCell::default(),
}
}
fn data(&self) -> std::cell::Ref<'_, Vec<String>> {
self.data.borrow()
}
fn clear(&mut self, log: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<()> {
log.commit_event(DbEventClear)?;
self.data.get_mut().clear();
Ok(())
}
fn pop(&mut self, log: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<()> {
self.data.get_mut().pop().unwrap();
log.commit_event(DbEventPop)?;
Ok(())
}
fn push(
&mut self,
log: &mut RawJournalWriter<SimpleDBJournal>,
new: impl ToString,
) -> RuntimeResult<()> {
let new = new.to_string();
log.commit_event(DbEventPush(&new))?;
self.data.get_mut().push(new);
Ok(())
}
}

/*
event impls
*/

pub struct SimpleDBJournal;
struct DbEventPush<'a>(&'a str);
struct DbEventPop;
struct DbEventClear;
trait SimpleDBEvent: Sized {
const OPC: u8;
fn write_buffered(self, _: &mut Vec<u8>);
}
macro_rules! impl_db_event {
($($ty:ty as $code:expr $(=> $expr:expr)?),*) => {
$(impl SimpleDBEvent for $ty {
const OPC: u8 = $code;
fn write_buffered(self, buf: &mut Vec<u8>) { let _ = buf; fn _do_it(s: $ty, b: &mut Vec<u8>, f: impl Fn($ty, &mut Vec<u8>)) { f(s, b) } $(_do_it(self, buf, $expr))? }
})*
}
}

impl_db_event!(
DbEventPush<'_> as 0 => |me, buf| {
buf.extend(&(me.0.len() as u64).to_le_bytes());
buf.extend(me.0.as_bytes());
},
DbEventPop as 1,
DbEventClear as 2
);

impl<T: SimpleDBEvent> RawJournalAdapterEvent<SimpleDBJournal> for T {
fn md(&self) -> u64 {
T::OPC as _
}
fn write_buffered(self, buf: &mut Vec<u8>, _: ()) {
T::write_buffered(self, buf)
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
pub enum EventMeta {
NewKey,
Pop,
Clear,
}
impl RawJournalAdapter for SimpleDBJournal {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered;
type Spec = SystemDatabaseV1;
type GlobalState = SimpleDB;
type EventMeta = EventMeta;
type CommitContext = ();
type Context<'a> = () where Self: 'a;
fn initialize(_: &JournalInitializer) -> Self {
Self
}
fn enter_context<'a>(_: &'a mut RawJournalWriter<Self>) -> Self::Context<'a> {
()
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
Some(match meta {
0 => EventMeta::NewKey,
1 => EventMeta::Pop,
2 => EventMeta::Clear,
_ => return None,
})
}
fn commit_buffered<'a, E: RawJournalAdapterEvent<Self>>(
&mut self,
buf: &mut Vec<u8>,
event: E,
ctx: (),
) {
event.write_buffered(buf, ctx)
}
fn decode_apply<'a>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
file: &mut TrackedReader<Self::Spec>,
) -> RuntimeResult<()> {
match meta {
EventMeta::NewKey => {
let key_size = u64::from_le_bytes(file.read_block()?);
let mut keybuf = vec![0u8; key_size as usize];
file.tracked_read(&mut keybuf)?;
match String::from_utf8(keybuf) {
Ok(k) => gs.data.borrow_mut().push(k),
Err(_) => {
return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into())
}
}
}
EventMeta::Clear => gs.data.borrow_mut().clear(),
EventMeta::Pop => {
let _ = gs.data.borrow_mut().pop().unwrap();
}
}
Ok(())
}
}

/*
journal tests
*/

#[test]
fn journal_open_close() {
const JOURNAL_NAME: &str = "journal_open_close";
{
// new boot
let mut j = create_journal::<SimpleDBJournal>(JOURNAL_NAME).unwrap();
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![JournalWriterTraceEvent::Initialized]
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
Expand All @@ -228,7 +68,7 @@ fn journal_open_close() {
)
.unwrap();
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
// init reader and read close event
JournalReaderTraceEvent::Initialized,
Expand All @@ -251,7 +91,7 @@ fn journal_open_close() {
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
Expand All @@ -272,7 +112,7 @@ fn journal_open_close() {
)
.unwrap();
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
// init reader and read reopen event
JournalReaderTraceEvent::Initialized,
Expand Down Expand Up @@ -304,7 +144,7 @@ fn journal_open_close() {
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
Expand All @@ -328,7 +168,7 @@ fn journal_with_server_single_event() {
db.push(&mut j, "hello world").unwrap();
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
JournalWriterTraceEvent::Initialized,
JournalWriterTraceEvent::CommitAttemptForEvent(0),
Expand All @@ -349,12 +189,12 @@ fn journal_with_server_single_event() {
let db = SimpleDB::new();
// second boot
let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &db, JournalSettings::default())
.set_dmsg_fn(|| format!("{:?}", super::obtain_trace()))
.set_dmsg_fn(|| format!("{:?}", obtain_trace()))
.unwrap();
assert_eq!(db.data().len(), 1);
assert_eq!(db.data()[0], "hello world");
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
// init reader and read server event
JournalReaderTraceEvent::Initialized,
Expand Down Expand Up @@ -382,7 +222,7 @@ fn journal_with_server_single_event() {
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
Expand All @@ -402,7 +242,7 @@ fn journal_with_server_single_event() {
assert_eq!(db.data().len(), 1);
assert_eq!(db.data()[0], "hello world");
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
// init reader and read server event
JournalReaderTraceEvent::Initialized,
Expand Down Expand Up @@ -440,7 +280,7 @@ fn journal_with_server_single_event() {
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
Expand Down
Loading

0 comments on commit b9c97c5

Please sign in to comment.