Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add ngx_zeromq_{send,recv}msg helper functions.

Change-Id: I0c3a07ca485a58dc135512cefbe381f89f2d9a62
Signed-off-by: Piotr Sikora <piotr.sikora@frickle.com>
  • Loading branch information...
commit 2df71eb124ed3f1f78c7ddee4288a7cfa9259c51 1 parent 3afca3f
@PiotrSikora PiotrSikora authored
Showing with 90 additions and 73 deletions.
  1. +90 −73 src/ngx_event_zeromq.c
View
163 src/ngx_event_zeromq.c
@@ -51,6 +51,9 @@ static void ngx_zeromq_log_error(ngx_log_t *log, const char *text);
static ngx_int_t ngx_zeromq_ready(void *zmq, ngx_event_t *ev, const char *what,
uint32_t want);
+static ssize_t ngx_zeromq_sendmsg(void *zmq, ngx_event_t *ev, zmq_msg_t *msg,
+ int flags);
+static ssize_t ngx_zeromq_recvmsg(void *zmq, ngx_event_t *ev, zmq_msg_t *msg);
static ssize_t ngx_zeromq_send_part(void *zmq, ngx_event_t *wev, u_char *buf,
size_t size, int flags);
@@ -333,57 +336,105 @@ ngx_zeromq_ready(void *zmq, ngx_event_t *ev, const char *what, uint32_t want)
static ssize_t
-ngx_zeromq_send_part(void *zmq, ngx_event_t *wev, u_char *buf, size_t size,
- int flags)
+ngx_zeromq_sendmsg(void *zmq, ngx_event_t *ev, zmq_msg_t *msg, int flags)
{
- zmq_msg_t zmq_msg;
-
- if (zmq_msg_init_size(&zmq_msg, size) == -1) {
- ngx_log_error(NGX_LOG_ALERT, wev->log, 0,
- "zmq_msg_init_size(%uz) failed (%d: %s)",
- size, ngx_errno, zmq_strerror(ngx_errno));
- goto failed_zmq;
- }
+ size_t size;
- ngx_memcpy(zmq_msg_data(&zmq_msg), buf, size);
+ size = zmq_msg_size(msg);
for (;;) {
- if (zmq_sendmsg(zmq, &zmq_msg, ZMQ_DONTWAIT|flags) == -1) {
+ if (zmq_sendmsg(zmq, msg, ZMQ_DONTWAIT|flags) == -1) {
if (ngx_errno == NGX_EINTR) {
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, wev->log, 0,
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"zmq_send: interrupted");
- wev->ready = 0;
+ ev->ready = 0;
continue;
}
- ngx_zeromq_log_error(wev->log, "zmq_sendmsg()");
- goto failed;
+ ngx_zeromq_log_error(ev->log, "zmq_sendmsg()");
+
+ ev->error = 1;
+ return NGX_ERROR;
}
break;
}
- ngx_log_debug2(NGX_LOG_DEBUG_EVENT, wev->log, 0,
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"zmq_send: %uz eom:%d", size, flags != ZMQ_SNDMORE);
- if (zmq_msg_close(&zmq_msg) == -1) {
- ngx_zeromq_log_error(wev->log, "zmq_msg_close()");
- goto failed_zmq;
+ return size;
+}
+
+
+static ssize_t
+ngx_zeromq_recvmsg(void *zmq, ngx_event_t *ev, zmq_msg_t *msg)
+{
+ int64_t more;
+ size_t msize;
+
+ for (;;) {
+ if (zmq_recvmsg(zmq, msg, ZMQ_DONTWAIT) == -1) {
+
+ if (ngx_errno == NGX_EINTR) {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "zmq_recv: interrupted");
+ ev->ready = 0;
+ continue;
+ }
+
+ ngx_zeromq_log_error(ev->log, "zmq_recvmsg()");
+
+ ev->error = 1;
+ return NGX_ERROR;
+ }
+
+ break;
}
- return size;
+ msize = sizeof(int64_t);
-failed:
+ if (zmq_getsockopt(zmq, ZMQ_RCVMORE, &more, &msize) == -1) {
+ ngx_zeromq_log_error(ev->log, "zmq_getsockopt(ZMQ_RCVMORE)");
+
+ ev->error = 1;
+ return NGX_ERROR;
+ }
+
+ ev->eof = more ? 0 : 1;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "zmq_recv: %uz eom:%d", zmq_msg_size(msg), ev->eof);
+
+ return zmq_msg_size(msg);
+}
+
+
+static ssize_t
+ngx_zeromq_send_part(void *zmq, ngx_event_t *wev, u_char *buf, size_t size,
+ int flags)
+{
+ zmq_msg_t zmq_msg;
+ ssize_t n;
+
+ if (zmq_msg_init_size(&zmq_msg, size) == -1) {
+ ngx_log_error(NGX_LOG_ALERT, wev->log, 0,
+ "zmq_msg_init_size(%uz) failed (%d: %s)",
+ size, ngx_errno, zmq_strerror(ngx_errno));
+ return NGX_ERROR;
+ }
+
+ ngx_memcpy(zmq_msg_data(&zmq_msg), buf, size);
+
+ n = ngx_zeromq_sendmsg(zmq, wev, &zmq_msg, flags);
if (zmq_msg_close(&zmq_msg) == -1) {
ngx_zeromq_log_error(wev->log, "zmq_msg_close()");
+ return NGX_ERROR;
}
-failed_zmq:
-
- wev->error = 1;
- return NGX_ERROR;
+ return n;
}
@@ -446,71 +497,37 @@ static ssize_t
ngx_zeromq_recv_part(void *zmq, ngx_event_t *rev, u_char *buf, size_t size)
{
zmq_msg_t zmq_msg;
- int64_t more;
- size_t msize;
+ ssize_t n;
if (zmq_msg_init(&zmq_msg) == -1) {
ngx_zeromq_log_error(rev->log, "zmq_msg_init()");
- goto failed_zmq;
+ return NGX_ERROR;
}
- for (;;) {
- if (zmq_recvmsg(zmq, &zmq_msg, ZMQ_DONTWAIT) == -1) {
-
- if (ngx_errno == NGX_EINTR) {
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, rev->log, 0,
- "zmq_recv: interrupted");
- rev->ready = 0;
- continue;
- }
-
- ngx_zeromq_log_error(rev->log, "zmq_recvmsg()");
- goto failed;
- }
-
- break;
+ n = ngx_zeromq_recvmsg(zmq, rev, &zmq_msg);
+ if (n < 0) {
+ goto done;
}
- if (zmq_msg_size(&zmq_msg) > size) {
+ if ((size_t) n > size) {
ngx_log_error(NGX_LOG_ALERT, rev->log, 0,
"zmq_recv: ZeroMQ message part too big (%uz) to fit"
- " into buffer (%uz)", zmq_msg_size(&zmq_msg), size);
- goto failed;
- }
-
- msize = sizeof(int64_t);
-
- if (zmq_getsockopt(zmq, ZMQ_RCVMORE, &more, &msize) == -1) {
- ngx_zeromq_log_error(rev->log, "zmq_getsockopt(ZMQ_RCVMORE)");
- goto failed;
- }
-
- rev->eof = more ? 0 : 1;
+ " into buffer (%uz)", n, size);
- size = zmq_msg_size(&zmq_msg);
-
- ngx_memcpy(buf, zmq_msg_data(&zmq_msg), size);
-
- ngx_log_debug2(NGX_LOG_DEBUG_EVENT, rev->log, 0,
- "zmq_recv: %uz eom:%d", size, rev->eof);
-
- if (zmq_msg_close(&zmq_msg) == -1) {
- ngx_zeromq_log_error(rev->log, "zmq_msg_close()");
- goto failed_zmq;
+ n = NGX_ERROR;
+ goto done;
}
- return size;
+ ngx_memcpy(buf, zmq_msg_data(&zmq_msg), n);
-failed:
+done:
if (zmq_msg_close(&zmq_msg) == -1) {
ngx_zeromq_log_error(rev->log, "zmq_msg_close()");
+ return NGX_ERROR;
}
-failed_zmq:
-
- rev->error = 1;
- return NGX_ERROR;
+ return n;
}
Please sign in to comment.
Something went wrong with that request. Please try again.