diff --git a/Cargo.toml b/Cargo.toml index 0cf846e1..4e409f7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ axum = "0.7.7" gethostname = "0.5.0" log = "0.4.22" prost = "0.13.3" +prost-types = "0.13.3" pyo3 = {version="0.22.3", features = ["extension-module"]} slog = "2.7.0" slog-stdlog = "4.1.1" diff --git a/proto/torchft.proto b/proto/torchft.proto index d8390483..1b0f51b2 100644 --- a/proto/torchft.proto +++ b/proto/torchft.proto @@ -7,6 +7,8 @@ syntax = "proto3"; package torchft; +import "google/protobuf/timestamp.proto"; + message RaftMessageRequest { // Request message contains the serialized Raft proto message. bytes message = 1; @@ -43,6 +45,7 @@ message QuorumMember { message Quorum { int64 quorum_id = 1; repeated QuorumMember participants = 2; + google.protobuf.Timestamp created = 3; } message LighthouseQuorumRequest { diff --git a/src/lighthouse.rs b/src/lighthouse.rs index 9701a179..d6296b97 100644 --- a/src/lighthouse.rs +++ b/src/lighthouse.rs @@ -8,7 +8,7 @@ use core::net::SocketAddr; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use std::time::Instant; +use std::time::{Instant, SystemTime}; use anyhow::{anyhow, Result}; use askama::Template; @@ -97,7 +97,8 @@ impl Lighthouse { }) } - async fn quorum_valid(&self) -> bool { + // Checks whether the quorum is valid and an explanation for the state. + async fn quorum_valid(&self) -> (bool, String) { let state = self.state.lock().await; let mut first_joined = Instant::now(); @@ -119,18 +120,19 @@ impl Lighthouse { } if is_fast_quorum { - info!("Fast quorum found!"); - return is_fast_quorum; + return (is_fast_quorum, format!("Fast quorum found!")); } } if state.participants.len() < self.opt.min_replicas as usize { - info!( - "No quorum, only have {} participants, need {}", - state.participants.len(), - self.opt.min_replicas + return ( + false, + format!( + "No quorum, only have {} participants, need {}", + state.participants.len(), + self.opt.min_replicas + ), ); - return false; } // Quorum is valid at this point but lets wait for stragglers. @@ -138,19 +140,23 @@ impl Lighthouse { if Instant::now().duration_since(first_joined) < Duration::from_millis(self.opt.join_timeout_ms) { - info!( - "Valid quorum with {} participants, waiting for stragglers due to join timeout", - state.participants.len() + return ( + false, + format!( + "Valid quorum with {} participants, waiting for stragglers due to join timeout", + state.participants.len() + ), ); - return false; } - true + (true, format!("Valid quorum found")) } async fn _quorum_tick(self: Arc) -> Result<()> { // TODO: these should probably run under the same lock - let quorum_met = self.quorum_valid().await; + let (quorum_met, reason) = self.quorum_valid().await; + info!("{}", reason); + if quorum_met { let mut state = self.state.lock().await; let mut participants: Vec = state @@ -180,6 +186,7 @@ impl Lighthouse { let quorum = Quorum { quorum_id: state.quorum_id, participants: participants, + created: Some(SystemTime::now().into()), }; info!("Quorum! {:?}", quorum); @@ -257,16 +264,33 @@ impl Lighthouse { } async fn get_status(self: Arc) -> Html { + let (_, quorum_status) = self.quorum_valid().await; + let template = { let state = self.state.lock().await; + let max_step = { + if let Some(quorum) = state.prev_quorum.clone() { + quorum + .participants + .iter() + .map(|p| p.step) + .max() + .unwrap_or(-1) + } else { + -1 + } + }; + StatusTemplate { quorum_id: state.quorum_id, prev_quorum: state.prev_quorum.clone(), heartbeats: state.heartbeats.clone(), + quorum_status: quorum_status, old_age_threshold: Instant::now() .checked_sub(Duration::from_secs(1)) .unwrap_or(Instant::now()), + max_step: max_step, } }; Html(template.render().unwrap()) @@ -361,10 +385,14 @@ struct IndexTemplate {} #[derive(Template)] #[template(path = "status.html")] struct StatusTemplate { - old_age_threshold: Instant, prev_quorum: Option, quorum_id: i64, heartbeats: HashMap, + quorum_status: String, + + // visualization thresholds + old_age_threshold: Instant, + max_step: i64, } // Make our own error that wraps `anyhow::Error`. @@ -422,7 +450,7 @@ mod tests { #[tokio::test] async fn test_quorum_join_timeout() { let lighthouse = lighthouse_test_new(); - assert!(!lighthouse.quorum_valid().await); + assert!(!lighthouse.quorum_valid().await.0); { let mut state = lighthouse.state.lock().await; @@ -440,7 +468,7 @@ mod tests { ); } - assert!(!lighthouse.quorum_valid().await); + assert!(!lighthouse.quorum_valid().await.0); { let mut state = lighthouse.state.lock().await; @@ -448,13 +476,13 @@ mod tests { Instant::now().sub(Duration::from_secs(10 * 60 * 60)); } - assert!(lighthouse.quorum_valid().await); + assert!(lighthouse.quorum_valid().await.0); } #[tokio::test] async fn test_quorum_fast_prev_quorum() { let lighthouse = lighthouse_test_new(); - assert!(!lighthouse.quorum_valid().await); + assert!(!lighthouse.quorum_valid().await.0); { let mut state = lighthouse.state.lock().await; @@ -472,7 +500,7 @@ mod tests { ); } - assert!(!lighthouse.quorum_valid().await); + assert!(!lighthouse.quorum_valid().await.0); { let mut state = lighthouse.state.lock().await; @@ -484,10 +512,11 @@ mod tests { store_address: "".to_string(), step: 1, }], + created: Some(SystemTime::now().into()), }); } - assert!(lighthouse.quorum_valid().await); + assert!(lighthouse.quorum_valid().await.0); } #[tokio::test] diff --git a/templates/index.html b/templates/index.html index 8ced716a..000e9095 100644 --- a/templates/index.html +++ b/templates/index.html @@ -40,6 +40,9 @@ padding: 10px; border: 1px solid #333; } + .member.recovering { + background-color: orange; + } .heartbeat.old { color: red; } diff --git a/templates/status.html b/templates/status.html index 336fee53..11f28773 100644 --- a/templates/status.html +++ b/templates/status.html @@ -1,16 +1,21 @@

Quorum Status

-Current quorum_id: {{quorum_id}} +Current quorum_id: {{quorum_id}}
+Next quorum status: {{quorum_status}}

Previous Quorum

{% if let Some(prev_quorum) = prev_quorum %} -Previous quorum id: {{prev_quorum.quorum_id}} +Previous quorum id: {{prev_quorum.quorum_id}}
+Quorum age: +{{SystemTime::try_from(prev_quorum.created.unwrap()).unwrap().elapsed().unwrap().as_secs_f64()}}s
{% for member in prev_quorum.participants %} -
+
{{ member.replica_id }}
Step: {{ member.step }}
Manager: {{ member.address }}
@@ -33,7 +38,9 @@

Heartbeats

{% for replica_id in heartbeats.keys() %} {% let age = heartbeats[replica_id].elapsed().as_secs_f64() %} -
  • +
  • {{ replica_id }}: seen {{ age }}s ago