Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Devel #46

Merged
merged 7 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ $ nimble install zmq
close(responder)
```

#### client
#### client

```nim
import zmq
Expand All @@ -49,3 +49,12 @@ $ nimble install zmq
For more examples demonstrating many functionalities and patterns that ZMQ offers, see the ``tests/`` and ``examples/`` folder.

The examples are commented to better understand how zmq works.


### Log EAGAIN errno

Sometimes EAGAIN error happens in ZMQ context; typically this is a non-ctritical error that can be ignored. Nonetheless, if you desire to log or display such error, it is possible to enable it using the ``enableLogEagain`` and disable it with ``disableLogEagain``.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How hard would it be to make this a runtime switch? Compile-time switches for libraries do not compose.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very easy.

I will make the change.

### Setting default flag as DONTWAIT

The default flag passed to send / receive is NOFLAGS. This can be overriden by defining ``-d:defaultFlagDontWait``
38 changes: 24 additions & 14 deletions tests/async_demo.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ proc asyncpoll() =
puller2,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount)
if msglist.contains(msg):
msglist.delete(0)
assert true
else:
assert false
let
# Avoid using indefinitly blocking proc in async context
res = x.waitForReceive(timeout=10)
if res.msgAvailable:
let
msg = res.msg
inc(msgCount)
if msglist.contains(msg):
msglist.delete(0)
assert true
else:
assert false
)
# assert message received are correct (should be even integer in string format)
var msglist2 = @["0", "2", "4", "6", "8"]
Expand All @@ -37,13 +42,18 @@ proc asyncpoll() =
puller,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount2)
if msglist2.contains(msg):
msglist2.delete(0)
assert true
else:
assert false
let
# Avoid using indefinitly blocking proc in async context
res = x.waitForReceive(timeout=10)
if res.msgAvailable:
let
msg = res.msg
inc(msgCount2)
if msglist2.contains(msg):
msglist2.delete(0)
assert true
else:
assert false
)

let
Expand Down
32 changes: 18 additions & 14 deletions tests/tzmq.nim
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,15 @@ proc asyncpoll() =
puller2,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount)
if msglist.contains(msg):
msglist.delete(0)
check true
else:
check false
let res= x.tryReceive()
if res.msgAvailable:
let msg = res.msg
inc(msgCount)
if msglist.contains(msg):
msglist.delete(0)
check true
else:
check false
)
# Check message received are correct (should be even integer in string format)
var msglist2 = @["0", "2", "4", "6", "8"]
Expand All @@ -234,13 +236,15 @@ proc asyncpoll() =
puller,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount2)
if msglist2.contains(msg):
msglist2.delete(0)
check true
else:
check false
let res = x.tryReceive()
if res.msgAvailable:
let msg = res.msg
inc(msgCount2)
if msglist2.contains(msg):
msglist2.delete(0)
check true
else:
check false
)

let
Expand Down
3 changes: 1 addition & 2 deletions zmq.nimble
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Package

version = "1.5.1"
version = "1.5.2"
author = "Andreas Rumpf"
description = "ZeroMQ wrapper"
license = "MIT"
Expand All @@ -18,4 +18,3 @@ task buildexamples, "Compile all examples":

task gendoc, "Generate documentation":
exec("nim doc --mm:orc --project --out:docs/ zmq.nim")

15 changes: 11 additions & 4 deletions zmq/asynczmq.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,39 @@ proc `=destroy`*(obj: AsyncZPoller) =

proc register*(poller: var AsyncZPoller, sock: ZSocket, event: int, cb: AsyncZPollCB) =
## Register ZSocket function
## The callback should ideally use non-blocking proc such ``waitForReceive`` or ``tryReceive`` or ``c.receive(DONTWAIT)``
poller.zpoll.register(sock, event)
poller.cb.add(cb)

proc register*(poller: var AsyncZPoller, conn: ZConnection, event: int, cb: AsyncZPollCB) =
## Register ZConnection
## The callback should ideally use non-blocking proc such ``waitForReceive`` or ``tryReceive`` or ``c.receive(DONTWAIT)``
poller.register(conn.socket, event, cb)

proc register*(poller: var AsyncZPoller, item: ZPollItem, cb: AsyncZPollCB) =
## Register ZConnection
## Register ZConnection.
## The callback should use non-blocking proc ``waitForReceive`` with strictly positive timeout or ``tryReceive`` or ``c.receive(DONTWAIT)``
poller.zpoll.items.add(item)
poller.cb.add(cb)

proc initZPoller*(poller: sink ZPoller, cb: AsyncZPollCB) : AsyncZPoller =
## The callback should use non-blocking proc such ``waitForReceive`` or ``tryReceive`` or ``c.receive(DONTWAIT)``
for p in poller.items:
result.register(p, cb)

proc initZPoller*(args: openArray[tuple[item: ZConnection, cb: AsyncZPollCB]], event: cshort): AsyncZPoller =
## Init a ZPoller with all items on the same event
## The callback should use non-blocking proc ``waitForReceive`` with strictly positive timeout or ``tryReceive`` or ``c.receive(DONTWAIT)``
for arg in args:
result.register(arg.item, event, arg.cb)

proc pollAsync*(poller: AsyncZPoller, timeout: int = 2) : Future[int] =
## Experimental API. Poll all the ZConnection and execute an async CB when ``event`` occurs.
## The callback should use non-blocking proc ``waitForReceive`` with strictly positive timeout or ``tryReceive`` or ``c.receive(DONTWAIT)``

var timeout = max(2, timeout)
result = newFuture[int]("pollAsync")
var r = poller.zpoll.poll(timeout)
var r = poller.zpoll.poll(timeout div 2)
# ZMQ can't have a timeout smaller than one
if r > 0:
for i in 0..<poller.len():
Expand All @@ -78,7 +86,7 @@ proc pollAsync*(poller: AsyncZPoller, timeout: int = 2) : Future[int] =

if hasPendingOperations():
# poll vs drain ?
drain(timeout)
drain(timeout div 2)

result.complete(r)

Expand Down Expand Up @@ -158,4 +166,3 @@ proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWA
# can send without blocking
conn.send(msg, flags)
fut.complete()

54 changes: 38 additions & 16 deletions zmq/connections.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import ./bindings
import std/[strformat]
import std/[strformat, logging]

# Unofficial easier-for-Nim API

Expand All @@ -13,7 +13,7 @@ type

ZConnectionImpl* {.pure, final.} = object
## A Zmq connection. Since ``ZContext`` and ``ZSocket`` are pointers, it is highly recommended to **not** copy ``ZConnection``.
context*: ZContext ## Zmq context from C-bindings.
context*: ZContext ## Zmq context from C-bindings.
socket*: ZSocket ## Zmq socket from C-bindings.
ownctx: bool # Boolean indicating if the connection owns the Zmq context
alive: bool # Boolean indicating if the connection has been closed
Expand All @@ -32,16 +32,39 @@ proc zmqError*() {.noinline, noreturn.} =
e.msg = &"Error: {e.error}. " & $strerror(e.error)
raise e

var shouldLogEagainError = false

proc enableLogEagain*() =
## Enable logging EAGAIN error in ZMQ calls
shouldLogEagainError = true

proc disableLogEagain*() =
## Disable logging EAGAIN error in ZMQ calls
shouldLogEagainError = false

proc zmqErrorExceptEAGAIN() =
var e: ref ZmqError
new(e)
e.error = errno()
let errmsg = $strerror(e.error)
if e.error == ZMQ_EAGAIN:
discard
if shouldLogEagainError:
if logging.getHandlers().len() > 0:
warn(errmsg)
else:
echo(errmsg)
else:
discard
else:
e.msg = &"Error: {e.error}. " & $strerror(e.error)
e.msg = &"Error: {e.error}. " & errmsg
raise e

template defaultFlag() : ZSendRecvOptions =
when defined(defaultFlagDontWait):
DONTWAIT
else:
NOFLAGS

#[
# Context related proc
]#
Expand Down Expand Up @@ -135,7 +158,6 @@ proc getsockopt*[T: SomeOrdinal|string](c: ZConnection, option: ZSockOptions): T
## Check http://api.zeromq.org/4-2:zmq-setsockopt
getsockopt[T](c.socket, option)


#[
Destructor
]#
Expand Down Expand Up @@ -305,7 +327,7 @@ proc close*(c: ZConnection, linger: int = 500) =

# Send / Receive
# Send with ZSocket type
proc send*(s: ZSocket, msg: string, flags: ZSendRecvOptions = NOFLAGS) =
proc send*(s: ZSocket, msg: string, flags: ZSendRecvOptions = defaultFlag()) =
## Sends a message through the socket.
var m: ZMsg
if msg_init(m, msg.len) != 0:
Expand All @@ -330,7 +352,7 @@ proc sendAll*(s: ZSocket, msg: varargs[string]) =
inc(i)
s.send(msg[i])

proc send*(c: ZConnection, msg: string, flags: ZSendRecvOptions = NOFLAGS) =
proc send*(c: ZConnection, msg: string, flags: ZSendRecvOptions = defaultFlag()) =
## Sends a message over the connection.
send(c.socket, msg, flags)

Expand All @@ -339,7 +361,7 @@ proc sendAll*(c: ZConnection, msg: varargs[string]) =
sendAll(c.socket, msg)

# receive with ZSocket type
proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
result.moreAvailable = false
result.msgAvailable = false

Expand All @@ -364,7 +386,7 @@ proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila
if msg_close(m) != 0:
zmqError()

proc waitForReceive*(s: ZSocket, timeout: int = -2, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc waitForReceive*(s: ZSocket, timeout: int = -2, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
## Set RCVTIMEO for the socket and wait until a message is available.
## This function is blocking.
##
Expand Down Expand Up @@ -392,7 +414,7 @@ proc waitForReceive*(s: ZSocket, timeout: int = -2, flags: ZSendRecvOptions = NO
if shouldUpdateTimeout:
s.setsockopt(RCVTIMEO, curtimeout.cint)

proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
## Receives a message from a socket in a non-blocking way.
##
## Indicate whether a message was received or EAGAIN occured by ``msgAvailable``
Expand All @@ -406,13 +428,13 @@ proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila
if (status and ZMQ_POLLIN) != 0:
result = receiveImpl(s, flags)

proc receive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): string =
proc receive*(s: ZSocket, flags: ZSendRecvOptions = defaultFlag()): string =
## Receive a message on socket.
#
## Return an empty string on EAGAIN
receiveImpl(s, flags).msg

proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): seq[string] =
proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = defaultFlag()): seq[string] =
## Receive all parts of a message
##
## If EAGAIN occurs without any data being received, it will be an empty seq
Expand All @@ -425,7 +447,7 @@ proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): seq[string] =
else:
expectMessage = false

proc waitForReceive*(c: ZConnection, timeout: int = -1, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc waitForReceive*(c: ZConnection, timeout: int = -1, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
## Set RCVTIMEO for the socket and wait until a message is available.
## This function is blocking.
##
Expand All @@ -438,19 +460,19 @@ proc waitForReceive*(c: ZConnection, timeout: int = -1, flags: ZSendRecvOptions
## Indicate if more parts are needed to be received by ``moreAvailable``
waitForReceive(c.socket, timeout, flags)

proc tryReceive*(c: ZConnection, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
proc tryReceive*(c: ZConnection, flags: ZSendRecvOptions = defaultFlag()): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] =
## Receives a message from a socket in a non-blocking way.
##
## Indicate whether a message was received or EAGAIN occured by ``msgAvailable``
##
## Indicate if more parts are needed to be received by ``moreAvailable``
tryReceive(c.socket, flags)

proc receive*(c: ZConnection, flags: ZSendRecvOptions = NOFLAGS): string =
proc receive*(c: ZConnection, flags: ZSendRecvOptions = defaultFlag()): string =
## Receive data over the connection
receive(c.socket, flags)

proc receiveAll*(c: ZConnection, flags: ZSendRecvOptions = NOFLAGS): seq[string] =
proc receiveAll*(c: ZConnection, flags: ZSendRecvOptions = defaultFlag()): seq[string] =
## Receive all parts of a message
##
## If EAGAIN occurs without any data being received, it will be an empty seq
Expand Down