Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do I use a remote reactor? #80

Closed
rushmorem opened this issue Oct 25, 2016 · 9 comments
Closed

How do I use a remote reactor? #80

rushmorem opened this issue Oct 25, 2016 · 9 comments

Comments

@rushmorem
Copy link

rushmorem commented Oct 25, 2016

I'm working on a library in which I need to use tokio_core::reactor::Remote instead of tokio_core::reactor::Core. Unfortunately, all the examples I can find are using tokio_core::reactor::Core. @carllerche was kind enough to point me in the right direction in #75, so I now have a good idea how this works.

I tried to reproduce the tutorial using Remote instead of Core. Here is the code I came up with so far:-

extern crate futures;
extern crate tokio_core;
extern crate tokio_tls;

use std::net::ToSocketAddrs;
use std::net::TcpStream as StdTcpStream;

use futures::Future;
use tokio_core::reactor::{Core, Remote};
use tokio_core::net::TcpStream;
use tokio_tls::ClientContext;

fn main() {
    let remote = Core::new().unwrap().remote();

    let (tx, rx) = futures::oneshot();

    let stream = StdTcpStream::connect("www.rust-lang.org:443").unwrap();

    remote.spawn(|handle| {
        let addr = stream.peer_addr().unwrap();
        let socket = TcpStream::connect_stream(stream, &addr, handle);
        tx.complete(socket);
        Ok(())
    });

    let tls_handshake = rx.and_then(|socket| {
        let cx = ClientContext::new().unwrap();
        cx.handshake("www.rust-lang.org", socket)
    });

    let request = tls_handshake.and_then(|socket| {
        tokio_core::io::write_all(socket, "\
                GET / HTTP/1.0\r\n\
                Host: www.rust-lang.org\r\n\
                \r\n\
            ".as_bytes())
    });

    let response = request.and_then(|(socket, _)| {
        tokio_core::io::read_to_end(socket, Vec::new())
    });

    let (_, res) = response.wait().unwrap();
    println!("{}", String::from_utf8_lossy(&res));
}

However, this errors out. The first error being:-

error[E0277]: the trait bound `Box<futures::Future<Item=tokio_core::net::TcpStream, Error=std::io::Error> + Send>: tokio_core::io::Io` is not satisfied
  --> src/main.rs:29:12
   |
29 |         cx.handshake("www.rust-lang.org", socket)
   |            ^^^^^^^^^

I have tried calling as_ref() (among other things) to get out of the box on the socket on that line 29 which has the first error. How do I get this to work?

@alexcrichton
Copy link
Contributor

I think you'll want to change the return value of spawn slightly. The TcpStream::connect_stream function returns a future rather than a value, so you'll want to wait for that to finish before calling tx.complete.

If you do that, does it work out?

@rushmorem
Copy link
Author

You mean by replacing

        let socket = TcpStream::connect_stream(stream, &addr, handle);
        tx.complete(socket);

with

        TcpStream::connect_stream(stream, &addr, handle).and_then(|socket| {
            tx.complete(socket);
            Ok(())
        });

Like this?

extern crate futures;
extern crate tokio_core;
extern crate tokio_tls;

use std::net::ToSocketAddrs;
use std::net::TcpStream as StdTcpStream;

use futures::Future;
use tokio_core::reactor::{Core, Remote};
use tokio_core::net::TcpStream;
use tokio_tls::ClientContext;

fn main() {
    let remote = Core::new().unwrap().remote();

    let (tx, rx) = futures::oneshot();

    let stream = StdTcpStream::connect("www.rust-lang.org:443").unwrap();

    remote.spawn(|handle| {
        let addr = stream.peer_addr().unwrap();
        TcpStream::connect_stream(stream, &addr, handle).and_then(|socket| {
            tx.complete(socket);
            Ok(())
        });
        Ok(())
    });

    let tls_handshake = rx.and_then(|socket| {
        let cx = ClientContext::new().unwrap();
        cx.handshake("www.rust-lang.org", socket)
    });

    let request = tls_handshake.and_then(|socket| {
        tokio_core::io::write_all(socket, "\
                GET / HTTP/1.0\r\n\
                Host: www.rust-lang.org\r\n\
                \r\n\
            ".as_bytes())
    });

    let response = request.and_then(|(socket, _)| {
        tokio_core::io::read_to_end(socket, Vec::new())
    });

    let (_, res) = response.wait().unwrap();
    println!("{}", String::from_utf8_lossy(&res));
}

This gives me:-

error[E0271]: type mismatch resolving `<tokio_tls::ClientHandshake<tokio_core::net::TcpStream> as futures::IntoFuture>::Error == futures::Canceled`
  --> src/main.rs:29:28
   |
29 |     let tls_handshake = rx.and_then(|socket| {
   |                            ^^^^^^^^ expected struct `std::io::Error`, found struct `futures::Canceled`
   |
   = note: expected type `std::io::Error`
   = note:    found type `futures::Canceled`

error: no method named `and_then` found for type `futures::AndThen<futures::Oneshot<tokio_core::net::TcpStream>, tokio_tls::ClientHandshake<tokio_core::net::TcpStream>, [closure@src/main.rs:29:37: 32:6]>` in the current scope
  --> src/main.rs:34:33
   |
34 |     let request = tls_handshake.and_then(|socket| {
   |                                 ^^^^^^^^
   |
   = note: the method `and_then` exists but the following trait bounds were not satisfied: `futures::AndThen<futures::Oneshot<tokio_core::net::TcpStream>, tokio_tls::ClientHandshake<tokio_core::net::TcpStream>, [closure@src/main.rs:29:37: 32:6]> : futures::Future`

error[E0277]: the trait bound `[u8]: std::marker::Sized` is not satisfied
  --> src/main.rs:46:13
   |
46 |     let (_, res) = response.wait().unwrap();
   |             ^^^
   |
   = note: `[u8]` does not have a constant size known at compile-time
   = note: all local variables must have a statically known size

@alexcrichton
Copy link
Contributor

Oh the function passed to Remote::spawn is expected to return a closure, so in this case that should be the return value of the connect + and_then

@rushmorem
Copy link
Author

@alexcrichton I'm not sure I totally understand you. I've tried what I thought you meant to no avail. Anyway I think something interesting is going on here. In the following code, I'm not using Remote::spawn and yet I'm running into the same error:-

extern crate futures;
extern crate tokio_core;
extern crate tokio_tls;

use std::net::ToSocketAddrs;

use futures::Future;
use tokio_core::reactor::{Core, Remote};
use tokio_core::net::TcpStream;
use tokio_tls::ClientContext;

fn main() {
    let remote = Core::new().unwrap().remote();

    let addr = "www.rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();
    let socket = TcpStream::connect(&addr, &remote.current_reactor().unwrap());

    let tls_handshake = socket.and_then(|socket| {
        let cx = ClientContext::new().unwrap();
        cx.handshake("www.rust-lang.org", socket)
    });
    let request = tls_handshake.and_then(|socket| {
        tokio_core::io::write_all(socket, "\
            GET / HTTP/1.0\r\n\
            Host: www.rust-lang.org\r\n\
            \r\n\
        ".as_bytes())
    });
    let response = request.and_then(|(socket, _)| {
        tokio_core::io::read_to_end(socket, Vec::new())
    });

    let (_, data) = response.wait().unwrap();
    println!("{}", String::from_utf8_lossy(&data));
}

Note that remote.current_reactor() is coming from #84. Also note that the only difference this code has with that in the tutorial is the Handle instance and use of the wait method on the future instead of Core::run().

@alexcrichton
Copy link
Contributor

@rushmorem in your previous example yes the types need to match up, but you can fix that by changing this:

let tls_handshake = rx.and_then(|socket| {

to

let tls_handshake = rx.map_err(|_| panic!("canceled")).and_then(|socket| {

The second example doesn't compile for me right now due to the current_reactor method, but it won't work regardless because the Core is dropped immediately, which means you're destroying the event loop as soon as it's created.

@rushmorem
Copy link
Author

I see. In that case I think I'm trying to go about this the wrong way. I'm working on a database driver. I'm using https://github.com/sfackler/r2d2 as my connection pool. I'm trying to bring futures into the mix hence these questions.

I could introduce futures by only using async IO in the final method that actually executes a database connection. That way I can just create the event loop and use it there. However, that means I will be creating and dropping it for every query. I think this is inefficient. I also think that having one instance of the event loop might give me an opportunity to do some interesting things.

Any suggestions?

@alexcrichton
Copy link
Contributor

Ideally the database driver would be async I/O powered to reduce the overhead, but in lieu of that I think a reasonable starting point for an architecture might look like:

  • You can pull a database connection out of a pool through a future
  • With that connection you can execute queries which return futures
  • When the database connection is dropped, it's returned to the pool

You'd then probably implement that with a thread pool where one thread is handing out connections and other threads are executing the actual queries with those connections. (perhaps)

That may not be the most ideal, but is perhaps a place to get started?

@rushmorem
Copy link
Author

Yes you are right. This approach is what I was referring to when I said I could introduce futures in the exec method. Granted it's not ideal but it's a good first step so I will go with that for now. Thanks a lot for all your help and patience.

@alexcrichton
Copy link
Contributor

No problem! Feel free to keep asking questions if you've got them :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants