Skip to content

Commit

Permalink
replicator: Persist last replicator error for user
Browse files Browse the repository at this point in the history
Adds a `PersistentStats::last_replicator_error` which is updated when
the replicator hits an potentially recoverable error that causes it to
exit the main loop. These errors are propagated by splitting up the
`ReplicatorMessage::Error` variant into an `UnrecoverableError` and
`RecoverableError` variant.

If an error is logged, it will be show in `SHOW READYSET STATUS`. The
error is cleared after we enter the main replication loop as it
primarily exists to help users debug issues starting replication.

Release-Note-Core: Displays the last error encountered by the
  replicator in `SHOW READYSET STATUS` if the error prevents the
  replicator from entering the main replication loop. The error is
  cleared once replication is able to begin.
Change-Id: Iff0c77b37ba3ddba24811b495e6fa7d27a033d96
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5811
Tested-by: Buildkite CI
Reviewed-by: Ethan Donowitz <ethan@readyset.io>
  • Loading branch information
Dan Wilbanks committed Aug 29, 2023
1 parent f993f46 commit 4bb7a17
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 23 deletions.
9 changes: 6 additions & 3 deletions readyset-adapter/src/backend/noria_connector.rs
Expand Up @@ -967,17 +967,20 @@ impl NoriaConnector {

if let Ok(Some(stats)) = authority.persistent_stats().await {
status.push((
"Last Started Controller".to_string(),
"Last started Controller".to_string(),
val_or_null(stats.last_controller_startup),
));
status.push((
"Last Completed Snapshot".to_string(),
"Last completed snapshot".to_string(),
val_or_null(stats.last_completed_snapshot),
));
status.push((
"Last Started Replication".to_string(),
"Last started replication".to_string(),
val_or_null(stats.last_started_replication),
));
if let Some(err) = stats.last_replicator_error {
status.push(("Last replicator error".to_string(), err))
}
}

status.append(&mut additional_meta);
Expand Down
4 changes: 4 additions & 0 deletions readyset-client/src/debug/stats.rs
Expand Up @@ -53,6 +53,10 @@ pub struct PersistentStats {
pub last_completed_snapshot: Option<u64>,
/// Time in millis when we last started the main replication loop.
pub last_started_replication: Option<u64>,
/// Last error reported by the replicator that caused it to restart. This message is cleared
/// when we enter the main replication loop, because it is primarily intended to help debug
/// issues with starting replication and holding onto errors forever can be confusing.
pub last_replicator_error: Option<String>,
}

/// Statistics about the Soup data-flow.
Expand Down
7 changes: 6 additions & 1 deletion readyset-server/src/controller/inner.rs
Expand Up @@ -178,7 +178,8 @@ impl Leader {
// Unrecoverable errors, propagate the error the controller and kill the
// loop.
Err(err @ ReadySetError::RecipeInvariantViolated(_)) => {
if let Err(e) = notification_channel.send(ReplicatorMessage::Error(err))
if let Err(e) = notification_channel
.send(ReplicatorMessage::UnrecoverableError(err))
{
error!(error = %e, "Could not notify controller of critical error. The system may be in an invalid state");
}
Expand All @@ -193,6 +194,10 @@ impl Leader {
timeout_sec=replicator_restart_timeout.as_secs(),
"Error in replication, will retry after timeout"
);
// Send the error to the controller so that we can update the replicator
// status
let _ = notification_channel
.send(ReplicatorMessage::RecoverableError(error));
tokio::time::sleep(replicator_restart_timeout).await;
}
}
Expand Down
45 changes: 29 additions & 16 deletions readyset-server/src/controller/mod.rs
Expand Up @@ -17,7 +17,6 @@ use readyset_client::consensus::{
Authority, AuthorityControl, AuthorityWorkerHeartbeatResponse, GetLeaderResult,
WorkerDescriptor, WorkerId, WorkerSchedulingConfig,
};
use readyset_client::debug::stats::PersistentStats;
#[cfg(feature = "failure_injection")]
use readyset_client::failpoints;
use readyset_client::metrics::recorded;
Expand Down Expand Up @@ -658,38 +657,52 @@ impl Controller {
}
}
req = self.replicator_channel.receiver.recv() => {
fn now() -> u64 {
#[allow(clippy::unwrap_used)] // won't error comparing to UNIX EPOCH
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
}

match req {
Some(msg) => match msg {
ReplicatorMessage::Error(e)=> return Err(e),
ReplicatorMessage::UnrecoverableError(e) => return Err(e),
ReplicatorMessage::SnapshotDone => {
self.leader_ready.store(true, Ordering::Release);

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;

if let Err(error) = self.authority.update_persistent_stats(|stats: Option<PersistentStats>| {
let now = now();
if let Err(error) = self.authority.update_persistent_stats(|stats| {
let mut stats = stats.unwrap_or_default();
stats.last_completed_snapshot = Some(now);
Ok(stats)
}).await {
error!(%error, "Failed to persist status in the Authority");
error!(%error, "Failed to persist stats in the Authority");
}
},
ReplicatorMessage::ReplicationStarted => {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
if let Err(error) = self.authority.update_persistent_stats(|stats: Option<PersistentStats>| {
let now = now();
if let Err(error) = self.authority.update_persistent_stats(|stats| {
let mut stats = stats.unwrap_or_default();
stats.last_started_replication = Some(now);
// Clear the last replicator error if we have started the main loop.
stats.last_replicator_error = None;
Ok(stats)
}).await {
error!(%error, "Failed to persist status in the Authority");
error!(%error, "Failed to persist stats in the Authority");
}
},
ReplicatorMessage::RecoverableError(e) => {
if let Err(error) = self.authority.update_persistent_stats(|stats| {
let mut stats = stats.unwrap_or_default();
let error = if let ReadySetError::ReplicationFailed(msg) = &e {
// If we have `ReplicationFailed` we can just show the error message
msg.to_string()
} else {
e.to_string()
};
stats.last_replicator_error = Some(error);
Ok(stats)
}).await {
error!(%error, "Failed to persist stats in the Authority");
}

},
},
_ => {
Expand Down
5 changes: 4 additions & 1 deletion replicators/src/lib.rs
Expand Up @@ -26,7 +26,10 @@ pub enum ReplicatorMessage {
/// The replicator finished startup and entered the main replication loop
ReplicationStarted,
/// The replicator encountered an unrecoverable error
Error(ReadySetError),
UnrecoverableError(ReadySetError),
/// The replicator encountered an error that caused it to restart, but the error could be
/// recoverable. The controller is notified so that it can update status for the user.
RecoverableError(ReadySetError),
}

/// Provide a simplistic human-readable estimate for how much time remains to complete an operation
Expand Down
4 changes: 2 additions & 2 deletions replicators/tests/tests.rs
Expand Up @@ -137,7 +137,7 @@ impl TestChannel {
async fn snapshot_completed(&mut self) -> ReadySetResult<()> {
match self.0.recv().await {
Some(ReplicatorMessage::SnapshotDone) => Ok(()),
Some(ReplicatorMessage::Error(e)) => Err(e),
Some(ReplicatorMessage::UnrecoverableError(e)) => Err(e),
_ => internal!(),
}
}
Expand Down Expand Up @@ -368,7 +368,7 @@ impl TestHandle {
.await
{
error!(%error, "Error in replicator");
let _ = sender.send(ReplicatorMessage::Error(error));
let _ = sender.send(ReplicatorMessage::UnrecoverableError(error));
}
});

Expand Down

0 comments on commit 4bb7a17

Please sign in to comment.