Conversation
The previous implementation did not ensured that all messages arrived to subscribers. Now message send to the output port are guarenteed to be delivered.
|
Given So I'd request that we add a feature that toggles on these changes, and that can control which functionality would run. |
| } else { | ||
| subscribers.push((subscriber.id(), subscriber)); | ||
| } | ||
| } //OutportMessage::RemoveSubscriber(id) => { |
There was a problem hiding this comment.
With this implementation it is possible to unsubscribe and to get a list of subscriber to a port. We plan to use it soon. I remove it for now, and I will propose
| } else { | ||
| subscribers.push((subscriber.id(), subscriber)); | ||
| } | ||
| } //OutportMessage::RemoveSubscriber(id) => { |
There was a problem hiding this comment.
why is all this commented logic here?
There was a problem hiding this comment.
This implementation open the door to knew features I have implemented and that I plan to use. I remove it for now and I will propose a new pull request for their addition latter.
|
I revise everything next week!
Le mer. 30 juil. 2025 à 21:39, Sean Lawlor ***@***.***> a
écrit :
… ***@***.**** requested changes on this pull request.
------------------------------
In ractor/src/port/output.rs
<#369 (comment)>:
> + let subscriber = opt_subscriber.take().unwrap();
+
+ // We ensure there is no duplicate subscription
+ if !allow_duplicate_subscription {
+ if let Some((_, prev_subscriber)) =
+ subscribers.iter_mut().find(|(id, _)| id == &sid)
+ {
+ // In case of duplication, previous subscription is overrided
+ *prev_subscriber = subscriber;
+ } else {
+ subscribers.push((subscriber.id(), subscriber));
+ }
+ } else {
+ subscribers.push((subscriber.id(), subscriber));
+ }
+ } //OutportMessage::RemoveSubscriber(id) => {
is this needed?
------------------------------
In ractor/src/port/output.rs
<#369 (comment)>:
> + let sid = subscriber.id();
+
+ // We ensure there is no duplicate subscription
+ if !allow_duplicate_subscription {
+ if let Some((_, prev_subscriber)) =
+ subscribers.iter_mut().find(|(id, _)| id == &sid)
+ {
+ // In case of duplication, previous subscription is overrided
+ *prev_subscriber = subscriber;
+ } else {
+ subscribers.push((subscriber.id(), subscriber));
+ }
+ } else {
+ subscribers.push((subscriber.id(), subscriber));
+ }
+ } //OutportMessage::RemoveSubscriber(id) => {
why is all this commented logic here?
—
Reply to this email directly, view it on GitHub
<#369 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACZETT3TI2HT4EXD2A3M2WT3LENOHAVCNFSM6AAAAACB2X3726VHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZTANZTGA3TSMJTGE>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
…ntation by default, the new is accessible with a feature
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #369 +/- ##
=======================================
Coverage 82.48% 82.48%
=======================================
Files 71 71
Lines 12919 12919
=======================================
Hits 10656 10656
Misses 2263 2263 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
The failing test is unrelated to output ports, it is due to a race condition in the implementation of pg. |
slawlor
left a comment
There was a problem hiding this comment.
Thanks for adding the gate, the approach seems reasonable, and when there's more opt-in validation in prod, we can probably swap out the underlying implementations
| continue 'subs; | ||
| } else { | ||
| coop_count = coop_count.wrapping_add(1); | ||
| #[cfg(feature = "tokio_runtime")] |
There was a problem hiding this comment.
what happens here in non-tokio environments?
* OuputPort use a single task to dispatch messages The previous implementation did not ensured that all messages arrived to subscribers. Now message send to the output port are guarenteed to be delivered. * bench OutputPort * change outport bench msg and receiver numbers * fix unused warning * fix new output port implementation to suport async-std * reduce outport message dispatch time by 60%! * removed extra ouput port feature, provides previous ouputport implementation by default, the new is accessible with a feature * update ci to test feature output-port-v2 * change clippy lint name * added feature documentation
With the current implementation of OutputPort, if an actor send more than 10 messages on an output port before the runtime has the opportunity to preempt it, only the 10 last messages may be received by subscribers.
Moreover, it is not guaranteed that those last 10 messages will be received either.
This new implementation ensures that all messages are sent and will be received by subscribers.
It also open the opportunity to implement more advanced subscriber strategy, to remove subscriber, to ensure that subscription is not duplicated and to clone the OutputPort. For now this pull request only provide the insurance that all messages will be delivered, and does not add anymore feature.