Skip to content
Browse files

Added delivery reports in client

  • Loading branch information...
1 parent 2df2a27 commit 660431d2bc567d685fe7996cf8473c07cc2eba21 @hintjens hintjens committed Nov 29, 2012
Showing with 135 additions and 40 deletions.
  1. +8 −0 include/fmq_client.h
  2. +31 −3 model/client_c.gsl
  3. +6 −5 model/fmq_client.xml
  4. +0 −2 model/fmq_server_mount.xml
  5. +1 −1 model/server_c.gsl
  6. +29 −9 src/fmq_client.c
  7. +18 −2 src/fmq_file.c
  8. +23 −11 src/fmq_selftest.c
  9. +1 −3 src/fmq_server.c
  10. +18 −4 src/track.c
View
8 include/fmq_client.h
@@ -53,6 +53,14 @@ void
void
fmq_client_connect (fmq_client_t *self, const char *endpoint);
+// Wait for message from API
+zmsg_t *
+ fmq_client_recv (fmq_client_t *self);
+
+// Return API pipe handle for polling
+void *
+ fmq_client_handle (fmq_client_t *self);
+
//
void
fmq_client_subscribe (fmq_client_t *self, const char *path);
View
34 model/client_c.gsl
@@ -45,6 +45,14 @@ void
void
$(class.name)_connect ($(class.name)_t *self, const char *endpoint);
+// Wait for message from API
+zmsg_t *
+ $(class.name)_recv ($(class.name)_t *self);
+
+// Return API pipe handle for polling
+void *
+ $(class.name)_handle ($(class.name)_t *self);
+
.for class.method
// $(method.text?'':)
. if return ?= "number"
@@ -175,6 +183,27 @@ $(class.name)_connect ($(class.name)_t *self, const char *endpoint)
}
+// --------------------------------------------------------------------------
+// Wait for message from API
+
+zmsg_t *
+$(class.name)_recv ($(class.name)_t *self)
+{
+ zmsg_t *msg = zmsg_recv (self->pipe);
+ return msg;
+}
+
+
+// --------------------------------------------------------------------------
+// Return API pipe handle for polling
+
+void *
+$(class.name)_handle ($(class.name)_t *self)
+{
+ return self->pipe;
+}
+
+
.for class.method
// --------------------------------------------------------------------------
@@ -599,7 +628,7 @@ static void
client_thread (void *args, zctx_t *ctx, void *pipe)
{
client_t *self = client_new (ctx, pipe);
- while (!self->stopped) {
+ while (!self->stopped && !zctx_interrupted) {
// Build structure each time since self->dealer can change
zmq_pollitem_t items [] = {
{ self->pipe, 0, ZMQ_POLLIN, 0 },
@@ -618,9 +647,8 @@ client_thread (void *args, zctx_t *ctx, void *pipe)
server_message (self);
// Check whether server seems dead
- if (self->expires_at && zclock_time () >= self->expires_at) {
+ if (self->expires_at && zclock_time () >= self->expires_at)
client_restart (self, NULL);
- }
}
client_destroy (&self);
}
View
11 model/fmq_client.xml
@@ -163,7 +163,6 @@ filename++;
if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_CREATE) {
if (self->file == NULL) {
- zclock_log ("I: create %s/%s", inbox, filename);
self->file = fmq_file_new (inbox, filename);
if (fmq_file_output (self->file)) {
// File not writeable, skip patch
@@ -178,10 +177,13 @@ if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_CREATE) {
fmq_file_write (self->file, chunk, fmq_msg_offset (self->reply));
self->credit -= fmq_chunk_size (chunk);
}
- else
- // Zero-sized chunk means end of file
+ else {
+ // Zero-sized chunk means end of file, so report back to caller
+ zstr_sendm (self->pipe, "DELIVER");
+ zstr_sendm (self->pipe, filename);
+ zstr_sendf (self->pipe, "%s/%s", inbox, filename);
fmq_file_destroy (&self->file);
-
+ }
fmq_chunk_destroy (&chunk);
}
else
@@ -226,7 +228,6 @@ assert (*path == '/');
// New subscription, store it for later replay
char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
-zclock_log ("I: subscribe to %s as %s%s", path, inbox, path);
self->sub = sub_new (self, inbox, path);
zlist_append (self->subs, self->sub);
View
2 model/fmq_server_mount.xml
@@ -26,8 +26,6 @@ mount_new (char *location, char *alias)
self->alias = strdup (alias);
self->dir = fmq_dir_new (self->location, NULL);
self->subs = zlist_new ();
- zclock_log ("I: mounted directory %s as '%s' with %zd files",
- self->location, self->alias, self->dir? fmq_dir_count (self->dir): 0);
return self;
}
View
2 model/server_c.gsl
@@ -686,7 +686,7 @@ server_thread (void *args, zctx_t *ctx, void *pipe)
};
self->monitor_at = zclock_time () + self->monitor;
- while (!self->stopped) {
+ while (!self->stopped && !zctx_interrupted) {
// Calculate tickless timer, up to interval seconds
uint64_t tickless = zclock_time () + self->monitor;
zhash_foreach (self->clients, client_tickless, &tickless);
View
38 src/fmq_client.c
@@ -111,6 +111,27 @@ fmq_client_connect (fmq_client_t *self, const char *endpoint)
// --------------------------------------------------------------------------
+// Wait for message from API
+
+zmsg_t *
+fmq_client_recv (fmq_client_t *self)
+{
+ zmsg_t *msg = zmsg_recv (self->pipe);
+ return msg;
+}
+
+
+// --------------------------------------------------------------------------
+// Return API pipe handle for polling
+
+void *
+fmq_client_handle (fmq_client_t *self)
+{
+ return self->pipe;
+}
+
+
+// --------------------------------------------------------------------------
void
fmq_client_subscribe (fmq_client_t *self, const char *path)
@@ -319,7 +340,6 @@ client_apply_config (client_t *self)
// New subscription, store it for later replay
char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
- zclock_log ("I: subscribe to %s as %s%s", path, inbox, path);
self->sub = sub_new (self, inbox, path);
zlist_append (self->subs, self->sub);
@@ -424,7 +444,6 @@ process_the_patch (client_t *self)
if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_CREATE) {
if (self->file == NULL) {
- zclock_log ("I: create %s/%s", inbox, filename);
self->file = fmq_file_new (inbox, filename);
if (fmq_file_output (self->file)) {
// File not writeable, skip patch
@@ -439,10 +458,13 @@ process_the_patch (client_t *self)
fmq_file_write (self->file, chunk, fmq_msg_offset (self->reply));
self->credit -= fmq_chunk_size (chunk);
}
- else
- // Zero-sized chunk means end of file
+ else {
+ // Zero-sized chunk means end of file, so report back to caller
+ zstr_sendm (self->pipe, "DELIVER");
+ zstr_sendm (self->pipe, filename);
+ zstr_sendf (self->pipe, "%s/%s", inbox, filename);
fmq_file_destroy (&self->file);
-
+ }
fmq_chunk_destroy (&chunk);
}
else
@@ -676,7 +698,6 @@ control_message (client_t *self)
// New subscription, store it for later replay
char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
- zclock_log ("I: subscribe to %s as %s%s", path, inbox, path);
self->sub = sub_new (self, inbox, path);
zlist_append (self->subs, self->sub);
@@ -780,7 +801,7 @@ static void
client_thread (void *args, zctx_t *ctx, void *pipe)
{
client_t *self = client_new (ctx, pipe);
- while (!self->stopped) {
+ while (!self->stopped && !zctx_interrupted) {
// Build structure each time since self->dealer can change
zmq_pollitem_t items [] = {
{ self->pipe, 0, ZMQ_POLLIN, 0 },
@@ -799,9 +820,8 @@ client_thread (void *args, zctx_t *ctx, void *pipe)
server_message (self);
// Check whether server seems dead
- if (self->expires_at && zclock_time () >= self->expires_at) {
+ if (self->expires_at && zclock_time () >= self->expires_at)
client_restart (self, NULL);
- }
}
client_destroy (&self);
}
View
20 src/fmq_file.c
@@ -495,10 +495,26 @@ fmq_file_test (bool verbose)
assert (chunk);
assert (fmq_chunk_size (chunk) == 1000100);
fmq_chunk_destroy (&chunk);
+
+ // Try some fun with symbolic links
+ fmq_file_t *link = fmq_file_new ("./this/is/a/test", "bilbo.ln");
+ rc = fmq_file_output (link);
+ assert (rc == 0);
+ fprintf (fmq_file_handle (link), "./this/is/a/test/bilbo\n");
+ fmq_file_destroy (&link);
+
+ link = fmq_file_new ("./this/is/a/test", "bilbo.ln");
+ rc = fmq_file_input (link);
+ assert (rc == 0);
+ chunk = fmq_file_read (file, 1000100, 0);
+ assert (chunk);
+ assert (fmq_chunk_size (chunk) == 1000100);
+ fmq_chunk_destroy (&chunk);
+ fmq_file_destroy (&link);
// Remove file and directory
fmq_dir_t *dir = fmq_dir_new ("./this", NULL);
- assert (fmq_dir_size (dir) == 1000100);
+ assert (fmq_dir_size (dir) == 2000200);
fmq_dir_remove (dir, true);
assert (fmq_dir_size (dir) == 0);
fmq_dir_destroy (&dir);
@@ -510,7 +526,7 @@ fmq_file_test (bool verbose)
rc = fmq_file_input (file);
assert (rc == -1);
fmq_file_destroy (&file);
-
+
printf ("OK\n");
return 0;
}
View
34 src/fmq_selftest.c
@@ -53,32 +53,44 @@ int main (int argc, char *argv [])
}
// Else run as FILEMQ server or client
- fmq_client_t *client = NULL;
- fmq_server_t *server = NULL;
-
if (streq (argv [1], "-s")) {
- server = fmq_server_new ();
+ fmq_server_t *server = fmq_server_new ();
fmq_server_configure (server, "server_test.cfg");
fmq_server_publish (server, "./fmqroot/send", "/");
fmq_server_publish (server, "./fmqroot/logs", "/logs");
// We do this last
fmq_server_bind (server, "tcp://*:5670");
+ while (!zctx_interrupted)
+ sleep (1);
+ fmq_server_destroy (&server);
}
else
if (streq (argv [1], "-c")) {
- client = fmq_client_new ();
+ fmq_client_t *client = fmq_client_new ();
fmq_client_configure (client, "client_test.cfg");
fmq_client_setoption (client, "client/inbox", "./fmqroot/recv");
fmq_client_connect (client, "tcp://localhost:5670");
fmq_client_set_resync (client, true);
fmq_client_subscribe (client, "/photos");
fmq_client_subscribe (client, "/logs");
- }
- while (!zctx_interrupted)
- sleep (1);
- puts ("interrupted");
- fmq_server_destroy (&server);
- fmq_client_destroy (&client);
+ while (true) {
+ // Get message from fmq_client API
+ zmsg_t *msg = fmq_client_recv (client);
+ if (!msg)
+ break; // Interrupted
+ char *command = zmsg_popstr (msg);
+ if (streq (command, "DELIVER")) {
+ char *filename = zmsg_popstr (msg);
+ char *fullname = zmsg_popstr (msg);
+ printf ("I: received %s (%s)\n", filename, fullname);
+ free (filename);
+ free (fullname);
+ }
+ free (command);
+ zmsg_destroy (&msg);
+ }
+ fmq_client_destroy (&client);
+ }
return 0;
}
View
4 src/fmq_server.c
@@ -341,8 +341,6 @@ mount_new (char *location, char *alias)
self->alias = strdup (alias);
self->dir = fmq_dir_new (self->location, NULL);
self->subs = zlist_new ();
- zclock_log ("I: mounted directory %s as '%s' with %zd files",
- self->location, self->alias, self->dir? fmq_dir_count (self->dir): 0);
return self;
}
@@ -1144,7 +1142,7 @@ server_thread (void *args, zctx_t *ctx, void *pipe)
};
self->monitor_at = zclock_time () + self->monitor;
- while (!self->stopped) {
+ while (!self->stopped && !zctx_interrupted) {
// Calculate tickless timer, up to interval seconds
uint64_t tickless = zclock_time () + self->monitor;
zhash_foreach (self->clients, client_tickless, &tickless);
View
22 src/track.c
@@ -13,6 +13,8 @@
This file is part of FILEMQ, see http://filemq.org.
+ Distributed under the MIT/X11 license:
+
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
@@ -54,11 +56,23 @@ int main (int argc, char *argv [])
fmq_client_set_inbox (client, argv [2]);
fmq_client_set_resync (client, true);
fmq_client_subscribe (client, "/");
-
- while (!zctx_interrupted)
- sleep (1);
- puts ("interrupted");
+ while (true) {
+ // Get message from fmq_client API
+ zmsg_t *msg = fmq_client_recv (client);
+ if (!msg)
+ break; // Interrupted
+ char *command = zmsg_popstr (msg);
+ if (streq (command, "DELIVER")) {
+ char *filename = zmsg_popstr (msg);
+ char *fullname = zmsg_popstr (msg);
+ printf ("I: received %s (%s)\n", filename, fullname);
+ free (filename);
+ free (fullname);
+ }
+ free (command);
+ zmsg_destroy (&msg);
+ }
fmq_server_destroy (&server);
fmq_client_destroy (&client);
return 0;

0 comments on commit 660431d

Please sign in to comment.
Something went wrong with that request. Please try again.