-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Combine message box receivers #1738
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going in the direction I was expecting.
// TODO: rename | ||
#[async_trait] | ||
trait Foo<Input> { | ||
async fn a(&mut self) -> Result<Option<Input>, RuntimeRequest>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn a(&mut self) -> Result<Option<Input>, RuntimeRequest>; | |
async fn try_recv(&mut self) -> Result<Option<WrappedInput>, ChannelError>; |
Else how will we propagate any channel specific errors? For those actors not wanting to handle the RuntimeRequest
, the c
variant is already there. So, I don't see any issues in this being wrapped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of this fn is the ergonomics of handling the RuntimeRequests as err. So I think Result<_, Either<ChannelError, RuntimeRequest> might be better.
CombineReceiver never produces Channel errors so I guess there's some message box impl where this would be useful, could you point me to an example where this would be useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I just noticed that the recv
method does not return any ChannelError
but only an Option
. It was only send
that returns ChannelError
. I guess any errors while recv
ing from a channel is implicitly mapped into Option:None
instead of returning that error. So, this signature is fine.
7acabfc
to
25cfaf1
Compare
Robot Results
Passed Tests
|
// TODO: Should be removed | ||
pub async fn recv(&mut self) -> Option<Input> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not remove it now as we have impl<I,O> ReceiveMessages<I> for SimpleMessageBox<I,O>
|
||
pub struct CombinedReceiver<Input> { | ||
name: String, | ||
logging_is_on: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For logging, we need to go away from this logging_is_on
stuff. This has been done in a hurry - my bad.
With this new trait ReceiveMessage
it will nicer to have LoggingReceiver
that owns an inner receiver and logs the messages.
self.logging_is_on = on; | ||
self.receiver.logging_is_on = on; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see no point to have this twice. This has to be refactor anyway.
#[async_trait] | ||
impl ReceiveMessages<ConfigInput> for ConfigManagerMessageBox { | ||
async fn try_recv(&mut self) -> Result<Option<ConfigInput>, RuntimeRequest> { | ||
self.input_receiver.try_recv().await | ||
} | ||
|
||
async fn recv_message(&mut self) -> Option<WrappedInput<ConfigInput>> { | ||
self.input_receiver.recv_message().await | ||
} | ||
|
||
async fn recv(&mut self) -> Option<ConfigInput> { | ||
self.input_receiver.recv().await | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having to repeat ourselves, here, is a sign that this actor should use a SimpleMessageBox
.
Signed-off-by: Sharkey <james.sharkey@softwareag.com>
c85302a
to
d55f262
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved - with some minor remarks and follow up tasks.
// FIXME: If I add the RuntimeRequests here and if the channel we use to send messages is dropped then we will get an ChannelError::SendError | ||
// FIXME: but I don't think we shouldn't return this error if the message box has a shutdown message for us |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not bother too much for that. Definitely not for this example, but even for most actors. That said, handling correct termination during shutdown can be tricky when some actor expect other peers to say up a bit longer.
|
||
/// Returns [Some] message the next time a message is received. Returns [None] if | ||
/// both of the underlying channels are closed. | ||
/// Returning [RuntimeRequest] takes priority over messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment has to be updated as currently returning None on RuntimeRequest.
|
||
async fn recv(&mut self) -> Option<Input> { | ||
self.input_receiver.recv().await.map(|message| { | ||
self.log_input(&message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay for now - as this keeps the current behavior.
However, as a follow up PR, we will need to log whatever the recv method. The simpler will be to use a LoggingReceiver
which wraps a CombinedReceiver
, implements ReceiveMessages
and logs received messages.
let (publish_sender, publish_receiver) = channel(10); | ||
let (signal_sender, signal_receiver) = channel(10); | ||
let input_receiver = CombinedReceiver::new(publish_receiver, signal_receiver); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An improvement could be to add a CombinedSender
type along a chanel()
that create a sender, receiver pair.
This combined sender will have to impl RuntimeRequestSink
to ease the implementation of specific actor builders.
d55f262
to
9e75197
Compare
9e75197
to
e042ad1
Compare
Proposed changes
Types of changes
Paste Link to the issue
Checklist
cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments