Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow running SharedConnection with any other runtime #229

Merged
merged 3 commits into from
Aug 15, 2019

Conversation

hadashiA
Copy link
Contributor

@hadashiA hadashiA commented Aug 15, 2019

Currently, SharedConnection only works on tokio 0.1 runtime.
Because it depends on tokio_executor::spawn (with tokio 0.1.x).
( https://github.com/mitsuhiko/redis-rs/blob/master/src/aio.rs#L437

I want to use SharedConnection with async/await in tokio 0.3.

In this PR, for that purpose, make it can be specified Executor from outside without depending on a specific Executor.
( This way is also adopted for hyper , rusoto, and more.

For example, 0.3 Executor will work with 0.1 adapter as below:

let client = redis::Client::open("redis://127.0.0.1:6379").unwrap();
let executor = DefaultExecutor::current(); // This is tokio 0.3 Executor
let executor = Executor03As01::new(DefaultExecutor::current()); // This is tokio 0.1 compatible  Executor
let conn = client.get_shared_async_connection_with_executor(executor).compat().await?; // Works with await 

Compatibility layers is:

use std::sync::Mutex;
use futures01::future::{Executor as Executor01, ExecuteError as ExecuteError01};
use futures01::Future as Future01;
use futures::compat::Future01CompatExt;
use futures::FutureExt;
use tokio_executor::Executor;

pub struct Executor03As01<Ex> {
    inner: Mutex<Ex>,
}

impl<Ex> Executor03As01<Ex> {
    pub fn new(inner: Ex) -> Self {
        Executor03As01 { inner: Mutex::new(inner) }
    }
}

impl<Ex, Fut> Executor01<Fut> for Executor03As01<Ex>
    where
        Ex: Executor,
        Fut: Future01<Item = (), Error = ()> + Send + 'static,
{
    fn execute(&self, future: Fut) -> Result<(), ExecuteError01<Fut>> {
        let mut future = future.compat().map(|_| ());
        let mut pin = Box::pin(future);

        let mut guard = self.inner.lock().unwrap();
        (&mut guard).spawn(pin).expect("unable to spawn future from Compat executor");
        Ok(())
    }
}

Copy link
Collaborator

@Marwes Marwes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Might be a good idea to make tokio_executor an optional (but default dependency) as well?

@hadashiA
Copy link
Contributor Author

Oh, that makes sense. I agree.
I changed tokio-executor to optional.

@Marwes
Copy link
Collaborator

Marwes commented Aug 15, 2019

Hmm, we also depend on tokio-tcp/tokio-uds 0.1. Isn't that also an issue for you? The connection should really be parameterized by R: AsyncRead + AsyncWrite, though those would have versioning issues as well atm.

@hadashiA
Copy link
Contributor Author

hadashiA commented Aug 15, 2019

Hmm, we also depend on tokio-tcp/tokio-uds 0.1. Isn't that also an issue for you?

No. In my project, the problem now is that the redis-rs require different runtime.

For me, mixing tokio 0.1 and tokio 0.3 is not a problem.

Because futures 0.1 and futures 0.3 can be used together.
Compatibility Layer | Futures-rs

Therefore, even if redis-rs depends on tokio-tcp / uds 0.1.x, it can be run on the tokio 0.3 runtime.

The only incompatibility is the runtime that is running in the application.
I think that the new version of tokio-tcp support is enough even after waiting to become stable. 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants