Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): finish will not return until the item is done #169

Merged
merged 1 commit into from Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/index.html
Expand Up @@ -104,6 +104,8 @@ <h2><a name="history"></a>History</h2>
<dd><ul>
<li>Fix: <code>copas.removethread</code> would not remove a sleeping thread immediately (it would not execute, but
would prevent the Copas loop from exiting until the timer expired).</li>
<li>Fix: <code>queue:finish</code> will return after the last item has been handled, not just popped
(if using workers).</li>
</ul></dd>

<dt><strong>Copas 4.7.0</strong> [15/Jan/2023]</dt>
Expand Down
5 changes: 5 additions & 0 deletions docs/reference.html
Expand Up @@ -530,6 +530,11 @@ <h3>Non-blocking data exchange and timer/sleep functions</h3>
<p>Finishes a queue. Calls <code>queue:stop()</code> and then waits for the queue to run
empty (and be destroyed) before returning.</p>

<p>When using "workers" via <code>queue:add_worker()</code>
then this method will return after the worker has finished processing the popped item.
When using your own threads and calling <code>queue:pop()</code>, then this method will
return after the last item has been popped, but not necessarily also processed. </p>

<p>The <code>timeout</code> defaults to 10 seconds
(the default timeout value for a lock), <code>math.huge</code> can be used to wait forever.</p>

Expand Down
33 changes: 32 additions & 1 deletion src/copas/queue.lua
@@ -1,4 +1,5 @@
local copas = require "copas"
local gettime = require("socket").gettime
local Sema = copas.semaphore
local Lock = copas.lock

Expand Down Expand Up @@ -30,6 +31,7 @@ function Queue.new(opts)
self.workers = setmetatable({}, { __mode = "k" })
self.stopping = false
self.worker_id = 0
self.exit_semaphore = Sema.new(10^9)
return self
end

Expand Down Expand Up @@ -105,6 +107,8 @@ end
-- destroyed on a timeout.
function Queue:finish(timeout, no_destroy_on_timeout)
self:stop()
timeout = timeout or self.lock.timeout
local endtime = gettime() + timeout
local _, err = self.lock:get(timeout)
-- the lock never gets released, only destroyed, so we have to check the error string
if err == "timeout" then
Expand All @@ -113,7 +117,31 @@ function Queue:finish(timeout, no_destroy_on_timeout)
end
return nil, err
end
return true

-- if we get here, the lock was destroyed, so the queue is empty, now wait for all workers to exit
if not next(self.workers) then
-- all workers already exited, we're done
return true
end

-- multiple threads can call this "finish" method, so we must check exiting workers
-- one by one.
while true do
local _, err = self.exit_semaphore:take(1, math.max(0, endtime - gettime()))
if err == "destroyed" then
return true -- someone else destroyed/finished it, so we're done
end
if err == "timeout" then
if not no_destroy_on_timeout then
self:destroy()
end
return nil, "timeout"
end
if not next(self.workers) then
self.exit_semaphore:destroy()
return true -- all workers exited, we're done
end
end
end


Expand Down Expand Up @@ -170,6 +198,9 @@ function Queue:add_worker(worker)
worker(item) -- TODO: wrap in errorhandling
end
self.workers[coro] = nil
if self.exit_semaphore then
self.exit_semaphore:give(1)
end
end)

self.workers[coro] = true
Expand Down
58 changes: 57 additions & 1 deletion tests/queue.lua
Expand Up @@ -134,5 +134,61 @@ copas.loop(function()
copas.pause(0.5) -- to activate the worker, which will now be blocked on the q semaphore
q:stop() -- this should exit the idle workers and exit the copas loop
end)

print("test 4 success!")


-- finish a queue while workers are idle
copas.loop(function()
local q = Queue:new()
q:add_worker(function() end)
copas.pause(0.5) -- to activate the worker, which will now be blocked on the q semaphore
q:finish() -- this should exit the idle workers and exit the copas loop
end)
print("test 5 success!")


-- finish doesn't return until all workers are done (finished handling the last queue item)
local result = {}
local passed = true
copas.loop(function()
local q = Queue:new()
q:push(1)
q:push(2)
q:push(3)
for i = 1,2 do -- add 2 workers
q:add_worker(function(n)
table.insert(result, "start item " .. n)
copas.pause(0.5)
table.insert(result, "end item " .. n)
end)
end
-- local s = now()
table.insert(result, "start queue")
copas.pause(0.75)
table.insert(result, "start finish")
local ok, err = q:finish()
table.insert(result, "finished "..tostring(ok).." "..tostring(err))
copas.pause(1)
local expected = {
"start queue",
"start item 1",
"start item 2",
"end item 1",
"start item 3",
"end item 2",
"start finish",
"end item 3",
"finished true nil",
}
for i = 1, math.max(#result, #expected) do
if result[i] ~= expected[i] then
for n = 1, math.max(#result, #expected) do
print(n, result[n], expected[n], result[n] == expected[n] and "" or " <--- failed")
end
passed = false
break
end
end
end)
assert(passed, "test 6 failed!")
print("test 6 success!")