diff --git a/readyset-client/src/status.rs b/readyset-client/src/status.rs index 62c2c1f4fa..bb71894fa9 100644 --- a/readyset-client/src/status.rs +++ b/readyset-client/src/status.rs @@ -11,8 +11,13 @@ use std::fmt::{self, Display}; use serde::{Deserialize, Serialize}; +use crate::replication::ReplicationOffset; + // Consts for variable names. + const SNAPSHOT_STATUS_VARIABLE: &str = "Snapshot Status"; +const MAX_REPLICATION_OFFSET: &str = "Maximum Replication Offset"; +const MIN_REPLICATION_OFFSET: &str = "Minimum Replication Offset"; /// ReadySetStatus holds information regarding the status of ReadySet, similar to /// [`SHOW STATUS`](https://dev.mysql.com/doc/refman/8.0/en/show-status.html) in MySQL. @@ -22,15 +27,34 @@ const SNAPSHOT_STATUS_VARIABLE: &str = "Snapshot Status"; pub struct ReadySetStatus { /// The snapshot status of the current leader. pub snapshot_status: SnapshotStatus, - //TODO: Include binlog position and other fields helpful for evaluating a ReadySet cluster. + /// The current maximum replication offset known by the leader. + pub max_replication_offset: Option, + /// The current minimum replication offset known by the leader. + pub min_replication_offset: Option, } impl From for Vec<(String, String)> { fn from(status: ReadySetStatus) -> Vec<(String, String)> { - vec![( + let mut res = vec![( SNAPSHOT_STATUS_VARIABLE.to_string(), status.snapshot_status.to_string(), - )] + )]; + + if let Some(replication_offset) = status.max_replication_offset { + res.push(( + MAX_REPLICATION_OFFSET.to_string(), + replication_offset.to_string(), + )) + } + + if let Some(replication_offset) = status.min_replication_offset { + res.push(( + MIN_REPLICATION_OFFSET.to_string(), + replication_offset.to_string(), + )) + } + + res } } diff --git a/readyset-server/src/controller/inner.rs b/readyset-server/src/controller/inner.rs index 9e4b71264f..81d990080a 100644 --- a/readyset-server/src/controller/inner.rs +++ b/readyset-server/src/controller/inner.rs @@ -460,6 +460,14 @@ impl Leader { return_serialized!(leader_ready); } (&Method::POST, "/status") => { + let ds = self.dataflow_state_handle.read().await; + let replication_offsets = + if self.pending_recovery || ds.workers.len() < self.quorum { + None + } else { + Some(ds.replication_offsets().await?) + }; + let status = ReadySetStatus { // Use whether the leader is ready or not as a proxy for if we have // completed snapshotting. @@ -468,6 +476,13 @@ impl Leader { } else { SnapshotStatus::InProgress }, + + max_replication_offset: replication_offsets + .as_ref() + .and_then(|offs| offs.max_offset().ok().flatten().cloned()), + min_replication_offset: replication_offsets + .as_ref() + .and_then(|offs| offs.min_present_offset().ok().flatten().cloned()), }; return_serialized!(status); }