Skip to content

Commit

Permalink
Fix reset_node_state
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Feb 26, 2024
1 parent 01178f7 commit b3f109b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 31 deletions.
65 changes: 55 additions & 10 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,26 @@ impl Chitchat {
) {
let node_state = self.cluster_state.node_state_mut(chitchat_id);

// If the node is new, we must ensure that the failure detector is aware of it.
if node_state.max_version() >= max_version {
return;
}
if node_state.max_version() == 0 {
self.failure_detector.report_heartbeat(chitchat_id);
}
node_state.retain_key_values(|_key, value| value.version > max_version);
node_state.set_last_gc_version(last_gc_version);

// We don't want to call listeners for keys that are already up to date so we must do this
// dance instead of clearing the node state and then setting the new values.
let mut previous_keys: HashSet<String> = node_state
.key_values_including_deleted()
.map(|(key, _)| key.to_string())
.collect();
for (key, value) in key_values {
previous_keys.remove(&key);
node_state.set_versioned_value(key, value)
}
for key in previous_keys {
node_state.remove_key_value_internal(&key);
}
node_state.set_last_gc_version(last_gc_version);
}

pub(crate) fn update_self_heartbeat(&mut self) {
Expand Down Expand Up @@ -1075,19 +1085,54 @@ mod tests {

node.reset_node_state(
&chitchat_id,
[(
"qux".to_string(),
VersionedValue::new("baz".to_string(), 2, false),
)]
[
(
"qux".to_string(),
VersionedValue::new("baz".to_string(), 2, false),
),
(
"toto".to_string(),
VersionedValue::new("tutu".to_string(), 4, false),
),
]
.into_iter(),
2,
4,
1337,
);
let node_state = node.cluster_state.node_state(&chitchat_id).unwrap();
assert_eq!(node_state.num_key_values(), 2);
assert_eq!(node_state.get("qux"), Some("baz"));
assert_eq!(node_state.get("toto"), Some("tutu"));
assert_eq!(node_state.max_version(), 4);
assert_eq!(node_state.last_gc_version(), 1337);

let chitchat_id = ChitchatId::for_local_test(10_004);
let node_state = node.cluster_state.node_state_mut(&chitchat_id);
node_state.set("foo", "bar");
node_state.set("qux", "baz");
node_state.set("toto", "titi");

node.reset_node_state(
&chitchat_id,
[
(
"foo".to_string(),
VersionedValue::new("bar".to_string(), 1, false),
),
(
"qux".to_string(),
VersionedValue::new("baz".to_string(), 2, false),
),
]
.into_iter(),
2,
1337,
);
let node_state = node.cluster_state.node_state(&chitchat_id).unwrap();
assert_eq!(node_state.num_key_values(), 3);
assert_eq!(node_state.get("foo"), Some("bar"));
assert_eq!(node_state.get("qux"), Some("baz"));
assert_eq!(node_state.get("toto"), Some("titi"));
assert_eq!(node_state.max_version(), 3);
assert_eq!(node_state.last_gc_version(), 1337);
}
}
40 changes: 19 additions & 21 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,31 +196,29 @@ impl NodeState {
fn gc_keys_marked_for_deletion(&mut self, grace_period: Duration) {
let now = Instant::now();
let mut max_deleted_version = self.last_gc_version;
self.retain_key_values(|_, versioned_value: &VersionedValue| {
let Some(deleted_instant) = versioned_value.tombstone else {
// The KV is not deleted. We keep it!
return true;
};
if now < deleted_instant + grace_period {
// We haved not passed the grace period yet. We keep it!
return true;
}
// We have exceeded the tombstone grace period. Time to remove it.
max_deleted_version = versioned_value.version.max(max_deleted_version);
false
});
self.key_values
.retain(|_, versioned_value: &mut VersionedValue| {
let Some(deleted_instant) = versioned_value.tombstone else {
// The KV is not deleted. We keep it!
return true;
};
if now < deleted_instant + grace_period {
// We haved not passed the grace period yet. We keep it!
return true;
}
// We have exceeded the tombstone grace period. Time to remove it.
max_deleted_version = versioned_value.version.max(max_deleted_version);
false
});
self.last_gc_version = max_deleted_version;
}

/// Retains the key-value pairs for which the predicate returns `true` and removes the other
/// ones definitively. In other words, this method does not add tombstones.
/// Removes a key-value pair without marking it for deletion.
///
/// Most often, you don't want to call this method but rather `mark_for_deletion`.
pub(crate) fn retain_key_values(
&mut self,
mut predicate: impl FnMut(&str, &VersionedValue) -> bool,
) {
self.key_values.retain(|key, value| predicate(key, value));
/// Most of the time, you do not want to call this method but,
/// `mark_for_deletion` instead.
pub(crate) fn remove_key_value_internal(&mut self, key: &str) {
self.key_values.remove(key);
}

/// Returns an iterator over the versioned values that are strictly greater than
Expand Down

0 comments on commit b3f109b

Please sign in to comment.