Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

View change protocol #6

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,20 @@ where
/// The commit ID of the replica that is sending the NewState message.
commit_number: CommitID,
},
StartViewChange {
view_number: ViewNumber,
replica_id: ReplicaID,
},
DoViewChange {
view_number: ViewNumber,
replica_id: ReplicaID,
log: Vec<Op>,
commit_number: CommitID,
},
StartView {
view_number: ViewNumber,
replica_id: ReplicaID,
log: Vec<Op>,
commit_number: CommitID,
},
}
156 changes: 140 additions & 16 deletions src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::Arc;
enum Status {
Normal,
Recovery,
ViewChange,
}

#[derive(Debug)]
Expand All @@ -23,11 +24,14 @@ pub struct Replica<SM: StateMachine> {
self_id: ReplicaID,
state_machine: Arc<SM>,
status: RefCell<Status>,
view_number: ViewNumber,
backup_idle_ticks: RefCell<usize>,
view_number: RefCell<ViewNumber>,
commit_number: AtomicUsize,
op_number: AtomicUsize,
log: RefCell<Vec<SM::Input>>,
acks: RefCell<HashMap<ReplicaID, usize>>,
start_view_change_counter: RefCell<HashMap<ViewNumber, usize>>,
do_view_change_counter: RefCell<HashMap<ViewNumber, usize>>,
client_tx: Sender<SM::Output>,
replica_tx: Sender<(ReplicaID, Message<SM::Input>)>,
}
Expand All @@ -41,21 +45,27 @@ impl<SM: StateMachine> Replica<SM> {
replica_tx: Sender<(ReplicaID, Message<SM::Input>)>,
) -> Replica<SM> {
let status = RefCell::new(Status::Normal);
let view_number = 0;
let backup_idle_ticks = RefCell::new(0);
let view_number = RefCell::new(0);
let commit_number = AtomicUsize::new(0);
let op_number = AtomicUsize::new(0);
let log = RefCell::new(Vec::default());
let acks = RefCell::new(HashMap::default());
let start_view_change_counter = RefCell::new(HashMap::default());
let do_view_change_counter = RefCell::new(HashMap::default());
Replica {
self_id,
config,
state_machine,
status,
backup_idle_ticks,
view_number,
commit_number,
op_number,
log,
acks,
start_view_change_counter,
do_view_change_counter,
client_tx,
replica_tx,
}
Expand Down Expand Up @@ -110,6 +120,28 @@ impl<SM: StateMachine> Replica<SM> {
commit_number,
);
}
Message::StartViewChange {
view_number,
replica_id,
} => {
self.on_start_view_change(view_number, replica_id);
}
Message::DoViewChange {
view_number,
replica_id,
log,
commit_number,
} => {
self.on_do_view_change(view_number, replica_id, log, commit_number);
}
Message::StartView {
view_number,
replica_id,
log,
commit_number,
} => {
self.on_start_view(view_number, replica_id, log, commit_number);
}
}
}

