Permalink
Browse files

Various cleanups

* Fixed various memory issues (take today's CZMQ)
* Change server monitor default to 1sec from 5secs
* Allow API configuration of server & client timeouts
  • Loading branch information...
1 parent 660431d commit 3128a17c1f58f2529ffcd290e1d03eb9ee6b6e67 @hintjens hintjens committed Dec 8, 2012
Showing with 107 additions and 34 deletions.
  1. +8 −0 include/fmq_msg.h
  2. +11 −5 model/client_c.gsl
  3. +4 −1 model/codec_c.gsl
  4. +1 −0 model/fmq_msg.xml
  5. +14 −9 model/server_c.gsl
  6. +11 −5 src/fmq_client.c
  7. +7 −2 src/fmq_file.c
  8. +37 −3 src/fmq_msg.c
  9. +14 −9 src/fmq_server.c
View
8 include/fmq_msg.h
@@ -50,6 +50,7 @@
operation number 1
filename string
offset number 8
+ eof number 1
headers dictionary
chunk frame
HUGZ - Client or server sends a heartbeat
@@ -148,6 +149,7 @@ int
byte operation,
char *filename,
uint64_t offset,
+ byte eof,
zhash_t *headers,
zframe_t *chunk);
@@ -297,6 +299,12 @@ uint64_t
void
fmq_msg_offset_set (fmq_msg_t *self, uint64_t offset);
+// Get/set the eof field
+byte
+ fmq_msg_eof (fmq_msg_t *self);
+void
+ fmq_msg_eof_set (fmq_msg_t *self, byte eof);
+
// Get/set the headers field
zhash_t *
fmq_msg_headers (fmq_msg_t *self);
View
16 model/client_c.gsl
@@ -343,14 +343,22 @@ struct _client_t {
int64_t expires_at; // Server expires at
};
+static void
+client_config_self (client_t *self)
+{
+ // Get standard client configuration
+ self->heartbeat = atoi (
+ fmq_config_resolve (self->config, "client/heartbeat", "1")) * 1000;
+}
+
static client_t *
client_new (zctx_t *ctx, void *pipe)
{
client_t *self = (client_t *) zmalloc (sizeof (client_t));
self->ctx = ctx;
self->pipe = pipe;
self->config = fmq_config_new ("root", NULL);
- self->heartbeat = 1000; // 1 second by default
+ client_config_self (self);
.for class.self
. for construct
$(string.trim (construct.?''):block )
@@ -386,10 +394,6 @@ client_destroy (client_t **self_p)
static void
client_apply_config (client_t *self)
{
- // Get standard client configuration
- self->heartbeat = atoi (
- fmq_config_resolve (self->config, "client/heartbeat", "1")) * 1000;
-
// Apply echo commands and class methods
fmq_config_t *section = fmq_config_child (self->config);
while (section) {
@@ -416,6 +420,7 @@ client_apply_config (client_t *self)
.endfor
section = fmq_config_next (section);
}
+ client_config_self (self);
}
.macro output_event_body
@@ -579,6 +584,7 @@ control_message (client_t *self)
char *path = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
fmq_config_path_set (self->config, path, value);
+ client_config_self (self);
free (path);
free (value);
}
View
5 model/codec_c.gsl
@@ -501,6 +501,7 @@ $(class.name)_recv (void *input)
char *string;
GET_STRING (string);
zlist_append (self->$(name), string);
+ free (string);
}
. elsif type = "dictionary"
GET_NUMBER1 (hash_size);
@@ -512,7 +513,7 @@ $(class.name)_recv (void *input)
char *value = strchr (string, '=');
if (value)
*value++ = 0;
- zhash_insert (self->$(name), string, strdup (value));
+ zhash_insert (self->$(name), string, value);
free (string);
}
. elsif type = "frame"
@@ -1072,6 +1073,7 @@ $(class.name)_$(name)_append ($(class.name)_t *self, char *format, ...)
zlist_autofree (self->$(name));
}
zlist_append (self->$(name), string);
+ free (string);
}
size_t
@@ -1150,6 +1152,7 @@ $(class.name)_$(name)_insert ($(class.name)_t *self, char *key, char *format, ..
zhash_autofree (self->$(name));
}
zhash_update (self->$(name), key, string);
+ free (string);
}
size_t
View
1 model/fmq_msg.xml
@@ -54,6 +54,7 @@ The server sends a file chunk
<field name = "operation" type = "number" size = "1" />
<field name = "filename" type = "string" />
<field name = "offset" type = "number" size = "8" />
+ <field name = "eof" type = "number" size = "1" />
<field name = "headers" type = "dictionary" />
<field name = "chunk" type = "frame" />
</message>
View
23 model/server_c.gsl
@@ -418,6 +418,17 @@ client_free (void *argument)
// Server methods
+static void
+server_config_self (server_t *self)
+{
+ // Get standard server configuration
+ self->monitor = atoi (
+ fmq_config_resolve (self->config, "server/monitor", "1")) * 1000;
+ self->heartbeat = atoi (
+ fmq_config_resolve (self->config, "server/heartbeat", "1")) * 1000;
+ self->monitor_at = zclock_time () + self->monitor;
+}
+
static server_t *
server_new (zctx_t *ctx, void *pipe)
{
@@ -427,8 +438,7 @@ server_new (zctx_t *ctx, void *pipe)
self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
self->clients = zhash_new ();
self->config = fmq_config_new ("root", NULL);
- self->monitor = 5000; // 5 seconds by default
- self->heartbeat = 1000; // 1 second by default
+ server_config_self (self);
.for class.self
. for construct
$(string.trim (construct.?''):block )
@@ -464,13 +474,6 @@ server_destroy (server_t **self_p)
static void
server_apply_config (server_t *self)
{
- // Get standard server configuration
- self->monitor = atoi (
- fmq_config_resolve (self->config, "server/monitor", "5")) * 1000;
- self->heartbeat = atoi (
- fmq_config_resolve (self->config, "server/heartbeat", "1")) * 1000;
- self->monitor_at = zclock_time () + self->monitor;
-
// Apply echo commands and class methods
fmq_config_t *section = fmq_config_child (self->config);
while (section) {
@@ -497,6 +500,7 @@ server_apply_config (server_t *self)
.endfor
section = fmq_config_next (section);
}
+ server_config_self (self);
}
static void
@@ -542,6 +546,7 @@ server_control_message (server_t *self)
char *path = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
fmq_config_path_set (self->config, path, value);
+ server_config_self (self);
free (path);
free (value);
}
View
16 src/fmq_client.c
@@ -270,14 +270,22 @@ struct _client_t {
int64_t expires_at; // Server expires at
};
+static void
+client_config_self (client_t *self)
+{
+ // Get standard client configuration
+ self->heartbeat = atoi (
+ fmq_config_resolve (self->config, "client/heartbeat", "1")) * 1000;
+}
+
static client_t *
client_new (zctx_t *ctx, void *pipe)
{
client_t *self = (client_t *) zmalloc (sizeof (client_t));
self->ctx = ctx;
self->pipe = pipe;
self->config = fmq_config_new ("root", NULL);
- self->heartbeat = 1000; // 1 second by default
+ client_config_self (self);
self->subs = zlist_new ();
self->connected = false;
return self;
@@ -311,10 +319,6 @@ client_destroy (client_t **self_p)
static void
client_apply_config (client_t *self)
{
- // Get standard client configuration
- self->heartbeat = atoi (
- fmq_config_resolve (self->config, "client/heartbeat", "1")) * 1000;
-
// Apply echo commands and class methods
fmq_config_t *section = fmq_config_child (self->config);
while (section) {
@@ -360,6 +364,7 @@ client_apply_config (client_t *self)
}
section = fmq_config_next (section);
}
+ client_config_self (self);
}
// Custom actions for state machine
@@ -744,6 +749,7 @@ control_message (client_t *self)
char *path = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
fmq_config_path_set (self->config, path, value);
+ client_config_self (self);
free (path);
free (value);
}
View
9 src/fmq_file.c
@@ -39,6 +39,7 @@ struct _fmq_file_t {
// Other properties
bool exists; // true if file exists
bool stable; // true if file is stable
+ bool eof; // true if at end of file
FILE *handle; // Read/write handle
};
@@ -354,7 +355,7 @@ fmq_file_output (fmq_file_t *self)
// --------------------------------------------------------------------------
// Read chunk from file at specified position
-// Zero-sized chunk means we're at the end of the file
+// If this was the last chunk, sets self->eof
// Null chunk means there was another error
fmq_chunk_t *
@@ -373,7 +374,11 @@ fmq_file_read (fmq_file_t *self, size_t bytes, off_t offset)
if (rc == -1)
return NULL;
- return fmq_chunk_read (self->handle, bytes);
+ self->eof = false;
+ fmq_chunk_t *chunk = fmq_chunk_read (self->handle, bytes);
+ if (chunk)
+ self->eof = fmq_chunk_size (chunk) < bytes;
+ return chunk;
}
View
40 src/fmq_msg.c
@@ -49,6 +49,7 @@ struct _fmq_msg_t {
byte operation;
char *filename;
uint64_t offset;
+ byte eof;
zhash_t *headers;
size_t headers_bytes; // Size of dictionary content
zframe_t *chunk;
@@ -286,6 +287,7 @@ fmq_msg_recv (void *input)
char *string;
GET_STRING (string);
zlist_append (self->mechanisms, string);
+ free (string);
}
// Get next frame, leave current untouched
if (!zsocket_rcvmore (input))
@@ -317,7 +319,7 @@ fmq_msg_recv (void *input)
char *value = strchr (string, '=');
if (value)
*value++ = 0;
- zhash_insert (self->options, string, strdup (value));
+ zhash_insert (self->options, string, value);
free (string);
}
GET_NUMBER1 (hash_size);
@@ -329,7 +331,7 @@ fmq_msg_recv (void *input)
char *value = strchr (string, '=');
if (value)
*value++ = 0;
- zhash_insert (self->cache, string, strdup (value));
+ zhash_insert (self->cache, string, value);
free (string);
}
break;
@@ -348,6 +350,7 @@ fmq_msg_recv (void *input)
free (self->filename);
GET_STRING (self->filename);
GET_NUMBER8 (self->offset);
+ GET_NUMBER1 (self->eof);
GET_NUMBER1 (hash_size);
self->headers = zhash_new ();
zhash_autofree (self->headers);
@@ -357,7 +360,7 @@ fmq_msg_recv (void *input)
char *value = strchr (string, '=');
if (value)
*value++ = 0;
- zhash_insert (self->headers, string, strdup (value));
+ zhash_insert (self->headers, string, value);
free (string);
}
// Get next frame, leave current untouched
@@ -554,6 +557,8 @@ fmq_msg_send (fmq_msg_t **self_p, void *output)
frame_size += strlen (self->filename);
// offset is a 8-byte integer
frame_size += 8;
+ // eof is a 1-byte integer
+ frame_size += 1;
// headers is an array of key=value strings
frame_size++; // Size is one octet
if (self->headers) {
@@ -669,6 +674,7 @@ fmq_msg_send (fmq_msg_t **self_p, void *output)
else
PUT_NUMBER1 (0); // Empty string
PUT_NUMBER8 (self->offset);
+ PUT_NUMBER1 (self->eof);
if (self->headers != NULL) {
PUT_NUMBER1 (zhash_size (self->headers));
zhash_foreach (self->headers, s_headers_write, self);
@@ -873,6 +879,7 @@ fmq_msg_send_cheezburger (
byte operation,
char *filename,
uint64_t offset,
+ byte eof,
zhash_t *headers,
zframe_t *chunk)
{
@@ -881,6 +888,7 @@ fmq_msg_send_cheezburger (
fmq_msg_operation_set (self, operation);
fmq_msg_filename_set (self, filename);
fmq_msg_offset_set (self, offset);
+ fmq_msg_eof_set (self, eof);
fmq_msg_headers_set (self, zhash_dup (headers));
fmq_msg_chunk_set (self, zframe_dup (chunk));
return fmq_msg_send (&self, output);
@@ -1001,6 +1009,7 @@ fmq_msg_dup (fmq_msg_t *self)
copy->operation = self->operation;
copy->filename = strdup (self->filename);
copy->offset = self->offset;
+ copy->eof = self->eof;
copy->headers = zhash_dup (self->headers);
copy->chunk = zframe_dup (self->chunk);
break;
@@ -1159,6 +1168,7 @@ fmq_msg_dump (fmq_msg_t *self)
else
printf (" filename=\n");
printf (" offset=%ld\n", (long) self->offset);
+ printf (" eof=%ld\n", (long) self->eof);
printf (" headers={\n");
if (self->headers)
zhash_foreach (self->headers, s_headers_dump, self);
@@ -1360,6 +1370,7 @@ fmq_msg_mechanisms_append (fmq_msg_t *self, char *format, ...)
zlist_autofree (self->mechanisms);
}
zlist_append (self->mechanisms, string);
+ free (string);
}
size_t
@@ -1528,6 +1539,7 @@ fmq_msg_options_insert (fmq_msg_t *self, char *key, char *format, ...)
zhash_autofree (self->options);
}
zhash_update (self->options, key, string);
+ free (string);
}
size_t
@@ -1606,6 +1618,7 @@ fmq_msg_cache_insert (fmq_msg_t *self, char *key, char *format, ...)
zhash_autofree (self->cache);
}
zhash_update (self->cache, key, string);
+ free (string);
}
size_t
@@ -1713,6 +1726,24 @@ fmq_msg_offset_set (fmq_msg_t *self, uint64_t offset)
// --------------------------------------------------------------------------
+// Get/set the eof field
+
+byte
+fmq_msg_eof (fmq_msg_t *self)
+{
+ assert (self);
+ return self->eof;
+}
+
+void
+fmq_msg_eof_set (fmq_msg_t *self, byte eof)
+{
+ assert (self);
+ self->eof = eof;
+}
+
+
+// --------------------------------------------------------------------------
// Get/set the headers field
zhash_t *
@@ -1781,6 +1812,7 @@ fmq_msg_headers_insert (fmq_msg_t *self, char *key, char *format, ...)
zhash_autofree (self->headers);
}
zhash_update (self->headers, key, string);
+ free (string);
}
size_t
@@ -1943,6 +1975,7 @@ fmq_msg_test (bool verbose)
fmq_msg_operation_set (self, 123);
fmq_msg_filename_set (self, "Life is short but Now lasts for ever");
fmq_msg_offset_set (self, 123);
+ fmq_msg_eof_set (self, 123);
fmq_msg_headers_insert (self, "Name", "Brutus");
fmq_msg_headers_insert (self, "Age", "%d", 43);
fmq_msg_chunk_set (self, zframe_new ("Captcha Diem", 12));
@@ -1954,6 +1987,7 @@ fmq_msg_test (bool verbose)
assert (fmq_msg_operation (self) == 123);
assert (streq (fmq_msg_filename (self), "Life is short but Now lasts for ever"));
assert (fmq_msg_offset (self) == 123);
+ assert (fmq_msg_eof (self) == 123);
assert (fmq_msg_headers_size (self) == 2);
assert (streq (fmq_msg_headers_string (self, "Name", "?"), "Brutus"));
assert (fmq_msg_headers_number (self, "Age", 0) == 43);
View
23 src/fmq_server.c
@@ -571,6 +571,17 @@ client_free (void *argument)
// Server methods
+static void
+server_config_self (server_t *self)
+{
+ // Get standard server configuration
+ self->monitor = atoi (
+ fmq_config_resolve (self->config, "server/monitor", "1")) * 1000;
+ self->heartbeat = atoi (
+ fmq_config_resolve (self->config, "server/heartbeat", "1")) * 1000;
+ self->monitor_at = zclock_time () + self->monitor;
+}
+
static server_t *
server_new (zctx_t *ctx, void *pipe)
{
@@ -580,8 +591,7 @@ server_new (zctx_t *ctx, void *pipe)
self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
self->clients = zhash_new ();
self->config = fmq_config_new ("root", NULL);
- self->monitor = 5000; // 5 seconds by default
- self->heartbeat = 1000; // 1 second by default
+ server_config_self (self);
self->mounts = zlist_new ();
return self;
}
@@ -614,13 +624,6 @@ server_destroy (server_t **self_p)
static void
server_apply_config (server_t *self)
{
- // Get standard server configuration
- self->monitor = atoi (
- fmq_config_resolve (self->config, "server/monitor", "5")) * 1000;
- self->heartbeat = atoi (
- fmq_config_resolve (self->config, "server/heartbeat", "1")) * 1000;
- self->monitor_at = zclock_time () + self->monitor;
-
// Apply echo commands and class methods
fmq_config_t *section = fmq_config_child (self->config);
while (section) {
@@ -649,6 +652,7 @@ server_apply_config (server_t *self)
}
section = fmq_config_next (section);
}
+ server_config_self (self);
}
static void
@@ -697,6 +701,7 @@ server_control_message (server_t *self)
char *path = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
fmq_config_path_set (self->config, path, value);
+ server_config_self (self);
free (path);
free (value);
}

0 comments on commit 3128a17

Please sign in to comment.