Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async await using libuv #83

Merged
merged 61 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
a096073
started branch
oberblastmeister Feb 23, 2021
b363ee0
added bench
oberblastmeister Feb 23, 2021
621bfb9
added plenary. ahead
oberblastmeister Feb 23, 2021
b226549
changed naming
oberblastmeister Feb 24, 2021
eea0b24
added work future and test
oberblastmeister Feb 24, 2021
3fe213a
fixed await_all, added more benches and tests
oberblastmeister Feb 24, 2021
db2f958
ntoes
oberblastmeister Feb 24, 2021
02e938e
more notes
oberblastmeister Feb 24, 2021
7b9c2ac
added doc
oberblastmeister Feb 24, 2021
2ccda6b
added M
oberblastmeister Feb 24, 2021
4fe5f84
added some more uv functions
oberblastmeister Feb 24, 2021
23553f3
start of counting semaphore
oberblastmeister Feb 26, 2021
7062489
more docs
oberblastmeister Feb 26, 2021
dab2d92
use join in run_all
oberblastmeister Feb 26, 2021
de8ac26
started branch
oberblastmeister Feb 27, 2021
430a2d0
fixed tests
oberblastmeister Feb 27, 2021
1bc875a
removed unneeded
oberblastmeister Feb 27, 2021
68af03b
small changes
oberblastmeister Feb 28, 2021
8915094
async: refactor futures without object
bfredl Feb 28, 2021
e1e44b1
maded naming more consistent
oberblastmeister Feb 28, 2021
9234048
added argc
oberblastmeister Feb 28, 2021
6906da0
added argc for wrap
oberblastmeister Mar 5, 2021
60368ff
added argc for all functions
oberblastmeister Mar 6, 2021
53ab866
put in main loop
oberblastmeister Mar 9, 2021
b682e78
made timeout work
oberblastmeister Mar 9, 2021
7fc1214
added runned
oberblastmeister Mar 9, 2021
02fe675
removed convert
oberblastmeister Mar 13, 2021
1c4559b
added nvim future to be able to call api
oberblastmeister Mar 18, 2021
0fd0f1a
added select
oberblastmeister Mar 18, 2021
dc68752
fixed wrong argc in select function
oberblastmeister Mar 18, 2021
c5efd4d
added block on
oberblastmeister Mar 20, 2021
83378a0
updated waiting time for blockon
oberblastmeister Mar 20, 2021
2e3ef66
Merge remote-tracking branch 'upstream/master' into async
oberblastmeister Mar 20, 2021
bab7509
added protect and block_on
oberblastmeister Mar 20, 2021
578cd63
added api helper
oberblastmeister Mar 20, 2021
0c96921
updated benchs for api
oberblastmeister Mar 20, 2021
8384d3d
fixed protected
oberblastmeister Mar 20, 2021
fbd402c
validate sender
oberblastmeister Mar 20, 2021
1666282
add in_fast_event check
oberblastmeister Mar 20, 2021
e24a703
removed unneeded asset file
oberblastmeister Mar 21, 2021
f74b094
removed comment
oberblastmeister Mar 21, 2021
522a7b8
change name to scheduler
oberblastmeister Mar 21, 2021
ffcf332
removed idle and work related stuff for now
oberblastmeister Mar 21, 2021
2c9f3a9
removed work tests and changed name to util
oberblastmeister Mar 21, 2021
58ac59d
added scope and void
oberblastmeister Mar 21, 2021
73c4c81
added check to condvar
oberblastmeister Mar 21, 2021
e3e3e8f
removed unnecesary concats
oberblastmeister Mar 22, 2021
14c734d
removed long bench file
oberblastmeister Mar 22, 2021
79640e9
added better errors
oberblastmeister Mar 22, 2021
92afbb9
added many docs
oberblastmeister Mar 22, 2021
fc4557f
moved block_on and fixed oneshot channel
oberblastmeister Mar 23, 2021
4767627
added async tests
oberblastmeister Mar 23, 2021
60ea7d9
updated tests and added describe it
oberblastmeister Mar 23, 2021
1752ae5
fixed channel and added more tests
oberblastmeister Mar 23, 2021
df921db
more tests
oberblastmeister Mar 23, 2021
374d7cd
added counter channel
oberblastmeister Mar 24, 2021
981a291
changed counter api and added tests
oberblastmeister Mar 27, 2021
2765ee5
added more deque methods and tests
oberblastmeister Mar 27, 2021
6dfa6fc
added mspc channel
oberblastmeister Mar 27, 2021
5f2d6e5
woops forgot to commit
oberblastmeister Mar 28, 2021
5233604
remove runned
oberblastmeister Mar 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions lua/plenary/async_lib/async.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
local co = coroutine
local uv = vim.loop

local thread_loop = function(thread, callback)
local idle = uv.new_idle()
idle:start(function()
local success = co.resume(thread)
assert(success, "Coroutine failed")

if co.status(thread) == "dead" then
idle:stop()
callback()
end
end)
end

-- use with wrap
local pong = function(func, callback)
assert(type(func) == "function", "type error :: expected func")
local thread = co.create(func)
local step
step = function(...)
local res = {co.resume(thread, ...)}
local stat = res[1]
local ret = {select(2, unpack(res))}
oberblastmeister marked this conversation as resolved.
Show resolved Hide resolved
assert(stat, "Status should be true")
if co.status(thread) == "dead" then
(callback or function() end)(unpack(ret))
else
assert(#ret == 1, "expected a single return value")
assert(type(ret[1]) == "function", "type error :: expected func")
ret[1](step)
end
end
step()
end

-- use with pong, creates thunk factory
local wrap = function(func)
assert(type(func) == "function", "type error :: expected func, got " .. type(func))

return function(...)
local params = {...}
return function(step)
table.insert(params, step)
return func(unpack(params))
end
end
end

local thread_loop_async = wrap(thread_loop)

-- many thunks -> single thunk
local join = function(thunks)
local len = #thunks
local done = 0
local acc = {}

local thunk = function(step)
if len == 0 then
return step()
end
for i, tk in ipairs(thunks) do
assert(type(tk) == "function", "thunk must be function")
local callback = function(...)
acc[i] = {...}
done = done + 1
if done == len then
step(unpack(acc))
end
end
tk(callback)
end
end
return thunk
end

local function run(future)
future()
end

local function run_all(futures)
for _, future in ipairs(futures) do
future()
end
end

-- sugar over coroutine
local await = function(defer)
assert(type(defer) == "function", "type error :: expected func")
return co.yield(defer)
end


local await_all = function(defer)
assert(type(defer) == "table", "type error :: expected table")
return co.yield(join(defer))
end

local async = function(func)
return function(...)
local args = {...}
return wrap(pong)(function()
return func(unpack(args))
end)
end
end

local pong_loop = async(function(func, callback)
assert(type(func) == "function", "type error :: expected func")
local thread = co.create(func)

local _step
_step = function(...)
local res = {co.resume(thread, ...)}
local stat = res[1]
local ret = {select(2, unpack(res))}
assert(stat, "Status should be true")
if co.status(thread) == "dead" then
(callback or function() end)(unpack(ret))
else
assert(#ret == 1, "expected a single return value")
assert(type(ret[1]) == "function", "type error :: expected func")
-- yield before calling the next one
co.yield()
ret[1](_step)
end
end

local step = function()
thread_loop(co.create(_step))
end

step()
end)

--- because idle is a bad name
local spawn = wrap(pong_loop)

return {
async = async,
join = join,
await = await,
await_all = await_all,
run = run,
run_all = run_all,
spawn = spawn,
wrap = wrap,
wait_for_textlock = wrap(vim.schedule)
}
50 changes: 50 additions & 0 deletions lua/plenary/async_lib/helpers.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
local M = {}

VecDeque = {}
VecDeque.__index = VecDeque

function VecDeque.new()
return setmetatable({first = 0, last = -1}, VecDeque)
end

function VecDeque:pushleft(value)
local first = self.first - 1
self.first = first
self[first] = value
end

function VecDeque:pushright(value)
local last = self.last + 1
self.last = last
self[last] = value
end

function VecDeque:popleft()
local first = self.first
if first > self.last then return nil end
local value = self[first]
self[first] = nil -- to allow garbage collection
self.first = first + 1
return value
end

function VecDeque:is_empty()
return self.first > self.last
end

function VecDeque:popright()
local last = self.last
if self.first > last then return nil end
local value = self[last]
self[last] = nil -- to allow garbage collection
self.last = last - 1
return value
end

function VecDeque:len()
return self.last - self.first
end

M.VecDeque = VecDeque

return M
7 changes: 7 additions & 0 deletions lua/plenary/async_lib/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
local exports = require('plenary.async_lib.async')
exports.uv = require('plenary.async_lib.uv_async')
exports.utils = require('plenary.async_lib.utils')
exports.lsp = require('plenary.async_lib.lsp')
exports.work = require('plenary.async_lib.work')

return exports
7 changes: 7 additions & 0 deletions lua/plenary/async_lib/lsp.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
local a = require('plenary.async_lib.async')

local M = {}

M.buf_request = a.wrap(vim.lsp.buf_request)

return M
116 changes: 116 additions & 0 deletions lua/plenary/async_lib/utils.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
local a = require('plenary.async_lib.async')
local async = a.async
local co = coroutine
local VecDeque = require('plenary.async_lib.helpers').VecDeque
local uv = vim.loop

local M = {}

M.sleep = a.wrap(function(ms, callback)
local timer = uv.new_timer()
uv.timer_start(timer, ms, 0, function()
uv.timer_stop(timer)
uv.close(timer)
callback()
end)
end)

M.timer = function(ms)
return async(function()
a.wait(M.sleep(ms))
end)
end

M.id = async(function(...)
return ...
end)

M.thread_loop = function(thread, callback)
local idle = uv.new_idle()
idle:start(function()
local success = co.resume(thread)
assert(success, "Coroutine failed")

if co.status(thread) == "dead" then
idle:stop()
callback()
end
end)
end

M.thread_loop_async = a.wrap(M.thread_loop)

M.yield_now = async(function()
a.wait(M.id())
end)

local Condvar = {}
Condvar.__index = Condvar

function Condvar.new()
return setmetatable({handles = {}}, Condvar)
end

--- async function
--- blocks the thread until a notification is received
Condvar.wait = a.wrap(function(self, callback)
-- not calling the callback will block the coroutine
table.insert(self.handles, callback)
end)

--- not an async function
function Condvar:notify_all()
oberblastmeister marked this conversation as resolved.
Show resolved Hide resolved
for _, callback in ipairs(self.handles) do
callback()
end
self.handles = {} -- reset all handles as they have been used up
end

--- not an async function
function Condvar:notify_one()
if #self.handles == 0 then return end

local idx = math.random(#self.handles)
self.handles[idx]()
table.remove(self.handles, idx)
end

M.Condvar = Condvar

M.channel = {}

---comment
---@return function
---@return any
M.channel.oneshot = function()
local val = nil
local saved_callback = nil

--- sender is not async
--- sends a value
local sender = function(t)
if val ~= nil then
error('Oneshot channel can only send one value!')
end

val = t
saved_callback(val)
end

--- receiver is async
--- blocks until a value is received
local receiver = a.wrap(function(callback)
if callback ~= nil then
error('Oneshot channel can only receive one value!')
end

saved_callback = callback
end)

return sender, receiver
end

M.channel.mpsc = function()
end

return M
Loading