diff --git a/include/zpipes_client.h b/include/zpipes_client.h index fb991c9..9fa3a93 100644 --- a/include/zpipes_client.h +++ b/include/zpipes_client.h @@ -17,11 +17,11 @@ extern "C" { typedef struct _zpipes_client_t zpipes_client_t; // @interface -// Constructor +// Constructor; open ">pipename" for writing, "pipename" for reading CZMQ_EXPORT zpipes_client_t * zpipes_client_new (const char *broker_name, const char *pipe_name); -// Destructor +// Destructor; closes pipe CZMQ_EXPORT void zpipes_client_destroy (zpipes_client_t **self_p); @@ -29,10 +29,12 @@ CZMQ_EXPORT void CZMQ_EXPORT void zpipes_client_write (zpipes_client_t *self, void *data, size_t size); -// Read chunk of data from pipe, blocks until data arrives -// Returns size of chunk read; if less than max_size, truncates +// Read chunk of data from pipe. If timeout is non zero, waits at most +// that many msecs for data. Returns number of bytes read, or zero if +// timeout expired, or if pipe was closed by the writer, and no more +// data is available. CZMQ_EXPORT size_t - zpipes_client_read (zpipes_client_t *self, void *data, size_t max_size); + zpipes_client_read (zpipes_client_t *self, void *data, size_t max_size, int timeout); // Self test of this class CZMQ_EXPORT void diff --git a/src/zpipes_client.c b/src/zpipes_client.c index 34c74fe..5a2c30f 100644 --- a/src/zpipes_client.c +++ b/src/zpipes_client.c @@ -2,7 +2,7 @@ zpipes_client.c - simple API for zpipes client applications Copyright contributors as noted in the AUTHORS file. - This file is part of zbroker, the ZeroMQ broker project. + This file is part of zserver, the ZeroMQ server project. This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this @@ -26,7 +26,7 @@ struct _zpipes_client_t { zctx_t *ctx; // Private CZMQ context char *name; // Name of named zpipe - void *dealer; // Dealer socket to zpipes broker + void *dealer; // Dealer socket to zpipes server }; @@ -42,27 +42,30 @@ s_expect_reply (zpipes_client_t *self, int message_id) // --------------------------------------------------------------------- -// Constructor +// Constructor; open ">pipename" for writing, "pipename" for reading zpipes_client_t * -zpipes_client_new (const char *broker_name, const char *pipe_name) +zpipes_client_new (const char *server_name, const char *pipe_name) { // Create new pipe API instance zpipes_client_t *self = (zpipes_client_t *) zmalloc (sizeof (zpipes_client_t)); assert (self); - - // Create dealer socket and connect to broker IPC port + + // Create dealer socket and connect to server IPC port self->ctx = zctx_new (); + assert (self->ctx); self->dealer = zsocket_new (self->ctx, ZMQ_DEALER); - if (self->dealer) { - int rc = zsocket_connect (self->dealer, "ipc://@/zpipes/%s", broker_name); - assert (rc == 0); - if (*pipe_name == '>') - zpipes_msg_send_output (self->dealer, pipe_name + 1); - else - zpipes_msg_send_input (self->dealer, pipe_name); - s_expect_reply (self, ZPIPES_MSG_READY); - } + assert (self->dealer); + int rc = zsocket_connect (self->dealer, "ipc://@/zpipes/%s", server_name); + assert (rc == 0); + + // Open pipe for reading or writing + if (*pipe_name == '>') + zpipes_msg_send_output (self->dealer, pipe_name + 1); + else + zpipes_msg_send_input (self->dealer, pipe_name); + s_expect_reply (self, ZPIPES_MSG_READY); + return self; } @@ -94,41 +97,43 @@ void zpipes_client_write (zpipes_client_t *self, void *data, size_t size) { assert (self); - if (self->dealer) { - zchunk_t *chunk = zchunk_new (data, size); - assert (chunk); - zpipes_msg_send_store (self->dealer, chunk); - zchunk_destroy (&chunk); - s_expect_reply (self, ZPIPES_MSG_STORED); - } + zchunk_t *chunk = zchunk_new (data, size); + assert (chunk); + zpipes_msg_send_store (self->dealer, chunk); + zchunk_destroy (&chunk); + s_expect_reply (self, ZPIPES_MSG_STORED); } // --------------------------------------------------------------------- -// Read chunk of data from pipe, blocks until data arrives -// Returns size of chunk read; if less than max_size, truncates +// Read chunk of data from pipe. If timeout is non zero, waits at most +// that many msecs for data. Returns number of bytes read, or zero if +// timeout expired, or if pipe was closed by the writer, and no more +// data is available. size_t -zpipes_client_read (zpipes_client_t *self, void *data, size_t max_size) +zpipes_client_read (zpipes_client_t *self, void *data, size_t max_size, int timeout) { assert (self); - if (self->dealer) { - // Use timeout of 200 msecs for now - zpipes_msg_send_fetch (self->dealer, 200); - zpipes_msg_t *reply = zpipes_msg_recv (self->dealer); - assert (reply); - assert (zpipes_msg_id (reply) == ZPIPES_MSG_FETCHED); - // Return chunk data + + zpipes_msg_send_fetch (self->dealer, timeout); + zpipes_msg_t *reply = zpipes_msg_recv (self->dealer); + if (!reply) + return 0; // Interrupted + + size_t bytes = 0; + if (zpipes_msg_id (reply) == ZPIPES_MSG_FETCHED) { zchunk_t *chunk = zpipes_msg_chunk (reply); - size_t bytes = zchunk_size (chunk); + bytes = zchunk_size (chunk); if (bytes > max_size) bytes = max_size; memcpy (data, zchunk_data (chunk), bytes); - zpipes_msg_destroy (&reply); - return bytes; } else - return 0; + bytes = 0; // Timeout or end-of-pipe + + zpipes_msg_destroy (&reply); + return bytes; } @@ -152,11 +157,11 @@ zpipes_client_test (bool verbose) byte buffer [6]; size_t bytes; - bytes = zpipes_client_read (reader, buffer, 6); + bytes = zpipes_client_read (reader, buffer, 6, 200); assert (bytes == 6); - bytes = zpipes_client_read (reader, buffer, 6); + bytes = zpipes_client_read (reader, buffer, 6, 200); assert (bytes == 6); - bytes = zpipes_client_read (reader, buffer, 6); + bytes = zpipes_client_read (reader, buffer, 6, 200); assert (bytes == 6); zpipes_client_destroy (&writer);