Skip to content

Commit

Permalink
refactor(sn_dysfunction): rename IssueType variants
Browse files Browse the repository at this point in the history
The `IssueType` is now used to track as well as untrack dysfunctional
nodes. Hence modify the names to be more generic.
  • Loading branch information
RolandSherwin committed Nov 22, 2022
1 parent 98d0d4c commit 489deb5
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 107 deletions.
48 changes: 23 additions & 25 deletions sn_dysfunction/src/detection.rs
Expand Up @@ -33,16 +33,16 @@ static DYSFUNCTION_SCORE_THRESHOLD: usize = 500;
/// system.
/// Issues have a xorname so they can be reliable assignd to the same nodes
pub enum IssueType {
/// Represents an AEProbeMsg has been sent, but we're awaiting response.
AwaitingProbeResponse,
/// Represents an AeProbeMsg to be tracked by Dysfunction Detection.
AeProbeMsg,
/// Represents a Dkg issue to be tracked by Dysfunction Detection.
Dkg,
/// Represents a communication issue to be tracked by Dysfunction Detection.
Communication,
/// Represents a knowledge issue to be tracked by Dysfunction Detection.
Knowledge,
/// Represents a pending request operation issue to be tracked by Dysfunction Detection.
PendingRequestOperation(OperationId),
RequestOperation(OperationId),
}

