Skip to content

Commit

Permalink
Merge db6186d into 34842e9
Browse files Browse the repository at this point in the history
  • Loading branch information
Tieske committed Feb 6, 2022
2 parents 34842e9 + db6186d commit eea386d
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
*.rock
luacov.report.out
luacov.stats.out

2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ install:
cp src/copas/timer.lua $(DESTDIR)$(LUA_DIR)/copas/timer.lua
cp src/copas/lock.lua $(DESTDIR)$(LUA_DIR)/copas/lock.lua
cp src/copas/semaphore.lua $(DESTDIR)$(LUA_DIR)/copas/semaphore.lua
cp src/copas/queue.lua $(DESTDIR)$(LUA_DIR)/copas/queue.lua

tests/certs/clientA.pem:
cd ./tests/certs && \
Expand All @@ -50,6 +51,7 @@ test: certs
$(LUA) $(DELIM) $(PKGPATH) tests/limit.lua
$(LUA) $(DELIM) $(PKGPATH) tests/lock.lua
$(LUA) $(DELIM) $(PKGPATH) tests/loop_starter.lua
$(LUA) $(DELIM) $(PKGPATH) tests/queue.lua
$(LUA) $(DELIM) $(PKGPATH) tests/removeserver.lua
$(LUA) $(DELIM) $(PKGPATH) tests/removethread.lua
$(LUA) $(DELIM) $(PKGPATH) tests/request.lua 'http://www.google.com'
Expand Down
1 change: 1 addition & 0 deletions copas-cvs-6.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ build = {
["copas.timer"] = "src/copas/timer.lua",
["copas.lock"] = "src/copas/lock.lua",
["copas.semaphore"] = "src/copas/semaphore.lua",
["copas.queue"] = "src/copas/queue.lua",
},
copy_directories = {
"docs",
Expand Down
7 changes: 7 additions & 0 deletions docs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ <h2><a name="dependencies"></a>Dependencies</h2>
<h2><a name="history"></a>History</h2>

<dl class="history">
<dt><strong>Copas x.x.x</strong> [unreleased]</dt>
<dd><ul>
<li>Added: added sempahore:destroy()</li>
<li>Added: queue class, see module "copas.queue"</li>
<li>Added: copas.settimeouts, to set separate timeouts for connect, send, receive</li>
</ul></dd>

<dt><strong>Copas 3.0.0</strong> [12/Nov/2021]</dt>
<dd><ul>
<li>[breaking] Change: copas.addserver() now uses the timeout value as a copas timeout,
Expand Down
102 changes: 97 additions & 5 deletions docs/reference.html
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,21 @@ <h3>Non-blocking data exchange and timer/sleep functions</h3>
<dd>Non-blocking equivalent to the LuaSocket method (after <code>copas.wrap</code>).
</dd>

<dt><strong><code>sock:settimeout(...)</code></strong></dt>
<dd>Equivalent to the LuaSocket method (after <code>copas.wrap</code>).
<dt><strong><code>sock:settimeout([timeout])</code></strong></dt>
<dd>Sets the timeouts (in seconds) for a socket (after <code>copas.wrap</code>).
The default is to not have a timeout and wait indefinitely. This method is compatible
with LuaSocket, but sets all three timeouts to the same value.<br/>
<strong>Important:</strong> this behaviour is the same as LuaSocket, but different from
<code>sock:settimeouts</code>, where <code>nil</code> means 'do not change' the timeout.<br/>
If a timeout is hit, the operation will return <code>nil + "timeout"</code>.
</dd>

<dt><strong><code>sock:settimeouts([connect], [send], [receive])</code></strong></dt>
<dd>Sets the timeouts (in seconds) for a socket (after <code>copas.wrap</code>).
The default is to not have a timeout and wait indefinitely.<br/>
<strong>Important:</strong> this behaviour is different from
<code>sock:settimeout</code>, where <code>nil</code> means 'wait indefinitely'.<br/>
If a timeout is hit, the operation will return <code>nil + "timeout"</code>.
</dd>

<dt><strong><code>sock:sni(...)</code></strong></dt>
Expand All @@ -292,7 +305,8 @@ <h3>Non-blocking data exchange and timer/sleep functions</h3>

<dt><strong><code>lock:destroy()</code></strong></dt>
<dd>Will destroy the lock and release all waiting threads. The result for those
threads will be <code>nil + "destroyed" + wait_time</code>.
threads will be <code>nil + "destroyed" + wait_time</code>, any new call on any
method will return <code>nil + "destroyed"</code> from here on.
</dd>

<dt><strong><code>lock:get([timeout])</code></strong></dt>
Expand All @@ -317,6 +331,71 @@ <h3>Non-blocking data exchange and timer/sleep functions</h3>
<dd>Releases the currently held lock. Returns <code>true</code> or <code>nil + error</code>.
</dd>

<dt><strong><code>queue:add_worker(func)</code></strong></dt>
<dd>Adds a worker that will handle whatever is passed into the queue. Can be called
multiple times to add more workers. The function <code>func</code> is wrapped and added
as a copas thread. The threads automatically exit when the queue is destroyed.
Worker function signature: <code>function(item)</code> (Note: worker functions run
unprotected, so wrap code in an (x)pcall if errors are expected, otherwise the
worker will exit on an error, and queue handling will stop).
Returns the coroutine added, or <code>nil+"destroyed"</code>
</dd>

<dt><strong><code>queue:destroy()</code></strong></dt>
<dd>Destroys a queue immediately. Abandons what is left in the queue.
Releases all waiting calls to <code>queue:pop()</code> with <code>nil+"destroyed"</code>.
Returns <code>true</code>, or <code>nil+"destroyed"</code>
</dd>

<dt><strong><code>queue:finish([timeout], [no_destroy_on_timeout])</code></strong></dt>
<dd>Finishes a queue. Calls <code>queue:stop()</code> and then waits for the queue to run
empty (and be destroyed) before returning.
Parameter <code>no_destroy_on_timeout</code> indicates if the queue is not to be forcefully
destroyed on a timeout (abandonning what ever is left in the queue).
Returns <code>true</code>, or <code>nil+"timeout"</code>, or <code>nil+"destroyed"</code>.
</dd>

<dt><strong><code>queue:get_size()</code></strong></dt>
<dd>Gets the number of items in the queue currently.
Returns <code>number</code> or <code>nil + "destroyed"</code>.
</dd>

<dt><strong><code>queue:get_workers()</code></strong></dt>
<dd>Returns a list/array of current workers (coroutines) handling the queue.
(only the workers added by <code>queue:add_worker()</code>, and still active,
will be in the list).
Returns <code>list</code> or <code>nil + "destroyed"</code>.
</dd>

<dt><strong><code>queue.new()</code></strong></dt>
<dd>Creates and returns a new queue.
</dd>

<dt><strong><code>queue:pop([timeout])</code></strong></dt>
<dd>Will pop an item from the queue. If there are no items in the queue it will yield
until there are or a timeout happens (exception is when <code>timeout == 0</code>, then it will
not yield but return immediately, be careful not to create a hanging loop!). Timeout defaults
to the default time-out of a semaphore.
Returns an item, or <code>nil+"timeout"</code>, or <code>nil+"destroyed"</code>.
</dd>

<dt><strong><code>queue:push(item)</code></strong></dt>
<dd>Will push a new item in the queue. Item can be any type, including 'nil'.
Returns <code>true</code> or <code>nil + "destroyed"</code>.
</dd>

<dt><strong><code>queue:stop()</code></strong></dt>
<dd>Instructs the queue to stop. It will no longer accept calls to <code>queue:push()</code>,
and will call <code>queue:destroy()</code> once the queue is empty.
Returns <code>true</code> or <code>nil + "destroyed"</code>.
</dd>

<dt><strong><code>semaphore:destroy()</code></strong></dt>
<dd>Will destroy the sempahore and release all waiting threads. The result for those
threads will be <code>nil + "destroyed"</code>, any new call on any
method will return <code>nil + "destroyed"</code> from here on.
</dd>

<dt><strong><code>semaphore:get_count()</code></strong></dt>
<dd>Returns the number of resources currently available in the semaphore.
</dd>
Expand Down Expand Up @@ -495,8 +574,21 @@ <h3>Low level Copas functions</h3>
<dd>(deprecated, since UDP sending doesn't block)</dd>

<dt><strong><code>copas.settimeout(skt, [timeout])</code></strong></dt>
<dd>Sets the timeout (in seconds) for a socket. The default is to not have a timeout and wait
indefinitely. If a timeout is hit, the operation will return <code>nil + "timeout"</code>.
<dd>Sets the timeout (in seconds) for a socket. A negative timout or absent timeout (<code>nil</code>)
will wait indefinitely.<br/>
<strong>Important:</strong> this behaviour is the same as LuaSocket, but different from
<code>copas.settimeouts</code>, where <code>nil</code> means 'do not change' the timeout.<br/>
If a timeout is hit, the operation will return <code>nil + "timeout"</code>.
Timeouts are applied on: <code>receive, receivefrom, receivePartial, send, connect, dohandshake</code>.</br>
See <code>copas.useSocketTimeoutErrors()</code> below for alternative error messages.
</dd>

<dt><strong><code>copas.settimeouts(skt, [connect], [send], [receive])</code></strong></dt>
<dd>Sets the timeouts (in seconds) for a socket. The default is to not have a timeout and wait
indefinitely.<br/>
<strong>Important:</strong> this behaviour is different from
<code>copas.settimeout</code>, where <code>nil</code> means 'wait indefinitely'.<br/>
If a timeout is hit, the operation will return <code>nil + "timeout"</code>.
Timeouts are applied on: <code>receive, receivefrom, receivePartial, send, connect, dohandshake</code>.</br>
See <code>copas.useSocketTimeoutErrors()</code> below for alternative error messages.
</dd>
Expand Down
102 changes: 86 additions & 16 deletions src/copas.lua
Original file line number Diff line number Diff line change
Expand Up @@ -259,16 +259,24 @@ local _isSocketTimeout = { -- set of errors indicating a socket-timeout
-------------------------------------------------------------------------------
-- Coroutine based socket timeouts.
-------------------------------------------------------------------------------

local usertimeouts = setmetatable({}, {
local user_timeouts_connect
local user_timeouts_send
local user_timeouts_receive
do
local timeout_mt = {
__mode = "k",
__index = function(self, skt)
-- if there is no timeout found, we insert one automatically,
-- a 10 year timeout as substitute for the default "blocking" should do
self[skt] = 10*365*24*60*60
return self[skt]
end,
})
}

user_timeouts_connect = setmetatable({}, timeout_mt)
user_timeouts_send = setmetatable({}, timeout_mt)
user_timeouts_receive = setmetatable({}, timeout_mt)
end

local useSocketTimeoutErrors = setmetatable({},{ __mode = "k" })

Expand Down Expand Up @@ -304,14 +312,19 @@ local sto_timeout, sto_timed_out, sto_change_queue, sto_error do
-- Calling it as `sto_timeout()` will cancel the timeout.
-- @param queue (string) the queue the socket is currently in, must be either "read" or "write"
-- @param skt (socket) the socket on which to operate
-- @param use_connect_to (bool) timeout to use is determined based on queue (read/write) or if this
-- is truthy, it is the connect timeout.
-- @return true
function sto_timeout(skt, queue)
function sto_timeout(skt, queue, use_connect_to)
local co = coroutine.running()
socket_register[co] = skt
operation_register[co] = queue
timeout_flags[co] = nil
if skt then
copas.timeout(usertimeouts[skt], socket_callback)
local to = (use_connect_to and user_timeouts_connect[skt]) or
(queue == "read" and user_timeouts_receive[skt]) or
user_timeouts_send[skt]
copas.timeout(to, socket_callback)
else
copas.timeout(0)
end
Expand Down Expand Up @@ -367,17 +380,53 @@ function copas.close(skt, ...)
end



-- nil or negative is indefinitly
function copas.settimeout(skt, timeout)
timeout = timeout or -1
if type(timeout) ~= "number" then
return nil, "timeout must be 'nil' or a number"
end

return copas.settimeouts(skt, timeout, timeout, timeout)
end

-- negative is indefinitly, nil means do not change
function copas.settimeouts(skt, connect, send, read)

if connect ~= nil and type(connect) ~= "number" then
return nil, "connect timeout must be 'nil' or a number"
end
if connect then
if connect < 0 then
connect = nil
end
user_timeouts_connect[skt] = connect
end


if timeout ~= nil and type(timeout) ~= "number" then
return nil, "timeout must be a 'nil' or a number"
if send ~= nil and type(send) ~= "number" then
return nil, "send timeout must be 'nil' or a number"
end
if send then
if send < 0 then
send = nil
end
user_timeouts_send[skt] = send
end

if timeout and timeout < 0 then
timeout = nil -- negative is same as nil; blocking indefinitely

if read ~= nil and type(read) ~= "number" then
return nil, "read timeout must be 'nil' or a number"
end
if read then
if read < 0 then
read = nil
end
user_timeouts_receive[skt] = read
end

usertimeouts[skt] = timeout

return true
end

Expand Down Expand Up @@ -555,7 +604,7 @@ end
function copas.connect(skt, host, port)
skt:settimeout(0)
local ret, err, tried_more_than_once
sto_timeout(skt, "write")
sto_timeout(skt, "write", true)

repeat
ret, err = skt:connect(host, port)
Expand Down Expand Up @@ -603,7 +652,8 @@ local function ssl_wrap(skt, wrap_params)
return error(err) -- throw a hard error, because we do not want to silently ignore this one!!
end
nskt:settimeout(0) -- non-blocking on the ssl-socket
copas.settimeout(nskt, usertimeouts[skt]) -- copy copas user-timeout to newly wrapped one
copas.settimeouts(nskt, user_timeouts_connect[skt],
user_timeouts_send[skt], user_timeouts_receive[skt]) -- copy copas user-timeout to newly wrapped one
return nskt
end

Expand Down Expand Up @@ -673,7 +723,7 @@ function copas.dohandshake(skt, wrap_params)

local nskt = ssl_wrap(skt, wrap_params)

sto_timeout(nskt, "write")
sto_timeout(nskt, "write", true)
local queue

repeat
Expand Down Expand Up @@ -722,7 +772,7 @@ local _skt_mt_tcp = {
end,

receive = function (self, pattern, prefix)
if usertimeouts[self.socket] == 0 then
if user_timeouts_receive[self.socket] == 0 then
return copas.receivePartial(self.socket, pattern, prefix)
end
return copas.receive(self.socket, pattern, prefix)
Expand All @@ -736,6 +786,10 @@ local _skt_mt_tcp = {
return copas.settimeout(self.socket, time)
end,

settimeouts = function (self, connect, send, receive)
return copas.settimeouts(self.socket, connect, send, receive)
end,

-- TODO: socket.connect is a shortcut, and must be provided with an alternative
-- if ssl parameters are available, it will also include a handshake
connect = function(self, ...)
Expand Down Expand Up @@ -782,6 +836,10 @@ local _skt_mt_tcp = {
return self.socket:sni(names, strict)
end,

settimeout = function (self, time)
return copas.settimeout(self.socket, time)
end,

dohandshake = function(self, wrap_params)
local nskt, err = copas.dohandshake(self.socket, wrap_params or self.ssl_params.wrap)
if not nskt then return nskt, err end
Expand Down Expand Up @@ -818,6 +876,11 @@ _skt_mt_udp.__index.setsockname = function(self, ...) return self.socket:setsock
-- do not close client, as it is also the server for udp.
_skt_mt_udp.__index.close = function(self, ...) return true end

_skt_mt_udp.__index.settimeouts = function (self, connect, send, receive)
return copas.settimeouts(self.socket, connect, send, receive)
end



---
-- Wraps a LuaSocket socket object in an async Copas based socket object.
Expand Down Expand Up @@ -918,7 +981,8 @@ local function _accept(server_skt, handler)
local client_skt = server_skt:accept()
if client_skt then
client_skt:settimeout(0)
copas.settimeout(client_skt, usertimeouts[server_skt]) -- copy server socket timeout settings
copas.settimeouts(client_skt, user_timeouts_connect[server_skt], -- copy server socket timeout settings
user_timeouts_send[server_skt], user_timeouts_receive[server_skt])
local co = coroutine.create(handler)
_doTick(co, client_skt)
end
Expand Down Expand Up @@ -984,7 +1048,13 @@ end
function copas.addthread(handler, ...)
-- create a coroutine that skips the first argument, which is always the socket
-- passed by the scheduler, but `nil` in case of a task/thread
local thread = coroutine.create(function(_, ...) return handler(...) end)
local thread = coroutine.create(function(_, ...)
-- TODO: this should be added to not immediately execute the thread
-- it should only schedule and then return to the calling code
-- Enabling this breaks the "limitset".
-- copas.sleep(0)
return handler(...)
end)
_threads[thread] = true -- register this thread so it can be removed
_doTick (thread, nil, ...)
return thread
Expand Down

0 comments on commit eea386d

Please sign in to comment.