Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions crates/ember-core/src/shard/aof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,64 @@ pub(super) fn write_aof_record(
aof_writer: &mut Option<AofWriter>,
fsync_policy: FsyncPolicy,
shard_id: u16,
aof_errors: &mut u32,
) {
if let Some(ref mut writer) = *aof_writer {
let mut ok = true;
if let Err(e) = writer.write_record(record) {
warn!(shard_id, "aof write failed: {e}");
log_aof_error(shard_id, aof_errors, "write", &e);
ok = false;
}
if fsync_policy == FsyncPolicy::Always {
if let Err(e) = writer.sync() {
warn!(shard_id, "aof sync failed: {e}");
log_aof_error(shard_id, aof_errors, "sync", &e);
ok = false;
}
}
if ok && *aof_errors > 0 {
let missed = *aof_errors;
*aof_errors = 0;
info!(shard_id, missed_errors = missed, "aof writes recovered");
}
}
}

/// Logs an AOF error with rate-limiting and severity awareness.
///
/// Disk-full (ENOSPC) errors are logged at `error!` level since they indicate
/// a condition that needs operator attention. Other I/O errors use `warn!`.
/// After the first failure, subsequent consecutive errors are suppressed —
/// only every 1000th error is logged to avoid flooding under sustained
/// disk-full conditions.
pub(super) fn log_aof_error(
shard_id: u16,
consecutive: &mut u32,
op: &str,
err: &ember_persistence::format::FormatError,
) {
*consecutive = consecutive.saturating_add(1);

// rate-limit: log first failure, then every 1000th
if *consecutive != 1 && !(*consecutive).is_multiple_of(1000) {
return;
}

// ENOSPC = 28 (Linux + macOS), EDQUOT = 122 (Linux) / 69 (macOS)
let is_disk_full = matches!(err, ember_persistence::format::FormatError::Io(ref io_err)
if matches!(io_err.raw_os_error(), Some(28) | Some(69) | Some(122)));

if is_disk_full {
error!(
shard_id,
consecutive_errors = *consecutive,
"aof {op} failed: disk full — writes continue without durability"
);
} else {
warn!(
shard_id,
consecutive_errors = *consecutive,
"aof {op} failed: {err}"
);
}
}

Expand Down
16 changes: 14 additions & 2 deletions crates/ember-core/src/shard/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ pub(super) fn handle_blocking_pop(
key: key.to_owned(),
}
};
aof::write_aof_record(&record, ctx.aof_writer, fsync_policy, shard_id);
aof::write_aof_record(
&record,
ctx.aof_writer,
fsync_policy,
shard_id,
ctx.aof_errors,
);
aof::broadcast_replication(
record,
ctx.replication_tx,
Expand Down Expand Up @@ -108,7 +114,13 @@ pub(super) fn wake_blocked_waiters(key: &str, ctx: &mut ProcessCtx<'_>) {
key: key.to_owned(),
}
};
aof::write_aof_record(&record, ctx.aof_writer, fsync_policy, shard_id);
aof::write_aof_record(
&record,
ctx.aof_writer,
fsync_policy,
shard_id,
ctx.aof_errors,
);
aof::broadcast_replication(
record,
ctx.replication_tx,
Expand Down
27 changes: 23 additions & 4 deletions crates/ember-core/src/shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use ember_persistence::recovery::{self, RecoveredValue};
use ember_persistence::snapshot::{self, SnapEntry, SnapValue, SnapshotWriter};
use smallvec::{smallvec, SmallVec};
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};

use crate::dropper::DropHandle;
use crate::error::ShardError;
Expand Down Expand Up @@ -915,6 +915,9 @@ async fn run_shard(
let mut lpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();
let mut rpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();

// consecutive AOF write/sync failure counter for rate-limited logging
let mut aof_errors: u32 = 0;

// -- tickers --
let mut expiry_tick = tokio::time::interval(EXPIRY_TICK);
expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -938,6 +941,7 @@ async fn run_shard(
replication_offset: &mut replication_offset,
lpop_waiters: &mut lpop_waiters,
rpop_waiters: &mut rpop_waiters,
aof_errors: &mut aof_errors,
#[cfg(feature = "protobuf")]
schema_registry: &schema_registry,
};
Expand All @@ -960,7 +964,11 @@ async fn run_shard(
_ = fsync_tick.tick(), if fsync_policy == FsyncPolicy::EverySec => {
if let Some(ref mut writer) = aof_writer {
if let Err(e) = writer.sync() {
warn!(shard_id, "periodic aof sync failed: {e}");
aof::log_aof_error(shard_id, &mut aof_errors, "sync", &e);
} else if aof_errors > 0 {
let missed = aof_errors;
aof_errors = 0;
info!(shard_id, missed_errors = missed, "aof sync recovered");
}
}
}
Expand Down Expand Up @@ -990,6 +998,9 @@ struct ProcessCtx<'a> {
lpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
/// Waiters for BRPOP — keyed by list name, FIFO order.
rpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
/// Consecutive AOF write/sync failures. Used to rate-limit error logging
/// so a sustained disk-full condition doesn't flood logs.
aof_errors: &'a mut u32,
#[cfg(feature = "protobuf")]
schema_registry: &'a Option<crate::schema::SharedSchemaRegistry>,
}
Expand Down Expand Up @@ -1086,16 +1097,24 @@ fn process_single(mut request: ShardRequest, reply: ReplySender, ctx: &mut Proce

// write AOF records for successful mutations
if let Some(ref mut writer) = *ctx.aof_writer {
let mut batch_ok = true;
for record in &records {
if let Err(e) = writer.write_record(record) {
warn!(shard_id, "aof write failed: {e}");
aof::log_aof_error(shard_id, ctx.aof_errors, "write", &e);
batch_ok = false;
}
}
if !records.is_empty() && fsync_policy == FsyncPolicy::Always {
if let Err(e) = writer.sync() {
warn!(shard_id, "aof sync failed: {e}");
aof::log_aof_error(shard_id, ctx.aof_errors, "sync", &e);
batch_ok = false;
}
}
if batch_ok && *ctx.aof_errors > 0 {
let missed = *ctx.aof_errors;
*ctx.aof_errors = 0;
info!(shard_id, missed_errors = missed, "aof writes recovered");
}
}

// broadcast mutation events to replication subscribers
Expand Down