Skip to content

Conversation

benesch
Copy link
Contributor

@benesch benesch commented Feb 2, 2020

@sfackler not sure if there are additional complexities here, but this callback is sufficient for my notification-receiving needs with the sync client. Consider this more of a proposal than a ready-to-merge PR!


Expose a Config::connect_with method in the sync API that allows the
user to provide a callback function that is invoked for every
asynchronous message or error received by the underlying connection.

Fix #404.

@benesch benesch requested a review from sfackler February 2, 2020 06:33
@benesch benesch force-pushed the sync-notifications branch from 332e1af to 8f666a1 Compare February 2, 2020 06:38
///
/// `notify_fn` is invoked from an asynchronous runtime and therefore must
/// not block.
pub fn connect_with<T, F>(&self, tls: T, mut notify_fn: F) -> Result<Client, Error>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a separate method, I think it would make more sense to store the callback in the config with a setter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you propose to store the closure in the config? I see two options:

  • An fn (AsyncFunction) function pointer. This is almost certainly too limiting since you can't pass a closure that captures any state. (It's definitely too limiting for my uses.)
  • A Box<dyn FnMut(AsyncMessage) + Clone + Send + 'static>. This doesn't work as written, of course, because Clone isn't an auto trait, but I think you could make it work by creating a child trait that inherits from Clone and FnMut(AsyncMessage)? (Clone is necessary to preserve the Cloneness of Config). This is also potentially limiting, if the closure wants to capture a non-Cloneable type.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go with an Arc<dyn Fn(AsyncMessage) + Sync + Send>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forcing a Sync bound on the closure is tough, though. For example it precludes the test that I wrote which sends the messages over an MPSC (which I imagine is what 90% of these implementations will do) because an MPSC transmitter is not Sync. And putting an MPSC transmitter in a Mutex feels really weird, especially because we know that each closure is only going to be invoked by one thread.

I've pushed up an implementation that instead uses a Box<NotificationCallback>, where NotificationCallback is a custom private trait that's implemented for any FnMut(AsyncMessage) + Send + Clone. From the user's perspective the API is

    pub fn notification_callback<F>(&mut self, f: F) -> &mut Config
    where
        F: Fn(AsyncMessage) + Clone + Send + 'static,
    {

which feels quite nice. A bit of ugly boilerplate internally though.

@benesch benesch force-pushed the sync-notifications branch 2 times, most recently from 8dbcb52 to 603c408 Compare February 20, 2020 17:29
Expose a Config::connect_with method in the sync API that allows the
user to provide a callback function that is invoked for every
asynchronous message or error received by the underlying connection.

Fix sfackler#404.
@benesch
Copy link
Contributor Author

benesch commented Mar 17, 2020

@sfackler just wanted to make sure you hadn't missed the latest updates here! No worries if you're busy.

pub struct Config {
config: tokio_postgres::Config,
notification_callback: Box<dyn NotificationCallback>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be able to store an Arc<dyn FnMut(AsyncMessage) + Send>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that. Sadly an Arc<dyn FnMut(AsyncMessage) + Send> does not itself implement Send, as Arc propagates both Send + Sync iff its contents are Send + Sync. And without the Send bound you can't call the callback from a futures context.

I don't see a good way to do this besides this Box<dyn NotificationCallback> or the separate connect_with method I proposed originally.

My only requirement is to be able to easily use an MPSC transmitter from the notification callback. I don't have particular opinions on how the API should look, so if it'd be easier for you to just hammer on this PR to get into a mergeable shape, please feel free to do so!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right yeah.

@sfackler
Copy link
Owner

Thinking about this more, I wonder if this is actually the right approach for the blocking interface. One problem here is that notifications are only processed while the connection is being actively used (that's not the case in tokio-postgres since the spawned connection will generally always be running).

I think it might actually make some more sense to go back to the API that the old version of the crate had, with an explicit iterator over notifications: https://docs.rs/postgres/0.15.2/postgres/notification/struct.Notifications.html

@benesch
Copy link
Contributor Author

benesch commented Mar 18, 2020

One problem here is that notifications are only processed while the connection is being actively used (that's not the case in tokio-postgres since the spawned connection will generally always be running).

Can you say more about what you mean? By my read of this code

runtime.spawn(connection.for_each(move |r| {
match r {
Ok(m) => notification_callback(m),
Err(e) => error!("postgres connection error: {}", e),
}
future::ready(())
}));

the connection future is going to be polled automatically and indefinitely by the Tokio runtime, and so the callback will get invoked even if the connection is not seeing active use?

@sfackler
Copy link
Owner

The runtime is single threaded - nothing is happening outside of the calls to block_on.

@benesch
Copy link
Contributor Author

benesch commented Mar 21, 2020

Ah, thanks, that's the piece I was missing. Hmm.

@sfackler
Copy link
Owner

I opened up a PR with the alternate implementation: #588.

@benesch benesch closed this Mar 23, 2020
@benesch benesch deleted the sync-notifications branch March 23, 2020 01:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

How should notifications be handled in the sync API?
2 participants