Skip to content

Conversation

@rklaehn
Copy link
Collaborator

@rklaehn rklaehn commented Oct 6, 2025

I find these useful quite a few times. They already exist on Sink and Stream of course, but often you don't want to convert your senders and receivers into Sink / Stream and lose the extra fns. Also Sink API is just weird!

In the fns that filter (with_filter, with_filter_map, filter, filter_map) the item is just dropped if it does not match the filter. I think for these channels this is uncontroversial, besides SinkExt does the same. It does not produce an error if you use e.g. with_flat_map to map to an empty stream.

An additional change is that the bounds for crate::channel::mpsc::Sender/Receiver are reduced so you can now use a crate::channel::mpsc::Sender/Receiver as the channel to an actor even for messages that are not serializable.

…and oneshot::Sender

Implement mao, filter_map and filter for mpsc::Receiver

oneshot::Receiver is a future, so you can just do this yourself using FutureExt.
@rklaehn rklaehn requested a review from Frando October 6, 2025 10:19
rklaehn added 10 commits October 6, 2025 15:12
this makes irpc::channel::mpsc::* a generic channel that can also be used for non-serializable msgs.

Especially when mapping you might not want intermediate map results ot be serializable!
it was not exported anyway, so the resulting future was anonymous all th etime.
@rklaehn rklaehn marked this pull request as ready for review October 8, 2025 07:12
@rklaehn
Copy link
Collaborator Author

rklaehn commented Oct 8, 2025

Note: an in memory stream can return the following errors:

    pub enum SendError {
        ReceiverClosed,
        ...
    }

An io based stream can in addition produce the following errors:

        ...
        MaxMessageSizeExceeded,
        Io(io::Error),
    }

We could model this by having a type parameter for streams that tracks if the stream uses io or not, but I am not sure if it is worth it. In any case I think it can wait for another PR.

@rklaehn
Copy link
Collaborator Author

rklaehn commented Oct 8, 2025

Actually I think having recv return a Result<Option, RecvError> is wrong. We got the case RecvError::SenderClosed in 2 cases now - as error and as returning None.

We should modify this so that RecvError for mpsc has just the true error cases, since stream closed is already modeled as returning None.

src/lib.rs Outdated
/// Applies a filter before sending.
///
/// Messages that don't pass the filter are dropped.
pub fn with_filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Sender<T>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not super convinced that these are useful on the oneshot sender? but maybe they are :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 just did it for completeness.

src/lib.rs Outdated
where
F: Fn(U) -> Option<T> + Send + Sync + 'static,
U: Send + Sync + 'static,
T: Send + Sync + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move those bounds to the top level (i.e. have T: Send + Sync + 'static on the top level impl block). But don't mind much either way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do another "trait alias" trick where you have

trait SendSyncStatic: Send + Sync + 'static {}

impl<T: Send + Sync + 'static> SendSyncStatic for T;

But these bounds are fine even for wasm, so probably best to live with the repetition I think.

src/lib.rs Outdated
pub fn map<U, F>(self, f: F) -> Receiver<U>
where
F: Fn(T) -> U + Send + Sync + 'static,
T: Send + Sync + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can drop Send + 'static bounds, are already on top level impl block

fn recv(
&mut self,
) -> Pin<
Box<
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: that's a long type, maybe we add a BoxedSyncFuture type alias and use in all DynReceiver uses? But this is unrelated to this PR so can also come sometime later.

Copy link
Collaborator Author

@rklaehn rklaehn Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use n0_future::boxed::BoxFuture if it weren't for the + '_. So maybe n0_future could contain such a type alias.

Copy link
Member

@Frando Frando left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Some nits in comments.

@rklaehn rklaehn merged commit 6c74d27 into main Oct 8, 2025
16 checks passed
@rklaehn rklaehn deleted the filter_map branch October 8, 2025 10:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants