Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Implemented ICANHAZ 'cache' field

  • Loading branch information...
commit fa3e77454b55ed33719cfe95c1e681f5f68fc15c 1 parent b6556f3
@hintjens hintjens authored
View
4 .gitignore
@@ -31,3 +31,7 @@ fmq_selftest
src/fmqroot/send/*
src/fmqroot/recv/*
src/fmqroot/logs/*
+.cache
+mymusic
+testit
+
View
10 doc/rfc19.txt
@@ -79,7 +79,7 @@ path = string ; Full path or path prefix
options = dictionary
dictionary = size *key-value
key-value = string ; Formatted as name=value
-cache = dictionary ; File SHA1 signatures
+cache = dictionary ; File SHA-1 signatures
; The server confirms the subscription
ICANHAZ-OK = signature %x06
@@ -161,7 +161,13 @@ When the server grants the client access after an OHAI or YARL command, it SHALL
The client MAY subscribe to any number of virtual paths by sending ICANHAZ commands to the server. The client MAY specify a full path, or it MAY specify a partial path, which is used as a prefix match. Paths MUST start with "/", thus the path "/" subscribes to //everything//.
-The options field is reserved for future use. The path does not have to exist in the server. That is, clients can request paths which will exist in the server at some future time.
+The 'path' does not have to exist in the server. That is, clients can request paths which will exist in the server at some future time.
+
+The 'options' field provides additional information to the server. The server SHOULD implement these options:
+
+* {{RESYNC=1}} - if the client sets this, the server SHALL send the full contents of the virtual path to the client.
+
+The 'cache' dictionary field tells the server which files the client already has. The server SHOULD respect this information. Each entry in the 'cache' dictionary is a "filename=digest" key/value pair where the digest SHALL be a SHA-1 digest in printable hexadecimal format. If the filename starts with '/' then it SHOULD start with the path, otherwise the server MUST ignore it. If the filename does not start with '/' then the server SHALL treat it as relative to the path.
++++ The ICANHAZ-OK Command
View
7 include/fmq.h
@@ -36,6 +36,11 @@
#define FMQ_VERSION \
FMQ_MAKE_VERSION(FMQ_VERSION_MAJOR, FMQ_VERSION_MINOR, FMQ_VERSION_PATCH)
+// Maximum length of a path + filename
+#ifndef PATH_MAX
+#define PATH_MAX 1024
+#endif
+
// These are reusable utility classes
// TODO: the generators and required classes should not be part of FMQ
// as such, but a separate project so they can be reused more widely.
@@ -44,7 +49,7 @@
#include "fmq_dir.h"
#include "fmq_patch.h"
#include "fmq_sasl.h"
-#include "fmq_sha.h"
+#include "fmq_hash.h"
#include "fmq_config.h"
// These are specific to the FileMQ implementation
View
6 include/fmq_dir.h
@@ -72,6 +72,12 @@ void
void
fmq_dir_dump (fmq_dir_t *self, int indent);
+// Load directory cache; returns a hash table containing the SHA-1 digests
+// of every file in the tree. The cache is saved between runs in .cache.
+// The caller must destroy the hash table when done with it.
+zhash_t *
+ fmq_dir_cache (fmq_dir_t *self);
+
// Self test of this class
int
fmq_dir_test (bool verbose);
View
8 include/fmq_file.h
@@ -96,6 +96,14 @@ int
void
fmq_file_close (fmq_file_t *self);
+// Return file handle, if opened
+FILE *
+ fmq_file_handle (fmq_file_t *self);
+
+// Return file SHA-1 hash as string; caller has to free it
+char *
+ fmq_file_hash (fmq_file_t *self);
+
// Self test of this class
int
fmq_file_test (bool verbose);
View
32 include/fmq_hash.h
@@ -1,5 +1,5 @@
/* =========================================================================
- fmq_sha - wraps the sha-1.1 library
+ fmq_hash - provides hashing functions (SHA-1 at present)
-------------------------------------------------------------------------
Copyright (c) 1991-2012 iMatix Corporation -- http://www.imatix.com
@@ -22,39 +22,39 @@
=========================================================================
*/
-#ifndef __FMQ_SHA_H_INCLUDED__
-#define __FMQ_SHA_H_INCLUDED__
+#ifndef __FMQ_HASH_H_INCLUDED__
+#define __FMQ_HASH_H_INCLUDED__
#ifdef __cplusplus
extern "C" {
#endif
// Opaque class structure
-typedef struct _fmq_sha_t fmq_sha_t;
+typedef struct _fmq_hash_t fmq_hash_t;
-// Create new SHA object
-fmq_sha_t *
- fmq_sha_new (void);
+// Create new HASH object
+fmq_hash_t *
+ fmq_hash_new (void);
-// Destroy a SHA object
+// Destroy a HASH object
void
- fmq_sha_destroy (fmq_sha_t **self_p);
+ fmq_hash_destroy (fmq_hash_t **self_p);
-// Add buffer into SHA calculation
+// Add buffer into HASH calculation
void
- fmq_sha_update (fmq_sha_t *self, byte *buffer, size_t length);
+ fmq_hash_update (fmq_hash_t *self, byte *buffer, size_t length);
-// Return final SHA hash data
+// Return final HASH hash data
byte *
- fmq_sha_hash_data (fmq_sha_t *self);
+ fmq_hash_data (fmq_hash_t *self);
-// Return final SHA hash size
+// Return final HASH hash size
size_t
- fmq_sha_hash_size (fmq_sha_t *self);
+ fmq_hash_size (fmq_hash_t *self);
// Self test of this class
int
- fmq_sha_test (bool verbose);
+ fmq_hash_test (bool verbose);
#ifdef __cplusplus
}
View
20 include/fmq_msg.h
@@ -40,6 +40,7 @@
ICANHAZ - Client subscribes to a path
path string
options dictionary
+ cache dictionary
ICANHAZ_OK - Server confirms the subscription
NOM - Client sends credit to the server
credit number 8
@@ -127,7 +128,8 @@ int
int
fmq_msg_send_icanhaz (void *output,
char *path,
- zhash_t *options);
+ zhash_t *options,
+ zhash_t *cache);
// Send the ICANHAZ_OK to the output in one step
int
@@ -249,6 +251,22 @@ void
size_t
fmq_msg_options_size (fmq_msg_t *self);
+// Get/set the cache field
+zhash_t *
+ fmq_msg_cache (fmq_msg_t *self);
+void
+ fmq_msg_cache_set (fmq_msg_t *self, zhash_t *cache);
+
+// Get/set a value in the cache dictionary
+char *
+ fmq_msg_cache_string (fmq_msg_t *self, char *key, char *default_value);
+uint64_t
+ fmq_msg_cache_number (fmq_msg_t *self, char *key, uint64_t default_value);
+void
+ fmq_msg_cache_insert (fmq_msg_t *self, char *key, char *format, ...);
+size_t
+ fmq_msg_cache_size (fmq_msg_t *self);
+
// Get/set the credit field
uint64_t
fmq_msg_credit (fmq_msg_t *self);
View
12 include/fmq_patch.h
@@ -61,6 +61,18 @@ fmq_file_t *
fmq_patch_op_t
fmq_patch_op (fmq_patch_t *self);
+// Return patch virtual file name
+char *
+ fmq_patch_virtual (fmq_patch_t *self);
+
+// Set patch virtual file name
+void
+ fmq_patch_virtual_set (fmq_patch_t *self, char *virtual);
+
+// Return hash digest for patch file (create only)
+char *
+ fmq_patch_hashstr (fmq_patch_t *self);
+
// Self test of this class
int
fmq_patch_test (bool verbose);
View
2  model/client_c.gsl
@@ -354,7 +354,7 @@ client_apply_config (client_t *self)
entry = fmq_config_next (entry);
}
.for class.method
- if (streq (fmq_config_name (section), "$(name)")) {
+ if (streq (fmq_config_name (section), "$(name:c)")) {
. for argument
. if type = "string"
char *$(name) = fmq_config_resolve (section, "$(name)", "?");
View
9 model/codec_c.gsl
@@ -505,6 +505,7 @@ $(class.name)_recv (void *input)
. elsif type = "dictionary"
GET_NUMBER1 (hash_size);
self->$(name) = zhash_new ();
+ zhash_autofree (self->$(name));
while (hash_size--) {
char *string;
GET_STRING (string);
@@ -512,7 +513,6 @@ $(class.name)_recv (void *input)
if (value)
*value++ = 0;
zhash_insert (self->$(name), string, strdup (value));
- zhash_freefn (self->$(name), string, free);
free (string);
}
. elsif type = "frame"
@@ -1145,10 +1145,11 @@ $(class.name)_$(name)_insert ($(class.name)_t *self, char *key, char *format, ..
va_end (argptr);
// Store string in hash table
- if (!self->$(name))
+ if (!self->$(name)) {
self->$(name) = zhash_new ();
- if (zhash_insert (self->$(name), key, string) == 0)
- zhash_freefn (self->$(name), key, free);
+ zhash_autofree (self->$(name));
+ }
+ zhash_update (self->$(name), key, string);
}
size_t
View
54 model/fmq_client.xml
@@ -23,6 +23,7 @@ This is the FILEMQ/1.0 client protocol handler
<state name = "subscribing">
<event name = "ok" next = "subscribing">
+ <action name = "format icanhaz command" />
<action name = "send" message = "ICANHAZ" />
<action name = "get next subscription" />
</event>
@@ -40,6 +41,7 @@ This is the FILEMQ/1.0 client protocol handler
<action name = "send" message = "HUGZ-OK" />
</event>
<event name = "subscribe">
+ <action name = "format icanhaz command" />
<action name = "send" message = "ICANHAZ" />
</event>
<event name = "send credit">
@@ -74,6 +76,7 @@ This is the FILEMQ/1.0 client protocol handler
<context>
bool connected; // Are we connected to server?
zlist_t *subs; // Subscriptions
+sub_t *sub; // Subscription we want to send
size_t credit; // Current credit pending
fmq_file_t *file; // File we're writing to
</context>
@@ -113,25 +116,26 @@ self->connected = true;
</action>
<action name = "get first subscription">
-sub_t *sub = (sub_t *) zlist_first (self->subs);
-if (sub) {
- fmq_msg_path_set (self->request, sub->path);
+self->sub = (sub_t *) zlist_first (self->subs);
+if (self->sub)
self->next_event = ok_event;
-}
else
self->next_event = finished_event;
</action>
<action name = "get next subscription">
-sub_t *sub = (sub_t *) zlist_next (self->subs);
-if (sub) {
- fmq_msg_path_set (self->request, sub->path);
+self->sub = (sub_t *) zlist_next (self->subs);
+if (self->sub)
self->next_event = ok_event;
-}
else
self->next_event = finished_event;
</action>
+<action name = "format icanhaz command">
+fmq_msg_path_set (self->request, self->sub->path);
+fmq_msg_cache_set (self->request, zhash_dup (self->sub->cache));
+</action>
+
<action name = "refill credit as needed">
// If credit has fallen too low, send more credit
size_t credit_to_send = 0;
@@ -149,6 +153,10 @@ if (credit_to_send) {
char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
char *filename = fmq_msg_filename (self->reply);
+// Filenames from server must start with slash, which we skip
+assert (*filename == '/');
+filename++;
+
if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_CREATE) {
if (self->file == NULL) {
zclock_log ("I: create %s/%s", inbox, filename);
@@ -169,6 +177,7 @@ if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_CREATE) {
else
// Zero-sized chunk means end of file
fmq_file_destroy (&self->file);
+
fmq_chunk_destroy (&chunk);
}
else
@@ -201,27 +210,30 @@ self->next_event = terminate_event;
<argument name = "path" type = "string" />
// Store subscription along with any previous ones
// Check we don't already have a subscription for this path
-sub_t *sub = (sub_t *) zlist_first (self->subs);
-while (sub) {
- if (streq (path, sub->path))
+self->sub = (sub_t *) zlist_first (self->subs);
+while (self->sub) {
+ if (streq (path, self->sub->path))
return;
- sub = (sub_t *) zlist_next (self->subs);
+ self->sub = (sub_t *) zlist_next (self->subs);
}
// Subscription path must start with '/'
// We'll do better error handling later
assert (*path == '/');
-// New subscription, so store it for later replay
-sub = sub_new (self, path);
-zlist_append (self->subs, sub);
-zclock_log ("I: subscribing directory %s as '%s'",
- fmq_config_resolve (self->config, "client/inbox", ".inbox"), path, path);
-
+// Get directory cache for this path
+char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
+fmq_dir_t *dir = fmq_dir_new (path + 1, inbox);
+zhash_t *cache = dir? fmq_dir_cache (dir): NULL;
+fmq_dir_destroy (&dir);
+
+// New subscription, store it for later replay
+self->sub = sub_new (self, path, cache);
+zlist_append (self->subs, self->sub);
+zclock_log ("I: subscribe to %s as %s%s", path, inbox, path);
+
// If we're connected, then also send to server
-if (self->connected) {
- fmq_msg_path_set (self->request, path);
+if (self->connected)
self->next_event = subscribe_event;
-}
</method>
<method name = "set inbox">
View
5 model/fmq_client_sub.xml
@@ -2,13 +2,15 @@
// Subscription in memory
typedef struct {
char *path; // Path we subscribe to
+ zhash_t *cache; // Existing files, as digests
} sub_t;
static sub_t *
-sub_new (client_t *client, char *path)
+sub_new (client_t *client, char *path, zhash_t *cache)
{
sub_t *self = (sub_t *) zmalloc (sizeof (sub_t));
self->path = strdup (path);
+ self->cache = cache; // Take ownership
return self;
}
@@ -18,6 +20,7 @@ sub_destroy (sub_t **self_p)
assert (self_p);
if (*self_p) {
sub_t *self = *self_p;
+ zhash_destroy (&self->cache);
free (self->path);
free (self);
*self_p = NULL;
View
1  model/fmq_msg.xml
@@ -35,6 +35,7 @@ Server grants the client access
Client subscribes to a path
<field name = "path" type = "string" />
<field name = "options" type = "dictionary" />
+ <field name = "cache" type = "dictionary" />
</message>
<message name = "ICANHAZ-OK" id = "6">
View
29 model/fmq_server.xml
@@ -186,17 +186,18 @@ free (password);
<action name = "store client subscription">
// Find mount point with longest match to subscription
char *path = fmq_msg_path (client->request);
-mount_t *mount = (mount_t *) zlist_first (self->mounts);
-mount_t *match = mount;
-while (mount) {
- // If mount->alias is prefix of path and alias is
- // longer than previous best then we have a new best
- if (strncmp (path, mount->alias, strlen (mount->alias)) == 0
- && strlen (mount->alias) > strlen (match->alias))
- match = mount;
- mount = (mount_t *) zlist_next (self->mounts);
+
+mount_t *check = (mount_t *) zlist_first (self->mounts);
+mount_t *mount = check;
+while (check) {
+ // If check->alias is prefix of path and alias is
+ // longer than current mount then we have a new mount
+ if (strncmp (path, check->alias, strlen (check->alias)) == 0
+ && strlen (check->alias) > strlen (mount->alias))
+ mount = check;
+ check = (mount_t *) zlist_next (self->mounts);
}
-mount_sub_store (match, client, path);
+mount_sub_store (mount, client, client->request);
</action>
<action name = "store client credit">
@@ -234,9 +235,8 @@ if (client->patch == NULL) {
client->next_event = finished_event;
return;
}
-// Map filename to logical space
-char *filename = fmq_file_name (fmq_patch_file (client->patch), fmq_patch_path (client->patch));
-fmq_msg_filename_set (client->reply, filename);
+// Get virtual filename from patch
+fmq_msg_filename_set (client->reply, fmq_patch_virtual (client->patch));
// We can process a delete patch right away
if (fmq_patch_op (client->patch) == patch_delete) {
@@ -247,7 +247,8 @@ if (fmq_patch_op (client->patch) == patch_delete) {
// No reliability in this version, assume patch delivered safely
fmq_patch_destroy (&client->patch);
}
-else {
+else
+if (fmq_patch_op (client->patch) == patch_create) {
// Create patch refers to file, open that for input if needed
if (client->file == NULL) {
client->file = fmq_file_dup (fmq_patch_file (client->patch));
View
29 model/fmq_server_mount.xml
@@ -64,7 +64,7 @@ mount_refresh (mount_t *self, server_t *server)
{
bool activity = false;
- // Get latest snapshot and build a patches list if it's activity
+ // Get latest snapshot and build a patches list for any changes
fmq_dir_t *latest = fmq_dir_new (self->location, NULL);
zlist_t *patches = fmq_dir_diff (self->dir, latest);
@@ -72,6 +72,26 @@ mount_refresh (mount_t *self, server_t *server)
fmq_dir_destroy (&self->dir);
self->dir = latest;
+ // Compute virtual file names for all patches
+ fmq_patch_t *patch = (fmq_patch_t *) zlist_first (patches);
+ while (patch) {
+ // Get filename without path
+ char *filename = fmq_file_name (fmq_patch_file (patch),
+ fmq_patch_path (patch));
+ assert (*filename != '/');
+
+ // Now put mount alias in front of that
+ char *virtual = malloc (strlen (self->alias) + strlen (filename) + 2);
+ strcpy (virtual, self->alias);
+ if (virtual [strlen (virtual) - 1] != '/')
+ strcat (virtual, "/");
+ strcat (virtual, filename);
+ fmq_patch_virtual_set (patch, virtual);
+ free (virtual);
+
+ patch = (fmq_patch_t *) zlist_next (patches);
+ }
+
// Copy new patches to clients' patches list
sub_t *sub = (sub_t *) zlist_first (self->subs);
while (sub) {
@@ -92,11 +112,11 @@ mount_refresh (mount_t *self, server_t *server)
// Store subscription for mount point
static void
-mount_sub_store (mount_t *self, client_t *client, char *path)
+mount_sub_store (mount_t *self, client_t *client, fmq_msg_t *request)
{
// Store subscription along with any previous ones
// Coalesce subscriptions that are on same path
-
+ char *path = fmq_msg_path (request);
sub_t *sub = (sub_t *) zlist_first (self->subs);
while (sub) {
if (client == sub->client) {
@@ -107,6 +127,7 @@ mount_sub_store (mount_t *self, client_t *client, char *path)
// If new subscription is superset of old one, remove old
if (strncmp (sub->path, path, strlen (path)) == 0) {
zlist_remove (self->subs, sub);
+ sub_destroy (&sub);
sub = (sub_t *) zlist_first (self->subs);
}
else
@@ -116,7 +137,7 @@ mount_sub_store (mount_t *self, client_t *client, char *path)
sub = (sub_t *) zlist_next (self->subs);
}
// New subscription for this client, append to our list
- sub = sub_new (client, path);
+ sub = sub_new (client, path, fmq_msg_cache (request));
zlist_append (self->subs, sub);
}
View
36 model/fmq_server_sub.xml
@@ -5,21 +5,42 @@
typedef struct {
client_t *client; // Always refers to live client
char *path; // Path client is subscribed to
+ zhash_t *cache; // Client's cache list
} sub_t;
+static int
+s_resolve_cache_path (const char *key, void *item, void *argument);
// --------------------------------------------------------------------------
// Constructor
static sub_t *
-sub_new (client_t *client, char *path)
+sub_new (client_t *client, char *path, zhash_t *cache)
{
sub_t *self = (sub_t *) zmalloc (sizeof (sub_t));
self->client = client;
self->path = strdup (path);
+ self->cache = zhash_dup (cache);
+ zhash_foreach (self->cache, s_resolve_cache_path, self);
return self;
}
+// Cached filenames may be local, in which case prefix them with
+// the subscription path so we can do a consistent match.
+
+static int
+s_resolve_cache_path (const char *key, void *item, void *argument)
+{
+ sub_t *self = (sub_t *) argument;
+ if (*key != '/') {
+ char *new_key = malloc (strlen (self->path) + strlen (key) + 2);
+ sprintf (new_key, "%s/%s", self->path, key);
+ zhash_rename (self->cache, key, new_key);
+ free (new_key);
+ }
+ return 0;
+}
+
// --------------------------------------------------------------------------
// Destructor
@@ -30,24 +51,33 @@ sub_destroy (sub_t **self_p)
assert (self_p);
if (*self_p) {
sub_t *self = *self_p;
+ zhash_destroy (&self->cache);
free (self->path);
free (self);
*self_p = NULL;
}
}
+
// --------------------------------------------------------------------------
// Add patch to sub client patches list
static void
sub_patch_add (sub_t *self, fmq_patch_t *patch)
{
- // Delete duplicate patches
+ // Skip file creation if client already has identical file
+ if (fmq_patch_op (patch) == patch_create) {
+ char *hashstr = zhash_lookup (self->cache, fmq_patch_virtual (patch));
+ if (hashstr && strcasecmp (hashstr, fmq_patch_hashstr (patch)) == 0)
+ return; // Just skip patch for this client
+ }
+ // Remove any previous patches for the same file
fmq_patch_t *existing = (fmq_patch_t *) zlist_first (self->client->patches);
while (existing) {
if (streq (fmq_file_name (fmq_patch_file (patch), NULL),
- fmq_file_name (fmq_patch_file (existing), NULL))) {
+ fmq_file_name (fmq_patch_file (existing), NULL))) {
zlist_remove (self->client->patches, existing);
+ fmq_patch_destroy (&existing);
break;
}
existing = (fmq_patch_t *) zlist_next (self->client->patches);
View
4 model/server_c.gsl
@@ -470,8 +470,8 @@ server_apply_config (server_t *self)
zclock_log (fmq_config_value (entry));
entry = fmq_config_next (entry);
}
-.for class.method
- if (streq (fmq_config_name (section), "$(name)")) {
+.for class.method where count (argument) > 0
+ if (streq (fmq_config_name (section), "$(name:c)")) {
. for argument
. if type = "string"
char *$(name) = fmq_config_resolve (section, "$(name)", "?");
View
12 src/Makefile.am
@@ -12,19 +12,11 @@ libfmq_la_SOURCES = \
fmq_config.c \
fmq_dir.c \
fmq_file.c \
+ fmq_hash.c \
fmq_msg.c \
fmq_patch.c \
fmq_sasl.c \
- fmq_server.c \
- fmq_sha.c \
- ../sha-1.1/sha1.c \
- ../sha-1.1/sha256.c \
- ../sha-1.1/sha384.c \
- ../sha-1.1/sha512.c \
- ../sha-1.1/hmac-sha1.c \
- ../sha-1.1/hmac-sha256.c \
- ../sha-1.1/hmac-sha384.c \
- ../sha-1.1/hmac-sha512.c
+ fmq_server.c
INCLUDES = -I$(top_srcdir)/include
bin_PROGRAMS = filemq track fmq_selftest
View
5 src/client_test.cfg
@@ -4,11 +4,10 @@ client
heartbeat = 1 # Heartbeat to server
inbox = ./fmqroot/recv
+# inbox
+# path = ./some/directory
# subscribe
# path = /logs
-# someopt = comingsoon
-# subscribe
-# path = /photos
security
plain
View
2  src/fmq_chunk.c
@@ -156,7 +156,7 @@ fmq_chunk_data (fmq_chunk_t *self)
// --------------------------------------------------------------------------
-// Read chunk to an open file descriptor
+// Read chunk from an open file descriptor
fmq_chunk_t *
fmq_chunk_read (FILE *handle, size_t bytes)
View
152 src/fmq_client.c
@@ -172,13 +172,15 @@ typedef struct _client_t client_t;
// Subscription in memory
typedef struct {
char *path; // Path we subscribe to
+ zhash_t *cache; // Existing files, as digests
} sub_t;
static sub_t *
-sub_new (client_t *client, char *path)
+sub_new (client_t *client, char *path, zhash_t *cache)
{
sub_t *self = (sub_t *) zmalloc (sizeof (sub_t));
self->path = strdup (path);
+ self->cache = cache; // Take ownership
return self;
}
@@ -188,6 +190,7 @@ sub_destroy (sub_t **self_p)
assert (self_p);
if (*self_p) {
sub_t *self = *self_p;
+ zhash_destroy (&self->cache);
free (self->path);
free (self);
*self_p = NULL;
@@ -201,10 +204,11 @@ sub_destroy (sub_t **self_p)
struct _client_t {
// Properties accessible to client actions
event_t next_event; // Next event
- bool connected; // Are we connected to server?
- zlist_t *subs; // Subscriptions
- size_t credit; // Current credit pending
- fmq_file_t *file; // File we're writing to
+ bool connected; // Are we connected to server?
+ zlist_t *subs; // Subscriptions
+ sub_t *sub; // Subscription we want to send
+ size_t credit; // Current credit pending
+ fmq_file_t *file; // File we're writing to
// Properties you should NOT touch
zctx_t *ctx; // Own CZMQ context
@@ -276,32 +280,35 @@ client_apply_config (client_t *self)
}
if (streq (fmq_config_name (section), "subscribe")) {
char *path = fmq_config_resolve (section, "path", "?");
- // Store subscription along with any previous ones
- // Check we don't already have a subscription for this path
- sub_t *sub = (sub_t *) zlist_first (self->subs);
- while (sub) {
- if (streq (path, sub->path))
- return;
- sub = (sub_t *) zlist_next (self->subs);
- }
- // Subscription path must start with '/'
- // We'll do better error handling later
- assert (*path == '/');
-
- // New subscription, so store it for later replay
- sub = sub_new (self, path);
- zlist_append (self->subs, sub);
- zclock_log ("I: subscribing directory %s as '%s'",
- fmq_config_resolve (self->config, "client/inbox", ".inbox"), path, path);
-
- // If we're connected, then also send to server
- if (self->connected) {
- fmq_msg_path_set (self->request, path);
- self->next_event = subscribe_event;
- }
+ // Store subscription along with any previous ones
+ // Check we don't already have a subscription for this path
+ self->sub = (sub_t *) zlist_first (self->subs);
+ while (self->sub) {
+ if (streq (path, self->sub->path))
+ return;
+ self->sub = (sub_t *) zlist_next (self->subs);
+ }
+ // Subscription path must start with '/'
+ // We'll do better error handling later
+ assert (*path == '/');
+
+ // Get directory cache for this path
+ char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
+ fmq_dir_t *dir = fmq_dir_new (path + 1, inbox);
+ zhash_t *cache = dir? fmq_dir_cache (dir): NULL;
+ fmq_dir_destroy (&dir);
+
+ // New subscription, store it for later replay
+ self->sub = sub_new (self, path, cache);
+ zlist_append (self->subs, self->sub);
+ zclock_log ("I: subscribe to %s as %s%s", path, inbox, path);
+
+ // If we're connected, then also send to server
+ if (self->connected)
+ self->next_event = subscribe_event;
}
else
- if (streq (fmq_config_name (section), "set inbox")) {
+ if (streq (fmq_config_name (section), "set_inbox")) {
char *path = fmq_config_resolve (section, "path", "?");
fmq_config_path_set (self->config, "client/inbox", path);
}
@@ -336,25 +343,28 @@ connected_to_server (client_t *self)
static void
get_first_subscription (client_t *self)
{
- sub_t *sub = (sub_t *) zlist_first (self->subs);
- if (sub) {
- fmq_msg_path_set (self->request, sub->path);
- self->next_event = ok_event;
- }
- else
- self->next_event = finished_event;
+ self->sub = (sub_t *) zlist_first (self->subs);
+ if (self->sub)
+ self->next_event = ok_event;
+ else
+ self->next_event = finished_event;
}
static void
get_next_subscription (client_t *self)
{
- sub_t *sub = (sub_t *) zlist_next (self->subs);
- if (sub) {
- fmq_msg_path_set (self->request, sub->path);
- self->next_event = ok_event;
- }
- else
- self->next_event = finished_event;
+ self->sub = (sub_t *) zlist_next (self->subs);
+ if (self->sub)
+ self->next_event = ok_event;
+ else
+ self->next_event = finished_event;
+}
+
+static void
+format_icanhaz_command (client_t *self)
+{
+ fmq_msg_path_set (self->request, self->sub->path);
+ fmq_msg_cache_set (self->request, zhash_dup (self->sub->cache));
}
static void
@@ -378,6 +388,10 @@ process_the_patch (client_t *self)
char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
char *filename = fmq_msg_filename (self->reply);
+ // Filenames from server must start with slash, which we skip
+ assert (*filename == '/');
+ filename++;
+
if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_CREATE) {
if (self->file == NULL) {
zclock_log ("I: create %s/%s", inbox, filename);
@@ -398,6 +412,7 @@ process_the_patch (client_t *self)
else
// Zero-sized chunk means end of file
fmq_file_destroy (&self->file);
+
fmq_chunk_destroy (&chunk);
}
else
@@ -501,6 +516,7 @@ client_execute (client_t *self, int event)
case subscribing_state:
if (self->event == ok_event) {
+ format_icanhaz_command (self);
fmq_msg_id_set (self->request, FMQ_MSG_ICANHAZ);
fmq_msg_send (&self->request, self->dealer);
self->request = fmq_msg_new (0);
@@ -543,6 +559,7 @@ client_execute (client_t *self, int event)
}
else
if (self->event == subscribe_event) {
+ format_icanhaz_command (self);
fmq_msg_id_set (self->request, FMQ_MSG_ICANHAZ);
fmq_msg_send (&self->request, self->dealer);
self->request = fmq_msg_new (0);
@@ -615,29 +632,32 @@ control_message (client_t *self)
char *method = zmsg_popstr (msg);
if (streq (method, "SUBSCRIBE")) {
char *path = zmsg_popstr (msg);
- // Store subscription along with any previous ones
- // Check we don't already have a subscription for this path
- sub_t *sub = (sub_t *) zlist_first (self->subs);
- while (sub) {
- if (streq (path, sub->path))
- return;
- sub = (sub_t *) zlist_next (self->subs);
- }
- // Subscription path must start with '/'
- // We'll do better error handling later
- assert (*path == '/');
-
- // New subscription, so store it for later replay
- sub = sub_new (self, path);
- zlist_append (self->subs, sub);
- zclock_log ("I: subscribing directory %s as '%s'",
- fmq_config_resolve (self->config, "client/inbox", ".inbox"), path, path);
-
- // If we're connected, then also send to server
- if (self->connected) {
- fmq_msg_path_set (self->request, path);
- self->next_event = subscribe_event;
- }
+ // Store subscription along with any previous ones
+ // Check we don't already have a subscription for this path
+ self->sub = (sub_t *) zlist_first (self->subs);
+ while (self->sub) {
+ if (streq (path, self->sub->path))
+ return;
+ self->sub = (sub_t *) zlist_next (self->subs);
+ }
+ // Subscription path must start with '/'
+ // We'll do better error handling later
+ assert (*path == '/');
+
+ // Get directory cache for this path
+ char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
+ fmq_dir_t *dir = fmq_dir_new (path + 1, inbox);
+ zhash_t *cache = dir? fmq_dir_cache (dir): NULL;
+ fmq_dir_destroy (&dir);
+
+ // New subscription, store it for later replay
+ self->sub = sub_new (self, path, cache);
+ zlist_append (self->subs, self->sub);
+ zclock_log ("I: subscribe to %s as %s%s", path, inbox, path);
+
+ // If we're connected, then also send to server
+ if (self->connected)
+ self->next_event = subscribe_event;
free (path);
}
else
View
48 src/fmq_dir.c
@@ -294,6 +294,7 @@ fmq_dir_diff (fmq_dir_t *older, fmq_dir_t *newer)
if (fmq_file_stable (old)) {
// Old file was modified or replaced
// Since we don't check file contents, treat as created
+ // Could better do SHA check on file here
if (fmq_file_time (new) != fmq_file_time (old)
|| fmq_file_size (new) != fmq_file_size (old))
zlist_append (patches, fmq_patch_new (newer->path, new, patch_create));
@@ -428,6 +429,47 @@ fmq_dir_dump (fmq_dir_t *self, int indent)
}
+static void
+s_dir_recache (fmq_dir_t *self, char *root, zhash_t *cache)
+{
+ // Process files in directory
+ fmq_file_t *file = (fmq_file_t *) zlist_first (self->files);
+ while (file) {
+ char *filename = fmq_file_name (file, root);
+ if (zhash_lookup (cache, fmq_file_name (file, root)) == NULL)
+ zhash_insert (cache, filename, fmq_file_hash (file));
+ file = (fmq_file_t *) zlist_next (self->files);
+ }
+
+ // Recurse to subdirectories
+ fmq_dir_t *subdir = (fmq_dir_t *) zlist_first (self->subdirs);
+ while (subdir) {
+ s_dir_recache (subdir, root, cache);
+ subdir = (fmq_dir_t *) zlist_next (self->subdirs);
+ }
+}
+
+
+// --------------------------------------------------------------------------
+// Load directory cache; returns a hash table containing the SHA-1 digests
+// of every file in the tree. The cache is saved between runs in .cache.
+// The caller must destroy the hash table when done with it.
+
+zhash_t *
+fmq_dir_cache (fmq_dir_t *self)
+{
+ assert (self);
+ zhash_t *cache = zhash_new ();
+ char *cache_file = malloc (strlen (self->path) + strlen ("/.cache") + 1);
+ sprintf (cache_file, "%s/.cache", self->path);
+ zhash_load (cache, cache_file);
+ s_dir_recache (self, self->path, cache);
+ zhash_save (cache, cache_file);
+ free (cache_file);
+ return cache;
+}
+
+
// --------------------------------------------------------------------------
// Self test of this class
int
@@ -452,6 +494,12 @@ fmq_dir_test (bool verbose)
fmq_dir_destroy (&older);
fmq_dir_destroy (&newer);
+ // Test directory cache calculation
+ fmq_dir_t *here = fmq_dir_new (".", NULL);
+ zhash_t *cache = fmq_dir_cache (here);
+ fmq_dir_destroy (&here);
+ zhash_destroy (&cache);
+
fmq_dir_t *nosuch = fmq_dir_new ("does-not-exist", NULL);
assert (nosuch == NULL);
View
49 src/fmq_file.c
@@ -409,6 +409,55 @@ fmq_file_close (fmq_file_t *self)
// --------------------------------------------------------------------------
+// Return file handle, if opened
+
+FILE *
+fmq_file_handle (fmq_file_t *self)
+{
+ assert (self);
+ return self->handle;
+}
+
+
+// --------------------------------------------------------------------------
+// Return file SHA-1 hash as string; caller has to free it
+
+char *
+fmq_file_hash (fmq_file_t *self)
+{
+ assert (self);
+
+ int rc = fmq_file_input (self);
+ if (rc == -1)
+ return ""; // Problem reading directory
+
+ // Now calculate hash for file data, chunk by chunk
+ fmq_hash_t *hash = fmq_hash_new ();
+ size_t blocksz = 65535;
+
+ fmq_chunk_t *chunk = fmq_chunk_read (self->handle, blocksz);
+ while (fmq_chunk_size (chunk)) {
+ fmq_hash_update (hash, fmq_chunk_data (chunk), fmq_chunk_size (chunk));
+ fmq_chunk_destroy (&chunk);
+ chunk = fmq_chunk_read (self->handle, blocksz);
+ }
+ fmq_chunk_destroy (&chunk);
+ fmq_file_close (self);
+
+ // Convert to printable string
+ char hex_char [] = "0123456789ABCDEF";
+ char *hashstr = zmalloc (fmq_hash_size (hash) * 2 + 1);
+ int byte_nbr;
+ for (byte_nbr = 0; byte_nbr < fmq_hash_size (hash); byte_nbr++) {
+ hashstr [byte_nbr * 2 + 0] = hex_char [fmq_hash_data (hash) [byte_nbr] >> 4];
+ hashstr [byte_nbr * 2 + 1] = hex_char [fmq_hash_data (hash) [byte_nbr] & 15];
+ }
+ fmq_hash_destroy (&hash);
+ return hashstr;
+}
+
+
+// --------------------------------------------------------------------------
// Self test of this class
int
View
169 src/fmq_msg.c
@@ -42,6 +42,8 @@ struct _fmq_msg_t {
char *path;
zhash_t *options;
size_t options_bytes; // Size of dictionary content
+ zhash_t *cache;
+ size_t cache_bytes; // Size of dictionary content
uint64_t credit;
uint64_t sequence;
byte operation;
@@ -203,6 +205,7 @@ fmq_msg_destroy (fmq_msg_t **self_p)
zframe_destroy (&self->response);
free (self->path);
zhash_destroy (&self->options);
+ zhash_destroy (&self->cache);
free (self->filename);
zhash_destroy (&self->headers);
zframe_destroy (&self->chunk);
@@ -307,6 +310,7 @@ fmq_msg_recv (void *input)
GET_STRING (self->path);
GET_NUMBER1 (hash_size);
self->options = zhash_new ();
+ zhash_autofree (self->options);
while (hash_size--) {
char *string;
GET_STRING (string);
@@ -314,7 +318,18 @@ fmq_msg_recv (void *input)
if (value)
*value++ = 0;
zhash_insert (self->options, string, strdup (value));
- zhash_freefn (self->options, string, free);
+ free (string);
+ }
+ GET_NUMBER1 (hash_size);
+ self->cache = zhash_new ();
+ zhash_autofree (self->cache);
+ while (hash_size--) {
+ char *string;
+ GET_STRING (string);
+ char *value = strchr (string, '=');
+ if (value)
+ *value++ = 0;
+ zhash_insert (self->cache, string, strdup (value));
free (string);
}
break;
@@ -335,6 +350,7 @@ fmq_msg_recv (void *input)
GET_NUMBER8 (self->offset);
GET_NUMBER1 (hash_size);
self->headers = zhash_new ();
+ zhash_autofree (self->headers);
while (hash_size--) {
char *string;
GET_STRING (string);
@@ -342,7 +358,6 @@ fmq_msg_recv (void *input)
if (value)
*value++ = 0;
zhash_insert (self->headers, string, strdup (value));
- zhash_freefn (self->headers, string, free);
free (string);
}
// Get next frame, leave current untouched
@@ -409,6 +424,27 @@ s_options_write (const char *key, void *item, void *argument)
// Count size of key=value pair
static int
+s_cache_count (const char *key, void *item, void *argument)
+{
+ fmq_msg_t *self = (fmq_msg_t *) argument;
+ self->cache_bytes += strlen (key) + 1 + strlen ((char *) item) + 1;
+ return 0;
+}
+
+// Serialize cache key=value pair
+static int
+s_cache_write (const char *key, void *item, void *argument)
+{
+ fmq_msg_t *self = (fmq_msg_t *) argument;
+ char string [STRING_MAX + 1];
+ snprintf (string, STRING_MAX, "%s=%s", key, (char *) item);
+ size_t string_size;
+ PUT_STRING (string);
+ return 0;
+}
+
+// Count size of key=value pair
+static int
s_headers_count (const char *key, void *item, void *argument)
{
fmq_msg_t *self = (fmq_msg_t *) argument;
@@ -487,6 +523,14 @@ fmq_msg_send (fmq_msg_t **self_p, void *output)
zhash_foreach (self->options, s_options_count, self);
}
frame_size += self->options_bytes;
+ // cache is an array of key=value strings
+ frame_size++; // Size is one octet
+ if (self->cache) {
+ self->cache_bytes = 0;
+ // Add up size of dictionary contents
+ zhash_foreach (self->cache, s_cache_count, self);
+ }
+ frame_size += self->cache_bytes;
break;
case FMQ_MSG_ICANHAZ_OK:
@@ -600,6 +644,12 @@ fmq_msg_send (fmq_msg_t **self_p, void *output)
}
else
PUT_NUMBER1 (0); // Empty dictionary
+ if (self->cache != NULL) {
+ PUT_NUMBER1 (zhash_size (self->cache));
+ zhash_foreach (self->cache, s_cache_write, self);
+ }
+ else
+ PUT_NUMBER1 (0); // Empty dictionary
break;
case FMQ_MSG_ICANHAZ_OK:
@@ -774,11 +824,13 @@ int
fmq_msg_send_icanhaz (
void *output,
char *path,
- zhash_t *options)
+ zhash_t *options,
+ zhash_t *cache)
{
fmq_msg_t *self = fmq_msg_new (FMQ_MSG_ICANHAZ);
fmq_msg_path_set (self, path);
fmq_msg_options_set (self, zhash_dup (options));
+ fmq_msg_cache_set (self, zhash_dup (cache));
return fmq_msg_send (&self, output);
}
@@ -933,6 +985,7 @@ fmq_msg_dup (fmq_msg_t *self)
case FMQ_MSG_ICANHAZ:
copy->path = strdup (self->path);
copy->options = zhash_dup (self->options);
+ copy->cache = zhash_dup (self->cache);
break;
case FMQ_MSG_ICANHAZ_OK:
@@ -983,6 +1036,15 @@ s_options_dump (const char *key, void *item, void *argument)
return 0;
}
+// Dump cache key=value pair to stdout
+int
+s_cache_dump (const char *key, void *item, void *argument)
+{
+ fmq_msg_t *self = (fmq_msg_t *) argument;
+ printf (" %s=%s\n", key, (char *) item);
+ return 0;
+}
+
// Dump headers key=value pair to stdout
int
s_headers_dump (const char *key, void *item, void *argument)
@@ -1072,6 +1134,10 @@ fmq_msg_dump (fmq_msg_t *self)
if (self->options)
zhash_foreach (self->options, s_options_dump, self);
printf (" }\n");
+ printf (" cache={\n");
+ if (self->cache)
+ zhash_foreach (self->cache, s_cache_dump, self);
+ printf (" }\n");
break;
case FMQ_MSG_ICANHAZ_OK:
@@ -1457,10 +1523,11 @@ fmq_msg_options_insert (fmq_msg_t *self, char *key, char *format, ...)
va_end (argptr);
// Store string in hash table
- if (!self->options)
+ if (!self->options) {
self->options = zhash_new ();
- if (zhash_insert (self->options, key, string) == 0)
- zhash_freefn (self->options, key, free);
+ zhash_autofree (self->options);
+ }
+ zhash_update (self->options, key, string);
}
size_t
@@ -1471,6 +1538,84 @@ fmq_msg_options_size (fmq_msg_t *self)
// --------------------------------------------------------------------------
+// Get/set the cache field
+
+zhash_t *
+fmq_msg_cache (fmq_msg_t *self)
+{
+ assert (self);
+ return self->cache;
+}
+
+// Greedy function, takes ownership of cache; if you don't want that
+// then use zhash_dup() to pass a copy of cache
+
+void
+fmq_msg_cache_set (fmq_msg_t *self, zhash_t *cache)
+{
+ assert (self);
+ zhash_destroy (&self->cache);
+ self->cache = cache;
+}
+
+// --------------------------------------------------------------------------
+// Get/set a value in the cache dictionary
+
+char *
+fmq_msg_cache_string (fmq_msg_t *self, char *key, char *default_value)
+{
+ assert (self);
+ char *value = NULL;
+ if (self->cache)
+ value = (char *) (zhash_lookup (self->cache, key));
+ if (!value)
+ value = default_value;
+
+ return value;
+}
+
+uint64_t
+fmq_msg_cache_number (fmq_msg_t *self, char *key, uint64_t default_value)
+{
+ assert (self);
+ uint64_t value = default_value;
+ char *string;
+ if (self->cache)
+ string = (char *) (zhash_lookup (self->cache, key));
+ if (string)
+ value = atol (string);
+
+ return value;
+}
+
+void
+fmq_msg_cache_insert (fmq_msg_t *self, char *key, char *format, ...)
+{
+ // Format string into buffer
+ assert (self);
+ va_list argptr;
+ va_start (argptr, format);
+ char *string = (char *) malloc (STRING_MAX + 1);
+ assert (string);
+ vsnprintf (string, STRING_MAX, format, argptr);
+ va_end (argptr);
+
+ // Store string in hash table
+ if (!self->cache) {
+ self->cache = zhash_new ();
+ zhash_autofree (self->cache);
+ }
+ zhash_update (self->cache, key, string);
+}
+
+size_t
+fmq_msg_cache_size (fmq_msg_t *self)
+{
+ return zhash_size (self->cache);
+}
+
+
+// --------------------------------------------------------------------------
// Get/set the credit field
uint64_t
@@ -1631,10 +1776,11 @@ fmq_msg_headers_insert (fmq_msg_t *self, char *key, char *format, ...)
va_end (argptr);
// Store string in hash table
- if (!self->headers)
+ if (!self->headers) {
self->headers = zhash_new ();
- if (zhash_insert (self->headers, key, string) == 0)
- zhash_freefn (self->headers, key, free);
+ zhash_autofree (self->headers);
+ }
+ zhash_update (self->headers, key, string);
}
size_t
@@ -1759,6 +1905,8 @@ fmq_msg_test (bool verbose)
fmq_msg_path_set (self, "Life is short but Now lasts for ever");
fmq_msg_options_insert (self, "Name", "Brutus");
fmq_msg_options_insert (self, "Age", "%d", 43);
+ fmq_msg_cache_insert (self, "Name", "Brutus");
+ fmq_msg_cache_insert (self, "Age", "%d", 43);
fmq_msg_send (&self, output);
self = fmq_msg_recv (input);
@@ -1767,6 +1915,9 @@ fmq_msg_test (bool verbose)
assert (fmq_msg_options_size (self) == 2);
assert (streq (fmq_msg_options_string (self, "Name", "?"), "Brutus"));
assert (fmq_msg_options_number (self, "Age", 0) == 43);
+ assert (fmq_msg_cache_size (self) == 2);
+ assert (streq (fmq_msg_cache_string (self, "Name", "?"), "Brutus"));
+ assert (fmq_msg_cache_number (self, "Age", 0) == 43);
fmq_msg_destroy (&self);
self = fmq_msg_new (FMQ_MSG_ICANHAZ_OK);
View
56 src/fmq_patch.c
@@ -27,11 +27,14 @@
#include "../include/fmq.h"
// Structure of our class
+// If you modify this beware to also change _dup
struct _fmq_patch_t {
char *path; // Directory path
+ char *virtual; // Virtual file name
fmq_file_t *file; // File we refer to
fmq_patch_op_t op; // Operation
+ char *hashstr; // File hash digest
};
@@ -46,9 +49,12 @@ fmq_patch_new (char *path, fmq_file_t *file, fmq_patch_op_t op)
self->path = strdup (path);
self->file = fmq_file_dup (file);
self->op = op;
+ if (self->op == patch_create)
+ self->hashstr = fmq_file_hash (self->file);
return self;
}
+
// --------------------------------------------------------------------------
// Destroy a patch
@@ -59,6 +65,8 @@ fmq_patch_destroy (fmq_patch_t **self_p)
if (*self_p) {
fmq_patch_t *self = *self_p;
free (self->path);
+ free (self->virtual);
+ free (self->hashstr);
fmq_file_destroy (&self->file);
free (self);
*self_p = NULL;
@@ -72,12 +80,19 @@ fmq_patch_destroy (fmq_patch_t **self_p)
fmq_patch_t *
fmq_patch_dup (fmq_patch_t *self)
{
- return fmq_patch_new (self->path, self->file, self->op);
+ fmq_patch_t *copy = (fmq_patch_t *) zmalloc (sizeof (fmq_patch_t));
+ copy->path = strdup (self->path);
+ copy->file = fmq_file_dup (self->file);
+ copy->op = self->op;
+ // Don't recalculate hash when we duplicate patch
+ copy->hashstr = self->hashstr? strdup (self->hashstr): NULL;
+ copy->virtual = self->virtual? strdup (self->virtual): NULL;
+ return copy;
}
// --------------------------------------------------------------------------
-// Return patch file directory oatth
+// Return patch file directory path
char *
fmq_patch_path (fmq_patch_t *self)
@@ -110,6 +125,40 @@ fmq_patch_op (fmq_patch_t *self)
// --------------------------------------------------------------------------
+// Return patch virtual file name
+
+char *
+fmq_patch_virtual (fmq_patch_t *self)
+{
+ assert (self);
+ return self->virtual;
+}
+
+
+// --------------------------------------------------------------------------
+// Set patch virtual file name
+
+void
+fmq_patch_virtual_set (fmq_patch_t *self, char *virtual)
+{
+ assert (self);
+ free (self->virtual);
+ self->virtual = strdup (virtual);
+}
+
+
+// --------------------------------------------------------------------------
+// Return hash digest for patch file (create only)
+
+char *
+fmq_patch_hashstr (fmq_patch_t *self)
+{
+ assert (self);
+ return self->hashstr;
+}
+
+
+// --------------------------------------------------------------------------
// Self test of this class
int
fmq_patch_test (bool verbose)
@@ -122,7 +171,8 @@ fmq_patch_test (bool verbose)
file = fmq_patch_file (patch);
assert (streq (fmq_file_name (file, "."), "bilbo"));
-
+ fmq_patch_virtual_set (patch, "/virtual/path");
+ assert (streq (fmq_patch_virtual (patch), "/virtual/path"));
fmq_patch_destroy (&patch);
printf ("OK\n");
View
3  src/fmq_selftest.c
@@ -44,7 +44,7 @@ int main (int argc, char *argv [])
fmq_dir_test (verbose);
fmq_msg_test (verbose);
fmq_sasl_test (verbose);
- fmq_sha_test (verbose);
+ fmq_hash_test (verbose);
fmq_config_test (verbose);
fmq_server_test (verbose);
fmq_client_test (verbose);
@@ -70,7 +70,6 @@ int main (int argc, char *argv [])
fmq_client_configure (client, "client_test.cfg");
fmq_client_setoption (client, "client/inbox", "./fmqroot/recv");
fmq_client_connect (client, "tcp://localhost:5670");
- fmq_client_subscribe (client, "/");
fmq_client_subscribe (client, "/photos");
fmq_client_subscribe (client, "/logs");
}
View
210 src/fmq_server.c
@@ -227,21 +227,42 @@ server_client_execute (server_t *server, client_t *client, int event);
typedef struct {
client_t *client; // Always refers to live client
char *path; // Path client is subscribed to
+ zhash_t *cache; // Client's cache list
} sub_t;
+static int
+s_resolve_cache_path (const char *key, void *item, void *argument);
// --------------------------------------------------------------------------
// Constructor
static sub_t *
-sub_new (client_t *client, char *path)
+sub_new (client_t *client, char *path, zhash_t *cache)
{
sub_t *self = (sub_t *) zmalloc (sizeof (sub_t));
self->client = client;
self->path = strdup (path);
+ self->cache = zhash_dup (cache);
+ zhash_foreach (self->cache, s_resolve_cache_path, self);
return self;
}
+// Cached filenames may be local, in which case prefix them with
+// the subscription path so we can do a consistent match.
+
+static int
+s_resolve_cache_path (const char *key, void *item, void *argument)
+{
+ sub_t *self = (sub_t *) argument;
+ if (*key != '/') {
+ char *new_key = malloc (strlen (self->path) + strlen (key) + 2);
+ sprintf (new_key, "%s/%s", self->path, key);
+ zhash_rename (self->cache, key, new_key);
+ free (new_key);
+ }
+ return 0;
+}
+
// --------------------------------------------------------------------------
// Destructor
@@ -252,24 +273,33 @@ sub_destroy (sub_t **self_p)
assert (self_p);
if (*self_p) {
sub_t *self = *self_p;
+ zhash_destroy (&self->cache);
free (self->path);
free (self);
*self_p = NULL;
}
}
+
// --------------------------------------------------------------------------
// Add patch to sub client patches list
static void
sub_patch_add (sub_t *self, fmq_patch_t *patch)
{
- // Delete duplicate patches
+ // Skip file creation if client already has identical file
+ if (fmq_patch_op (patch) == patch_create) {
+ char *hashstr = zhash_lookup (self->cache, fmq_patch_virtual (patch));
+ if (hashstr && strcasecmp (hashstr, fmq_patch_hashstr (patch)) == 0)
+ return; // Just skip patch for this client
+ }
+ // Remove any previous patches for the same file
fmq_patch_t *existing = (fmq_patch_t *) zlist_first (self->client->patches);
while (existing) {
if (streq (fmq_file_name (fmq_patch_file (patch), NULL),
- fmq_file_name (fmq_patch_file (existing), NULL))) {
+ fmq_file_name (fmq_patch_file (existing), NULL))) {
zlist_remove (self->client->patches, existing);
+ fmq_patch_destroy (&existing);
break;
}
existing = (fmq_patch_t *) zlist_next (self->client->patches);
@@ -342,7 +372,7 @@ mount_refresh (mount_t *self, server_t *server)
{
bool activity = false;
- // Get latest snapshot and build a patches list if it's activity
+ // Get latest snapshot and build a patches list for any changes
fmq_dir_t *latest = fmq_dir_new (self->location, NULL);
zlist_t *patches = fmq_dir_diff (self->dir, latest);
@@ -350,6 +380,26 @@ mount_refresh (mount_t *self, server_t *server)
fmq_dir_destroy (&self->dir);
self->dir = latest;
+ // Compute virtual file names for all patches
+ fmq_patch_t *patch = (fmq_patch_t *) zlist_first (patches);
+ while (patch) {
+ // Get filename without path
+ char *filename = fmq_file_name (fmq_patch_file (patch),
+ fmq_patch_path (patch));
+ assert (*filename != '/');
+
+ // Now put mount alias in front of that
+ char *virtual = malloc (strlen (self->alias) + strlen (filename) + 2);
+ strcpy (virtual, self->alias);
+ if (virtual [strlen (virtual) - 1] != '/')
+ strcat (virtual, "/");
+ strcat (virtual, filename);
+ fmq_patch_virtual_set (patch, virtual);
+ free (virtual);
+
+ patch = (fmq_patch_t *) zlist_next (patches);
+ }
+
// Copy new patches to clients' patches list
sub_t *sub = (sub_t *) zlist_first (self->subs);
while (sub) {
@@ -370,11 +420,11 @@ mount_refresh (mount_t *self, server_t *server)
// Store subscription for mount point
static void
-mount_sub_store (mount_t *self, client_t *client, char *path)
+mount_sub_store (mount_t *self, client_t *client, fmq_msg_t *request)
{
// Store subscription along with any previous ones
// Coalesce subscriptions that are on same path
-
+ char *path = fmq_msg_path (request);
sub_t *sub = (sub_t *) zlist_first (self->subs);
while (sub) {
if (client == sub->client) {
@@ -385,6 +435,7 @@ mount_sub_store (mount_t *self, client_t *client, char *path)
// If new subscription is superset of old one, remove old
if (strncmp (sub->path, path, strlen (path)) == 0) {
zlist_remove (self->subs, sub);
+ sub_destroy (&sub);
sub = (sub_t *) zlist_first (self->subs);
}
else
@@ -394,7 +445,7 @@ mount_sub_store (mount_t *self, client_t *client, char *path)
sub = (sub_t *) zlist_next (self->subs);
}
// New subscription for this client, append to our list
- sub = sub_new (client, path);
+ sub = sub_new (client, path, fmq_msg_cache (request));
zlist_append (self->subs, sub);
}
@@ -588,7 +639,7 @@ server_apply_config (server_t *self)
zlist_append (self->mounts, mount);
}
else
- if (streq (fmq_config_name (section), "set anonymous")) {
+ if (streq (fmq_config_name (section), "set_anonymous")) {
long access = atoi (fmq_config_resolve (section, "access", ""));
// Enable anonymous access without a config file
fmq_config_path_set (self->config, "security/anonymous", access? "1" :"0");
@@ -714,17 +765,18 @@ store_client_subscription (server_t *self, client_t *client)
{
// Find mount point with longest match to subscription
char *path = fmq_msg_path (client->request);
- mount_t *mount = (mount_t *) zlist_first (self->mounts);
- mount_t *match = mount;
- while (mount) {
- // If mount->alias is prefix of path and alias is
- // longer than previous best then we have a new best
- if (strncmp (path, mount->alias, strlen (mount->alias)) == 0
- && strlen (mount->alias) > strlen (match->alias))
- match = mount;
- mount = (mount_t *) zlist_next (self->mounts);
+
+ mount_t *check = (mount_t *) zlist_first (self->mounts);
+ mount_t *mount = check;
+ while (check) {
+ // If check->alias is prefix of path and alias is
+ // longer than current mount then we have a new mount
+ if (strncmp (path, check->alias, strlen (check->alias)) == 0
+ && strlen (check->alias) > strlen (mount->alias))
+ mount = check;
+ check = (mount_t *) zlist_next (self->mounts);
}
- mount_sub_store (match, client, path);
+ mount_sub_store (mount, client, client->request);
}
static void
@@ -750,67 +802,67 @@ monitor_the_server (server_t *self, client_t *client)
static void
get_next_patch_for_client (server_t *self, client_t *client)
{
- // Get next patch for client if we're not doing one already
- if (client->patch == NULL)
- client->patch = (fmq_patch_t *) zlist_pop (client->patches);
- if (client->patch == NULL) {
- client->next_event = finished_event;
- return;
- }
- // Map filename to logical space
- char *filename = fmq_file_name (fmq_patch_file (client->patch), fmq_patch_path (client->patch));
- fmq_msg_filename_set (client->reply, filename);
-
- // We can process a delete patch right away
- if (fmq_patch_op (client->patch) == patch_delete) {
- fmq_msg_sequence_set (client->reply, client->sequence++);
- fmq_msg_operation_set (client->reply, FMQ_MSG_FILE_DELETE);
- client->next_event = send_delete_event;
-
- // No reliability in this version, assume patch delivered safely
- fmq_patch_destroy (&client->patch);
- }
- else {
- // Create patch refers to file, open that for input if needed
- if (client->file == NULL) {
- client->file = fmq_file_dup (fmq_patch_file (client->patch));
- if (fmq_file_input (client->file)) {
- // File no longer available, skip it
- fmq_patch_destroy (&client->patch);
- fmq_file_destroy (&client->file);
- client->next_event = next_patch_event;
- return;
- }
- client->offset = 0;
- }
- // Get next chunk for file
- fmq_chunk_t *chunk = fmq_file_read (client->file, CHUNK_SIZE, client->offset);
- assert (chunk);
-
- // Check if we have the credit to send chunk
- if (fmq_chunk_size (chunk) <= client->credit) {
- fmq_msg_sequence_set (client->reply, client->sequence++);
- fmq_msg_operation_set (client->reply, FMQ_MSG_FILE_CREATE);
- fmq_msg_offset_set (client->reply, client->offset);
- fmq_msg_chunk_set (client->reply, zframe_new (
- fmq_chunk_data (chunk),
- fmq_chunk_size (chunk)));
-
- client->offset += fmq_chunk_size (chunk);
- client->credit -= fmq_chunk_size (chunk);
- client->next_event = send_chunk_event;
-
- // Zero-sized chunk means end of file
- if (fmq_chunk_size (chunk) == 0) {
- fmq_file_destroy (&client->file);
- fmq_patch_destroy (&client->patch);
- }
- }
- else
- client->next_event = no_credit_event;
-
- fmq_chunk_destroy (&chunk);
- }
+ // Get next patch for client if we're not doing one already
+ if (client->patch == NULL)
+ client->patch = (fmq_patch_t *) zlist_pop (client->patches);
+ if (client->patch == NULL) {
+ client->next_event = finished_event;
+ return;
+ }
+ // Get virtual filename from patch
+ fmq_msg_filename_set (client->reply, fmq_patch_virtual (client->patch));
+
+ // We can process a delete patch right away
+ if (fmq_patch_op (client->patch) == patch_delete) {
+ fmq_msg_sequence_set (client->reply, client->sequence++);
+ fmq_msg_operation_set (client->reply, FMQ_MSG_FILE_DELETE);
+ client->next_event = send_delete_event;
+
+ // No reliability in this version, assume patch delivered safely
+ fmq_patch_destroy (&client->patch);
+ }
+ else
+ if (fmq_patch_op (client->patch) == patch_create) {
+ // Create patch refers to file, open that for input if needed
+ if (client->file == NULL) {
+ client->file = fmq_file_dup (fmq_patch_file (client->patch));
+ if (fmq_file_input (client->file)) {
+ // File no longer available, skip it
+ fmq_patch_destroy (&client->patch);
+ fmq_file_destroy (&client->file);
+ client->next_event = next_patch_event;
+ return;
+ }
+ client->offset = 0;
+ }
+ // Get next chunk for file
+ fmq_chunk_t *chunk = fmq_file_read (client->file, CHUNK_SIZE, client->offset);
+ assert (chunk);
+
+ // Check if we have the credit to send chunk
+ if (fmq_chunk_size (chunk) <= client->credit) {
+ fmq_msg_sequence_set (client->reply, client->sequence++);
+ fmq_msg_operation_set (client->reply, FMQ_MSG_FILE_CREATE);
+ fmq_msg_offset_set (client->reply, client->offset);
+ fmq_msg_chunk_set (client->reply, zframe_new (
+ fmq_chunk_data (chunk),
+ fmq_chunk_size (chunk)));
+
+ client->offset += fmq_chunk_size (chunk);
+ client->credit -= fmq_chunk_size (chunk);
+ client->next_event = send_chunk_event;
+
+ // Zero-sized chunk means end of file
+ if (fmq_chunk_size (chunk) == 0) {
+ fmq_file_destroy (&client->file);
+ fmq_patch_destroy (&client->patch);
+ }
+ }
+ else
+ client->next_event = no_credit_event;
+
+ fmq_chunk_destroy (&chunk);
+ }
}
// Execute state machine as long as we have events
View
34 src/selftest
@@ -6,7 +6,10 @@ cd ../model
./generate
cd ../src
-gcc -g -o fmq_selftest fmq_selftest.c \
+# Valgrind command
+VG="valgrind --tool=memcheck --leak-check=full --show-reachable=yes --suppressions=valgrind.supp"
+
+gcc -g -O2 -o fmq_selftest fmq_selftest.c \
fmq_chunk.c \
fmq_client.c \
fmq_config.c \
@@ -15,32 +18,37 @@ gcc -g -o fmq_selftest fmq_selftest.c \
fmq_msg.c \
fmq_patch.c \
fmq_sasl.c \
- fmq_sha.c \
+ fmq_hash.c \
fmq_server.c \
- ../sha-1.1/sha1.c \
- ${CFLAGS} ${LDFLAGS} -DHAVE_CONFIG_H -lczmq -lzmq
+ ${CFLAGS} ${LDFLAGS} -DHAVE_CONFIG_H -lczmq -lzmq -lcrypto
if [ $? -eq 0 ]; then
if [ "$1" == "-q" ]; then
./fmq_selftest
else
- echo "Starting Valgrind memcheck..."
- valgrind --tool=memcheck --leak-check=full --show-reachable=yes --suppressions=valgrind.supp ./fmq_selftest
+ $VG ./fmq_selftest
fi
fi
rm -f vgcore.*
+[ "$1" == "quick" ] && exit
+
function push_file {
dd if=/dev/urandom of=./fmqroot/$1 bs=$3 count=$2 2> /dev/null
}
# Start server and client
killall -q fmq_selftest -r ".*fmq_selftest.*"
-rm -rf fmqroot/send/* fmqroot/recv/* fmqroot/logs
-mkdir fmqroot/logs fmqroot/send/photos
+#rm -rf fmqroot/send/* fmqroot/recv/* fmqroot/logs
+mkdir -p fmqroot/logs fmqroot/send/photos
-./fmq_selftest -c &
-./fmq_selftest -s &
+if [ "$1" == "deep" ]; then
+ $VG --log-file=client.log ./fmq_selftest -c &
+ $VG --log-file=server.log ./fmq_selftest -s &
+else
+ ./fmq_selftest -c &
+ ./fmq_selftest -s &
+fi
sleep 1
# Push some files into photos and logs
@@ -62,4 +70,8 @@ push_file photos/DSCF0003.jpg 4000 1000
echo "Step 2..."
sleep 3
-killall -q fmq_selftest -r ".*fmq_selftest.*"
+if [ "$1" == "deep" ]; then
+ killall -q valgrind -r "memcheck-.*"
+else
+ killall -q fmq_selftest -r ".*fmq_selftest.*"
+fi
Please sign in to comment.
Something went wrong with that request. Please try again.