Expand All @@ -128,7 +160,7 @@ impl<SM: StateMachine> Replica<SM> {
acks.insert(op_number, 1);
// TODO: Update client_table
// Send a prepare message to all the replicas.
let view_number = self.view_number;
let view_number = self.view_number();
let commit_number = self.commit_number();
self.send_msg_to_others(Message::Prepare {
view_number,
Expand All @@ -150,9 +182,10 @@ impl<SM: StateMachine> Replica<SM> {
op: SM::Input,
commit_number: CommitID,
) {
self.backup_idle_ticks.replace(0);
assert!(!self.is_primary());
// TODO: If view number is not the same, initiate recovery.
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
if op_number <= self.op_number() {
return; // duplicate
}
Expand Down Expand Up @@ -182,7 +215,7 @@ impl<SM: StateMachine> Replica<SM> {
/// the operation and replies to the client.
fn on_prepare_ok(&self, view_number: ViewNumber, op_number: OpNumber) {
assert!(self.is_primary());
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
// Register the acknowledgement
let mut acks = self.acks.borrow_mut();
let acks = acks.get_mut(&op_number).unwrap();
Expand All @@ -202,14 +235,15 @@ impl<SM: StateMachine> Replica<SM> {
/// requests, then the primary sends a `Commit` message to backup
/// nodes instead to give backup nodes the chance to commit.
fn on_commit(&self, view_number: ViewNumber, commit_number: CommitID) {
self.backup_idle_ticks.replace(0);
if *self.status.borrow() != Status::Normal {
return;
}
if view_number < self.view_number {
if view_number < self.view_number() {
return;
}
assert_eq!(*self.status.borrow(), Status::Normal);
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
if commit_number > self.op_number() {
self.state_transfer();
return;
Expand All @@ -223,12 +257,12 @@ impl<SM: StateMachine> Replica<SM> {
/// up on its log.
fn on_get_state(&self, replica_id: ReplicaID, view_number: ViewNumber, op_number: OpNumber) {
assert_eq!(*self.status.borrow(), Status::Normal);
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
let log = self.log.borrow();
self.send_msg(
replica_id,
Message::NewState {
view_number: self.view_number,
view_number: self.view_number(),
log: log[op_number..].to_vec(),
op_number_start: op_number,
op_number_end: self.op_number(),
Expand All @@ -251,7 +285,7 @@ impl<SM: StateMachine> Replica<SM> {
return;
}
assert_eq!(*self.status.borrow(), Status::Recovery);
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
assert_eq!(op_number_start, self.op_number());
for op in log {
self.append_to_log(op);
Expand All @@ -262,29 +296,103 @@ impl<SM: StateMachine> Replica<SM> {
assert_eq!(self.op_number(), op_number_end);
assert_eq!(self.commit_number(), commit_number);
self.status.replace(Status::Normal);
let view_number = self.view_number;
let view_number = self.view_number();
self.send_msg_to_primary(Message::PrepareOk {
view_number,
op_number: op_number_end,
});
}

/// A replica sends a `StartViewChange` message to all other replicas if
/// it detects that the primary is unavailable.
fn on_start_view_change(&self, view_number: ViewNumber, replica_id: ReplicaID) {
assert!(self.self_id != replica_id);
self.status.replace(Status::ViewChange);
self.view_number.replace(view_number);
let mut start_view_changes = self.start_view_change_counter.borrow_mut();
let start_view_changes = start_view_changes.entry(view_number).or_insert(0);
*start_view_changes += 1;
if *start_view_changes >= self.config.quorum() {
self.send_msg_to_primary(Message::DoViewChange {
view_number,
replica_id: self.self_id,
log: self.log.borrow().clone(),
commit_number: self.commit_number(),
});
} else {
self.send_msg_to_others(Message::StartViewChange {
view_number,
replica_id: self.self_id,
});
}
}

/// A replica sends a `DoViewChange` message to a possible new primary if
/// a quorum of replicas have agreed to start a view change.
fn on_do_view_change(
&self,
view_number: ViewNumber,
_replica_id: ReplicaID,
_log: Vec<SM::Input>,
_commit_number: CommitID,
) {
let mut do_view_change_counter = self.do_view_change_counter.borrow_mut();
let do_view_change_counter = do_view_change_counter.entry(view_number).or_insert(0);
*do_view_change_counter += 1;
if *do_view_change_counter >= self.config.quorum() {
// TODO Take log from "best" replica...
self.status.replace(Status::Normal);
self.send_msg_to_others(Message::StartView {
view_number,
replica_id: self.self_id,
log: self.log.borrow().clone(),
commit_number: self.commit_number(),
});
}
}

/// The new primary sends a `StartView` message to all other replicas if
/// it has successfully started a new view.
fn on_start_view(
&self,
_view_number: ViewNumber,
_replica_id: ReplicaID,
_log: Vec<SM::Input>,
_commit_number: CommitID,
) {
// TODO update log
self.status.replace(Status::Normal);
}

/// When there are no client requests, the primary node sends a
/// `Commit` message to backup nodes periodically to let them commit
/// if needed.
pub fn on_idle(&self) {
if !self.is_primary() {
return;
if self.is_primary() {
self.on_primary_idle();
} else {
self.on_backup_idle();
}
}

fn on_primary_idle(&self) {
assert_eq!(*self.status.borrow(), Status::Normal);
let view_number = self.view_number;
let view_number = self.view_number();
let commit_number = self.commit_number();
self.send_msg_to_others(Message::Commit {
view_number,
commit_number,
});
}

fn on_backup_idle(&self) {
let mut backup_idle_ticks = self.backup_idle_ticks.borrow_mut();
*backup_idle_ticks += 1;
if *backup_idle_ticks > 2 {
self.view_change();
}
}

fn append_to_log(&self, op: SM::Input) {
let mut log = self.log.borrow_mut();
log.push(op);
Expand All @@ -299,12 +407,24 @@ impl<SM: StateMachine> Replica<SM> {
primary_id,
Message::GetState {
replica_id: self.self_id,
view_number: self.view_number,
view_number: self.view_number(),
op_number: self.op_number(),
},
);
}

fn view_change(&self) {
self.status.replace(Status::ViewChange);
let mut view_number = self.view_number.borrow_mut();
*view_number += 1;
let mut start_view_changes = self.start_view_change_counter.borrow_mut();
start_view_changes.insert(*view_number, 1);
self.send_msg_to_others(Message::StartViewChange {
view_number: *view_number,
replica_id: self.self_id,
});
}

/// Commits an operation at log index `op_idx`.
fn commit_op(&self, op_idx: usize) -> SM::Output {
let log = self.log.borrow();
Expand Down Expand Up @@ -344,13 +464,17 @@ impl<SM: StateMachine> Replica<SM> {
}

fn primary_id(&self) -> ReplicaID {
self.config.primary_id(self.view_number)
self.config.primary_id(self.view_number())
}

fn commit_number(&self) -> CommitID {
self.commit_number.load(Ordering::SeqCst)
}

fn view_number(&self) -> ViewNumber {
*self.view_number.borrow()
}

fn op_number(&self) -> OpNumber {
self.op_number.load(Ordering::SeqCst)
}
Expand Down