Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advice: async functions and libuv #76

Closed
chipsenkbeil opened this issue Sep 30, 2021 · 9 comments
Closed

Advice: async functions and libuv #76

chipsenkbeil opened this issue Sep 30, 2021 · 9 comments

Comments

@chipsenkbeil
Copy link

chipsenkbeil commented Sep 30, 2021

Hey there! Love your library and just finished writing a Lua module wrapper for one of my crates in order to use it in neovim.

The crate uses tokio for async operations, and so I've exposed some of those async methods to Lua.

I saw in the comments of another issue mention that if neovim had an event loop then it could do async tasks. I'm trying to understand how that would work.

With neovim, it exposes luv, a libuv wrapper in Lua, via a global vim.loop. You can schedule tasks to run when available via vim.schedule. In order to use async Lua methods, do I use the event loop to poll them and check the return value, scheduling another poll if not ready?

Reading the doc for creating async methods, there's mention that each call will yield if pending, but I don't know if that means the async function returns a coroutine in lua.

I'm trying to figure out if I need to create a static tokio runtime that I can reference or if neovim becomes the runtime through its event loop and I need to manage polling using it.

Would love some guidance in how to approach this! Sent a sponsorship your way, btw, as this library is fantastic and I see myself using it for quite a few Lua plugins for neovim going forward.

@khvzak
Copy link
Member

khvzak commented Oct 1, 2021

Thanks for the feedback and sponsorship, really appreciate this!

mlua's async functionality is built on top a simple code (in some approximation):

while true do
    local ready, res = poll()
    if ready then
        return res
   end
   coroutine.yield(PENDING)
end

(https://github.com/khvzak/mlua/blob/master/src/lua.rs#L2077)

Every time when future is not ready, mlua executes coroutine.yield() to return control back to a scheduler in order to be
executed later.
With a rust-integrated event loop, a task (coroutine) will be woke up again after receiving a notification from an associated Waker.
To execute tokio-based task you need to do it inside tokio runtime otherwise you will get an exception from tokio.
By the way async-std, in contrast, does not require explicitly running event loop and can automatically launch it on first poll().

I've checked neovim lua documentation and seems it's built on top of callbacks rather than utilizing Lua coroutines.
I think you should be able to periodically poll future by wrapping a Rust async function to a coroutine and executing it.

I've played with it and the following example works:
lib.rs:

#[mlua::lua_module]
fn my_module(lua: &Lua) -> Result<Table> {
    let get = lua.create_async_function(|_, ()| async move {
        let mut res = surf::get("https://httpbin.org/get").await.to_lua_err()?;
        dbg!(res.body_string().await.to_lua_err()?);
        Ok(())
    })?;

    lua.create_table_from([("get", get)])
}

nvim_module.lua

local http = require("my_module")

f = coroutine.wrap(http.get)
vim.wait(10000, function () return f() == nil end, 100)

* nil is the final value returned be the get function on completion.

It uses surf & async-std runtime rather than tokio... And it's quite dirty example with polling futures manually...

@chipsenkbeil
Copy link
Author

chipsenkbeil commented Oct 1, 2021

@khvzak unfortunately, I am bolted to tokio as this is porting distant-core and distant-ssh2 from distant to a new crate distant-lua. I'm doing this to convert distant.nvim to use a lua module instead of the current process where I spawn a binary that talks over stdin. I'm trying to convert my callback-based functions to still be callback-based, but using the Lua module: https://github.com/chipsenkbeil/distant.nvim/blob/master/lua/distant/fn.lua#L291.

--- Copies a remote file or directory to a new location
---
--- @param src string Path to the input file/directory to copy
--- @param dst string Path to the output file/directory
--- @param cb function Function that is passed true if successful or false if failed
fn.async.copy = function(src, dst, cb)
    assert(type(src) == 'string', 'src must be a string')
    assert(type(dst) == 'string', 'dst must be a string')

    -- Today, this sends a message over stdin via libuv to an external process and 
    -- then triggers a callback once a response is received over stdout 
    -- (also powered by libuv)
    s.client():send({
        type = 'copy';
        data = {
            src = src;
            dst = dst;
        };
    }, function(res)
        cb(make_args(res, 'ok'))
    end)
end

Is there any way to hand off the polling of tokio futures to neovim's event loop (check out vim.loop from lua doc)? I've also been browsing through how neovim handles its communication with an external LSP process, which mixes coroutines with libuv: https://github.com/neovim/neovim/blob/master/runtime/lua/vim/lsp/rpc.lua

Using neovim's schedule

So if I create a future using tokio, I cannot not poll it using neovim's event loop for completion? Using something like

fn my_module(lua: &Lua) -> Result<Table> {
    let sleep = lua.create_async_function(|_, ()| async move {
        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
        Ok(())
    })?;

    lua.create_table_from([("sleep", sleep)])
}
local lib = require("my_module")
local f = coroutine.wrap(lib.sleep)
local cb = function() print("DONE!") end

-- Places a callback on the event loop to be triggered when ready
local step = function()
    if f() == nil then
        cb()
    else
        vim.schedule(step)
    end
end)
step()

