Skip to content

Commit

Permalink
WIP: partially integrated leader election into the main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
tailhook committed Mar 9, 2016
1 parent 4b03fa5 commit 494789d
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 59 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ documentation = "http://verwalter.readthedocs.org"
authors = ["Paul Colomiets <paul@colomiets.name>"]

[dependencies]
rotor = "0.6.1"
rotor = "0.6.2"
rotor-http = { rev="e8f199b", git="git://github.com/tailhook/rotor-http" }
rotor-tools = "0.3.1"
rotor-cantal = { rev="f772b26", git="git://github.com/tailhook/rotor-cantal" }
Expand All @@ -33,6 +33,7 @@ libc = "0.2"
matches = "0.1.2"
hyper = "0.7"
mio = "0.5"
nix = "0.5.0"

[[bin]]
name = "verwalter"
8 changes: 4 additions & 4 deletions src/elect/action.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use time::SteadyTime;
use rotor::Time;

use super::{Id};

Expand All @@ -14,18 +14,18 @@ pub enum Action {

#[derive(PartialEq, Eq, Debug)]
pub struct ActionList {
pub next_wakeup: SteadyTime,
pub next_wakeup: Time,
pub action: Option<Action>,
}

impl Action {
pub fn and_wait(self, time: SteadyTime) -> ActionList {
pub fn and_wait(self, time: Time) -> ActionList {
ActionList {
next_wakeup: time,
action: Some(self),
}
}
pub fn wait(time: SteadyTime) -> ActionList {
pub fn wait(time: Time) -> ActionList {
ActionList {
next_wakeup: time,
action: None,
Expand Down
21 changes: 19 additions & 2 deletions src/elect/info.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
use std::str::FromStr;
use std::collections::HashMap;
use std::sync::Arc;

use rustc_serialize::hex::{FromHex, FromHexError};

use super::{Info, Id};


impl Id {
pub fn new<S:AsRef<[u8]>>(id: S) -> Id {
Id(id.as_ref().to_owned().into_boxed_slice())
}
}

impl FromStr for Id {
type Err = FromHexError;
fn from_str(s: &str) -> Result<Id, Self::Err> {
s.from_hex().map(|x| x.into_boxed_slice()).map(Id)
}
}

impl Info {
pub fn new<S:AsRef<str>>(id: S) -> Info {
pub fn new(id: Id) -> Info {
Info {
id: Id(id.as_ref().to_string()),
id: id,
all_hosts: HashMap::new(),
}
}
Expand Down
35 changes: 17 additions & 18 deletions src/elect/machine.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::HashSet;
use std::cmp::{Ord, Ordering};
use std::cmp::Ordering::{Less as Older, Equal as Current, Greater as Newer};
use std::time::Duration;

use rotor::Time;
use rand::{thread_rng, Rng};
use time::{SteadyTime, Duration};

use super::{Id, Message, Info, Capsule};
use super::settings::{start_timeout, election_ivl, HEARTBEAT_INTERVAL};
Expand All @@ -15,16 +16,16 @@ type Epoch = u64;

#[derive(Clone, Debug)]
pub enum Machine {
Starting { leader_deadline: SteadyTime },
Electing { epoch: Epoch, votes_for_me: HashSet<Id>, deadline: SteadyTime },
Voted { epoch: Epoch, peer: Id, election_deadline: SteadyTime },
Leader { epoch: Epoch, next_ping_time: SteadyTime },
Follower { epoch: Epoch, leader_deadline: SteadyTime },
Starting { leader_deadline: Time },
Electing { epoch: Epoch, votes_for_me: HashSet<Id>, deadline: Time },
Voted { epoch: Epoch, peer: Id, election_deadline: Time },
Leader { epoch: Epoch, next_ping_time: Time },
Follower { epoch: Epoch, leader_deadline: Time },
}


impl Machine {
pub fn new(now: SteadyTime) -> Machine {
pub fn new(now: Time) -> Machine {
Machine::Starting {
leader_deadline: now + start_timeout(),
}
Expand All @@ -42,7 +43,7 @@ impl Machine {
};
epoch.cmp(&my_epoch)
}
pub fn current_deadline(&self) -> SteadyTime {
pub fn current_deadline(&self) -> Time {
use self::Machine::*;
match *self {
Starting { leader_deadline } => leader_deadline,
Expand All @@ -53,7 +54,7 @@ impl Machine {
}
}

pub fn time_passed(self, info: &Info, now: SteadyTime)
pub fn time_passed(self, info: &Info, now: Time)
-> (Machine, ActionList)
{
use self::Machine::*;
Expand Down Expand Up @@ -97,7 +98,7 @@ impl Machine {
}
me @ Leader { .. } => {
let next_ping = now +
Duration::milliseconds(HEARTBEAT_INTERVAL);
Duration::from_millis(HEARTBEAT_INTERVAL);
(me,
Action::PingAll.and_wait(next_ping))
}
Expand All @@ -108,7 +109,7 @@ impl Machine {
};
return (machine, action)
}
pub fn message(self, info: &Info, msg: Capsule, now: SteadyTime)
pub fn message(self, info: &Info, msg: Capsule, now: Time)
-> (Machine, ActionList)
{
use self::Machine::*;
Expand Down Expand Up @@ -190,7 +191,7 @@ impl Machine {
}
}

fn follow(epoch: Epoch, now: SteadyTime) -> (Machine, ActionList) {
fn follow(epoch: Epoch, now: Time) -> (Machine, ActionList) {
let dline = now + election_ivl();
(Machine::Follower { epoch: epoch, leader_deadline: dline },
Action::Pong.and_wait(dline))
Expand All @@ -210,18 +211,16 @@ fn minimum_votes(total_peers: usize) -> usize {
}
}

fn become_leader(epoch: Epoch, now: SteadyTime) -> (Machine, ActionList) {
let next_ping = now +
Duration::milliseconds(HEARTBEAT_INTERVAL);
fn become_leader(epoch: Epoch, now: Time) -> (Machine, ActionList) {
let next_ping = now + Duration::from_millis(HEARTBEAT_INTERVAL);
(Machine::Leader { epoch: epoch, next_ping_time: next_ping },
Action::PingAll.and_wait(next_ping))
}

fn start_election(epoch: Epoch, now: SteadyTime, first_vote: &Id)
fn start_election(epoch: Epoch, now: Time, first_vote: &Id)
-> (Machine, ActionList)
{
let election_end = now +
Duration::milliseconds(HEARTBEAT_INTERVAL);
let election_end = now + Duration::from_millis(HEARTBEAT_INTERVAL);
(Machine::Electing {
epoch: epoch,
votes_for_me: {
Expand Down
19 changes: 14 additions & 5 deletions src/elect/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
use std::net::SocketAddr;
use std::sync::{Arc};
use std::collections::{HashSet, HashMap};
use rustc_serialize::hex::ToHex;

use time::SteadyTime;
use rotor_cantal::Schedule;
use time::Timespec;

mod machine;
mod action;
mod settings;
mod info;
mod network;
#[cfg(test)] mod test_node;
#[cfg(test)] mod test_mesh;
#[cfg(test)] mod test_util;
#[cfg(test)] mod test_split_brain;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Id(String);
pub struct Id(Box<[u8]>);

pub struct Election {
info: Info,
machine: machine::Machine,
schedule: Schedule,
}

type Capsule = (Id, u64, Message);

Expand All @@ -31,8 +40,8 @@ enum Message {

#[derive(Clone, Debug)]
struct PeerInfo {
addr: SocketAddr,
last_report: Timespec,
addr: Option<SocketAddr>,
last_report: Option<Timespec>,
}

#[derive(Debug)]
Expand All @@ -45,6 +54,6 @@ struct Info {

impl ::std::fmt::Display for Id {
fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(fmt, "{}", self.0)
write!(fmt, "{}", self.0.to_hex())
}
}
77 changes: 77 additions & 0 deletions src/elect/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::collections::HashMap;

use time::Timespec;
use rotor::{Machine, EventSet, Scope, Response};
use rotor::void::{unreachable, Void};
use rotor_cantal::Schedule;

use net::Context;
use super::machine;
use super::{Election, Info, Id, PeerInfo};


impl Election {
pub fn new(id: Id, schedule: Schedule, scope: &mut Scope<Context>)
-> Response<Election, Void>
{
let mach = machine::Machine::new(scope.now());
let dline = mach.current_deadline();
Response::ok(Election {
info: Info::new(id),
schedule: schedule,
machine: mach,
}).deadline(dline)
}
}

impl Machine for Election {
type Context = Context;
type Seed = Void;
fn create(seed: Self::Seed, _scope: &mut Scope<Context>)
-> Response<Self, Void>
{
unreachable(seed)
}
fn ready(self, _events: EventSet, _scope: &mut Scope<Context>)
-> Response<Self, Self::Seed>
{
unimplemented!();
}
fn spawned(self, _scope: &mut Scope<Context>) -> Response<Self, Self::Seed>
{
unimplemented!();
}
fn timeout(self, scope: &mut Scope<Context>) -> Response<Self, Self::Seed>
{
let (me, alst) = self.machine.time_passed(&self.info, scope.now());
match alst.action {
Some(x) => {
println!("DO -------> {:?}", x)
}
None => {}
}
Response::ok(Election { machine: me, ..self})
.deadline(alst.next_wakeup)
}
fn wakeup(mut self, _scope: &mut Scope<Context>)
-> Response<Self, Self::Seed>
{
self.info.all_hosts = self.schedule.get_peers().map(|peers| {
peers.peers.iter()
.filter_map(|p| {
p.id.parse()
.map_err(|e| error!("Error parsing node id {:?}", p.id)).ok()
.map(|x| (x, p))
}).map(|(id, p)| (id, PeerInfo {
addr: p.primary_addr.as_ref().and_then(|x| x.parse().ok()),
last_report: p.last_report_direct.map(|x| {
Timespec { sec: (x/1000) as i64,
nsec: ((x % 1000)*1_000_000) as i32 }
}),
})).collect()
}).unwrap_or_else(HashMap::new);
// TODO(tailhook) check wakeup time
println!("Selfinfo {:?}", self.info);
Response::ok(self)
}
}
15 changes: 8 additions & 7 deletions src/elect/settings.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use time::Duration;
use std::time::Duration;

use rand::{thread_rng, Rng};

/// The number of seconds just started worker waits it to be reached by
Expand All @@ -11,13 +12,13 @@ use rand::{thread_rng, Rng};
///
/// The random MESSAGE_TIMEOUT (constained by constants below) is added to
/// this timeout.
pub const START_TIMEOUT: i64 = 5000;
pub const START_TIMEOUT: u64 = 5000;

/// On each leader's ping we start election timer of a random value in this
/// range. If there is no heartbeat from leader during this timeout, we start
/// election. Note that mio currently has only 200 ms precision timers.
pub const MIN_MESSAGE_TIMEOUT: i64 = 1200;
pub const MAX_MESSAGE_TIMEOUT: i64 = 3000;
pub const MIN_MESSAGE_TIMEOUT: u64 = 1200;
pub const MAX_MESSAGE_TIMEOUT: u64 = 3000;

/// Leader ping interval
///
Expand All @@ -26,14 +27,14 @@ pub const MAX_MESSAGE_TIMEOUT: i64 = 3000;
/// system. There is no good reason to wait so long for original Raft. I.e.
/// it wants to reestablish consistency as fast as possible. But it may be
/// nicer to keep lower elections for us.
pub const HEARTBEAT_INTERVAL: i64 = 600;
pub const HEARTBEAT_INTERVAL: u64 = 600;


pub fn start_timeout() -> Duration {
Duration::milliseconds(START_TIMEOUT) + election_ivl()
Duration::from_millis(START_TIMEOUT) + election_ivl()
}

pub fn election_ivl() -> Duration {
Duration::milliseconds(
Duration::from_millis(
thread_rng().gen_range(MIN_MESSAGE_TIMEOUT, MAX_MESSAGE_TIMEOUT))
}

0 comments on commit 494789d

Please sign in to comment.