Skip to content

Commit

Permalink
Merge pull request #340 from skytable/storage/online-recover
Browse files Browse the repository at this point in the history
storage: Online recovery
  • Loading branch information
ohsayan committed Apr 8, 2024
2 parents b961e84 + 852dcf2 commit e4dc0b4
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 118 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ All changes in this project will be noted in this file.

### Additions

- Skyhash/2.1: Restored support for pipelines
- Skyhash/2: Restored support for pipelines
- Enable online (runtime) recovery of transactional failures due to disk errors

## Version 0.8.1

Expand Down
1 change: 0 additions & 1 deletion server/src/engine/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ enumerate_err! {
runtime errors
----
*/
RawJournalRuntimeHeartbeatFail = "journal-lwt-heartbeat-failed",
RawJournalRuntimeDirty = "journal-in-dirty-state",
}
}
20 changes: 6 additions & 14 deletions server/src/engine/fractal/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub enum CriticalTask {
/// Write a new data batch
WriteBatch(ModelUniqueID, usize),
/// try recovering model ID
TryModelAutorecoverLWT(ModelUniqueID),
TryModelAutorecover(ModelUniqueID),
CheckGNSDriver,
}

Expand Down Expand Up @@ -323,13 +323,7 @@ impl FractalMgr {
match task {
CriticalTask::CheckGNSDriver => {
info!("trying to autorecover GNS driver");
match global
.state()
.gns_driver()
.txn_driver
.lock()
.__lwt_heartbeat()
{
match global.state().gns_driver().txn_driver.lock().__rollback() {
Ok(()) => {
info!("GNS driver has been successfully auto-recovered");
global.state().gns_driver().status().set_okay();
Expand All @@ -343,7 +337,7 @@ impl FractalMgr {
}
}
}
CriticalTask::TryModelAutorecoverLWT(mdl_id) => {
CriticalTask::TryModelAutorecover(mdl_id) => {
info!("trying to autorecover model {mdl_id}");
match global
.state()
Expand All @@ -355,7 +349,7 @@ impl FractalMgr {
Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => {
let mut drv = mdl.driver().batch_driver().lock();
let drv = drv.as_mut().unwrap();
match drv.__lwt_heartbeat() {
match drv.__rollback() {
Ok(()) => {
mdl.driver().status().set_okay();
global.health().report_recovery();
Expand All @@ -364,7 +358,7 @@ impl FractalMgr {
Err(e) => {
error!("failed to autorecover {mdl_id} with {e}. will try again");
self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT(mdl_id)))
.send(Task::new(CriticalTask::TryModelAutorecover(mdl_id)))
.unwrap()
}
}
Expand Down Expand Up @@ -548,9 +542,7 @@ impl FractalMgr {
.map_err(|e| {
mdl_driver_.status().set_iffy();
self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT(
mdl_id.into(),
)))
.send(Task::new(CriticalTask::TryModelAutorecover(mdl_id.into())))
.unwrap();
(e, BatchStats::into_inner(batch_stats))
})
Expand Down
2 changes: 1 addition & 1 deletion server/src/engine/fractal/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl GlobalInstanceLike for TestGlobal {
.commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new())
.unwrap()
}
CriticalTask::TryModelAutorecoverLWT(_) => {}
CriticalTask::TryModelAutorecover(_) => {}
CriticalTask::CheckGNSDriver => {}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/engine/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ macro_rules! local {
}

macro_rules! local_mut {
($ident:ident, $call:expr) => {{
($ident:expr, $call:expr) => {{
#[inline(always)]
fn _f<T, U>(v: &::std::cell::RefCell<T>, f: impl FnOnce(&mut T) -> U) -> U {
f(&mut *v.borrow_mut())
Expand Down
1 change: 1 addition & 0 deletions server/src/engine/storage/common/interface/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub trait FileWrite {
)
}
Ok(n) => written += n,
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return (written, Err(e)),
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/engine/storage/common/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@
pub mod fs;
#[cfg(test)]
mod vfs;
#[cfg(test)]
pub use vfs::vfs_utils;
69 changes: 63 additions & 6 deletions server/src/engine/storage/common/interface/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/

use {
crate::{engine::sync::cell::Lazy, IoResult},
crate::{engine::sync::cell::Lazy, util::test_utils, IoResult},
parking_lot::RwLock,
std::{
collections::{
Expand All @@ -41,6 +41,34 @@ use {
---
*/

pub mod vfs_utils {
#[derive(Debug, PartialEq, Clone, Copy)]
pub(super) enum WriteCrashKind {
None,
Zero,
Random,
}
local!(
static RANDOM_WRITE_CRASH: WriteCrashKind = WriteCrashKind::None;
pub(super) static RNG: Option<rand::rngs::ThreadRng> = None;
);
/// WARNING: A random write crash automatically degrades to a [`WriteCrashKind::Zero`] as soon as it completes
/// to prevent any further data writes (due to retries in
/// [`fs::FileWrite::fwrite_all_count`](super::super::fs::FileWrite::fwrite_all_count))
pub fn debug_enable_random_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Random)
}
pub fn debug_enable_zero_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Zero)
}
pub fn debug_disable_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::None)
}
pub(super) fn debug_write_crash_setting() -> WriteCrashKind {
local_ref!(RANDOM_WRITE_CRASH, |crash| *crash)
}
}

/*
definitions
---
Expand Down Expand Up @@ -167,12 +195,41 @@ impl VFile {
if !self.write {
return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into());
}
if self.pos + bytes.len() > self.data.len() {
self.data.resize(self.pos + bytes.len(), 0);
match vfs_utils::debug_write_crash_setting() {
vfs_utils::WriteCrashKind::None => {
if self.pos + bytes.len() > self.data.len() {
self.data.resize(self.pos + bytes.len(), 0);
}
self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes);
self.pos += bytes.len();
Ok(bytes.len() as _)
}
vfs_utils::WriteCrashKind::Random => {
let actual_write_length = local_mut!(vfs_utils::RNG, |rng| {
match rng {
Some(ref mut rng) => test_utils::random_number(0, bytes.len(), rng),
None => {
let mut rng_ = rand::thread_rng();
let r = test_utils::random_number(0, bytes.len(), &mut rng_);
*rng = Some(rng_);
r
}
}
});
// write some random part of the buffer into this file
if self.pos + actual_write_length > self.data.len() {
self.data.resize(self.pos + actual_write_length, 0);
}
self.data[self.pos..self.pos + actual_write_length]
.copy_from_slice(&bytes[..actual_write_length]);
self.pos += actual_write_length;
// now soon as this is complete, downgrade error type to writezero so that we don't write any further data during
// a retry
vfs_utils::debug_enable_zero_write_crash();
Ok(actual_write_length as _)
}
vfs_utils::WriteCrashKind::Zero => Ok(0),
}
self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes);
self.pos += bytes.len();
Ok(bytes.len() as _)
}
}

Expand Down
22 changes: 20 additions & 2 deletions server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ pub struct TrackedWriter<
S: FileSpecV1,
const SIZE: usize = 8192,
const PANIC_IF_UNFLUSHED: bool = true,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = true,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = false,
> {
f_d: File,
f_md: S::Metadata,
Expand Down Expand Up @@ -417,6 +417,12 @@ impl<
pub fn current_checksum(&self) -> u64 {
self.t_checksum.clone().finish()
}
pub fn inner_mut(&mut self, f: impl Fn(&mut File) -> IoResult<u64>) -> IoResult<()> {
let file = &mut self.f_d;
let new_cursor = f(file)?;
self.t_cursor = new_cursor;
Ok(())
}
}

impl<
Expand Down Expand Up @@ -491,7 +497,13 @@ impl<
return Ok(());
}
self.flush_buf()?;
// write whatever capacity exceeds the buffer size
/*
write whatever capacity exceeds the buffer size
[a,b,c,d,e,f]
problem: but we can only hold two items
so write to disk: [a,b]
store in memory: [c,d,e,f]
*/
let to_write_cnt = buf.len().saturating_sub(SIZE);
match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) {
(cnt, r) => {
Expand Down Expand Up @@ -538,6 +550,12 @@ impl<
pub fn fsync(&mut self) -> IoResult<()> {
self.f_d.fsync_all()
}
/// Empty the write buffer
///
/// DANGER: This means that whatever data was in the buffer will be immediately discarded
pub unsafe fn drain_buffer(&mut self) {
self.buf.clear()
}
}

impl<
Expand Down
61 changes: 48 additions & 13 deletions server/src/engine/storage/v2/raw/journal/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use {
mem::unsafe_apis::memcpy,
storage::common::{
checksum::SCrc64,
interface::fs::FileWriteExt,
sdss::sdss_r1::{
rw::{SdssFile, TrackedReader, TrackedWriter},
FileSpecV1,
Expand Down Expand Up @@ -162,6 +163,11 @@ pub fn debug_set_offset_tracking(track: bool) {
local_mut!(TRACE_OFFSETS, |track_| *track_ = track)
}

#[cfg(test)]
pub fn debug_get_first_meta_triplet() -> Option<(u64, u64, u64)> {
local_mut!(FIRST_TRIPLET, |tr| core::mem::take(tr))
}

#[derive(Debug, PartialEq)]
#[cfg(test)]
pub enum JournalTraceEvent {
Expand Down Expand Up @@ -229,6 +235,7 @@ local! {
static TRACE: Vec<JournalTraceEvent> = Vec::new();
static OFFSETS: std::collections::BTreeMap<u64, u64> = Default::default();
static TRACE_OFFSETS: bool = false;
static FIRST_TRIPLET: Option<(u64, u64, u64)> = None;
}

macro_rules! jtrace_event_offset {
Expand Down Expand Up @@ -519,7 +526,9 @@ pub(super) enum DriverEventKind {
Journal writer implementation
---
Quick notes:
- This is a low level writer and only handles driver events. Higher level impls must account for
- This is a low level writer and only handles driver events
- Checksum verification is only performed for meta events
- Implementors must handle checksums themselves
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*/
Expand Down Expand Up @@ -622,18 +631,30 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
{
self.commit_with_ctx(event, Default::default())
}
/// WARNING: ONLY CALL AFTER A FAILURE EVENT. THIS WILL EMPTY THE UNFLUSHED BUFFER
pub fn __lwt_heartbeat(&mut self) -> RuntimeResult<()> {
// verify that the on disk cursor is the same as what we know
/// roll back to the last txn
/// WARNING: only call on failure
///
/// NB: Idempotency is guaranteed. Will rollback to, and only to the last event
pub fn __rollback(&mut self) -> RuntimeResult<()> {
// ensure cursors are in sync, even if out of position
self.log_file.verify_cursor()?;
if self.log_file.cursor() == self.known_txn_offset {
// great, so if there was something in the buffer, simply ignore it
self.log_file.__zero_buffer();
Ok(())
} else {
// so, the on-disk file probably has some partial state. this is bad. throw an error
Err(StorageError::RawJournalRuntimeHeartbeatFail.into())
// reverse
self.log_file.inner_mut(|file| {
let new_offset = if self.txn_id == 0 {
debug_assert_eq!(self.known_txn_offset, 0);
<<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64
} else {
self.known_txn_offset
};
file.f_truncate(new_offset)?;
Ok(new_offset)
})?;
// reverse successful, now empty write buffer
unsafe {
// UNSAFE(@ohsayan): since the log has been reversed, whatever we failed to write should simply be ignored
self.log_file.drain_buffer();
}
Ok(())
}
}

Expand All @@ -642,13 +663,23 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
&mut self,
f: impl FnOnce(&mut Self, u128) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
#[cfg(test)]
if local_ref!(FIRST_TRIPLET, |tr| { tr.is_none() }) {
local_mut!(FIRST_TRIPLET, |tr| {
*tr = Some((
self.known_txn_id,
self.known_txn_offset,
self.log_file.current_checksum(),
));
})
}
let id = self.txn_id;
self.txn_id += 1;
let ret = f(self, id as u128);
if ret.is_ok() {
jtrace_event_offset!(id, self.log_file.cursor());
self.known_txn_id = id;
self.known_txn_offset = self.log_file.cursor();
self.txn_id += 1;
}
ret
}
Expand Down Expand Up @@ -859,7 +890,6 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
},
ErrorKind::Storage(e) => match e {
// unreachable errors (no execution path here)
StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start
| StorageError::RawJournalRuntimeDirty
| StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier
| StorageError::FileDecodeHeaderCorrupted // should be caught earlier
Expand Down Expand Up @@ -1030,6 +1060,11 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
jtrace_reader!(DriverEventExpectedCloseGotClose);
// a driver closed event; we've checked integrity, but we must check the field values
let valid_meta = okay! {
/*
basically:
- if this is a new journal all these values are 0 (we're essentially reading the first event)
- otherwise, it is the last event offset
*/
self.last_txn_checksum == drv_close_event.last_checksum,
self.last_txn_id == drv_close_event.last_txn_id,
self.last_txn_offset == drv_close_event.last_offset,
Expand Down
Loading

0 comments on commit e4dc0b4

Please sign in to comment.