Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use Handle in new tokio crate #134

Closed
Relrin opened this issue Feb 12, 2018 · 4 comments
Closed

How to use Handle in new tokio crate #134

Relrin opened this issue Feb 12, 2018 · 4 comments

Comments

@Relrin
Copy link

Relrin commented Feb 12, 2018

The old Handle struct in deprecated tokio-core was provided an opportunity to fire the tasks via spawn and spawn_fn calls. However, the new implementation doesn't provide anything, except clone() and default() calls. Can I somehow get the same functionality with the new crate?

For example, in tokio-dns-unofficial crate we have the following piece of code:

/// Invokes functor with event loop handle obtained from a remote.
fn with_handle<F, R, I>(remote: &Remote, f: F) -> IoFuture<I>
where
    F: FnOnce(&Handle) -> R + Send + 'static,
    R: IntoFuture<Item = I, Error = io::Error> + Send + 'static,
    R::Future: Send + 'static,
{
    let (tx, rx) = futures::oneshot();
    remote.spawn(move |handle| {
        let _ = tx.send(f(handle));
        Ok(())
    });
    boxed(rx.then(|r| r.expect("canceled")))
}

Replacing Remote struct from the old crate onto new Handle don't let us to fire a small subtask here. How can I resolve this issue or something similar to that?

@carllerche
Copy link
Member

The Tokio reactor is no longer an executor. Instead, you can bring your own executor for running tasks.

Tokio comes with the current_thread executor which provides similar functionality as Handle::spawn.

See usage of current_thread in this example: https://tokio.rs/docs/getting-started/hello-world/.

I'm going to close the issue now, but feel free to comment if you have further questions.

@kpp
Copy link
Contributor

kpp commented Feb 12, 2018

// Connect to the endpoint using the default resolver.
pub fn tcp_connect<'a, T>(ep: T, handle: Remote) -> IoFuture<TcpStream>
where
    T: ToEndpoint<'a>,
{
    tcp_connect_with(ep, handle, POOL.clone())
}

/// Connect to the endpoint using a custom resolver.
pub fn tcp_connect_with<'a, T, R>(ep: T, remote: Remote, resolver: R) -> IoFuture<TcpStream>
where
    T: ToEndpoint<'a>,
    R: Resolver,
{
    boxed(resolve_endpoint(ep, resolver).and_then(move |addrs| {
        try_until_ok(addrs, move |addr| {
            with_handle(&remote, move |handle| TcpStream::connect(&addr, handle))
        })
    }))
}

What you do is actually you try to resolve and then connect to addr using given executor.

Let's have a look at the usage:

let mut lp = Core::new().unwrap();

// connect using the built-in resolver.
let conn = tcp_connect("rust-lang.org:80", lp.remote()).and_then(|sock| {
    println!("connected to {}", sock.peer_addr().unwrap());
    Ok(())
});

lp.run(conn).unwrap();

Your crate is named tokio_dns, so you must not provide a tcp_connect function. Instead I would like to:

let dns_pool = CpuPool::new(4);
let addrs_future = tokio_dns::resolve("localhost:8080");
let addrs = dns_pool.spawn(addrs_future); // run in on explicit CpuPool
let connect = addrs /* continue executing futures on implicit current_thread */ .and_then(addrs, 
    tokio_dns::try_until_ok(addrs, move |addr| {
        // it actually connects in `main` thread
        TcpStream::connect(&addr)
    })
);
let print_smth = connect.and_then(|sock| {
    println!("connected to {}", sock.peer_addr().unwrap());
    Ok(())
});
print_smt.wait().unwrap();

And I would like to have an option of connecting inside CpuPool and printing in main:

let dns_pool = CpuPool::new(4);
let addrs = tokio_dns::resolve("localhost:8080"); // Your magic resolver
let connect_future = addrs.and_then(addrs,
    tokio_dns::try_until_ok(addrs, move |addr| {
        // it connects in `current_thread` which will be the thread of Executor = CpuPool
        TcpStream::connect(&addr)
    })
);
let connect = dns_pool.spawn(connect_future); // run in on explicit CpuPool
let print_smt = connect /* continue executing futures on implicit current_thread */ .and_then(|sock| {
    println!("connected to {}", sock.peer_addr().unwrap());
    Ok(())
});
print_smt.wait().unwrap();

Or even resolve host in main thread:

let addrs = tokio_dns::resolve("localhost:8080"); // Your magic resolver
let connect = addrs.and_then(addrs,
    tokio_dns::try_until_ok(addrs, move |addr| {
        TcpStream::connect(&addr)
    })
);
let print_smt = connect.and_then(|sock| {
    println!("connected to {}", sock.peer_addr().unwrap());
    Ok(())
});
print_smt.wait().unwrap(); // No additional CpuPoolResolver (5 threads) is initialized
// we have run all futures in the current thread

To achieve this your crate will have to provide 2 functions:

fn resolve(addr: SomeStringType) -> IoFuture<Vec<IpAddr>>;
fn try_until_ok<F, R, I>(addrs: Vec<SocketAddr>, f: F) -> IoFuture<I> where ....;

With neither Remote nor Handle in their signatures.

To achieve this all you need is to wrap your host[..].to_socket_addrs() with futures::future::res. And through away CpuPoolResolver, Remote and Handle because I really hate when libraries initialize 5 additional threads when they are not asked too.

@carllerche am I right?

@aep
Copy link
Contributor

aep commented Jun 20, 2018

It's not clear what you do if an existing crate needs future::Executor tho, the new tokio has no such thing, so we're stuck with the old tokio until old crates update?

@carllerche
Copy link
Member

@aep does ‘tokio::executor::DefaultExecutor’ not work in that case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants