Permalink
Browse files

Implemented RESYNC option

  • Loading branch information...
hintjens committed Nov 15, 2012
1 parent fa3e774 commit 4eab7c6da6a8a2d93fa7c8a9a7feb6c3499278d1
View
@@ -165,9 +165,9 @@ The 'path' does not have to exist in the server. That is, clients can request pa
The 'options' field provides additional information to the server. The server SHOULD implement these options:
-* {{RESYNC=1}} - if the client sets this, the server SHALL send the full contents of the virtual path to the client.
+* {{RESYNC=1}} - if the client sets this, the server SHALL send the full contents of the virtual path to the client, except files the client already has, as identified by their SHA-1 digest in the 'cache' field.
-The 'cache' dictionary field tells the server which files the client already has. The server SHOULD respect this information. Each entry in the 'cache' dictionary is a "filename=digest" key/value pair where the digest SHALL be a SHA-1 digest in printable hexadecimal format. If the filename starts with '/' then it SHOULD start with the path, otherwise the server MUST ignore it. If the filename does not start with '/' then the server SHALL treat it as relative to the path.
+When the client specifies the RESYNC option, the 'cache' dictionary field tells the server which files the client already has. Each entry in the 'cache' dictionary is a "filename=digest" key/value pair where the digest SHALL be a SHA-1 digest in printable hexadecimal format. If the filename starts with '/' then it SHOULD start with the path, otherwise the server MUST ignore it. If the filename does not start with '/' then the server SHALL treat it as relative to the path.
++++ The ICANHAZ-OK Command
@@ -211,7 +211,7 @@ The server SHALL respond to an invalid command by sending RTFM. Note that the se
++ Security Aspects
-FILEMQ uses the Simple Authentication and Security Layer (SASL) for authentication and encryption.
+FILEMQ uses the Simple Authentication and Security Layer (SASL) for authentication and encryption. The SHA-1 digest used for file cache identification has no security implications.
++ References
View
@@ -61,6 +61,10 @@ void
void
fmq_client_set_inbox (fmq_client_t *self, const char *path);
+//
+void
+ fmq_client_set_resync (fmq_client_t *self, long enabled);
+
// Self test of this class
int
fmq_client_test (bool verbose);
View
@@ -54,7 +54,11 @@ off_t
// Calculate differences between two versions of a directory tree
zlist_t *
- fmq_dir_diff (fmq_dir_t *older, fmq_dir_t *newer);
+ fmq_dir_diff (fmq_dir_t *older, fmq_dir_t *newer, char *alias);
+
+// Return full contents of directory as a patch list.
+zlist_t *
+ fmq_dir_resync (fmq_dir_t *self, char *alias);
// Return total hierarchy count
size_t
View
@@ -39,7 +39,7 @@ typedef struct _fmq_patch_t fmq_patch_t;
// Create new patch
fmq_patch_t *
- fmq_patch_new (char *path, fmq_file_t *file, fmq_patch_op_t op);
+ fmq_patch_new (char *path, fmq_file_t *file, fmq_patch_op_t op, char *alias);
// Destroy a patch
void
@@ -69,9 +69,13 @@ char *
void
fmq_patch_virtual_set (fmq_patch_t *self, char *virtual);
-// Return hash digest for patch file (create only)
+// Calculate hash digest for file (create only)
+fmq_patch_t *
+ fmq_patch_digest_set (fmq_patch_t *self);
+
+// Return hash digest for patch file
char *
- fmq_patch_hashstr (fmq_patch_t *self);
+ fmq_patch_digest (fmq_patch_t *self);
// Self test of this class
int
View
@@ -67,7 +67,7 @@ void
//
void
- fmq_server_set_anonymous (fmq_server_t *self, long access);
+ fmq_server_set_anonymous (fmq_server_t *self, long enabled);
// Self test of this class
int
View
@@ -497,7 +497,7 @@ control_message (client_t *self)
. elsif type = "number"
char *$(name)_string = zmsg_popstr (msg);
long $(name) = atoi ($(name)_string);
- free ($(name_string));
+ free ($(name)_string);
. endif
. endfor
$(string.trim (method.?''):block )
View
@@ -133,7 +133,11 @@ else
<action name = "format icanhaz command">
fmq_msg_path_set (self->request, self->sub->path);
-fmq_msg_cache_set (self->request, zhash_dup (self->sub->cache));
+// If client app wants full resync, send cache to server
+if (atoi (fmq_config_resolve (self->config, "client/resync", "0")) == 1) {
+ fmq_msg_options_insert (self->request, "RESYNC", "1");
+ fmq_msg_cache_set (self->request, sub_cache (self->sub));
+}
</action>
<action name = "refill credit as needed">
@@ -220,16 +224,11 @@ while (self->sub) {
// We'll do better error handling later
assert (*path == '/');
-// Get directory cache for this path
-char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
-fmq_dir_t *dir = fmq_dir_new (path + 1, inbox);
-zhash_t *cache = dir? fmq_dir_cache (dir): NULL;
-fmq_dir_destroy (&dir);
-
// New subscription, store it for later replay
-self->sub = sub_new (self, path, cache);
-zlist_append (self->subs, self->sub);
+char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
zclock_log ("I: subscribe to %s as %s%s", path, inbox, path);
+self->sub = sub_new (self, inbox, path);
+zlist_append (self->subs, self->sub);
// If we're connected, then also send to server
if (self->connected)
@@ -241,6 +240,12 @@ if (self->connected)
fmq_config_path_set (self->config, "client/inbox", path);
</method>
+<method name = "set resync">
+<argument name = "enabled" type = "number" />
+// Request resynchronization from server
+fmq_config_path_set (self->config, "client/resync", enabled? "1" :"0");
+</method>
+
<selftest config = "client_test.cfg">
<init>
fmq_client_connect (self, "tcp://localhost:6001");
View
@@ -1,16 +1,18 @@
<declare>
// Subscription in memory
typedef struct {
+ client_t *client; // Pointer to parent client
+ char *inbox; // Inbox location
char *path; // Path we subscribe to
- zhash_t *cache; // Existing files, as digests
} sub_t;
static sub_t *
-sub_new (client_t *client, char *path, zhash_t *cache)
+sub_new (client_t *client, char *inbox, char *path)
{
sub_t *self = (sub_t *) zmalloc (sizeof (sub_t));
+ self->client = client;
+ self->inbox = strdup (inbox);
self->path = strdup (path);
- self->cache = cache; // Take ownership
return self;
}
@@ -20,10 +22,22 @@ sub_destroy (sub_t **self_p)
assert (self_p);
if (*self_p) {
sub_t *self = *self_p;
- zhash_destroy (&self->cache);
+ free (self->inbox);
free (self->path);
free (self);
*self_p = NULL;
}
}
+
+// Return new cache object for subscription path
+
+static zhash_t *
+sub_cache (sub_t *self)
+{
+ // Get directory cache for this path
+ fmq_dir_t *dir = fmq_dir_new (self->path + 1, self->inbox);
+ zhash_t *cache = dir? fmq_dir_cache (dir): NULL;
+ fmq_dir_destroy (&dir);
+ return cache;
+}
</declare>
View
@@ -306,9 +306,9 @@ zlist_append (self->mounts, mount);
</method>
<method name = "set anonymous">
-<argument name = "access" type = "number" />
+<argument name = "enabled" type = "number" />
// Enable anonymous access without a config file
-fmq_config_path_set (self->config, "security/anonymous", access? "1" :"0");
+fmq_config_path_set (self->config, "security/anonymous", enabled? "1" :"0");
</method>
<include filename = "fmq_server_selftest.xml" />
View
@@ -66,32 +66,12 @@ mount_refresh (mount_t *self, server_t *server)
// Get latest snapshot and build a patches list for any changes
fmq_dir_t *latest = fmq_dir_new (self->location, NULL);
- zlist_t *patches = fmq_dir_diff (self->dir, latest);
+ zlist_t *patches = fmq_dir_diff (self->dir, latest, self->alias);
// Drop old directory and replace with latest version
fmq_dir_destroy (&self->dir);
self->dir = latest;
- // Compute virtual file names for all patches
- fmq_patch_t *patch = (fmq_patch_t *) zlist_first (patches);
- while (patch) {
- // Get filename without path
- char *filename = fmq_file_name (fmq_patch_file (patch),
- fmq_patch_path (patch));
- assert (*filename != '/');
-
- // Now put mount alias in front of that
- char *virtual = malloc (strlen (self->alias) + strlen (filename) + 2);
- strcpy (virtual, self->alias);
- if (virtual [strlen (virtual) - 1] != '/')
- strcat (virtual, "/");
- strcat (virtual, filename);
- fmq_patch_virtual_set (patch, virtual);
- free (virtual);
-
- patch = (fmq_patch_t *) zlist_next (patches);
- }
-
// Copy new patches to clients' patches list
sub_t *sub = (sub_t *) zlist_first (self->subs);
while (sub) {
@@ -103,6 +83,12 @@ mount_refresh (mount_t *self, server_t *server)
}
sub = (sub_t *) zlist_next (self->subs);
}
+
+ // Destroy patches, they've all been copied
+ while (zlist_size (patches)) {
+ fmq_patch_t *patch = (fmq_patch_t *) zlist_pop (patches);
+ fmq_patch_destroy (&patch);
+ }
zlist_destroy (&patches);
return activity;
}
@@ -139,6 +125,17 @@ mount_sub_store (mount_t *self, client_t *client, fmq_msg_t *request)
// New subscription for this client, append to our list
sub = sub_new (client, path, fmq_msg_cache (request));
zlist_append (self->subs, sub);
+
+ // If client requested resync, send full mount contents now
+ if (fmq_msg_options_number (client->request, "RESYNC", 0) == 1) {
+ zlist_t *patches = fmq_dir_resync (self->dir, self->alias);
+ while (zlist_size (patches)) {
+ fmq_patch_t *patch = (fmq_patch_t *) zlist_pop (patches);
+ sub_patch_add (sub, patch);
+ fmq_patch_destroy (&patch);
+ }
+ zlist_destroy (&patches);
+ }
}
View
@@ -66,22 +66,24 @@ static void
sub_patch_add (sub_t *self, fmq_patch_t *patch)
{
// Skip file creation if client already has identical file
+ fmq_patch_digest_set (patch);
if (fmq_patch_op (patch) == patch_create) {
- char *hashstr = zhash_lookup (self->cache, fmq_patch_virtual (patch));
- if (hashstr && strcasecmp (hashstr, fmq_patch_hashstr (patch)) == 0)
+ char *digest = zhash_lookup (self->cache, fmq_patch_virtual (patch));
+ if (digest && strcasecmp (digest, fmq_patch_digest (patch)) == 0)
return; // Just skip patch for this client
}
// Remove any previous patches for the same file
fmq_patch_t *existing = (fmq_patch_t *) zlist_first (self->client->patches);
while (existing) {
- if (streq (fmq_file_name (fmq_patch_file (patch), NULL),
- fmq_file_name (fmq_patch_file (existing), NULL))) {
+ if (streq (fmq_patch_virtual (patch), fmq_patch_virtual (existing))) {
zlist_remove (self->client->patches, existing);
fmq_patch_destroy (&existing);
break;
}
existing = (fmq_patch_t *) zlist_next (self->client->patches);
}
+ // Track that we've queued patch for client, so we don't do it twice
+ zhash_insert (self->cache, fmq_patch_digest (patch), fmq_patch_virtual (patch));
zlist_append (self->client->patches, fmq_patch_dup (patch));
}
</declare>
View
@@ -1,8 +1,9 @@
# Configure server for plain access
#
client
- heartbeat = 1 # Heartbeat to server
+ heartbeat = 1 # Interval in seconds
inbox = ./fmqroot/recv
+ resync = 1
# inbox
# path = ./some/directory
Oops, something went wrong.

0 comments on commit 4eab7c6

Please sign in to comment.