Skip to content

Commit

Permalink
fix(starvation) prevent high-throughput reading from starving
Browse files Browse the repository at this point in the history
If reading quickly in a loop, other threads could be starved. This
updates starvation prevention to be simpler, and be done both on
receiving and sending.

closes #139
  • Loading branch information
Tieske committed Sep 16, 2022
1 parent a04fef4 commit a0faba0
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 9 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ test: certs
$(LUA) $(DELIM) $(PKGPATH) tests/request.lua 'https://www.google.nl' true
$(LUA) $(DELIM) $(PKGPATH) tests/semaphore.lua
$(LUA) $(DELIM) $(PKGPATH) tests/sleep.lua
$(LUA) $(DELIM) $(PKGPATH) tests/starve.lua
$(LUA) $(DELIM) $(PKGPATH) tests/tcptimeout.lua
$(LUA) $(DELIM) $(PKGPATH) tests/timer.lua
$(LUA) $(DELIM) $(PKGPATH) tests/tls-sni.lua
Expand Down
3 changes: 2 additions & 1 deletion docs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ <h2><a name="history"></a>History</h2>
non-closed sockets. This could prevent the loop from exiting.</li>
<li>Fix: in debug mode very large data is now truncated when displayed.</li>
<li>Fix: order of <code>copas.addnamedthread</code> args.</li>
</ul></dd>
<li>Fix: the receive methods could starve other threads on high-throughput.</li>
</ul></dd>

<dt><strong>Copas 4.2.0</strong> [06/Sep/2022]</dt>
<dd><ul>
Expand Down
25 changes: 17 additions & 8 deletions src/copas.lua
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,11 @@ function copas.receive(client, pattern, part)
repeat
s, err, part = client:receive(pattern, part)

-- guarantees that high throughput doesn't take other threads to starvation
if (math.random(100) > 90) then
copas.sleep(0)
end

if s then
current_log[client] = nil
sto_timeout()
Expand Down Expand Up @@ -517,6 +522,11 @@ function copas.receivefrom(client, size)
repeat
s, err, port = client:receivefrom(size) -- upon success err holds ip address

-- garantees that high throughput doesn't take other threads to starvation
if (math.random(100) > 90) then
copas.sleep(0)
end

if s then
_reading_log[client] = nil
sto_timeout()
Expand Down Expand Up @@ -548,6 +558,11 @@ function copas.receivePartial(client, pattern, part)
repeat
s, err, part = client:receive(pattern, part)

-- guarantees that high throughput doesn't take other threads to starvation
if (math.random(100) > 90) then
copas.sleep(0)
end

if s or (type(pattern) == "number" and part ~= "" and part ~= nil) then
current_log[client] = nil
sto_timeout()
Expand Down Expand Up @@ -590,15 +605,9 @@ function copas.send(client, data, from, to)
repeat
s, err, lastIndex = client:send(data, lastIndex + 1, to)

-- adds extra coroutine swap
-- garantees that high throughput doesn't take other threads to starvation
-- guarantees that high throughput doesn't take other threads to starvation
if (math.random(100) > 90) then
current_log[client] = gettime() -- TODO: how to handle this??
if current_log == _writing_log then
coroutine_yield(client, _writing)
else
coroutine_yield(client, _reading)
end
copas.sleep(0)
end

if s then
Expand Down
87 changes: 87 additions & 0 deletions tests/starve.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
-- tests looping 100% in receive/send
-- Should not prevent other threads from running
--
-- Test should;
-- * sleep incremental, not on absolute time so it slowly diverges if the timer
-- thread is being starved
-- * seconds printed and elapsed should stay very close

local copas = require 'copas'
local socket = require 'socket'

--copas.debug.start()

local body = ("A"):rep(1024*1024*50) -- 50 mb string
local done = 0

local function runtest()
local s1 = socket.bind('*', 49500)
copas.addserver(s1, copas.handler(function(skt)
copas.setsocketname("Server 49500", skt)
copas.setthreadname("Server 49500")
print "Server 49500 accepted incoming connection"
local end_time = socket.gettime() + 30 -- we run for 30 seconds
while end_time > socket.gettime() do
local res, err, _ = skt:receive(1) -- single byte from 50mb chunks
if res == nil and err ~= "timeout" then
print("Server 49500 returned: " .. err)
os.exit(1)
end
end
done = done + 1
print("Server reading port 49500... Done!")
skt:close()
copas.removeserver(s1)
end))

copas.addnamedthread("Client 49500", function()
local skt = socket.tcp()
skt = copas.wrap(skt)
copas.setsocketname("Client 49500", skt)
skt:connect("localhost", 49500)
local last_byte_sent, err, complete
while not complete do
repeat
last_byte_sent, err = skt:send(body, last_byte_sent or 1, -1)
if err == "closed" then
-- server closed connection, so exit, test is finished
complete = true
break
end
if last_byte_sent == nil and err ~= "timeout" then
print("client 49500 returned: " .. err)
os.exit(1)
end
until last_byte_sent == nil or last_byte_sent == #body
end
print("Client writing port 49500... Done!")
skt:close()
done = done + 1
end)

copas.addnamedthread("test timeout thread", function()
local i = 0
local start = socket.gettime()
while done ~= 2 do
copas.sleep(1) -- delta sleep, so it slowly diverges if starved
i = i + 1
local time_passed = socket.gettime()-start
print("slept "..i.." seconds, time passed: ".. time_passed.." seconds")
if math.abs(i - time_passed) > 2 then
print("timer diverged by more than 2 seconds: failed!")
os.exit(1)
end
if i > 60 then
print"timeout"
os.exit(1)
end
end
print "success!"
end)

print("starting loop")
copas.loop()
print("Loop done")
end

runtest()

0 comments on commit a0faba0

Please sign in to comment.