Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable concurrent processing of incoming and outgoing MQTT messages #2327

Merged
merged 2 commits into from
Oct 11, 2023

Conversation

jarhodes314
Copy link
Contributor

Proposed changes

If tedge-mapper is overwhelmed with messages, this causes a deadlock, preventing any further processing of MQTT messages until the mapper is restarted. The root cause of the deadlock is detailed in #2326 (comment), but the pertinent information here is that the MQTT actor blocks itself, waiting on outgoing messages to be processed before it can process incoming messages, but stuck in the middle of processing an incoming message.

This PR spawns a task to process outgoing messages, which allows the mapper to process incoming and outgoing messages concurrently. This avoids the deadlock scenario that is the root cause of the bug.

Running this change locally, I can reliably send 20,000 messages to the mapper running on my laptop using the simulate.py script (see the issue description) without dropping any messages (previously sending just 500 messages would reliably fail). Sending significantly more (e.g. 100,000 messages) will cause messages to be dropped, but the mapper does continue to run as normal once the pressure is relieved.

I haven't added any tests for this change. Verifying we don't deadlock depends on knowing the capacity of the machine we're running the tests on (and also involves hammering Cumulocity to some extent). But the existing tests should verify that the changes don't break anything (given all that's changed is where existing code is run, I would expect any bugs with the change to be really obvious).

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

#2326

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s)
  • I ran cargo fmt as mentioned in CODING_GUIDELINES
  • I used cargo clippy as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

@codecov
Copy link

codecov bot commented Oct 9, 2023

Codecov Report

Merging #2327 (7509cbf) into main (db3f16d) will decrease coverage by 0.1%.
The diff coverage is 74.3%.

Additional details and impacted files
Files Coverage Δ
crates/common/tedge_utils/src/futures.rs 60.0% <60.0%> (ø)
crates/extensions/c8y_mapper_ext/src/actor.rs 75.0% <0.0%> (-0.7%) ⬇️
crates/extensions/tedge_mqtt_ext/src/lib.rs 71.6% <81.2%> (+2.0%) ⬆️

... and 5 files with indirect coverage changes

@github-actions
Copy link
Contributor

github-actions bot commented Oct 9, 2023

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
318 0 3 318 100 53m7.875s

@reubenmiller
Copy link
Contributor

@jarhodes314 Running the same benchmarks with this PR fixes the bug. I'll spend a bit of time to see if I can add reasonable system test to the PR, though as you already mentioned, it might be tricky to "stable" benchmark that will run on every machine.

@reubenmiller reubenmiller temporarily deployed to Test Pull Request October 10, 2023 03:11 — with GitHub Actions Inactive
Copy link
Contributor

@albinsuresh albinsuresh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposal to asynchronously process the MQTT messages make sense. My only concern is about the relevance of the MqttMessageBox struct having both the send and recv aspects together, as the new design is using those aspects separately from two different contexts anway. So, instead an MqttMessageBox, the actor could just keep one SimpleMessageBox to receive the peer messages and another wrapper over the peer_senders, with that specialized send() method in the existing MqttMessageBox impl, as part of its state. But, that refactoring feels a bit out of scope for this PR, hence approving. I've left some minor suggestions/queries though.

crates/extensions/c8y_mapper_ext/src/actor.rs Show resolved Hide resolved
crates/extensions/c8y_mapper_ext/src/actor.rs Show resolved Hide resolved
crates/extensions/tedge_mqtt_ext/src/lib.rs Outdated Show resolved Hide resolved
crates/extensions/tedge_mqtt_ext/src/lib.rs Outdated Show resolved Hide resolved
crates/extensions/tedge_mqtt_ext/src/lib.rs Show resolved Hide resolved
@jarhodes314
Copy link
Contributor Author

The proposal to asynchronously process the MQTT messages make sense. My only concern is about the relevance of the MqttMessageBox struct having both the send and recv aspects together, as the new design is using those aspects separately from two different contexts anway. So, instead an MqttMessageBox, the actor could just keep one SimpleMessageBox to receive the peer messages and another wrapper over the peer_senders, with that specialized send() method in the existing MqttMessageBox impl, as part of its state. But, that refactoring feels a bit out of scope for this PR, hence approving. I've left some minor suggestions/queries though.

I agree that that refactoring away MqttMessageBox entirely is too complicated to be worth blocking the fix for this. I do wonder whether instead the internal use of MqttMessageBox could be refactored so we can destructure it into incoming and outgoing parts before we start sending and receiving messages as neither part needs to care about the other.

@jarhodes314 jarhodes314 temporarily deployed to Test Pull Request October 10, 2023 07:57 — with GitHub Actions Inactive
@jarhodes314
Copy link
Contributor Author

I've now done an internal refactoring that includes splitting the MqttMessageBox into an InternalInbox and an InternalOutbox allowing the two to be processed completely separately, avoiding the need to spawn a task.

@reubenmiller reubenmiller temporarily deployed to Test Pull Request October 11, 2023 01:05 — with GitHub Actions Inactive
@reubenmiller reubenmiller temporarily deployed to Test Pull Request October 11, 2023 02:58 — with GitHub Actions Inactive
@reubenmiller reubenmiller temporarily deployed to Test Pull Request October 11, 2023 03:19 — with GitHub Actions Inactive
@reubenmiller reubenmiller temporarily deployed to Test Pull Request October 11, 2023 04:36 — with GitHub Actions Inactive
Copy link
Contributor

@reubenmiller reubenmiller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved. I ran the new benchmark system tests 5 times and they seem to be ok.

crates/extensions/tedge_mqtt_ext/src/lib.rs Outdated Show resolved Hide resolved
crates/extensions/tedge_mqtt_ext/src/lib.rs Outdated Show resolved Hide resolved
crates/extensions/tedge_mqtt_ext/src/lib.rs Outdated Show resolved Hide resolved
crates/extensions/tedge_mqtt_ext/src/lib.rs Outdated Show resolved Hide resolved
crates/extensions/tedge_mqtt_ext/src/lib.rs Outdated Show resolved Hide resolved
crates/extensions/tedge_mqtt_ext/src/lib.rs Outdated Show resolved Hide resolved
@jarhodes314 jarhodes314 changed the title Spawn a task to enable concurrent processing of incoming and outgoing MQTT messages Enable concurrent processing of incoming and outgoing MQTT messages Oct 11, 2023
jarhodes314 and others added 2 commits October 11, 2023 09:53
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: Reuben Miller <reuben.d.miller@gmail.com>
@jarhodes314 jarhodes314 temporarily deployed to Test Pull Request October 11, 2023 09:10 — with GitHub Actions Inactive
@@ -126,6 +126,14 @@ pub struct MqttMessageBox {
peer_senders: Vec<(TopicFilter, LoggingSender<MqttMessage>)>,
}

pub struct FromPeers<'a> {
input_receiver: &'a mut LoggingReceiver<MqttMessage>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hope we can keep owned copies of these once run starts taking self instead &mut self.

}

pub struct ToPeers<'a> {
peer_senders: &'a mut Vec<(TopicFilter, LoggingSender<MqttMessage>)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have just cloned that Vec to avoid these lifetime parameters, but its' okay, as it is consistent with its counterpart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the lifetimes are that complicated, and I'll modify the run(self) PR to remove these once this is merged as they won't be necessary after that.

crates/extensions/tedge_mqtt_ext/src/lib.rs Show resolved Hide resolved
crates/extensions/tedge_mqtt_ext/src/lib.rs Show resolved Hide resolved
@jarhodes314 jarhodes314 merged commit d1430e9 into thin-edge:main Oct 11, 2023
18 checks passed
@@ -137,6 +145,43 @@ impl MqttMessageBox {
}
}

fn split(&mut self) -> (FromPeers<'_>, ToPeers<'_>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This split method as well as the whole struct MqttMessageBox can be removed, as they are used only by the MqttActor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants