diff --git a/src/message.rs b/src/message.rs index 72cc861..e4bea2c 100644 --- a/src/message.rs +++ b/src/message.rs @@ -49,4 +49,8 @@ where /// The commit ID of the replica that is sending the NewState message. commit_number: CommitID, }, + StartViewChange { + view_number: ViewNumber, + replica_id: ReplicaID, + } } diff --git a/src/replica.rs b/src/replica.rs index 1ef1a7f..f64f70c 100644 --- a/src/replica.rs +++ b/src/replica.rs @@ -15,6 +15,7 @@ use std::sync::Arc; enum Status { Normal, Recovery, + ViewChange, } #[derive(Debug)] @@ -23,11 +24,13 @@ pub struct Replica { self_id: ReplicaID, state_machine: Arc, status: RefCell, - view_number: ViewNumber, + backup_idle_ticks: RefCell, + view_number: RefCell, commit_number: AtomicUsize, op_number: AtomicUsize, log: RefCell>, acks: RefCell>, + start_view_changes: RefCell>, client_tx: Sender, replica_tx: Sender<(ReplicaID, Message)>, } @@ -41,21 +44,25 @@ impl Replica { replica_tx: Sender<(ReplicaID, Message)>, ) -> Replica { let status = RefCell::new(Status::Normal); - let view_number = 0; + let backup_idle_ticks = RefCell::new(0); + let view_number = RefCell::new(0); let commit_number = AtomicUsize::new(0); let op_number = AtomicUsize::new(0); let log = RefCell::new(Vec::default()); let acks = RefCell::new(HashMap::default()); + let start_view_changes = RefCell::new(HashMap::default()); Replica { self_id, config, state_machine, status, + backup_idle_ticks, view_number, commit_number, op_number, log, acks, + start_view_changes, client_tx, replica_tx, } @@ -110,6 +117,9 @@ impl Replica { commit_number, ); } + Message::StartViewChange { view_number, replica_id } => { + self.on_start_view_change(view_number, replica_id); + } } } @@ -128,7 +138,7 @@ impl Replica { acks.insert(op_number, 1); // TODO: Update client_table // Send a prepare message to all the replicas. - let view_number = self.view_number; + let view_number = self.view_number(); let commit_number = self.commit_number(); self.send_msg_to_others(Message::Prepare { view_number, @@ -150,9 +160,10 @@ impl Replica { op: SM::Input, commit_number: CommitID, ) { + self.backup_idle_ticks.replace(0); assert!(!self.is_primary()); // TODO: If view number is not the same, initiate recovery. - assert_eq!(self.view_number, view_number); + assert_eq!(self.view_number(), view_number); if op_number <= self.op_number() { return; // duplicate } @@ -182,7 +193,7 @@ impl Replica { /// the operation and replies to the client. fn on_prepare_ok(&self, view_number: ViewNumber, op_number: OpNumber) { assert!(self.is_primary()); - assert_eq!(self.view_number, view_number); + assert_eq!(self.view_number(), view_number); // Register the acknowledgement let mut acks = self.acks.borrow_mut(); let acks = acks.get_mut(&op_number).unwrap(); @@ -202,14 +213,15 @@ impl Replica { /// requests, then the primary sends a `Commit` message to backup /// nodes instead to give backup nodes the chance to commit. fn on_commit(&self, view_number: ViewNumber, commit_number: CommitID) { + self.backup_idle_ticks.replace(0); if *self.status.borrow() != Status::Normal { return; } - if view_number < self.view_number { + if view_number < self.view_number() { return; } assert_eq!(*self.status.borrow(), Status::Normal); - assert_eq!(self.view_number, view_number); + assert_eq!(self.view_number(), view_number); if commit_number > self.op_number() { self.state_transfer(); return; @@ -223,12 +235,12 @@ impl Replica { /// up on its log. fn on_get_state(&self, replica_id: ReplicaID, view_number: ViewNumber, op_number: OpNumber) { assert_eq!(*self.status.borrow(), Status::Normal); - assert_eq!(self.view_number, view_number); + assert_eq!(self.view_number(), view_number); let log = self.log.borrow(); self.send_msg( replica_id, Message::NewState { - view_number: self.view_number, + view_number: self.view_number(), log: log[op_number..].to_vec(), op_number_start: op_number, op_number_end: self.op_number(), @@ -251,7 +263,7 @@ impl Replica { return; } assert_eq!(*self.status.borrow(), Status::Recovery); - assert_eq!(self.view_number, view_number); + assert_eq!(self.view_number(), view_number); assert_eq!(op_number_start, self.op_number()); for op in log { self.append_to_log(op); @@ -262,22 +274,44 @@ impl Replica { assert_eq!(self.op_number(), op_number_end); assert_eq!(self.commit_number(), commit_number); self.status.replace(Status::Normal); - let view_number = self.view_number; + let view_number = self.view_number(); self.send_msg_to_primary(Message::PrepareOk { view_number, op_number: op_number_end, }); } + fn on_start_view_change(&self, view_number: ViewNumber, replica_id: ReplicaID) { + assert!(self.self_id != replica_id); + self.status.replace(Status::ViewChange); + self.view_number.replace(view_number); + let mut start_view_changes = self.start_view_changes.borrow_mut(); + let start_view_changes = start_view_changes.entry(view_number).or_insert(0); + *start_view_changes += 1; + if *start_view_changes >= self.config.quorum() { + // TODO: Send DoViewChange message + } else { + self.send_msg_to_others(Message::StartViewChange { + view_number, + replica_id: self.self_id, + }); + } + } + /// When there are no client requests, the primary node sends a /// `Commit` message to backup nodes periodically to let them commit /// if needed. pub fn on_idle(&self) { - if !self.is_primary() { - return; + if self.is_primary() { + self.on_primary_idle(); + } else { + self.on_backup_idle(); } + } + + fn on_primary_idle(&self) { assert_eq!(*self.status.borrow(), Status::Normal); - let view_number = self.view_number; + let view_number = self.view_number(); let commit_number = self.commit_number(); self.send_msg_to_others(Message::Commit { view_number, @@ -285,6 +319,14 @@ impl Replica { }); } + fn on_backup_idle(&self) { + let mut backup_idle_ticks = self.backup_idle_ticks.borrow_mut(); + *backup_idle_ticks += 1; + if *backup_idle_ticks > 2 { + self.view_change(); + } + } + fn append_to_log(&self, op: SM::Input) { let mut log = self.log.borrow_mut(); log.push(op); @@ -299,12 +341,24 @@ impl Replica { primary_id, Message::GetState { replica_id: self.self_id, - view_number: self.view_number, + view_number: self.view_number(), op_number: self.op_number(), }, ); } + fn view_change(&self) { + self.status.replace(Status::ViewChange); + let mut view_number = self.view_number.borrow_mut(); + *view_number += 1; + let mut start_view_changes = self.start_view_changes.borrow_mut(); + start_view_changes.insert(*view_number, 1); + self.send_msg_to_others(Message::StartViewChange { + view_number: *view_number, + replica_id: self.self_id, + }); + } + /// Commits an operation at log index `op_idx`. fn commit_op(&self, op_idx: usize) -> SM::Output { let log = self.log.borrow(); @@ -344,13 +398,17 @@ impl Replica { } fn primary_id(&self) -> ReplicaID { - self.config.primary_id(self.view_number) + self.config.primary_id(self.view_number()) } fn commit_number(&self) -> CommitID { self.commit_number.load(Ordering::SeqCst) } + fn view_number(&self) -> ViewNumber { + *self.view_number.borrow() + } + fn op_number(&self) -> OpNumber { self.op_number.load(Ordering::SeqCst) }