What I wrap in a local task? Can I poll it that way?

Spawn local, but handing polling back to neovim

use mlua::prelude::*;

fn my_module(lua: &Lua) -> Result<Table> {
    let sleep = lua.create_function(|_, ()| {
        let f = tokio::LocalSet::new().run_until(async move {
            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            Ok(())
        });
        
        // Returns future created above
        f
    })?;

    lua.create_table_from([("sleep", sleep)])
}
local lib = require("my_module")

-- Won't get triggered until thread yields
local f = coroutine.wrap(lib.sleep)
local cb = function() print("DONE!") end

-- Places a callback on the event loop to be triggered when ready
local step = function()
    if f() == nil then
        cb()
    else
        vim.schedule(step)
    end
end)
step()

Static tokio runtime

Additionally, I'm not fully familiar with how the Lua module loading works. If I have a static runtime using something like once_cell to maintain the initialization, does that persist across lua module calls?

use mlua::prelude::*;
use once_cell::sync::OnceCell;

/// Retrieves the global runtime, initializing it if not initialized, and returning
/// an error if failed to initialize
pub fn get() -> LuaResult<&'static tokio::runtime::Runtime> {
    static RUNTIME: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
    RUNTIME.get_or_try_init(|| {
        tokio::runtime::Builder::new_multi_thread()
            .build()
            .map_err(|x| x.to_lua_err())
    })
}
use mlua::prelude::*;
use crate::runtime;

fn my_module(lua: &Lua) -> Result<Table> {
    -- Won't compile (I'm assuming) because of the 'lua lifetime associated with the callback function
    let sleep = lua.create_function(|_, cb: Function| {
        runtime::get()?.spawn(async move {
            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            let _ = cb.call::<_, ()>(());
        })
        
        Ok(())
    })?;

    lua.create_table_from([("sleep", sleep)])
}
local lib = require("my_module")

-- Won't get triggered until thread yields
lib.sleep(function()
    print("DONE!")
end)

-- Continue in current thread until yielding

Was also reading through smol vs async-std vs tokio.

@khvzak
Copy link
Member

khvzak commented Oct 1, 2021

You could also use tokio with a little bit magic provided by the async-compat crate.

lib.rs

use async_compat::CompatExt;
use mlua::{ExternalResult, Lua, Result, Table};

#[mlua::lua_module]
fn my_module(lua: &Lua) -> Result<Table> {
    let get = lua.create_async_function(|_, ()| async move {
        let res = reqwest::get("https://httpbin.org/get")
            .compat()
            .await
            .to_lua_err()?;
        dbg!(res.text().await.to_lua_err()?);
        Ok(())
    })?;

    lua.create_table_from([("get", get)])
}

and neovim Lua module:

local http = require("my_module")

local get = function(cb)
    local http_get = coroutine.wrap(http.get)
    local inner_fn
    inner_fn = function()
        if http_get() == nil then
            cb()
        else
            vim.schedule(inner_fn)
        end
    end
    vim.schedule(inner_fn)
end

get(function ()
    print("done!")
end)

In this example we use reqwest built on top of the tokio and drive execution by the neovim's polling.
Internaly it spawns a static single-threaded tokio runtime to execute futures.
Probably this is the most convenient way to poll tokio futures.

Additionally, I'm not fully familiar with how the Lua module loading works. If I have a static runtime using something like once_cell to maintain the initialization, does that persist across lua module calls?

Yeah, static runtime created by async-compat is bound to your module address space and shared among all async calls made by the module.

@khvzak
Copy link
Member

khvzak commented Oct 2, 2021

Alternatively, you could use the following approach:

static RT: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
    std::thread::spawn::<_, ()>(|| RT.block_on(futures::future::pending()));
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
});

static GUARD: Lazy<tokio::runtime::EnterGuard> = Lazy::new(|| RT.enter());

#[mlua::lua_module]
fn my_module(lua: &Lua) -> Result<Table> {
    let _guard = &*GUARD;
   ... code ...
}

Also you could get mlua's internal Poll::Pending value (it's static) to track future's execution more precisely:

    let pending = lua.create_async_function(|_, ()| async move {
        tokio::task::yield_now().await;
        Ok(())
    })?;
local poll_pending = (coroutine.wrap(http.pending))();
local get = function(cb)
    http_get = coroutine.wrap(http.get)
    local inner_fn
    inner_fn = function()
        local res = http_get()
        if res == poll_pending then
            vim.schedule(inner_fn)
        else
            cb(res)
        end
    end
    vim.schedule(inner_fn)
end

@chipsenkbeil
Copy link
Author

@khvzak great, thanks for the guidance! I'm going to give the async-compat approach a try as it seems the most straightforward to me. 😄 I'll report back here once I have a test up and running to validate, and then we can close this one out.

Thanks again for all of the help!

@chipsenkbeil
Copy link
Author

chipsenkbeil commented Oct 2, 2021

I also like the idea of using mlua's Poll::Pending value. If I want my future to return a value, I imagine I'd need to be able to check for poll pending on a coroutine's yield as all of the yields up to the final yield would be poll pending, right? And then if I am returning the async function's value it would be the final yield.

It does feel a little odd to have the pending value get exposed in the manner you provided. I'm assuming this is considered stable, but I'd love to have this formally made available some day through something like lua.create_table_from([("sleep", sleep), ("pending", mlua::PENDING)]).

// Returns some value
let get_number = lua.create_async_function(|_, ()| async move {
    tokio::task::yield_now().await;
    Ok(123)
})?;
local poll_pending = (coroutine.wrap(http.pending))();
local get_number = function(cb)
    http_get_number = coroutine.wrap(http.get_number)
    local inner_fn
    inner_fn = function()
        local res = http_get_number()
        if res == poll_pending then
            vim.schedule(inner_fn)
        else
            cb(res)
        end
    end
    vim.schedule(inner_fn)
end

Eventual API goal

Right now, I think I can get close to this. Maybe I can expose the pending value you described on my library directly by wrapping in a thread and calling it to get the value before assigning it to a table variable.

local distant = require("distant_lua")

-- Distant functions are async by design and need to be wrapped in a coroutine
-- in order to be used
local thread = coroutine.wrap(distant.launch)

-- Initialize the thread
thread({ host = "127.0.0.1" })

-- Continually check if launch has completed
local res
while true do
    res = thread()

    -- Assuming that I've somehow exposed (coroutine.wrap(http.pending))() as PENDING
    if res ~= distant.PENDING then
        break
    end
end

And then I'd provide my neovim wrapper outside of the distant lua module as we've already discussed, although I'm wondering if there would be a way for me to build out the callback function within my Lua module and have it just take the async function and the schedule function as arguments:

let make_callback = lua.create_function(|lua, (async_fn, schedule): (LuaFunction, LuaFunction)| move {
        let pending = lua.create_async_function(|_, ()| async move {
            tokio::task::yield_now().await;
            Ok(())
        })?;
        lua.create_function(|lua, cb: LuaFunction| move {
            lua.load(chunk! {
                local poll_pending = (coroutine.wrap($pending))()
                local thread = coroutine.wrap($async_fn)
                local inner_fn
                inner_fn = function()
                    if res == poll_pending then
                        $schedule(inner_fn)
                    else
                        $cb(res)
                    end
                end
                $schedule(inner_fn)
            }).exec()
        })
})?;
local distant = require("distant_lua")
local get = distant.make_callback(distant.get, vim.schedule)
get(function(res)
    print(res)
end)

@khvzak
Copy link
Member

khvzak commented Oct 3, 2021

It does feel a little odd to have the pending value get exposed in the manner you provided. I'm assuming this is considered stable, but I'd love to have this formally made available some day through something like lua.create_table_from([("sleep", sleep), ("pending", mlua::PENDING)]).

Yeah, it's a sort of side effect of Rust async + Lua integration, where you can get an internal value. I didn't thought about polling futures manually without executor in Lua :)
Nevertheless this approach is stable and should work. I'll think about exposing the PENDING value explicitly.

A comment about this piece of code:

-- Continually check if launch has completed
local res
while true do
    res = thread()

    -- Assuming that I've somehow exposed (coroutine.wrap(http.pending))() as PENDING
    if res ~= distant.PENDING then
        break
    end
end

This would cause nearly 100% cpu usage as you poll futures on a blocking manner. It could makes sence if you delegate task to poll futures to an external event loop (like in neovim) plus ideally making a small pause before polling future again.

I'm wondering if there would be a way for me to build out the callback function within my Lua module and have it just take the async function and the schedule function as arguments:

Sure, the following example would do this:

let pending_th = lua.create_thread(lua.create_async_function(|_, ()| async move {
    tokio::task::yield_now().await;
    Ok(())
})?)?;
let poll_pending = pending_th.resume::<_, Value>(())?;
let make_callback = lua.load(chunk! {
    local async_fn, schedule = ...
    local thread = coroutine.wrap(async_fn)
    return function(cb)
        local inner_fn
        inner_fn = function()
            local res = thread()
            if res == $poll_pending then
                schedule(inner_fn)
            else
                cb(res)
            end
        end
        schedule(inner_fn)
    end
})
.into_function()?;

@chipsenkbeil
Copy link
Author

Closing this out as I've been able to get asynchronous functions working on top of tokio while using neovim to poll for completion through your advice. Thanks so much for all of the help, @khvzak!

@ModProg
Copy link

ModProg commented May 29, 2022

Was there a regression conserning this? Or am I just missusing it.

I have the following in a lua module I run from nvim, but when running I get E5108: Error executing lua [string "?"]:9: attempt to yield across C-call boundary:

    exports.set(
        "async_test",
        lua.create_async_function(|lua, ()| async move {
            async_std::task::sleep(std::time::Duration::from_secs(5))
                .compat()
                .await;
            Ok(())
        })?,
    )?;

Well I somehow found a workaround...:

pub fn from_fn_async<F: Future<Output = ()> + 'lua>(
    lua: &'lua Lua,
    callback: impl (Fn(&'lua Lua, CommandCallbackData) -> F) + 'static + Send + Clone
) -> Self {
    let f = lua.create_function(move |lua, a: CommandCallbackData| {
        let callback = callback.clone();
        let f = lua.create_async_function(move |lua, a| {
                    callback(lua, a).map(Ok)
                }).unwrap().bind(a).unwrap();
        lua.load(chunk! {
            local coroutine = coroutine.wrap($f)
            local step = function() end
            step = function()
                if coroutine() ~= nil then
                    vim.schedule(step)
                end
            end
            step()
        }).exec().unwrap();
        Ok(())
    }).unwrap();
    Self::Lua(f)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants