Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Online recovery #340

Merged
merged 4 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ All changes in this project will be noted in this file.

### Additions

- Skyhash/2.1: Restored support for pipelines
- Skyhash/2: Restored support for pipelines
- Enable online (runtime) recovery of transactional failures due to disk errors

## Version 0.8.1

Expand Down
1 change: 0 additions & 1 deletion server/src/engine/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ enumerate_err! {
runtime errors
----
*/
RawJournalRuntimeHeartbeatFail = "journal-lwt-heartbeat-failed",
RawJournalRuntimeDirty = "journal-in-dirty-state",
}
}
20 changes: 6 additions & 14 deletions server/src/engine/fractal/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub enum CriticalTask {
/// Write a new data batch
WriteBatch(ModelUniqueID, usize),
/// try recovering model ID
TryModelAutorecoverLWT(ModelUniqueID),
TryModelAutorecover(ModelUniqueID),
CheckGNSDriver,
}

Expand Down Expand Up @@ -323,13 +323,7 @@ impl FractalMgr {
match task {
CriticalTask::CheckGNSDriver => {
info!("trying to autorecover GNS driver");
match global
.state()
.gns_driver()
.txn_driver
.lock()
.__lwt_heartbeat()
{
match global.state().gns_driver().txn_driver.lock().__rollback() {
Ok(()) => {
info!("GNS driver has been successfully auto-recovered");
global.state().gns_driver().status().set_okay();
Expand All @@ -343,7 +337,7 @@ impl FractalMgr {
}
}
}
CriticalTask::TryModelAutorecoverLWT(mdl_id) => {
CriticalTask::TryModelAutorecover(mdl_id) => {
info!("trying to autorecover model {mdl_id}");
match global
.state()
Expand All @@ -355,7 +349,7 @@ impl FractalMgr {
Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => {
let mut drv = mdl.driver().batch_driver().lock();
let drv = drv.as_mut().unwrap();
match drv.__lwt_heartbeat() {
match drv.__rollback() {
Ok(()) => {
mdl.driver().status().set_okay();
global.health().report_recovery();
Expand All @@ -364,7 +358,7 @@ impl FractalMgr {
Err(e) => {
error!("failed to autorecover {mdl_id} with {e}. will try again");
self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT(mdl_id)))
.send(Task::new(CriticalTask::TryModelAutorecover(mdl_id)))
.unwrap()
}
}
Expand Down Expand Up @@ -548,9 +542,7 @@ impl FractalMgr {
.map_err(|e| {
mdl_driver_.status().set_iffy();
self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT(
mdl_id.into(),
)))
.send(Task::new(CriticalTask::TryModelAutorecover(mdl_id.into())))
.unwrap();
(e, BatchStats::into_inner(batch_stats))
})
Expand Down
2 changes: 1 addition & 1 deletion server/src/engine/fractal/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl GlobalInstanceLike for TestGlobal {
.commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new())
.unwrap()
}
CriticalTask::TryModelAutorecoverLWT(_) => {}
CriticalTask::TryModelAutorecover(_) => {}
CriticalTask::CheckGNSDriver => {}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/engine/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ macro_rules! local {
}

macro_rules! local_mut {
($ident:ident, $call:expr) => {{
($ident:expr, $call:expr) => {{
#[inline(always)]
fn _f<T, U>(v: &::std::cell::RefCell<T>, f: impl FnOnce(&mut T) -> U) -> U {
f(&mut *v.borrow_mut())
Expand Down
1 change: 1 addition & 0 deletions server/src/engine/storage/common/interface/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub trait FileWrite {
)
}
Ok(n) => written += n,
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return (written, Err(e)),
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/engine/storage/common/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@
pub mod fs;
#[cfg(test)]
mod vfs;
#[cfg(test)]
pub use vfs::vfs_utils;
69 changes: 63 additions & 6 deletions server/src/engine/storage/common/interface/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/

use {
crate::{engine::sync::cell::Lazy, IoResult},
crate::{engine::sync::cell::Lazy, util::test_utils, IoResult},
parking_lot::RwLock,
std::{
collections::{
Expand All @@ -41,6 +41,34 @@ use {
---
*/

pub mod vfs_utils {
#[derive(Debug, PartialEq, Clone, Copy)]
pub(super) enum WriteCrashKind {
None,
Zero,
Random,
}
local!(
static RANDOM_WRITE_CRASH: WriteCrashKind = WriteCrashKind::None;
pub(super) static RNG: Option<rand::rngs::ThreadRng> = None;
);
/// WARNING: A random write crash automatically degrades to a [`WriteCrashKind::Zero`] as soon as it completes
/// to prevent any further data writes (due to retries in
/// [`fs::FileWrite::fwrite_all_count`](super::super::fs::FileWrite::fwrite_all_count))
pub fn debug_enable_random_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Random)
}
pub fn debug_enable_zero_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Zero)
}
pub fn debug_disable_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::None)
}
pub(super) fn debug_write_crash_setting() -> WriteCrashKind {
local_ref!(RANDOM_WRITE_CRASH, |crash| *crash)
}
}

/*
definitions
---
Expand Down Expand Up @@ -167,12 +195,41 @@ impl VFile {
if !self.write {
return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into());
}
if self.pos + bytes.len() > self.data.len() {
self.data.resize(self.pos + bytes.len(), 0);
match vfs_utils::debug_write_crash_setting() {
vfs_utils::WriteCrashKind::None => {
if self.pos + bytes.len() > self.data.len() {
self.data.resize(self.pos + bytes.len(), 0);
}
self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes);
self.pos += bytes.len();
Ok(bytes.len() as _)
}
vfs_utils::WriteCrashKind::Random => {
let actual_write_length = local_mut!(vfs_utils::RNG, |rng| {
match rng {
Some(ref mut rng) => test_utils::random_number(0, bytes.len(), rng),
None => {
let mut rng_ = rand::thread_rng();
let r = test_utils::random_number(0, bytes.len(), &mut rng_);
*rng = Some(rng_);
r
}
}
});
// write some random part of the buffer into this file
if self.pos + actual_write_length > self.data.len() {
self.data.resize(self.pos + actual_write_length, 0);
}
self.data[self.pos..self.pos + actual_write_length]
.copy_from_slice(&bytes[..actual_write_length]);
self.pos += actual_write_length;
// now soon as this is complete, downgrade error type to writezero so that we don't write any further data during
// a retry
vfs_utils::debug_enable_zero_write_crash();
Ok(actual_write_length as _)
}
vfs_utils::WriteCrashKind::Zero => Ok(0),
}
self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes);
self.pos += bytes.len();
Ok(bytes.len() as _)
}
}

Expand Down
22 changes: 20 additions & 2 deletions server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ pub struct TrackedWriter<
S: FileSpecV1,
const SIZE: usize = 8192,
const PANIC_IF_UNFLUSHED: bool = true,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = true,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = false,
> {
f_d: File,
f_md: S::Metadata,
Expand Down Expand Up @@ -417,6 +417,12 @@ impl<
pub fn current_checksum(&self) -> u64 {
self.t_checksum.clone().finish()
}
pub fn inner_mut(&mut self, f: impl Fn(&mut File) -> IoResult<u64>) -> IoResult<()> {
let file = &mut self.f_d;
let new_cursor = f(file)?;
self.t_cursor = new_cursor;
Ok(())
}
}

impl<
Expand Down Expand Up @@ -491,7 +497,13 @@ impl<
return Ok(());
}
self.flush_buf()?;
// write whatever capacity exceeds the buffer size
/*
write whatever capacity exceeds the buffer size
[a,b,c,d,e,f]
problem: but we can only hold two items
so write to disk: [a,b]
store in memory: [c,d,e,f]
*/
let to_write_cnt = buf.len().saturating_sub(SIZE);
match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) {
(cnt, r) => {
Expand Down Expand Up @@ -538,6 +550,12 @@ impl<
pub fn fsync(&mut self) -> IoResult<()> {
self.f_d.fsync_all()
}
/// Empty the write buffer
///
/// DANGER: This means that whatever data was in the buffer will be immediately discarded
pub unsafe fn drain_buffer(&mut self) {
self.buf.clear()
}
}

impl<
Expand Down
61 changes: 48 additions & 13 deletions server/src/engine/storage/v2/raw/journal/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use {
mem::unsafe_apis::memcpy,
storage::common::{
checksum::SCrc64,
interface::fs::FileWriteExt,
sdss::sdss_r1::{
rw::{SdssFile, TrackedReader, TrackedWriter},
FileSpecV1,
Expand Down Expand Up @@ -162,6 +163,11 @@ pub fn debug_set_offset_tracking(track: bool) {
local_mut!(TRACE_OFFSETS, |track_| *track_ = track)
}

#[cfg(test)]
pub fn debug_get_first_meta_triplet() -> Option<(u64, u64, u64)> {
local_mut!(FIRST_TRIPLET, |tr| core::mem::take(tr))
}

#[derive(Debug, PartialEq)]
#[cfg(test)]
pub enum JournalTraceEvent {
Expand Down Expand Up @@ -229,6 +235,7 @@ local! {
static TRACE: Vec<JournalTraceEvent> = Vec::new();
static OFFSETS: std::collections::BTreeMap<u64, u64> = Default::default();
static TRACE_OFFSETS: bool = false;
static FIRST_TRIPLET: Option<(u64, u64, u64)> = None;
}

macro_rules! jtrace_event_offset {
Expand Down Expand Up @@ -519,7 +526,9 @@ pub(super) enum DriverEventKind {
Journal writer implementation
---
Quick notes:
- This is a low level writer and only handles driver events. Higher level impls must account for
- This is a low level writer and only handles driver events
- Checksum verification is only performed for meta events
- Implementors must handle checksums themselves

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*/
Expand Down Expand Up @@ -622,18 +631,30 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
{
self.commit_with_ctx(event, Default::default())
}
/// WARNING: ONLY CALL AFTER A FAILURE EVENT. THIS WILL EMPTY THE UNFLUSHED BUFFER
pub fn __lwt_heartbeat(&mut self) -> RuntimeResult<()> {
// verify that the on disk cursor is the same as what we know
/// roll back to the last txn
/// WARNING: only call on failure
///
/// NB: Idempotency is guaranteed. Will rollback to, and only to the last event
pub fn __rollback(&mut self) -> RuntimeResult<()> {
// ensure cursors are in sync, even if out of position
self.log_file.verify_cursor()?;
if self.log_file.cursor() == self.known_txn_offset {
// great, so if there was something in the buffer, simply ignore it
self.log_file.__zero_buffer();
Ok(())
} else {
// so, the on-disk file probably has some partial state. this is bad. throw an error
Err(StorageError::RawJournalRuntimeHeartbeatFail.into())
// reverse
self.log_file.inner_mut(|file| {
let new_offset = if self.txn_id == 0 {
debug_assert_eq!(self.known_txn_offset, 0);
<<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64
} else {
self.known_txn_offset
};
file.f_truncate(new_offset)?;
Ok(new_offset)
})?;
// reverse successful, now empty write buffer
unsafe {
// UNSAFE(@ohsayan): since the log has been reversed, whatever we failed to write should simply be ignored
self.log_file.drain_buffer();
}
Ok(())
}
}

Expand All @@ -642,13 +663,23 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
&mut self,
f: impl FnOnce(&mut Self, u128) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
#[cfg(test)]
if local_ref!(FIRST_TRIPLET, |tr| { tr.is_none() }) {
local_mut!(FIRST_TRIPLET, |tr| {
*tr = Some((
self.known_txn_id,
self.known_txn_offset,
self.log_file.current_checksum(),
));
})
}
let id = self.txn_id;
self.txn_id += 1;
let ret = f(self, id as u128);
if ret.is_ok() {
jtrace_event_offset!(id, self.log_file.cursor());
self.known_txn_id = id;
self.known_txn_offset = self.log_file.cursor();
self.txn_id += 1;
}
ret
}
Expand Down Expand Up @@ -859,7 +890,6 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
},
ErrorKind::Storage(e) => match e {
// unreachable errors (no execution path here)
StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start
| StorageError::RawJournalRuntimeDirty
| StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier
| StorageError::FileDecodeHeaderCorrupted // should be caught earlier
Expand Down Expand Up @@ -1030,6 +1060,11 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
jtrace_reader!(DriverEventExpectedCloseGotClose);
// a driver closed event; we've checked integrity, but we must check the field values
let valid_meta = okay! {
/*
basically:
- if this is a new journal all these values are 0 (we're essentially reading the first event)
- otherwise, it is the last event offset
*/
self.last_txn_checksum == drv_close_event.last_checksum,
self.last_txn_id == drv_close_event.last_txn_id,
self.last_txn_offset == drv_close_event.last_offset,
Expand Down
Loading
Loading