Skip to content

Commit

Permalink
Merge pull request #46 from nim-lang/devel
Browse files Browse the repository at this point in the history
Devel
  • Loading branch information
Araq committed May 28, 2024
2 parents d948370 + 7c60860 commit 6eb2f6e
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 51 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ $ nimble install zmq
close(responder)
```

#### client
#### client

```nim
import zmq
Expand All @@ -49,3 +49,12 @@ $ nimble install zmq
For more examples demonstrating many functionalities and patterns that ZMQ offers, see the ``tests/`` and ``examples/`` folder.

The examples are commented to better understand how zmq works.


### Log EAGAIN errno

Sometimes EAGAIN error happens in ZMQ context; typically this is a non-ctritical error that can be ignored. Nonetheless, if you desire to log or display such error, it is possible to enable it using the ``enableLogEagain`` and disable it with ``disableLogEagain``.

### Setting default flag as DONTWAIT

The default flag passed to send / receive is NOFLAGS. This can be overriden by defining ``-d:defaultFlagDontWait``
38 changes: 24 additions & 14 deletions tests/async_demo.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ proc asyncpoll() =
puller2,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount)
if msglist.contains(msg):
msglist.delete(0)
assert true
else:
assert false
let
# Avoid using indefinitly blocking proc in async context
res = x.waitForReceive(timeout=10)
if res.msgAvailable:
let
msg = res.msg
inc(msgCount)
if msglist.contains(msg):
msglist.delete(0)
assert true
else:
assert false
)
# assert message received are correct (should be even integer in string format)
var msglist2 = @["0", "2", "4", "6", "8"]
Expand All @@ -37,13 +42,18 @@ proc asyncpoll() =
puller,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount2)
if msglist2.contains(msg):
msglist2.delete(0)
assert true
else:
assert false
let
# Avoid using indefinitly blocking proc in async context
res = x.waitForReceive(timeout=10)
if res.msgAvailable:
let
msg = res.msg
inc(msgCount2)
if msglist2.contains(msg):
msglist2.delete(0)
assert true
else:
assert false
)

let
Expand Down
32 changes: 18 additions & 14 deletions tests/tzmq.nim
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,15 @@ proc asyncpoll() =
puller2,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount)
if msglist.contains(msg):
msglist.delete(0)
check true
else:
check false
let res= x.tryReceive()
if res.msgAvailable:
let msg = res.msg
inc(msgCount)
if msglist.contains(msg):
msglist.delete(0)
check true
else:
check false
)
# Check message received are correct (should be even integer in string format)
var msglist2 = @["0", "2", "4", "6", "8"]
Expand All @@ -234,13 +236,15 @@ proc asyncpoll() =
puller,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount2)
if msglist2.contains(msg):
msglist2.delete(0)
check true
else:
check false
let res = x.tryReceive()
if res.msgAvailable:
let msg = res.msg
inc(msgCount2)
if msglist2.contains(msg):
msglist2.delete(0)
check true
else:
check false
)

let
Expand Down
3 changes: 1 addition & 2 deletions zmq.nimble
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Package

version = "1.5.1"
version = "1.5.2"
author = "Andreas Rumpf"
description = "ZeroMQ wrapper"
license = "MIT"
Expand All @@ -18,4 +18,3 @@ task buildexamples, "Compile all examples":

task gendoc, "Generate documentation":
exec("nim doc --mm:orc --project --out:docs/ zmq.nim")

15 changes: 11 additions & 4 deletions zmq/asynczmq.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,39 @@ proc `=destroy`*(obj: AsyncZPoller) =

proc register*(poller: var AsyncZPoller, sock: ZSocket, event: int, cb: AsyncZPollCB) =
## Register ZSocket function
## The callback should ideally use non-blocking proc such ``waitForReceive`` or ``tryReceive`` or ``c.receive(DONTWAIT)``
poller.zpoll.register(sock, event)
poller.cb.add(cb)

proc register*(poller: var AsyncZPoller, conn: ZConnection, event: int, cb: AsyncZPollCB) =
## Register ZConnection
## The callback should ideally use non-blocking proc such ``waitForReceive`` or ``tryReceive`` or ``c.receive(DONTWAIT)``
poller.register(conn.socket, event, cb)

proc register*(poller: var AsyncZPoller, item: ZPollItem, cb: AsyncZPollCB) =
## Register ZConnection
## Register ZConnection.
## The callback should use non-blocking proc ``waitForReceive`` with strictly positive timeout or ``tryReceive`` or ``c.receive(DONTWAIT)``
poller.zpoll.items.add(item)
poller.cb.add(cb)

proc initZPoller*(poller: sink ZPoller, cb: AsyncZPollCB) : AsyncZPoller =
## The callback should use non-blocking proc such ``waitForReceive`` or ``tryReceive`` or ``c.receive(DONTWAIT)``
for p in poller.items:
result.register(p, cb)

proc initZPoller*(args: openArray[tuple[item: ZConnection, cb: AsyncZPollCB]], event: cshort): AsyncZPoller =
## Init a ZPoller with all items on the same event
## The callback should use non-blocking proc ``waitForReceive`` with strictly positive timeout or ``tryReceive`` or ``c.receive(DONTWAIT)``
for arg in args:
result.register(arg.item, event, arg.cb)

proc pollAsync*(poller: AsyncZPoller, timeout: int = 2) : Future[int] =
## Experimental API. Poll all the ZConnection and execute an async CB when ``event`` occurs.
## The callback should use non-blocking proc ``waitForReceive`` with strictly positive timeout or ``tryReceive`` or ``c.receive(DONTWAIT)``

var timeout = max(2, timeout)
result = newFuture[int]("pollAsync")
var r = poller.zpoll.poll(timeout)
var r = poller.zpoll.poll(timeout div 2)
# ZMQ can't have a timeout smaller than one
if r > 0:
for i in 0..<poller.len():
Expand All @@ -78,7 +86,7 @@ proc pollAsync*(poller: AsyncZPoller, timeout: int = 2) : Future[int] =

if hasPendingOperations():
# poll vs drain ?
drain(timeout)
drain(timeout div 2)

result.complete(r)

Expand Down Expand Up @@ -158,4 +166,3 @@ proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWA
# can send without blocking
conn.send(msg, flags)
fut.complete()

54 changes: 38 additions & 16 deletions zmq/connections.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import ./bindings
import std/[strformat]
import std/[strformat, logging]

# Unofficial easier-for-Nim API

Expand All @@ -13,7 +13,7 @@ type

ZConnectionImpl* {.pure, final.} = object
## A Zmq connection. Since ``ZContext`` and ``ZSocket`` are pointers, it is highly recommended to **not** copy ``ZConnection``.
context*: ZContext ## Zmq context from C-bindings.
context*: ZContext ## Zmq context from C-bindings.
socket*: ZSocket ## Zmq socket from C-bindings.
ownctx: bool # Boolean indicating if the connection owns the Zmq context
alive: bool # Boolean indicating if the connection has been closed
Expand All @@ -32,16 +32,39 @@ proc zmqError*() {.noinline, noreturn.} =
e.msg = &"Error: {e.error}. " & $strerror(e.error)
raise e

var shouldLogEagainError = false

proc enableLogEagain*() =
## Enable logging EAGAIN error in ZMQ calls
shouldLogEagainError = true

proc disableLogEagain*() =
## Disable logging EAGAIN error in ZMQ calls
shouldLogEagainError = false

proc zmqErrorExceptEAGAIN() =
var e: ref ZmqError
new(e)
e.error = errno()
let errmsg = $strerror(e.error)
if e.error == ZMQ_EAGAIN:
discard
if shouldLogEagainError:
if logging.getHandlers().len() > 0:
warn(errmsg)
else:
echo(errmsg)
else:
discard
else:
e.msg = &"Error: {e.error}. " & $strerror(e.error)
e.msg = &"Error: {e.error}. " & errmsg
raise e

template defaultFlag() : ZSendRecvOptions =
when defined(defaultFlagDontWait):
DONTWAIT
else:
NOFLAGS

#[
# Context related proc
]#
Expand Down Expand Up @@ -135,7 +158,6 @@ proc getsockopt*[T: SomeOrdinal|string](c: ZConnection, option: ZSockOptions): T
## Check http://api.zeromq.org/4-2:zmq-setsockopt
getsockopt[T](c.socket, option)


#[
Destructor
]#
Expand Down Expand Up @@ -305,7 +327,7 @@ proc close*(c: ZConnection, linger: int = 500) =

# Send / Receive
# Send with ZSocket type
proc send*(s: ZSocket, msg: string, flags: ZSendRecvOptions = NOFLAGS) =
proc send*(s: ZSocket, msg: string, flags: ZSendRecvOptions = defaultFlag()) =
## Sends a message through the socket.
var m: ZMsg
if msg_init(m, msg.len) != 0:
Expand All @@ -330,7 +352,7 @@ proc sendAll*(s: ZSocket, msg: varargs[string]) =
inc(i)
s.send(msg[i])

proc send*(c: ZConnection, msg: string, flags: ZSendRecvOptions = NOFLAGS) =
proc send*(c: ZConnection, msg: string, flags: ZSendRecvOptions = defaultFlag()) =
## Sends a message over the connection.
send(c.socket, msg, flags)

Expand All @@ -339,7 +361,7 @@ proc sendAll*(c: ZConnection, msg: varargs[string]) =
sendAll(c.socket, msg)

# receive with ZSocket type
proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
result.moreAvailable = false
result.msgAvailable = false

Expand All @@ -364,7 +386,7 @@ proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila
if msg_close(m) != 0:
zmqError()

proc waitForReceive*(s: ZSocket, timeout: int = -2, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc waitForReceive*(s: ZSocket, timeout: int = -2, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
## Set RCVTIMEO for the socket and wait until a message is available.
## This function is blocking.
##
Expand Down Expand Up @@ -392,7 +414,7 @@ proc waitForReceive*(s: ZSocket, timeout: int = -2, flags: ZSendRecvOptions = NO
if shouldUpdateTimeout:
s.setsockopt(RCVTIMEO, curtimeout.cint)

proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
## Receives a message from a socket in a non-blocking way.
##
## Indicate whether a message was received or EAGAIN occured by ``msgAvailable``
Expand All @@ -406,13 +428,13 @@ proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila
if (status and ZMQ_POLLIN) != 0:
result = receiveImpl(s, flags)

proc receive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): string =
proc receive*(s: ZSocket, flags: ZSendRecvOptions = defaultFlag()): string =
## Receive a message on socket.
#
## Return an empty string on EAGAIN
receiveImpl(s, flags).msg

proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): seq[string] =
proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = defaultFlag()): seq[string] =
## Receive all parts of a message
##
## If EAGAIN occurs without any data being received, it will be an empty seq
Expand All @@ -425,7 +447,7 @@ proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): seq[string] =
else:
expectMessage = false

proc waitForReceive*(c: ZConnection, timeout: int = -1, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc waitForReceive*(c: ZConnection, timeout: int = -1, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
## Set RCVTIMEO for the socket and wait until a message is available.
## This function is blocking.
##
Expand All @@ -438,19 +460,19 @@ proc waitForReceive*(c: ZConnection, timeout: int = -1, flags: ZSendRecvOptions
## Indicate if more parts are needed to be received by ``moreAvailable``
waitForReceive(c.socket, timeout, flags)

proc tryReceive*(c: ZConnection, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc tryReceive*(c: ZConnection, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
## Receives a message from a socket in a non-blocking way.
##
## Indicate whether a message was received or EAGAIN occured by ``msgAvailable``
##
## Indicate if more parts are needed to be received by ``moreAvailable``
tryReceive(c.socket, flags)

proc receive*(c: ZConnection, flags: ZSendRecvOptions = NOFLAGS): string =
proc receive*(c: ZConnection, flags: ZSendRecvOptions = defaultFlag()): string =
## Receive data over the connection
receive(c.socket, flags)

proc receiveAll*(c: ZConnection, flags: ZSendRecvOptions = NOFLAGS): seq[string] =
proc receiveAll*(c: ZConnection, flags: ZSendRecvOptions = defaultFlag()): seq[string] =
## Receive all parts of a message
##
## If EAGAIN occurs without any data being received, it will be an empty seq
Expand Down

0 comments on commit 6eb2f6e

Please sign in to comment.