Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making the library executor agnostic (making Tokio optional) #280

Closed
DavidBM opened this issue Feb 1, 2020 · 23 comments
Closed

Making the library executor agnostic (making Tokio optional) #280

DavidBM opened this issue Feb 1, 2020 · 23 comments

Comments

@DavidBM
Copy link
Contributor

DavidBM commented Feb 1, 2020

Hi!

I was developing a library using this one and I found out that the futures returned by redis-rs required tokio to be executed.

As example, this test in my own library https://github.com/Couragium/rsmq_async :

use rsmq_async::{Rsmq, RsmqError};
use async_std::task;

#[test]
fn it_works() -> Result<(), RsmqError> {
    task::block_on(async {
        let mut rsmq = Rsmq::new(Default::default()).await?;
        
        let message = rsmq.receive_message_async("myqueue", None).await?;
        
        rsmq.delete_message_async("myqueue", &message.id).await?;

        Ok(())
    })
}

I get this on cargo test:

---- it_works stdout ----
thread 'it_works' panicked at 'there is no reactor running, must be called from the context of Tokio runtime', src/libcore/option.rs:1188:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.

I'm not very into how futures work internally, but, is there any way for this library to return a future that doesn't require an specific executor?

Edit: Btw, great work with this library!

@mezzato
Copy link

mezzato commented Feb 2, 2020

Hi, I have bumped into the same issue as well. Apparently release 0.13.0 was executor-agnostic, is this a breaking change? This requirement is not stated in the latest release notes.
Thanks

@Marwes
Copy link
Collaborator

Marwes commented Feb 2, 2020

redis is currently executor agnostic, but not reactor agnostic. So you can supply your own spawn function but the IO handling only supports tokio atm. Some refactoring in could help make both async-std and tokio work #244

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 2, 2020

I would like to help with that if you like, but I would need some initial description of what to change. If you think I can help, let me know.

@Marwes
Copy link
Collaborator

Marwes commented Feb 2, 2020

Since there isn't a good AsyncRead/AsyncWrite abstraction that works for both tokio and async-std you will want to either provide wrappers which lets async-std TcpStream/UnixStream be used as a tokio::io::AsyncRead/AsyncWrite (easier) or provide our own internal trait which abstracts over either async-std or tokio (harder, not sure how it would look like).

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 2, 2020

Ok, thanks! Then, for what I understand:

  1. Implement the trait tokio_io::AsyncRead for async_std::net::TcpStream. Given that async_std::net::TcpStream implements async_std::io::Read that is a reexport of futures::io::AsyncRead, meaning that they both have a method called poll_read that behaves the same in both sides.
  2. Same for write but with 3 methods: poll_write, poll_flush and poll_shutdown (which is poll_close in async_std).

With these 2, TcpStream/UnixStream would be possible to be given when the AsyncRead/AsyncWrite trait is required.

Is my understanding correct?

@Marwes
Copy link
Collaborator

Marwes commented Feb 2, 2020

Yes, you will need newtype wrappers and implement it on that but otherwise that is the gist of it.

@Marwes
Copy link
Collaborator

Marwes commented Feb 2, 2020

To be clear, the reason that we need to go through tokio is that we use tokio's Codec type and there are some optimizations that would be lost of we try to go from tokio to futures::io. Both of these are surmountable problems, but they are harder than taking the shortcut through tokio

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 2, 2020

Would it be possible to make these optimizations optional? Would it be very complex?

@Marwes
Copy link
Collaborator

Marwes commented Feb 2, 2020

Yes and maybe. I don't have an need for async-std support myself so I haven't given it any deeper thought than figuring out a way to support it.

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 2, 2020

I'm looking at the code in https://github.com/mitsuhiko/redis-rs/blob/master/src/aio.rs and I can see that there is a type that abstract the connection type called ActualConnection. This is the type that implements the AsyncWrite/AsyncRead/AsyncBufRead. I was thinking in extending this type to something like this

enum ActualConnection {
    TcpTokio(Buffered<TcpStream>),
    #[cfg(unix)]
    UnixTokio(Buffered<UnixStream>),
    TcpAsyncStd(Buffered<async_std::net::TcpStream>),
    #[cfg(unix)]
    UnixAsyncStd(Buffered<async_std::net::UnixStream>),
}

And then make the correct trait implementation over each one.

What I don't have clear is how that would allow the futures to use the correct type depending of the reactor. I'm having some problems wrapping my head around this.

Even if I make a wrapping type that implements Tokio traits for async_std's TcpStream, how is the code going to choose the correct implementation? A feature flag maybe can help (?)

@Marwes
Copy link
Collaborator

Marwes commented Feb 2, 2020

What I don't have clear is how that would allow the futures to use the correct type depending of the reactor. I'm having some problems wrapping my head around this.

I would change https://docs.rs/redis/0.15.1/redis/struct.Client.html#method.get_async_connection into get_tokio_connection and add a get_async_std_connection.

Even if I make a wrapping type that implements Tokio traits for async_std's TcpStream, how is the code going to choose the correct implementation? A feature flag maybe can help (?)

