Skip to content

Commit

Permalink
Fixed #10
Browse files Browse the repository at this point in the history
  • Loading branch information
hintjens committed Mar 23, 2014
1 parent 6273349 commit 638b46a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 44 deletions.
12 changes: 7 additions & 5 deletions include/zpipes_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@ extern "C" {
typedef struct _zpipes_client_t zpipes_client_t;

// @interface
// Constructor
// Constructor; open ">pipename" for writing, "pipename" for reading
CZMQ_EXPORT zpipes_client_t *
zpipes_client_new (const char *broker_name, const char *pipe_name);

// Destructor
// Destructor; closes pipe
CZMQ_EXPORT void
zpipes_client_destroy (zpipes_client_t **self_p);

// Write chunk of data to pipe
CZMQ_EXPORT void
zpipes_client_write (zpipes_client_t *self, void *data, size_t size);

// Read chunk of data from pipe, blocks until data arrives
// Returns size of chunk read; if less than max_size, truncates
// Read chunk of data from pipe. If timeout is non zero, waits at most
// that many msecs for data. Returns number of bytes read, or zero if
// timeout expired, or if pipe was closed by the writer, and no more
// data is available.
CZMQ_EXPORT size_t
zpipes_client_read (zpipes_client_t *self, void *data, size_t max_size);
zpipes_client_read (zpipes_client_t *self, void *data, size_t max_size, int timeout);

// Self test of this class
CZMQ_EXPORT void
Expand Down
83 changes: 44 additions & 39 deletions src/zpipes_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
zpipes_client.c - simple API for zpipes client applications
Copyright contributors as noted in the AUTHORS file.
This file is part of zbroker, the ZeroMQ broker project.
This file is part of zserver, the ZeroMQ server project.
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
Expand All @@ -26,7 +26,7 @@
struct _zpipes_client_t {
zctx_t *ctx; // Private CZMQ context
char *name; // Name of named zpipe
void *dealer; // Dealer socket to zpipes broker
void *dealer; // Dealer socket to zpipes server
};


Expand All @@ -42,27 +42,30 @@ s_expect_reply (zpipes_client_t *self, int message_id)


// ---------------------------------------------------------------------
// Constructor
// Constructor; open ">pipename" for writing, "pipename" for reading

zpipes_client_t *
zpipes_client_new (const char *broker_name, const char *pipe_name)
zpipes_client_new (const char *server_name, const char *pipe_name)
{
// Create new pipe API instance
zpipes_client_t *self = (zpipes_client_t *) zmalloc (sizeof (zpipes_client_t));
assert (self);

// Create dealer socket and connect to broker IPC port
// Create dealer socket and connect to server IPC port
self->ctx = zctx_new ();
assert (self->ctx);
self->dealer = zsocket_new (self->ctx, ZMQ_DEALER);
if (self->dealer) {
int rc = zsocket_connect (self->dealer, "ipc://@/zpipes/%s", broker_name);
assert (rc == 0);
if (*pipe_name == '>')
zpipes_msg_send_output (self->dealer, pipe_name + 1);
else
zpipes_msg_send_input (self->dealer, pipe_name);
s_expect_reply (self, ZPIPES_MSG_READY);
}
assert (self->dealer);
int rc = zsocket_connect (self->dealer, "ipc://@/zpipes/%s", server_name);
assert (rc == 0);

// Open pipe for reading or writing
if (*pipe_name == '>')
zpipes_msg_send_output (self->dealer, pipe_name + 1);
else
zpipes_msg_send_input (self->dealer, pipe_name);
s_expect_reply (self, ZPIPES_MSG_READY);

return self;
}

Expand Down Expand Up @@ -94,41 +97,43 @@ void
zpipes_client_write (zpipes_client_t *self, void *data, size_t size)
{
assert (self);
if (self->dealer) {
zchunk_t *chunk = zchunk_new (data, size);
assert (chunk);
zpipes_msg_send_store (self->dealer, chunk);
zchunk_destroy (&chunk);
s_expect_reply (self, ZPIPES_MSG_STORED);
}
zchunk_t *chunk = zchunk_new (data, size);
assert (chunk);
zpipes_msg_send_store (self->dealer, chunk);
zchunk_destroy (&chunk);
s_expect_reply (self, ZPIPES_MSG_STORED);
}


// ---------------------------------------------------------------------
// Read chunk of data from pipe, blocks until data arrives
// Returns size of chunk read; if less than max_size, truncates
// Read chunk of data from pipe. If timeout is non zero, waits at most
// that many msecs for data. Returns number of bytes read, or zero if
// timeout expired, or if pipe was closed by the writer, and no more
// data is available.

size_t
zpipes_client_read (zpipes_client_t *self, void *data, size_t max_size)
zpipes_client_read (zpipes_client_t *self, void *data, size_t max_size, int timeout)
{
assert (self);
if (self->dealer) {
// Use timeout of 200 msecs for now
zpipes_msg_send_fetch (self->dealer, 200);
zpipes_msg_t *reply = zpipes_msg_recv (self->dealer);
assert (reply);
assert (zpipes_msg_id (reply) == ZPIPES_MSG_FETCHED);
// Return chunk data

zpipes_msg_send_fetch (self->dealer, timeout);
zpipes_msg_t *reply = zpipes_msg_recv (self->dealer);
if (!reply)
return 0; // Interrupted

size_t bytes = 0;
if (zpipes_msg_id (reply) == ZPIPES_MSG_FETCHED) {
zchunk_t *chunk = zpipes_msg_chunk (reply);
size_t bytes = zchunk_size (chunk);
bytes = zchunk_size (chunk);
if (bytes > max_size)
bytes = max_size;
memcpy (data, zchunk_data (chunk), bytes);
zpipes_msg_destroy (&reply);
return bytes;
}
else
return 0;
bytes = 0; // Timeout or end-of-pipe

zpipes_msg_destroy (&reply);
return bytes;
}


Expand All @@ -152,11 +157,11 @@ zpipes_client_test (bool verbose)

byte buffer [6];
size_t bytes;
bytes = zpipes_client_read (reader, buffer, 6);
bytes = zpipes_client_read (reader, buffer, 6, 200);
assert (bytes == 6);
bytes = zpipes_client_read (reader, buffer, 6);
bytes = zpipes_client_read (reader, buffer, 6, 200);
assert (bytes == 6);
bytes = zpipes_client_read (reader, buffer, 6);
bytes = zpipes_client_read (reader, buffer, 6, 200);
assert (bytes == 6);

zpipes_client_destroy (&writer);
Expand Down

0 comments on commit 638b46a

Please sign in to comment.