Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ axum = "0.7.7"
gethostname = "0.5.0"
log = "0.4.22"
prost = "0.13.3"
prost-types = "0.13.3"
pyo3 = {version="0.22.3", features = ["extension-module"]}
slog = "2.7.0"
slog-stdlog = "4.1.1"
Expand Down
3 changes: 3 additions & 0 deletions proto/torchft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
syntax = "proto3";
package torchft;

import "google/protobuf/timestamp.proto";

message RaftMessageRequest {
// Request message contains the serialized Raft proto message.
bytes message = 1;
Expand Down Expand Up @@ -43,6 +45,7 @@ message QuorumMember {
message Quorum {
int64 quorum_id = 1;
repeated QuorumMember participants = 2;
google.protobuf.Timestamp created = 3;
}

message LighthouseQuorumRequest {
Expand Down
73 changes: 51 additions & 22 deletions src/lighthouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use core::net::SocketAddr;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::{Instant, SystemTime};

use anyhow::{anyhow, Result};
use askama::Template;
Expand Down Expand Up @@ -97,7 +97,8 @@ impl Lighthouse {
})
}

async fn quorum_valid(&self) -> bool {
// Checks whether the quorum is valid and an explanation for the state.
async fn quorum_valid(&self) -> (bool, String) {
let state = self.state.lock().await;

let mut first_joined = Instant::now();
Expand All @@ -119,38 +120,43 @@ impl Lighthouse {
}

if is_fast_quorum {
info!("Fast quorum found!");
return is_fast_quorum;
return (is_fast_quorum, format!("Fast quorum found!"));
}
}

if state.participants.len() < self.opt.min_replicas as usize {
info!(
"No quorum, only have {} participants, need {}",
state.participants.len(),
self.opt.min_replicas
return (
false,
format!(
"No quorum, only have {} participants, need {}",
state.participants.len(),
self.opt.min_replicas
),
);
return false;
}

// Quorum is valid at this point but lets wait for stragglers.

if Instant::now().duration_since(first_joined)
< Duration::from_millis(self.opt.join_timeout_ms)
{
info!(
"Valid quorum with {} participants, waiting for stragglers due to join timeout",
state.participants.len()
return (
false,
format!(
"Valid quorum with {} participants, waiting for stragglers due to join timeout",
state.participants.len()
),
);
return false;
}

true
(true, format!("Valid quorum found"))
}

async fn _quorum_tick(self: Arc<Self>) -> Result<()> {
// TODO: these should probably run under the same lock
let quorum_met = self.quorum_valid().await;
let (quorum_met, reason) = self.quorum_valid().await;
info!("{}", reason);

if quorum_met {
let mut state = self.state.lock().await;
let mut participants: Vec<QuorumMember> = state
Expand Down Expand Up @@ -180,6 +186,7 @@ impl Lighthouse {
let quorum = Quorum {
quorum_id: state.quorum_id,
participants: participants,
created: Some(SystemTime::now().into()),
};

info!("Quorum! {:?}", quorum);
Expand Down Expand Up @@ -257,16 +264,33 @@ impl Lighthouse {
}

async fn get_status(self: Arc<Self>) -> Html<String> {
let (_, quorum_status) = self.quorum_valid().await;

let template = {
let state = self.state.lock().await;

let max_step = {
if let Some(quorum) = state.prev_quorum.clone() {
quorum
.participants
.iter()
.map(|p| p.step)
.max()
.unwrap_or(-1)
} else {
-1
}
};

StatusTemplate {
quorum_id: state.quorum_id,
prev_quorum: state.prev_quorum.clone(),
heartbeats: state.heartbeats.clone(),
quorum_status: quorum_status,
old_age_threshold: Instant::now()
.checked_sub(Duration::from_secs(1))
.unwrap_or(Instant::now()),
max_step: max_step,
}
};
Html(template.render().unwrap())
Expand Down Expand Up @@ -361,10 +385,14 @@ struct IndexTemplate {}
#[derive(Template)]
#[template(path = "status.html")]
struct StatusTemplate {
old_age_threshold: Instant,
prev_quorum: Option<Quorum>,
quorum_id: i64,
heartbeats: HashMap<String, Instant>,
quorum_status: String,

// visualization thresholds
old_age_threshold: Instant,
max_step: i64,
}

// Make our own error that wraps `anyhow::Error`.
Expand Down Expand Up @@ -422,7 +450,7 @@ mod tests {
#[tokio::test]
async fn test_quorum_join_timeout() {
let lighthouse = lighthouse_test_new();
assert!(!lighthouse.quorum_valid().await);
assert!(!lighthouse.quorum_valid().await.0);

{
let mut state = lighthouse.state.lock().await;
Expand All @@ -440,21 +468,21 @@ mod tests {
);
}

assert!(!lighthouse.quorum_valid().await);
assert!(!lighthouse.quorum_valid().await.0);

{
let mut state = lighthouse.state.lock().await;
state.participants.get_mut("a").unwrap().joined =
Instant::now().sub(Duration::from_secs(10 * 60 * 60));
}

assert!(lighthouse.quorum_valid().await);
assert!(lighthouse.quorum_valid().await.0);
}

#[tokio::test]
async fn test_quorum_fast_prev_quorum() {
let lighthouse = lighthouse_test_new();
assert!(!lighthouse.quorum_valid().await);
assert!(!lighthouse.quorum_valid().await.0);

{
let mut state = lighthouse.state.lock().await;
Expand All @@ -472,7 +500,7 @@ mod tests {
);
}

assert!(!lighthouse.quorum_valid().await);
assert!(!lighthouse.quorum_valid().await.0);

{
let mut state = lighthouse.state.lock().await;
Expand All @@ -484,10 +512,11 @@ mod tests {
store_address: "".to_string(),
step: 1,
}],
created: Some(SystemTime::now().into()),
});
}

assert!(lighthouse.quorum_valid().await);
assert!(lighthouse.quorum_valid().await.0);
}

#[tokio::test]
Expand Down
3 changes: 3 additions & 0 deletions templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
padding: 10px;
border: 1px solid #333;
}
.member.recovering {
background-color: orange;
}
.heartbeat.old {
color: red;
}
Expand Down
15 changes: 11 additions & 4 deletions templates/status.html
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
<h2>Quorum Status</h2>

Current quorum_id: {{quorum_id}}
Current quorum_id: {{quorum_id}} <br>
Next quorum status: {{quorum_status}}

<h3>Previous Quorum</h3>
{% if let Some(prev_quorum) = prev_quorum %}

Previous quorum id: {{prev_quorum.quorum_id}}
Previous quorum id: {{prev_quorum.quorum_id}} <br>
Quorum age:
{{SystemTime::try_from(prev_quorum.created.unwrap()).unwrap().elapsed().unwrap().as_secs_f64()}}s

<div>
{% for member in prev_quorum.participants %}

<div class="member">
<div class="member
{% if member.step != max_step %}recovering{% endif %}
">
<b>{{ member.replica_id }}</b> <br/>
Step: {{ member.step }} <br/>
Manager: {{ member.address }} <br/>
Expand All @@ -33,7 +38,9 @@ <h3>Heartbeats</h3>
{% for replica_id in heartbeats.keys() %}

{% let age = heartbeats[replica_id].elapsed().as_secs_f64() %}
<li class="heartbeat {% if heartbeats[replica_id].lt(old_age_threshold) %}old{%endif%}">
<li class="heartbeat
{% if heartbeats[replica_id].lt(old_age_threshold) %}old{%endif%}
">
{{ replica_id }}: seen {{ age }}s ago
</li>

Expand Down
Loading