#[derive(Debug)]
Expand Down Expand Up @@ -77,7 +77,7 @@ impl DysfunctionDetection {
);
let _ = probe_scores.insert(
*node,
self.calculate_node_score_for_type(node, &IssueType::AwaitingProbeResponse),
self.calculate_node_score_for_type(node, &IssueType::AeProbeMsg),
);
let _ = communication_scores.insert(
*node,
Expand All @@ -91,7 +91,7 @@ impl DysfunctionDetection {
*node,
self.calculate_node_score_for_type(
node,
&IssueType::PendingRequestOperation(OperationId::random()),
&IssueType::RequestOperation(OperationId::random()),
),
);
}
Expand Down Expand Up @@ -143,7 +143,7 @@ impl DysfunctionDetection {
0
}
}
IssueType::AwaitingProbeResponse => {
IssueType::AeProbeMsg => {
if let Some(issues) = self.probe_issues.get(node) {
issues.len()
} else {
Expand All @@ -157,7 +157,7 @@ impl DysfunctionDetection {
0
}
}
IssueType::PendingRequestOperation(_) => {
IssueType::RequestOperation(_) => {
if let Some(issues) = self.unfulfilled_ops.get(node) {
// To avoid the case that the check get carried out just after
// burst of messages get inserted, only those issues has sat a
Expand Down Expand Up @@ -323,7 +323,7 @@ mod tests {
prop_oneof![
230 => Just(IssueType::Communication),
500 => Just(IssueType::Dkg),
30 => Just(IssueType::AwaitingProbeResponse),
30 => Just(IssueType::AeProbeMsg),
450 => Just(IssueType::Knowledge),
]
}
Expand All @@ -342,10 +342,10 @@ mod tests {
prop_oneof![
1200 => Just(IssueType::Communication),
0 => Just(IssueType::Dkg),
50 => Just(IssueType::AwaitingProbeResponse),
50 => Just(IssueType::AeProbeMsg),
0 => Just(IssueType::Knowledge),
3400 => (any::<[u8; 32]>())
.prop_map(|x| IssueType::PendingRequestOperation(OperationId(x)))
.prop_map(|x| IssueType::RequestOperation(OperationId(x)))
]
}

Expand Down Expand Up @@ -447,7 +447,7 @@ mod tests {
nodes: &[(XorName, NodeQualityScored)],
elders_count: usize,
) -> Vec<(XorName, NodeQualityScored)> {
if matches!(issue, IssueType::Dkg) || matches!(issue, IssueType::AwaitingProbeResponse) {
if matches!(issue, IssueType::Dkg) || matches!(issue, IssueType::AeProbeMsg) {
nodes
.iter()
.sorted_by(|lhs, rhs| root.clone().cmp_distance(&lhs.0, &rhs.0))
Expand Down Expand Up @@ -487,7 +487,7 @@ mod tests {
IssueType::Dkg => {
assert_eq!(score_results.dkg_scores.len(), node_count);
},
IssueType::AwaitingProbeResponse => {
IssueType::AeProbeMsg => {
assert_eq!(score_results.probe_scores.len(), node_count);
},
IssueType::Communication => {
Expand All @@ -496,7 +496,7 @@ mod tests {
IssueType::Knowledge => {
assert_eq!(score_results.knowledge_scores.len(), node_count);
},
IssueType::PendingRequestOperation(_) => {
IssueType::RequestOperation(_) => {
assert_eq!(score_results.op_scores.len(), node_count);
},
}
Expand Down Expand Up @@ -530,7 +530,7 @@ mod tests {
IssueType::Dkg => {
score_results.dkg_scores
},
IssueType::AwaitingProbeResponse => {
IssueType::AeProbeMsg => {
score_results.probe_scores
},
IssueType::Communication => {
Expand All @@ -539,7 +539,7 @@ mod tests {
IssueType::Knowledge => {
score_results.knowledge_scores
},
IssueType::PendingRequestOperation(_) => {
IssueType::RequestOperation(_) => {
score_results.op_scores
},
};
Expand Down Expand Up @@ -838,7 +838,7 @@ mod tests {
IssueType::Communication => {
score_results.communication_scores
},
IssueType::AwaitingProbeResponse => {
IssueType::AeProbeMsg => {
score_results.probe_scores
},
IssueType::Dkg => {
Expand All @@ -847,7 +847,7 @@ mod tests {
IssueType::Knowledge => {
score_results.knowledge_scores
},
IssueType::PendingRequestOperation(_) => {
IssueType::RequestOperation(_) => {
score_results.op_scores
},
};
Expand Down Expand Up @@ -880,8 +880,7 @@ mod ops_tests {
for _ in 0..NORMAL_OPERATIONS_ISSUES {
let op_id = OperationId::random();
pending_operations.push((node, op_id));
dysfunctional_detection
.track_issue(*node, IssueType::PendingRequestOperation(op_id));
dysfunctional_detection.track_issue(*node, IssueType::RequestOperation(op_id));
}
}

Expand All @@ -898,8 +897,7 @@ mod ops_tests {
// adding more issues though, and we should see some dysfunction
for _ in 0..300 {
let op_id = OperationId::random();
dysfunctional_detection
.track_issue(nodes[0], IssueType::PendingRequestOperation(op_id));
dysfunctional_detection.track_issue(nodes[0], IssueType::RequestOperation(op_id));
}

// Now we should start detecting...
Expand Down Expand Up @@ -1031,11 +1029,11 @@ mod knowledge_tests {

// Add just one issue to all, this gets us a baseline avg to not overly skew results
for node in nodes {
dysfunctional_detection.track_issue(node, IssueType::AwaitingProbeResponse);
dysfunctional_detection.track_issue(node, IssueType::AeProbeMsg);
}

// and add one for our "bad" node, too
dysfunctional_detection.track_issue(new_node, IssueType::AwaitingProbeResponse);
dysfunctional_detection.track_issue(new_node, IssueType::AeProbeMsg);

let dysfunctional_nodes = dysfunctional_detection.get_dysfunctional_nodes();

Expand All @@ -1052,7 +1050,7 @@ mod knowledge_tests {

// and add another for our "bad" node, two AeProbes should not be sufficient reason
// to label this as dysfuncitonal
dysfunctional_detection.track_issue(new_node, IssueType::AwaitingProbeResponse);
dysfunctional_detection.track_issue(new_node, IssueType::AeProbeMsg);

let dysfunctional_nodes = dysfunctional_detection.get_dysfunctional_nodes();

Expand All @@ -1069,7 +1067,7 @@ mod knowledge_tests {

// and some more issues for our "bad" node
for _ in 0..4 {
dysfunctional_detection.track_issue(new_node, IssueType::AwaitingProbeResponse);
dysfunctional_detection.track_issue(new_node, IssueType::AeProbeMsg);
}

let dysfunctional_nodes = dysfunctional_detection.get_dysfunctional_nodes();
Expand Down
112 changes: 38 additions & 74 deletions sn_dysfunction/src/lib.rs
Expand Up @@ -101,7 +101,7 @@ impl DysfunctionDetection {
let queue = self.dkg_issues.entry(node_id).or_default();
queue.push_back(Instant::now());
}
IssueType::AwaitingProbeResponse => {
IssueType::AeProbeMsg => {
let queue = self.probe_issues.entry(node_id).or_default();
queue.push_back(Instant::now());
}
Expand All @@ -113,7 +113,7 @@ impl DysfunctionDetection {
let queue = self.knowledge_issues.entry(node_id).or_default();
queue.push_back(Instant::now());
}
IssueType::PendingRequestOperation(op_id) => {
IssueType::RequestOperation(op_id) => {
let queue = self.unfulfilled_ops.entry(node_id).or_default();
trace!("New issue has associated operation ID: {op_id:#?}");
queue.push((op_id, Instant::now()));
Expand Down Expand Up @@ -328,19 +328,15 @@ mod tests {
for node in nodes.iter().take(3) {
dysfunctional_detection.track_issue(*node, IssueType::Communication);
dysfunctional_detection.track_issue(*node, IssueType::Knowledge);
dysfunctional_detection.track_issue(
*node,
IssueType::PendingRequestOperation(OperationId([1; 32])),
);
dysfunctional_detection
.track_issue(*node, IssueType::RequestOperation(OperationId([1; 32])));
}

// Track some issues for nodes that will be retained.
dysfunctional_detection.track_issue(nodes[5], IssueType::Communication);
dysfunctional_detection.track_issue(nodes[6], IssueType::Knowledge);
dysfunctional_detection.track_issue(
nodes[7],
IssueType::PendingRequestOperation(OperationId([1; 32])),
);
dysfunctional_detection
.track_issue(nodes[7], IssueType::RequestOperation(OperationId([1; 32])));

let nodes_to_retain = nodes[5..10].iter().cloned().collect::<BTreeSet<XorName>>();

Expand Down Expand Up @@ -384,10 +380,8 @@ mod tests {
let nodes = (0..10).map(|_| random_xorname()).collect::<Vec<XorName>>();
let mut dysfunctional_detection = DysfunctionDetection::new(nodes.clone());

dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([1; 32])),
);
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([1; 32])));

assert_eq!(dysfunctional_detection.unfulfilled_ops.len(), 1);
assert_eq!(dysfunctional_detection.knowledge_issues.len(), 0);
Expand All @@ -414,18 +408,12 @@ mod tests {
let nodes = (0..10).map(|_| random_xorname()).collect::<Vec<XorName>>();
let mut dysfunctional_detection = DysfunctionDetection::new(nodes.clone());

dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([1; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([2; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([3; 32])),
);
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([1; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([2; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([3; 32])));

let op_ids = dysfunctional_detection.get_unfulfilled_ops(nodes[0]);

Expand All @@ -438,18 +426,12 @@ mod tests {
let nodes = (0..10).map(|_| random_xorname()).collect::<Vec<XorName>>();
let mut dysfunctional_detection = DysfunctionDetection::new(nodes.clone());

dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([1; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([2; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([3; 32])),
);
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([1; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([2; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([3; 32])));

let op_ids = dysfunctional_detection.get_unfulfilled_ops(nodes[1]);

Expand All @@ -463,18 +445,12 @@ mod tests {
let mut dysfunctional_detection = DysfunctionDetection::new(nodes.clone());
let op_id = OperationId([2; 32]);

dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([1; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([2; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([3; 32])),
);
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([1; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([2; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([3; 32])));

let has_removed = dysfunctional_detection.request_operation_fulfilled(&nodes[0], op_id);

Expand All @@ -492,18 +468,12 @@ mod tests {
let mut dysfunctional_detection = DysfunctionDetection::new(nodes.clone());
let op_id = OperationId([2; 32]);

dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([1; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([2; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([3; 32])),
);
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([1; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([2; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([3; 32])));

let has_removed = dysfunctional_detection.request_operation_fulfilled(&nodes[1], op_id);

Expand All @@ -518,18 +488,12 @@ mod tests {
let mut dysfunctional_detection = DysfunctionDetection::new(nodes.clone());
let op_id = OperationId([4; 32]);

dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([1; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([2; 32])),
);
dysfunctional_detection.track_issue(
nodes[0],
IssueType::PendingRequestOperation(OperationId([3; 32])),
);
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([1; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([2; 32])));
dysfunctional_detection
.track_issue(nodes[0], IssueType::RequestOperation(OperationId([3; 32])));

let has_removed = dysfunctional_detection.request_operation_fulfilled(&nodes[1], op_id);

Expand Down
2 changes: 1 addition & 1 deletion sn_node/src/node/data/records.rs
Expand Up @@ -272,7 +272,7 @@ impl MyNode {
return Ok(vec![Cmd::TrackNodeIssueInDysfunction {
name: target.name(),
// TODO: no need for op id tracking here, this can be a simple counter
issue: IssueType::PendingRequestOperation(operation_id),
issue: IssueType::RequestOperation(operation_id),
}]);
}
}?;
Expand Down
6 changes: 2 additions & 4 deletions sn_node/src/node/flow_ctrl/dysfunction.rs
Expand Up @@ -44,11 +44,9 @@ impl FlowCtrl {
DysCmds::UntrackIssue(node, issue) => {
debug!("Attempting to remove {issue:?} from {node:?}");
match issue {
IssueType::AwaitingProbeResponse => {
dysfunction.ae_update_msg_received(&node)
}
IssueType::AeProbeMsg => dysfunction.ae_update_msg_received(&node),
IssueType::Dkg => dysfunction.dkg_ack_fulfilled(&node),
IssueType::PendingRequestOperation(op_id) => {
IssueType::RequestOperation(op_id) => {
let _ = dysfunction.request_operation_fulfilled(&node, op_id);
}
_ => {}
Expand Down

0 comments on commit 489deb5

Please sign in to comment.