Skip to content

Commit

Permalink
fix(queue): finish will not return until the item is done
Browse files Browse the repository at this point in the history
done meaning: it was handled. Previously, it would return once
the last item was popped, (but not yet handled)

fixes: #168
  • Loading branch information
Tieske committed Mar 6, 2024
1 parent 4a431e6 commit bc6faa7
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 2 deletions.
2 changes: 2 additions & 0 deletions docs/index.html
Expand Up @@ -104,6 +104,8 @@ <h2><a name="history"></a>History</h2>
<dd><ul>
<li>Fix: <code>copas.removethread</code> would not remove a sleeping thread immediately (it would not execute, but
would prevent the Copas loop from exiting until the timer expired).</li>
<li>Fix: <code>queue:finish</code> will return after the last item has been handled, not just popped
(if using workers).</li>
</ul></dd>

<dt><strong>Copas 4.7.0</strong> [15/Jan/2023]</dt>
Expand Down
5 changes: 5 additions & 0 deletions docs/reference.html
Expand Up @@ -530,6 +530,11 @@ <h3>Non-blocking data exchange and timer/sleep functions</h3>
<p>Finishes a queue. Calls <code>queue:stop()</code> and then waits for the queue to run
empty (and be destroyed) before returning.</p>

<p>When using "workers" via <code>queue:add_worker()</code>
then this method will return after the worker has finished processing the popped item.
When using your own threads and calling <code>queue:pop()</code>, then this method will
return after the last item has been popped, but not necessarily also processed. </p>

<p>The <code>timeout</code> defaults to 10 seconds
(the default timeout value for a lock), <code>math.huge</code> can be used to wait forever.</p>

Expand Down
33 changes: 32 additions & 1 deletion src/copas/queue.lua
@@ -1,4 +1,5 @@
local copas = require "copas"
local gettime = require("socket").gettime
local Sema = copas.semaphore
local Lock = copas.lock

Expand Down Expand Up @@ -30,6 +31,7 @@ function Queue.new(opts)
self.workers = setmetatable({}, { __mode = "k" })
self.stopping = false
self.worker_id = 0
self.exit_semaphore = Sema.new(10^9)
return self
end

Expand Down Expand Up @@ -105,6 +107,8 @@ end
-- destroyed on a timeout.
function Queue:finish(timeout, no_destroy_on_timeout)
self:stop()
timeout = timeout or self.lock.timeout
local endtime = gettime() + timeout
local _, err = self.lock:get(timeout)
-- the lock never gets released, only destroyed, so we have to check the error string
if err == "timeout" then
Expand All @@ -113,7 +117,31 @@ function Queue:finish(timeout, no_destroy_on_timeout)
end
return nil, err
end
return true

-- if we get here, the lock was destroyed, so the queue is empty, now wait for all workers to exit
if not next(self.workers) then
-- all workers already exited, we're done
return true
end

-- multiple threads can call this "finish" method, so we must check exiting workers
-- one by one.
while true do
local _, err = self.exit_semaphore:take(1, math.max(0, endtime - gettime()))
if err == "destroyed" then
return true -- someone else destroyed/finished it, so we're done
end
if err == "timeout" then
if not no_destroy_on_timeout then
self:destroy()
end
return nil, "timeout"
end
if not next(self.workers) then
self.exit_semaphore:destroy()
return true -- all workers exited, we're done
end
end
end


Expand Down Expand Up @@ -170,6 +198,9 @@ function Queue:add_worker(worker)
worker(item) -- TODO: wrap in errorhandling
end
self.workers[coro] = nil
if self.exit_semaphore then
self.exit_semaphore:give(1)
end
end)

self.workers[coro] = true
Expand Down
58 changes: 57 additions & 1 deletion tests/queue.lua
Expand Up @@ -134,5 +134,61 @@ copas.loop(function()
copas.pause(0.5) -- to activate the worker, which will now be blocked on the q semaphore
q:stop() -- this should exit the idle workers and exit the copas loop
end)

print("test 4 success!")


-- finish a queue while workers are idle
copas.loop(function()
local q = Queue:new()
q:add_worker(function() end)
copas.pause(0.5) -- to activate the worker, which will now be blocked on the q semaphore
q:finish() -- this should exit the idle workers and exit the copas loop
end)
print("test 5 success!")


-- finish doesn't return until all workers are done (finished handling the last queue item)
local result = {}
local passed = true
copas.loop(function()
local q = Queue:new()
q:push(1)
q:push(2)
q:push(3)
for i = 1,2 do -- add 2 workers
q:add_worker(function(n)
table.insert(result, "start item " .. n)
copas.pause(0.5)
table.insert(result, "end item " .. n)
end)
end
-- local s = now()
table.insert(result, "start queue")
copas.pause(0.75)
table.insert(result, "start finish")
local ok, err = q:finish()
table.insert(result, "finished "..tostring(ok).." "..tostring(err))
copas.pause(1)
local expected = {
"start queue",
"start item 1",
"start item 2",
"end item 1",
"start item 3",
"end item 2",
"start finish",
"end item 3",
"finished true nil",
}
for i = 1, math.max(#result, #expected) do
if result[i] ~= expected[i] then
for n = 1, math.max(#result, #expected) do
print(n, result[n], expected[n], result[n] == expected[n] and "" or " <--- failed")
end
passed = false
break
end
end
end)
assert(passed, "test 6 failed!")
print("test 6 success!")

0 comments on commit bc6faa7

Please sign in to comment.