Skip to content

Commit

Permalink
feat(queue) adds an async queue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tieske committed Jul 26, 2022
1 parent 4871f54 commit 3a1e01f
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Makefile
Expand Up @@ -26,6 +26,7 @@ install:
cp src/copas/timer.lua $(DESTDIR)$(LUA_DIR)/copas/timer.lua
cp src/copas/lock.lua $(DESTDIR)$(LUA_DIR)/copas/lock.lua
cp src/copas/semaphore.lua $(DESTDIR)$(LUA_DIR)/copas/semaphore.lua
cp src/copas/queue.lua $(DESTDIR)$(LUA_DIR)/copas/queue.lua

tests/certs/clientA.pem:
cd ./tests/certs && \
Expand All @@ -50,6 +51,7 @@ test: certs
$(LUA) $(DELIM) $(PKGPATH) tests/limit.lua
$(LUA) $(DELIM) $(PKGPATH) tests/lock.lua
$(LUA) $(DELIM) $(PKGPATH) tests/loop_starter.lua
$(LUA) $(DELIM) $(PKGPATH) tests/queue.lua
$(LUA) $(DELIM) $(PKGPATH) tests/removeserver.lua
$(LUA) $(DELIM) $(PKGPATH) tests/removethread.lua
$(LUA) $(DELIM) $(PKGPATH) tests/request.lua 'http://www.google.com'
Expand Down
1 change: 1 addition & 0 deletions copas-cvs-6.rockspec
Expand Up @@ -44,6 +44,7 @@ build = {
["copas.timer"] = "src/copas/timer.lua",
["copas.lock"] = "src/copas/lock.lua",
["copas.semaphore"] = "src/copas/semaphore.lua",
["copas.queue"] = "src/copas/queue.lua",
},
copy_directories = {
"docs",
Expand Down
1 change: 1 addition & 0 deletions docs/index.html
Expand Up @@ -104,6 +104,7 @@ <h2><a name="history"></a>History</h2>
<dd><ul>
<li>Added: added sempahore:destroy()</li>
<li>Added: copas.settimeouts, to set separate timeouts for connect, send, receive</li>
<li>Added: queue class, see module "copas.queue"</li>
</ul></dd>

<dt><strong>Copas 3.0.0</strong> [12/Nov/2021]</dt>
Expand Down
59 changes: 59 additions & 0 deletions docs/reference.html
Expand Up @@ -331,6 +331,65 @@ <h3>Non-blocking data exchange and timer/sleep functions</h3>
<dd>Releases the currently held lock. Returns <code>true</code> or <code>nil + error</code>.
</dd>

<dt><strong><code>queue:add_worker(func)</code></strong></dt>
<dd>Adds a worker that will handle whatever is passed into the queue. Can be called
multiple times to add more workers. The function <code>func</code> is wrapped and added
as a copas thread. The threads automatically exit when the queue is destroyed.
Worker function signature: <code>function(item)</code> (Note: worker functions run
unprotected, so wrap code in an (x)pcall if errors are expected, otherwise the
worker will exit on an error, and queue handling will stop).
Returns the coroutine added, or <code>nil+"destroyed"</code>
</dd>

<dt><strong><code>queue:destroy()</code></strong></dt>
<dd>Destroys a queue immediately. Abandons what is left in the queue.
Releases all waiting calls to <code>queue:pop()</code> with <code>nil+"destroyed"</code>.
Returns <code>true</code>, or <code>nil+"destroyed"</code>
</dd>

<dt><strong><code>queue:finish([timeout], [no_destroy_on_timeout])</code></strong></dt>
<dd>Finishes a queue. Calls <code>queue:stop()</code> and then waits for the queue to run
empty (and be destroyed) before returning.
Parameter <code>no_destroy_on_timeout</code> indicates if the queue is not to be forcefully
destroyed on a timeout (abandonning what ever is left in the queue).
Returns <code>true</code>, or <code>nil+"timeout"</code>, or <code>nil+"destroyed"</code>.
</dd>

<dt><strong><code>queue:get_size()</code></strong></dt>
<dd>Gets the number of items in the queue currently.
Returns <code>number</code> or <code>nil + "destroyed"</code>.
</dd>

<dt><strong><code>queue:get_workers()</code></strong></dt>
<dd>Returns a list/array of current workers (coroutines) handling the queue.
(only the workers added by <code>queue:add_worker()</code>, and still active,
will be in the list).
Returns <code>list</code> or <code>nil + "destroyed"</code>.
</dd>

<dt><strong><code>queue.new()</code></strong></dt>
<dd>Creates and returns a new queue.
</dd>

<dt><strong><code>queue:pop([timeout])</code></strong></dt>
<dd>Will pop an item from the queue. If there are no items in the queue it will yield
until there are or a timeout happens (exception is when <code>timeout == 0</code>, then it will
not yield but return immediately, be careful not to create a hanging loop!). Timeout defaults
to the default time-out of a semaphore.
Returns an item, or <code>nil+"timeout"</code>, or <code>nil+"destroyed"</code>.
</dd>

<dt><strong><code>queue:push(item)</code></strong></dt>
<dd>Will push a new item in the queue. Item can be any type, including 'nil'.
Returns <code>true</code> or <code>nil + "destroyed"</code>.
</dd>

<dt><strong><code>queue:stop()</code></strong></dt>
<dd>Instructs the queue to stop. It will no longer accept calls to <code>queue:push()</code>,
and will call <code>queue:destroy()</code> once the queue is empty.
Returns <code>true</code> or <code>nil + "destroyed"</code>.
</dd>

<dt><strong><code>semaphore:destroy()</code></strong></dt>
<dd>Will destroy the sempahore and release all waiting threads. The result for those
threads will be <code>nil + "destroyed"</code>, any new call on any
Expand Down
8 changes: 7 additions & 1 deletion src/copas.lua
Expand Up @@ -1044,7 +1044,13 @@ end
function copas.addthread(handler, ...)
-- create a coroutine that skips the first argument, which is always the socket
-- passed by the scheduler, but `nil` in case of a task/thread
local thread = coroutine.create(function(_, ...) return handler(...) end)
local thread = coroutine.create(function(_, ...)
-- TODO: this should be added to not immediately execute the thread
-- it should only schedule and then return to the calling code
-- Enabling this breaks the "limitset".
-- copas.sleep(0)
return handler(...)
end)
_threads[thread] = true -- register this thread so it can be removed
_doTick (thread, nil, ...)
return thread
Expand Down
159 changes: 159 additions & 0 deletions src/copas/queue.lua
@@ -0,0 +1,159 @@
local copas = require "copas"
local Sema = require "copas.semaphore"
local Lock = require "copas.lock"


local Queue = {}
Queue.__index = Queue


-- Creates a new Queue instance
function Queue.new()
local self = {}
setmetatable(self, Queue)
self.sema = Sema.new(10^9)
self.head = 1
self.tail = 1
self.list = {}
self.workers = setmetatable({}, { __mode = "k" })
self.stopping = false
return self
end


-- Pushes an item in the queue (can be 'nil')
-- returns true, or nil+err ("stopping", or "destroyed")
function Queue:push(item)
if self.stopping then
return nil, "stopping"
end
self.list[self.head] = item
self.head = self.head + 1
self.sema:give()
return true
end


-- Pops and item from the queue. If there are no items in the queue it will yield
-- until there are or a timeout happens (exception is when `timeout == 0`, then it will
-- not yield but return immediately)
-- Returns item, or nil+err ("timeout", or "destroyed")
function Queue:pop(timeout)
local ok, err = self.sema:take(1, timeout)
if not ok then
return ok, err
end
local item = self.list[self.tail]
self.tail = self.tail + 1
if self.tail == self.head then
-- reset pointers
self.tail = 1
self.head = 1
if self.stopping then
-- we're stopping and last item being returned, so we're done
self:destroy()
end
end
return item
end


-- return the number of items left in the queue
function Queue:get_size()
return self.head - self.tail
end


-- instructs the queue to stop. Will not accept any more 'push' calls.
-- will autocall 'destroy' when the queue is empty.
-- returns immediately. See `finish`
function Queue:stop()
if not self.stopping then
self.stopping = true
self.lock = Lock.new()
self.lock:get() -- close the lock
end
return true
end


-- Finishes a queue. Calls stop and then waits for the queue to run empty (and be
-- destroyed) before returning. returns true or nil+err ("timeout", or "destroyed")
-- Parameter no_destroy_on_timeout indicates if the queue is not to be forcefully
-- destroyed on a timeout.
function Queue:finish(timeout, no_destroy_on_timeout)
self:stop()
local _, err = self.lock:get(timeout)
-- the lock never gets released, only destroyed, so we have to check the error string
if err == "timeout" then
if not no_destroy_on_timeout then
self:destroy()
end
return nil, err
end
return true
end


do
local destroyed_func = function()
return nil, "destroyed"
end

local destroyed_queue_mt = {
__index = function()
return destroyed_func
end
}

-- destroys a queue immediately. Abandons what is left in the queue.
-- Releases all waiting threads with `nil+"destroyed"`
function Queue:destroy()
if self.lock then
self.lock:destroy()
end
setmetatable(self, destroyed_queue_mt)
return true
end
end


-- adds a worker that will handle whatever is passed into the queue. Can be called
-- multiple times to add more workers.
-- The threads automatically exit when the queue is destroyed.
-- worker function signature: `function(item)` (Note: worker functions run
-- unprotected, so wrap code in an (x)pcall if errors are expected, otherwise the
-- worker will exit on an error, and queue handling will stop)
-- Returns the coroutine added.
function Queue:add_worker(worker)
assert(type(worker) == "function", "expected worker to be a function")
local coro
coro = copas.addthread(function()
copas.sleep(0) -- TODO: remove after adding into copas.addthread
while true do
local item = self:pop(10*365*24*60*60) -- wait forever (10yr)
if not item then
break -- queue destroyed, exit
end
worker(item)
end
self.workers[coro] = nil
end)

self.workers[coro] = true
return coro
end

-- returns a list/array of current workers (coroutines) handling the queue.
-- (only the workers added by `add_worker`, and still active, will be in this list)
function Queue:get_workers()
local lst = {}
for coro in pairs(self.workers) do
if coroutine.status(coro) ~= "dead" then
lst[#lst+1] = coro
end
end
return lst
end

return Queue
79 changes: 79 additions & 0 deletions tests/queue.lua
@@ -0,0 +1,79 @@
-- make sure we are pointing to the local copas first
package.path = string.format("../src/?.lua;%s", package.path)
local now = require("socket").gettime


local copas = require "copas"
local Queue = require "copas.queue"



local test_complete = false
copas.loop(function()

-- basic push/pop
local q = Queue:new()
q:push "hello"
assert(q:pop() == "hello", "expected the input to be returned")

-- yielding on pop when queue is empty
local s = now()
copas.addthread(function()
copas.sleep(0.5)
q:push("delayed")
end)
assert(q:pop() == "delayed", "expected a delayed result")
assert(now() - s >= 0.5, "result was not delayed!")

-- pop times out
local ok, err = q:pop(0.5)
assert(err == "timeout", "expected a timeout")
assert(ok == nil)

-- get_size returns queue size
assert(q:get_size() == 0)
q:push(1)
assert(q:get_size() == 1)
q:push(2)
assert(q:get_size() == 2)
q:push(3)
assert(q:get_size() == 3)

-- queue behaves as fifo
assert(q:pop() == 1)
assert(q:pop() == 2)
assert(q:pop() == 3)

-- stopping
q:push(1)
q:push(2)
q:push(3)
assert(q:stop())
local count = 0
local coro = q:add_worker(function(item)
count = count + 1
end)
copas.sleep(0.1)
assert(count == 3, "expected all 3 items handled")
assert(coroutine.status(coro) == "dead", "expected thread to be gone")
-- coro should be GC'able
local weak = setmetatable({}, {__mode="v"})
weak[{}] = coro
coro = nil -- luacheck: ignore
collectgarbage()
collectgarbage()
assert(not next(weak))
-- worker exited, so queue is destroyed now?
ok, err = q:push()
assert(err == "destroyed", "expected queue to be destroyed")
assert(ok == nil)
ok, err = q:pop()
assert(err == "destroyed", "expected queue to be destroyed")
assert(ok == nil)


test_complete = true
end)

assert(test_complete, "test did not complete!")
print("test success!")

0 comments on commit 3a1e01f

Please sign in to comment.