Skip to content

Commit

Permalink
View change protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
penberg committed May 6, 2023
1 parent bd95cee commit 8412963
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 16 deletions.
4 changes: 4 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,8 @@ where
/// The commit ID of the replica that is sending the NewState message.
commit_number: CommitID,
},
StartViewChange {
view_number: ViewNumber,
replica_id: ReplicaID,
}
}
90 changes: 74 additions & 16 deletions src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::Arc;
enum Status {
Normal,
Recovery,
ViewChange,
}

#[derive(Debug)]
Expand All @@ -23,11 +24,13 @@ pub struct Replica<SM: StateMachine> {
self_id: ReplicaID,
state_machine: Arc<SM>,
status: RefCell<Status>,
view_number: ViewNumber,
backup_idle_ticks: RefCell<usize>,
view_number: RefCell<ViewNumber>,
commit_number: AtomicUsize,
op_number: AtomicUsize,
log: RefCell<Vec<SM::Input>>,
acks: RefCell<HashMap<ReplicaID, usize>>,
start_view_changes: RefCell<HashMap<ViewNumber, usize>>,
client_tx: Sender<SM::Output>,
replica_tx: Sender<(ReplicaID, Message<SM::Input>)>,
}
Expand All @@ -41,21 +44,25 @@ impl<SM: StateMachine> Replica<SM> {
replica_tx: Sender<(ReplicaID, Message<SM::Input>)>,
) -> Replica<SM> {
let status = RefCell::new(Status::Normal);
let view_number = 0;
let backup_idle_ticks = RefCell::new(0);
let view_number = RefCell::new(0);
let commit_number = AtomicUsize::new(0);
let op_number = AtomicUsize::new(0);
let log = RefCell::new(Vec::default());
let acks = RefCell::new(HashMap::default());
let start_view_changes = RefCell::new(HashMap::default());
Replica {
self_id,
config,
state_machine,
status,
backup_idle_ticks,
view_number,
commit_number,
op_number,
log,
acks,
start_view_changes,
client_tx,
replica_tx,
}
Expand Down Expand Up @@ -110,6 +117,9 @@ impl<SM: StateMachine> Replica<SM> {
commit_number,
);
}
Message::StartViewChange { view_number, replica_id } => {
self.on_start_view_change(view_number, replica_id);
}
}
}

Expand All @@ -128,7 +138,7 @@ impl<SM: StateMachine> Replica<SM> {
acks.insert(op_number, 1);
// TODO: Update client_table
// Send a prepare message to all the replicas.
let view_number = self.view_number;
let view_number = self.view_number();
let commit_number = self.commit_number();
self.send_msg_to_others(Message::Prepare {
view_number,
Expand All @@ -150,9 +160,10 @@ impl<SM: StateMachine> Replica<SM> {
op: SM::Input,
commit_number: CommitID,
) {
self.backup_idle_ticks.replace(0);
assert!(!self.is_primary());
// TODO: If view number is not the same, initiate recovery.
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
if op_number <= self.op_number() {
return; // duplicate
}
Expand Down Expand Up @@ -182,7 +193,7 @@ impl<SM: StateMachine> Replica<SM> {
/// the operation and replies to the client.
fn on_prepare_ok(&self, view_number: ViewNumber, op_number: OpNumber) {
assert!(self.is_primary());
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
// Register the acknowledgement
let mut acks = self.acks.borrow_mut();
let acks = acks.get_mut(&op_number).unwrap();
Expand All @@ -202,14 +213,15 @@ impl<SM: StateMachine> Replica<SM> {
/// requests, then the primary sends a `Commit` message to backup
/// nodes instead to give backup nodes the chance to commit.
fn on_commit(&self, view_number: ViewNumber, commit_number: CommitID) {
self.backup_idle_ticks.replace(0);
if *self.status.borrow() != Status::Normal {
return;
}
if view_number < self.view_number {
if view_number < self.view_number() {
return;
}
assert_eq!(*self.status.borrow(), Status::Normal);
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
if commit_number > self.op_number() {
self.state_transfer();
return;
Expand All @@ -223,12 +235,12 @@ impl<SM: StateMachine> Replica<SM> {
/// up on its log.
fn on_get_state(&self, replica_id: ReplicaID, view_number: ViewNumber, op_number: OpNumber) {
assert_eq!(*self.status.borrow(), Status::Normal);
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
let log = self.log.borrow();
self.send_msg(
replica_id,
Message::NewState {
view_number: self.view_number,
view_number: self.view_number(),
log: log[op_number..].to_vec(),
op_number_start: op_number,
op_number_end: self.op_number(),
Expand All @@ -251,7 +263,7 @@ impl<SM: StateMachine> Replica<SM> {
return;
}
assert_eq!(*self.status.borrow(), Status::Recovery);
assert_eq!(self.view_number, view_number);
assert_eq!(self.view_number(), view_number);
assert_eq!(op_number_start, self.op_number());
for op in log {
self.append_to_log(op);
Expand All @@ -262,29 +274,59 @@ impl<SM: StateMachine> Replica<SM> {
assert_eq!(self.op_number(), op_number_end);
assert_eq!(self.commit_number(), commit_number);
self.status.replace(Status::Normal);
let view_number = self.view_number;
let view_number = self.view_number();
self.send_msg_to_primary(Message::PrepareOk {
view_number,
op_number: op_number_end,
});
}

fn on_start_view_change(&self, view_number: ViewNumber, replica_id: ReplicaID) {
assert!(self.self_id != replica_id);
self.status.replace(Status::ViewChange);
self.view_number.replace(view_number);
let mut start_view_changes = self.start_view_changes.borrow_mut();
let start_view_changes = start_view_changes.entry(view_number).or_insert(0);
*start_view_changes += 1;
if *start_view_changes >= self.config.quorum() {
// TODO: Send DoViewChange message
} else {
self.send_msg_to_others(Message::StartViewChange {
view_number,
replica_id: self.self_id,
});
}
}

/// When there are no client requests, the primary node sends a
/// `Commit` message to backup nodes periodically to let them commit
/// if needed.
pub fn on_idle(&self) {
if !self.is_primary() {
return;
if self.is_primary() {
self.on_primary_idle();
} else {
self.on_backup_idle();
}
}

fn on_primary_idle(&self) {
assert_eq!(*self.status.borrow(), Status::Normal);
let view_number = self.view_number;
let view_number = self.view_number();
let commit_number = self.commit_number();
self.send_msg_to_others(Message::Commit {
view_number,
commit_number,
});
}

fn on_backup_idle(&self) {
let mut backup_idle_ticks = self.backup_idle_ticks.borrow_mut();
*backup_idle_ticks += 1;
if *backup_idle_ticks > 2 {
self.view_change();
}
}

fn append_to_log(&self, op: SM::Input) {
let mut log = self.log.borrow_mut();
log.push(op);
Expand All @@ -299,12 +341,24 @@ impl<SM: StateMachine> Replica<SM> {
primary_id,
Message::GetState {
replica_id: self.self_id,
view_number: self.view_number,
view_number: self.view_number(),
op_number: self.op_number(),
},
);
}

fn view_change(&self) {
self.status.replace(Status::ViewChange);
let mut view_number = self.view_number.borrow_mut();
*view_number += 1;
let mut start_view_changes = self.start_view_changes.borrow_mut();
start_view_changes.insert(*view_number, 1);
self.send_msg_to_others(Message::StartViewChange {
view_number: *view_number,
replica_id: self.self_id,
});
}

/// Commits an operation at log index `op_idx`.
fn commit_op(&self, op_idx: usize) -> SM::Output {
let log = self.log.borrow();
Expand Down Expand Up @@ -344,13 +398,17 @@ impl<SM: StateMachine> Replica<SM> {
}

fn primary_id(&self) -> ReplicaID {
self.config.primary_id(self.view_number)
self.config.primary_id(self.view_number())
}

fn commit_number(&self) -> CommitID {
self.commit_number.load(Ordering::SeqCst)
}

fn view_number(&self) -> ViewNumber {
*self.view_number.borrow()
}

fn op_number(&self) -> OpNumber {
self.op_number.load(Ordering::SeqCst)
}
Expand Down

0 comments on commit 8412963

Please sign in to comment.