Feature flags would be useful to enable/disable one async implementation or the other but it shouldn't be need otherwise (see above). For instance, tokio support can be disabled with #272

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 2, 2020

oook, that is more clear now, thank you. Then, if I do what I described in #280 (comment) and I implement 2 methods, the developer will be the one responsible of knowing what method to use (tokio/asynct_std).

How would it be in the case of libraries using radis-rs under the hood? I guess they will need to push the decision to the developer as well, as they cannot make assumptions.

Moreover, splitting that method would be a breaking change. If you are ok with that, I will start.

@Marwes
Copy link
Collaborator

Marwes commented Feb 2, 2020

How would it be in the case of libraries using radis-rs under the hood?

Yeah, there might be a better API if instead the caller supplied the TcpStream/UnixStream instance but I believe that is something for another time.

#272 forces a breaking change anyway so 🤷‍♂

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 2, 2020

Do you want my branch to be forked form Marwes:combine-4 or from master?

@Marwes
Copy link
Collaborator

Marwes commented Feb 2, 2020

Might be better to use combine-4 as it touches part of what you need to change. It shouldn't change really, just waiting in review.

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 2, 2020

Ok. Then I will try to work on this this week. :)

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 4, 2020

I was playing with the code a bit and seems that async_std doesn't allow to compose BufferRead and BufferWrite together.

I see the type type Buffered<T> = BufReader<BufWriter<T>>; using Tokio BufReader/Writer and I was trying to create the same type for async_std. I get the error:

the trait `futures_io::if_std::AsyncRead` is not implemented for `async_std::io::buf_writer::BufWriter<aio::AsyncStdTcpStream>`

Any idea of how can I proceed here? I can remove the Buffers for now until we find some solution.

@Marwes
Copy link
Collaborator

Marwes commented Feb 4, 2020

Conicidentally I pushed some changes to the combine-4 branch which removes the BufReader + BufWriter #272 :) (reader is redundant and if we do our own buffering the writer is a bit faster).

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 5, 2020

Ok!. I will rebase and continue from there :)

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 5, 2020

I think I have something. I'm trying to test it but I have this error in the latest commit of the combine-4 branch

 ~  repos  redis-rs_fork  combine-4  $  make test
====================================================================
Testing Connection Type TCP without features
====================================================================
   Compiling futures-core v0.3.2
   Compiling futures-io v0.3.2
   Compiling redis v0.15.2-alpha.0 (/home/david/repos/redis-rs_fork)
   Compiling futures-channel v0.3.2
error[E0425]: cannot find function `encode_pipeline` in this scope
   --> src/cmd.rs:543:9
    |
170 | / fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
171 | |     let cmds_len = cmds.iter().map(cmd_len).sum();
172 | |
173 | |     if atomic {
...   |
189 | |     }
190 | | }
    | |_- similarly named function `write_pipeline` defined here
...
543 |           encode_pipeline(&self.commands, self.transaction_mode)
    |           ^^^^^^^^^^^^^^^ help: a function with a similar name exists: `write_pipeline`

error[E0425]: cannot find function `encode_pipeline` in this scope
   --> src/cmd.rs:552:14
    |
170 | / fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
171 | |     let cmds_len = cmds.iter().map(cmd_len).sum();
172 | |
173 | |     if atomic {
...   |
189 | |     }
190 | | }
    | |_- similarly named function `write_pipeline` defined here
...
552 |               &encode_pipeline(&self.commands, false),
    |                ^^^^^^^^^^^^^^^ help: a function with a similar name exists: `write_pipeline`

error[E0425]: cannot find function `encode_pipeline` in this scope
   --> src/cmd.rs:560:14
    |
170 | / fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
171 | |     let cmds_len = cmds.iter().map(cmd_len).sum();
172 | |
173 | |     if atomic {
...   |
189 | |     }
190 | | }
    | |_- similarly named function `write_pipeline` defined here
...
560 |               &encode_pipeline(&self.commands, true),
    |                ^^^^^^^^^^^^^^^ help: a function with a similar name exists: `write_pipeline`

   Compiling futures-util v0.3.2
error: aborting due to 3 previous errors

For more information about this error, try `rustc --explain E0425`.
error: could not compile `redis`.
warning: build failed, waiting for other jobs to finish...
error: build failed
make: *** [Makefile:8: test] Error 101

@Marwes
Copy link
Collaborator

Marwes commented Feb 5, 2020

Pushed a fix (removed a rogue #[cfg])

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 5, 2020

There you go #281 :)

@DavidBM
Copy link
Contributor Author

DavidBM commented Feb 14, 2020

I was playing on my branch to to merge both async test files in one. I have everything working except a test that is hanging forever "dont_panic_on_closed_multiplexed_connection".
You can see the changes here: DavidBM#2 . Do you know why the test is not finishing? @Marwes

In any case, this is independent of the Pull Request, as in there everything is working.

@badboy badboy closed this as completed Mar 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants