Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fmq_client now allows multiple server connections

- initialize action removed from state machine models; no longer used
- client_c.gsl manages multiple server instances independently
- subscriptions no longer sent after connection
- java models & scripts updated a little but not fully
  • Loading branch information...
commit eadd1243c01444f00ac6cf004ce66d163df32889 1 parent dc57e44
Pieter Hintjens hintjens authored
4 include/fmq.h
View
@@ -37,8 +37,8 @@
FMQ_MAKE_VERSION(FMQ_VERSION_MAJOR, FMQ_VERSION_MINOR, FMQ_VERSION_PATCH)
#include <czmq.h>
-#if CZMQ_VERSION < 10302
-# error "FileMQ needs CZMQ/1.3.2 or later"
+#if CZMQ_VERSION < 10304
+# error "FileMQ needs CZMQ/1.3.4 or later"
#endif
// Maximum length of a path + filename
2  include/fmq_client.h
View
@@ -49,7 +49,7 @@ void
void
fmq_client_setoption (fmq_client_t *self, const char *path, const char *value);
-// Open connection to server
+// Create outgoing connection to server
void
fmq_client_connect (fmq_client_t *self, const char *endpoint);
12 java/model/fmq_client.xml
View
@@ -10,7 +10,7 @@ private static final int CREDIT_SLICE = 1000000;
private static final int CREDIT_MINIMUM = (CREDIT_SLICE * 4) + 1;
</declare>
-<self>
+<client>
<context>
private boolean connected; // Are we connected to server?
private List &lt;Sub&gt; subs; // Subscriptions
@@ -19,25 +19,19 @@ private int credit; // Current credit pending
private FmqFile file; // File we're writing to
private Iterator &lt;Sub&gt; subIterator;
</context>
-
<construct>
subs = new ArrayList &lt;Sub&gt; ();
connected = false;
</construct>
-
<destruct>
for (Sub sub: subs)
sub.destroy ();
</destruct>
-</self>
+</client>
<!-- Embedded class for subscriptions -->
<include filename = "fmq_client_sub.xml" />
-<action name = "initialize the client">
-next_event = Event.ready_event;
-</action>
-
<action name = "try security mechanism">
String login = config.resolve ("security/plain/login", "guest");
String password = config.resolve ("security/plain/password", "");
@@ -146,7 +140,7 @@ System.out.println ("E: server claims we sent an invalid message");
System.out.println ("E: protocol error");
</action>
-<action name = "terminate the client">
+<action name = "terminate the server">
connected = false;
next_event = Event.terminate_event;
</action>
16 java/model/fmq_server.xml
View
@@ -9,22 +9,20 @@ FileMQ protocol server
private static final int CHUNK_SIZE = 1000000;
</declare>
-<self>
+<server>
<context>
private List &lt;Mount&gt; mounts; // Mount points
private int port; // Server port
</context>
-
<construct>
mounts = new ArrayList &lt;Mount&gt; ();
</construct>
-
<destruct>
// Destroy mount points
for (Mount mount : mounts)
mount.destroy ();
</destruct>
-</self>
+</server>
<client>
<context>
@@ -35,11 +33,9 @@ private FmqFile file; // Current file we're sending
private long offset; // Offset of next read in file
private long sequence; // Sequence number for chunk
</context>
-
<construct>
patches = new ArrayList &lt;FmqPatch&gt; ();
</construct>
-
<destruct>
for (FmqPatch patch : patches)
patch.destroy ();
@@ -81,13 +77,13 @@ client.next_event = Event.foe_event;
String [] result = new String [2];
String login, password;
if (client.request.mechanism ().equals ("PLAIN")
-&amp;&amp; FmqSasl.plainDecode (client.request.response (), result)) {
+&& FmqSasl.plainDecode (client.request.response (), result)) {
login = result [0];
password = result [1];
FmqConfig account = config.locate ("security/plain/account");
while (account != null) {
if (account.resolve ("login", "").equals (login)
- &amp;&amp; account.resolve ("password", "").equals (password)) {
+ && account.resolve ("password", "").equals (password)) {
client.next_event = Event.friend_event;
break;
}
@@ -105,7 +101,7 @@ for (Mount check : mounts) {
// If check->alias is prefix of path and alias is
// longer than current mount then we have a new mount
if (path.startsWith (check.alias)
- &amp;&amp; check.alias.length () > mount.alias.length ())
+ && check.alias.length () > mount.alias.length ())
mount = check;
}
mount.storeSub (client, client.request);
@@ -177,7 +173,7 @@ if (client.patch.op () == FmqPatch.OP.patch_create) {
assert (chunk != null);
// Check if we have the credit to send chunk
- if (chunk.size () &lt;= client.credit) {
+ if (chunk.size () <= client.credit) {
client.reply.setSequence (client.sequence++);
client.reply.setOperation (FmqMsg.FMQ_MSG_FILE_CREATE);
client.reply.setOffset (client.offset);
2  java/model/fmq_server_sub.xml
View
@@ -49,7 +49,7 @@ private static class Sub {
patch.setDigest ();
if (patch.op () == FmqPatch.OP.patch_create) {
String digest = cache.get (patch.virtual ());
- if (digest != null &amp;&amp; digest.compareToIgnoreCase (patch.digest ()) == 0)
+ if (digest != null && digest.compareToIgnoreCase (patch.digest ()) == 0)
return; // Just skip patch for this client
}
// Remove any previous patches for the same file
55 java/src/main/java/org/filemq/FmqClient.java
View
@@ -169,7 +169,7 @@ public void setResync (long enabled)
private enum Event {
terminate_event (-1),
- ready_event (1),
+ initialize_event (1),
srsly_event (2),
rtfm_event (3),
_other_event (4),
@@ -179,9 +179,8 @@ public void setResync (long enabled)
finished_event (8),
cheezburger_event (9),
hugz_event (10),
- subscribe_event (11),
- send_credit_event (12),
- icanhaz_ok_event (13);
+ send_credit_event (11),
+ icanhaz_ok_event (12);
@SuppressWarnings ("unused")
private final int event;
@@ -236,12 +235,6 @@ private void destroy ()
// Properties accessible to client actions
private Event next_event; // Next event
- private boolean connected; // Are we connected to server?
- private List <Sub> subs; // Subscriptions
- private Sub sub; // Subscription we want to send
- private int credit; // Current credit pending
- private FmqFile file; // File we're writing to
- private Iterator <Sub> subIterator;
// Properties you should NOT touch
private ZContext ctx; // Own CZMQ context
private Socket pipe; // Socket to back to caller
@@ -268,8 +261,6 @@ private Client (ZContext ctx, Socket pipe)
this.config = new FmqConfig ("root", null);
config ();
- subs = new ArrayList <Sub> ();
- connected = false;
}
private void destroy ()
{
@@ -279,8 +270,6 @@ private void destroy ()
request.destroy ();
if (reply != null)
reply.destroy ();
- for (Sub sub: subs)
- sub.destroy ();
}
// Apply configuration tree:
@@ -338,11 +327,6 @@ private void applyConfig ()
// Custom actions for state machine
- private void initializeTheClient ()
- {
- next_event = Event.ready_event;
- }
-
private void trySecurityMechanism ()
{
String login = config.resolve ("security/plain/login", "guest");
@@ -461,7 +445,7 @@ private void logProtocolError ()
System.out.println ("E: protocol error");
}
- private void terminateTheClient ()
+ private void terminateTheServer ()
{
connected = false;
next_event = Event.terminate_event;
@@ -477,7 +461,7 @@ private void execute (Event event)
next_event = null;
switch (state) {
case start_state:
- if (event == Event.ready_event) {
+ if (event == Event.initialize_event) {
request.setId (FmqMsg.OHAI);
request.send (dealer);
request = new FmqMsg (0);
@@ -486,18 +470,18 @@ private void execute (Event event)
else
if (event == Event.srsly_event) {
logAccessDenied ();
- terminateTheClient ();
+ terminateTheServer ();
state = State.start_state;
}
else
if (event == Event.rtfm_event) {
logInvalidMessage ();
- terminateTheClient ();
+ terminateTheServer ();
}
else {
// Process all other events
logProtocolError ();
- terminateTheClient ();
+ terminateTheServer ();
}
break;
@@ -518,13 +502,13 @@ private void execute (Event event)
else
if (event == Event.srsly_event) {
logAccessDenied ();
- terminateTheClient ();
+ terminateTheServer ();
state = State.start_state;
}
else
if (event == Event.rtfm_event) {
logInvalidMessage ();
- terminateTheClient ();
+ terminateTheServer ();
}
else {
// Process all other events
@@ -548,18 +532,18 @@ private void execute (Event event)
else
if (event == Event.srsly_event) {
logAccessDenied ();
- terminateTheClient ();
+ terminateTheServer ();
state = State.start_state;
}
else
if (event == Event.rtfm_event) {
logInvalidMessage ();
- terminateTheClient ();
+ terminateTheServer ();
}
else {
// Process all other events
logProtocolError ();
- terminateTheClient ();
+ terminateTheServer ();
}
break;
@@ -575,13 +559,6 @@ private void execute (Event event)
request = new FmqMsg (0);
}
else
- if (event == Event.subscribe_event) {
- formatIcanhazCommand ();
- request.setId (FmqMsg.ICANHAZ);
- request.send (dealer);
- request = new FmqMsg (0);
- }
- else
if (event == Event.send_credit_event) {
request.setId (FmqMsg.NOM);
request.send (dealer);
@@ -593,18 +570,18 @@ private void execute (Event event)
else
if (event == Event.srsly_event) {
logAccessDenied ();
- terminateTheClient ();
+ terminateTheServer ();
state = State.start_state;
}
else
if (event == Event.rtfm_event) {
logInvalidMessage ();
- terminateTheClient ();
+ terminateTheServer ();
}
else {
// Process all other events
logProtocolError ();
- terminateTheClient ();
+ terminateTheServer ();
}
break;
7 java/src/main/java/org/filemq/FmqServer.java
View
@@ -480,8 +480,6 @@ private static void clientPing (Map <String, Client> clients , Server server)
private static class Server {
// Properties accessible to client actions
- private List <Mount> mounts; // Mount points
- private int port; // Server port
// Properties you should NOT touch
private ZContext ctx; // Own CZMQ context
@@ -495,7 +493,6 @@ private static void clientPing (Map <String, Client> clients , Server server)
private int heartbeat; // Heartbeat for clients
// Server methods
-
private void config ()
{
// Get standard server configuration
@@ -514,7 +511,6 @@ private Server (ZContext ctx, Socket pipe)
clients = new HashMap <String, Client> ();
config = new FmqConfig ("root", null);
config ();
- mounts = new ArrayList <Mount> ();
}
private void destroy ()
@@ -523,9 +519,6 @@ private void destroy ()
config.destroy ();
for (Client c: clients.values ())
c.destroy ();
- // Destroy mount points
- for (Mount mount : mounts)
- mount.destroy ();
}
// Apply configuration tree:
85 model/fmq_client.xml
View
@@ -3,27 +3,22 @@ This is the FILEMQ/1.0 client protocol handler
<include filename = "license.xml" />
<include filename = "fmq_client_fsm.xml" />
-<!-- Server and client contexts -->
<declare>
// There's no point making these configurable
#define CREDIT_SLICE 1000000
#define CREDIT_MINIMUM (CREDIT_SLICE * 4) + 1
</declare>
+<!-- Client and server contexts -->
+<!-- self is the client, server is each outgoing connection -->
<self>
<context>
-bool connected; // Are we connected to server?
zlist_t *subs; // Subscriptions
sub_t *sub; // Subscription we want to send
-size_t credit; // Current credit pending
-fmq_file_t *file; // File we're writing to
</context>
-
<construct>
self->subs = zlist_new ();
-self->connected = false;
</construct>
-
<destruct>
// Destroy subscriptions
while (zlist_size (self->subs)) {
@@ -34,98 +29,104 @@ zlist_destroy (&self->subs);
</destruct>
</self>
+<server>
+<context>
+size_t credit; // Current credit pending
+fmq_file_t *file; // File we're writing to
+</context>
+<construct>
+</construct>
+<destruct>
+</destruct>
+</server>
+
<!-- Embedded class for subscriptions -->
<include filename = "fmq_client_sub.xml" />
-<action name = "initialize the client">
-self->next_event = ready_event;
-</action>
-
<action name = "try security mechanism">
char *login = fmq_config_resolve (self->config, "security/plain/login", "guest");
char *password = fmq_config_resolve (self->config, "security/plain/password", "");
zframe_t *frame = fmq_sasl_plain_encode (login, password);
-fmq_msg_mechanism_set (self->request, "PLAIN");
-fmq_msg_response_set (self->request, frame);
+fmq_msg_mechanism_set (server->request, "PLAIN");
+fmq_msg_response_set (server->request, frame);
</action>
<action name = "connected to server">
-self->connected = true;
</action>
<action name = "get first subscription">
self->sub = (sub_t *) zlist_first (self->subs);
if (self->sub)
- self->next_event = ok_event;
+ server->next_event = ok_event;
else
- self->next_event = finished_event;
+ server->next_event = finished_event;
</action>
<action name = "get next subscription">
self->sub = (sub_t *) zlist_next (self->subs);
if (self->sub)
- self->next_event = ok_event;
+ server->next_event = ok_event;
else
- self->next_event = finished_event;
+ server->next_event = finished_event;
</action>
<action name = "format icanhaz command">
-fmq_msg_path_set (self->request, self->sub->path);
+fmq_msg_path_set (server->request, self->sub->path);
// If client app wants full resync, send cache to server
if (atoi (fmq_config_resolve (self->config, "client/resync", "0")) == 1) {
- fmq_msg_options_insert (self->request, "RESYNC", "1");
- fmq_msg_cache_set (self->request, sub_cache (self->sub));
+ fmq_msg_options_insert (server->request, "RESYNC", "1");
+ fmq_msg_cache_set (server->request, sub_cache (self->sub));
}
</action>
<action name = "refill credit as needed">
// If credit has fallen too low, send more credit
size_t credit_to_send = 0;
-while (self->credit < CREDIT_MINIMUM) {
+while (server->credit < CREDIT_MINIMUM) {
credit_to_send += CREDIT_SLICE;
- self->credit += CREDIT_SLICE;
+ server->credit += CREDIT_SLICE;
}
if (credit_to_send) {
- fmq_msg_credit_set (self->request, credit_to_send);
- self->next_event = send_credit_event;
+ fmq_msg_credit_set (server->request, credit_to_send);
+ server->next_event = send_credit_event;
}
</action>
<action name = "process the patch">
char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
-char *filename = fmq_msg_filename (self->reply);
+char *filename = fmq_msg_filename (server->reply);
// Filenames from server must start with slash, which we skip
assert (*filename == '/');
filename++;
-if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_CREATE) {
- if (self->file == NULL) {
- self->file = fmq_file_new (inbox, filename);
- if (fmq_file_output (self->file)) {
+if (fmq_msg_operation (server->reply) == FMQ_MSG_FILE_CREATE) {
+ if (server->file == NULL) {
+ server->file = fmq_file_new (inbox, filename);
+ if (fmq_file_output (server->file)) {
// File not writeable, skip patch
- fmq_file_destroy (&self->file);
+ fmq_file_destroy (&server->file);
return;
}
}
// Try to write, ignore errors in this version
- zframe_t *frame = fmq_msg_chunk (self->reply);
+ zframe_t *frame = fmq_msg_chunk (server->reply);
fmq_chunk_t *chunk = fmq_chunk_new (zframe_data (frame), zframe_size (frame));
if (fmq_chunk_size (chunk) > 0) {
- fmq_file_write (self->file, chunk, fmq_msg_offset (self->reply));
- self->credit -= fmq_chunk_size (chunk);
+ fmq_file_write (server->file, chunk, fmq_msg_offset (server->reply));
+ server->credit -= fmq_chunk_size (chunk);
}
else {
// Zero-sized chunk means end of file, so report back to caller
zstr_sendm (self->pipe, "DELIVER");
zstr_sendm (self->pipe, filename);
zstr_sendf (self->pipe, "%s/%s", inbox, filename);
- fmq_file_destroy (&self->file);
+ fmq_file_destroy (&server->file);
}
fmq_chunk_destroy (&chunk);
}
else
-if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_DELETE) {
+if (fmq_msg_operation (server->reply) == FMQ_MSG_FILE_DELETE) {
zclock_log ("I: delete %s/%s", inbox, filename);
fmq_file_t *file = fmq_file_new (inbox, filename);
fmq_file_remove (file);
@@ -145,11 +146,11 @@ puts ("E: server claims we sent an invalid message");
puts ("E: protocol error");
</action>
-<action name = "terminate the client">
-self->connected = false;
-self->next_event = terminate_event;
+<action name = "terminate the server">
+server->next_event = terminate_event;
</action>
+<!-- Must happen before any connects -->
<method name = "subscribe">
<argument name = "path" type = "string" />
// Store subscription along with any previous ones
@@ -169,9 +170,9 @@ char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
self->sub = sub_new (self, inbox, path);
zlist_append (self->subs, self->sub);
-// If we're connected, then also send to server
-if (self->connected)
- self->next_event = subscribe_event;
+<!-- disabled subscription after connect -->
+<!-- if (server->connected) -->
+<!-- server->next_event = subscribe_event; -->
</method>
<method name = "set inbox">
10 model/fmq_client_fsm.xml
View
@@ -1,5 +1,5 @@
<state name = "start">
- <event name = "ready" next = "requesting access">
+ <event name = "initialize" next = "requesting access">
<action name = "send" message ="OHAI" />
</event>
</state>
@@ -36,10 +36,12 @@
<event name = "HUGZ">
<action name = "send" message = "HUGZ-OK" />
</event>
+ <!-- Subscription after connection not supported anymore
<event name = "subscribe">
<action name = "format icanhaz command" />
<action name = "send" message = "ICANHAZ" />
</event>
+ -->
<event name = "send credit">
<action name = "send" message = "NOM" />
</event>
@@ -49,14 +51,14 @@
<state name = "defaults">
<event name = "SRSLY" next = "start">
<action name = "log access denied" />
- <action name = "terminate the client" />
+ <action name = "terminate the server" />
</event>
<event name = "RTFM">
<action name = "log invalid message" />
- <action name = "terminate the client" />
+ <action name = "terminate the server" />
</event>
<event name = "$other">
<action name = "log protocol error" />
- <action name = "terminate the client" />
+ <action name = "terminate the server" />
</event>
</state>
8 model/fmq_client_sub.xml
View
@@ -1,10 +1,14 @@
+<declare type = "forward">
+// Forward declarations
+typedef struct _sub_t sub_t;
+</declare>
<declare>
// Subscription in memory
-typedef struct {
+struct _sub_t {
client_t *client; // Pointer to parent client
char *inbox; // Inbox location
char *path; // Path we subscribe to
-} sub_t;
+};
static sub_t *
sub_new (client_t *client, char *inbox, char *path)
10 model/fmq_server.xml
View
@@ -3,23 +3,21 @@ FileMQ protocol server
<include filename = "license.xml" />
<include filename = "fmq_server_fsm.xml" />
-
-<!-- Server and client contexts -->
-<declare>
+<declare type = "forward">
// There's no point making these configurable
#define CHUNK_SIZE 1000000
</declare>
+<!-- Server and client contexts -->
+<!-- self is the server, client is each incoming connection -->
<self>
<context>
zlist_t *mounts; // Mount points
int port; // Server port
</context>
-
<construct>
self->mounts = zlist_new ();
</construct>
-
<destruct>
// Destroy mount points
while (zlist_size (self->mounts)) {
@@ -39,11 +37,9 @@ fmq_file_t *file; // Current file we're sending
off_t offset; // Offset of next read in file
int64_t sequence; // Sequence number for chunk
</context>
-
<construct>
self->patches = zlist_new ();
</construct>
-
<destruct>
while (zlist_size (self->patches)) {
fmq_patch_t *patch = (fmq_patch_t *) zlist_pop (self->patches);
8 model/fmq_server_mount.xml
View
@@ -1,13 +1,17 @@
+<declare type = "forward">
+// Forward declarations
+typedef struct _mount_t mount_t;
+</declare>
<declare>
// --------------------------------------------------------------------------
// Mount point in memory
-typedef struct {
+struct _mount_t {
char *location; // Physical location
char *alias; // Alias into our tree
fmq_dir_t *dir; // Directory snapshot
zlist_t *subs; // Client subscriptions
-} mount_t;
+};
// --------------------------------------------------------------------------
8 model/fmq_server_sub.xml
View
@@ -1,12 +1,16 @@
+<declare type = "forward">
+// Forward declarations
+typedef struct _sub_t sub_t;
+</declare>
<declare>
// --------------------------------------------------------------------------
// Subscription object
-typedef struct {
+struct _sub_t {
client_t *client; // Always refers to live client
char *path; // Path client is subscribed to
zhash_t *cache; // Client's cache list
-} sub_t;
+};
static int
s_resolve_cache_path (const char *key, void *item, void *argument);
4 model/generate
View
@@ -2,7 +2,3 @@ export PATH=../scripts:$PATH
gsl -q -script:codec_c fmq_msg.xml
gsl -q -script:server_c -trace:0 -animate:0 fmq_server.xml
gsl -q -script:client_c -trace:0 -animate:0 fmq_client.xml
-
-gsl -q -script:codec_java -package:filemq fmq_msg.xml
-gsl -q -script:server_java -package:filemq -trace:0 -animate:0 fmq_server.xml
-gsl -q -script:client_java -package:filemq -trace:0 -animate:0 fmq_client.xml
372 scripts/client_c.gsl
View
@@ -41,7 +41,7 @@ void
void
$(class.name)_setoption ($(class.name)_t *self, const char *path, const char *value);
-// Open connection to server
+// Create outgoing connection to server
void
$(class.name)_connect ($(class.name)_t *self, const char *endpoint);
@@ -173,7 +173,7 @@ $(class.name)_setoption ($(class.name)_t *self, const char *path, const char *va
// --------------------------------------------------------------------------
-// Open connection to server
+// Create outgoing connection to server
void
$(class.name)_connect ($(class.name)_t *self, const char *endpoint)
@@ -309,10 +309,14 @@ s_event_name [] = {
.endif
+// Maximum number of server connections we allow
+#define MAX_SERVERS 256
+
// Forward declarations
typedef struct _client_t client_t;
+typedef struct _server_t server_t;
-.for class.declare
+.for class.declare where type ?= "forward"
$(string.trim (declare.?''):)
.endfor
@@ -322,28 +326,99 @@ $(string.trim (declare.?''):)
struct _client_t {
// Properties accessible to client actions
- event_t next_event; // Next event
.for class.self
. for context
$(string.trim (context.?''):block )
. endfor
.endfor
-
// Properties you should NOT touch
zctx_t *ctx; // Own CZMQ context
void *pipe; // Socket to back to caller
- void *dealer; // Socket to talk to server
- bool stopped; // Has client stopped?
+ server_t *servers [MAX_SERVERS];
+ // Server connections
+ uint nbr_servers; // How many connections we have
+ bool dirty; // If true, rebuild pollset
+ bool stopped; // Is the client stopped?
fmq_config_t *config; // Configuration tree
+ int heartbeat; // Heartbeat interval
+};
+
+// ---------------------------------------------------------------------
+// Context for each server connection
+
+struct _server_t {
+ // Properties accessible to server actions
+ event_t next_event; // Next event
+.for class.server
+. for context
+ $(string.trim (context.?''):block )
+. endfor
+.endfor
+ // Properties you should NOT touch
+ zctx_t *ctx; // Own CZMQ context
+ uint index; // Index into client->server_array
+ void *dealer; // Socket to talk to server
+ int64_t expires_at; // Connection expires at
state_t state; // Current state
event_t event; // Current event
+ char *endpoint; // Server endpoint
$(codec)_t *request; // Next message to send
$(codec)_t *reply; // Last received reply
- int heartbeat; // Heartbeat interval
- int64_t expires_at; // Server expires at
};
static void
+client_server_execute (client_t *self, server_t *server, int event);
+
+.for class.declare where !defined (type)
+$(string.trim (declare.?''):)
+
+.endfor
+
+// Server methods
+
+static server_t *
+server_new (zctx_t *ctx, char *endpoint)
+{
+ server_t *self = (server_t *) zmalloc (sizeof (server_t));
+ self->ctx = ctx;
+ self->endpoint = strdup (endpoint);
+ self->dealer = zsocket_new (self->ctx, ZMQ_DEALER);
+ self->request = $(codec)_new (0);
+ zsocket_connect (self->dealer, endpoint);
+.for class.state where item () = 1
+ self->state = $(name:c)_state;
+.endfor
+.for class.server
+. for construct
+ $(string.trim (construct.?''):block )
+. endfor
+.endfor
+ return self;
+}
+
+static void
+server_destroy (server_t **self_p)
+{
+ assert (self_p);
+ if (*self_p) {
+ server_t *self = *self_p;
+ zsocket_destroy (self->ctx, self->dealer);
+ $(codec)_destroy (&self->request);
+ $(codec)_destroy (&self->reply);
+ free (self->endpoint);
+.for class.server
+. for destruct
+ $(string.trim (destruct.?''):block )
+. endfor
+.endfor
+ free (self);
+ *self_p = NULL;
+ }
+}
+
+// Client methods
+
+static void
client_config_self (client_t *self)
{
// Get standard client configuration
@@ -374,8 +449,11 @@ client_destroy (client_t **self_p)
if (*self_p) {
client_t *self = *self_p;
fmq_config_destroy (&self->config);
- $(codec)_destroy (&self->request);
- $(codec)_destroy (&self->reply);
+ int server_nbr;
+ for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++) {
+ server_t *server = self->servers [server_nbr];
+ server_destroy (&server);
+ }
.for class.self
. for destruct
$(string.trim (destruct.?''):block )
@@ -390,7 +468,6 @@ client_destroy (client_t **self_p)
// * apply client configuration
// * print any echo items in top-level sections
// * apply sections that match methods
-
static void
client_apply_config (client_t *self)
{
@@ -403,7 +480,7 @@ client_apply_config (client_t *self)
zclock_log (fmq_config_value (entry));
entry = fmq_config_next (entry);
}
-.for class.method
+.for class.method where count (argument) > 0
if (streq (fmq_config_name (section), "$(name:c)")) {
. for argument
. if type = "string"
@@ -423,31 +500,102 @@ client_apply_config (client_t *self)
client_config_self (self);
}
+// Process message from pipe
+static void
+client_control_message (client_t *self)
+{
+ zmsg_t *msg = zmsg_recv (self->pipe);
+ char *method = zmsg_popstr (msg);
+.for class.method
+ if (streq (method, "$(NAME)")) {
+. for argument
+. if type = "string"
+ char *$(name) = zmsg_popstr (msg);
+. elsif type = "number"
+ char *$(name)_string = zmsg_popstr (msg);
+ long $(name) = atoi ($(name)_string);
+ free ($(name)_string);
+. endif
+. endfor
+ $(string.trim (method.?''):block )
+. for method.return
+ $(string.trim (return.?''):block )
+. endfor
+. for argument where type = "string"
+ free ($(name));
+. endfor
+ }
+ else
+.endfor
+ if (streq (method, "CONFIG")) {
+ char *config_file = zmsg_popstr (msg);
+ fmq_config_destroy (&self->config);
+ self->config = fmq_config_load (config_file);
+ if (self->config)
+ client_apply_config (self);
+ else {
+ printf ("E: cannot load config file '%s'\\n", config_file);
+ self->config = fmq_config_new ("root", NULL);
+ }
+ free (config_file);
+ }
+ else
+ if (streq (method, "SETOPTION")) {
+ char *path = zmsg_popstr (msg);
+ char *value = zmsg_popstr (msg);
+ fmq_config_path_set (self->config, path, value);
+ client_config_self (self);
+ free (path);
+ free (value);
+ }
+ else
+ if (streq (method, "STOP")) {
+ zstr_send (self->pipe, "OK");
+ self->stopped = true;
+ }
+ else
+ if (streq (method, "CONNECT")) {
+ char *endpoint = zmsg_popstr (msg);
+ if (self->nbr_servers < MAX_SERVERS) {
+ server_t *server = server_new (self->ctx, endpoint);
+ self->servers [self->nbr_servers++] = server;
+ self->dirty = true;
+ client_server_execute (self, server, initialize_event);
+ }
+ else
+ printf ("E: too many server connections (max %d)\\n", MAX_SERVERS);
+
+ free (endpoint);
+ }
+ free (method);
+ zmsg_destroy (&msg);
+}
+
.macro output_event_body
. for action
. if name = "send"
. if switches.animate ?= 1
zclock_log ("C: + send $(MESSAGE:C)");
. endif
- $(codec)_id_set (self->request, $(CODEC)_$(MESSAGE:C));
+ $(codec)_id_set (server->request, $(CODEC)_$(MESSAGE:C));
.if switches.trace ?= 1
zclock_log ("Send request to server");
- $(codec)_dump (self->request);
+ $(codec)_dump (server->request);
.endif
- $(codec)_send (&self->request, self->dealer);
- self->request = $(codec)_new (0);
+ $(codec)_send (&server->request, server->dealer);
+ server->request = $(codec)_new (0);
. else
. if switches.animate ?= 1
zclock_log ("C: + $(name)");
. endif
- $(name:c) (self);
+ $(name:c) (self, server);
. if count (class.action, name = -1.name) = 0
. echo 'E: you need to add <action name="$(name)">'
. endif
. endif
. endfor
. if defined (event.next)
- self->state = $(next:c)_state;
+ server->state = $(next:c)_state;
. endif
.endmacro
.#
@@ -457,33 +605,32 @@ client_apply_config (client_t *self)
. endif
static void
-$(name:c) (client_t *self)
+$(name:c) (client_t *self, server_t *server)
{
$(string.trim (action.?''):block )
}
.endfor
// Execute state machine as long as we have events
-
static void
-client_execute (client_t *self, int event)
+client_server_execute (client_t *self, server_t *server, int event)
{
- self->next_event = event;
- while (self->next_event) {
- self->event = self->next_event;
- self->next_event = 0;
+ server->next_event = event;
+ while (server->next_event) {
+ server->event = server->next_event;
+ server->next_event = 0;
.if switches.animate ?= 1
- zclock_log ("C: %s:", s_state_name [self->state]);
- zclock_log ("C: (%s)", s_event_name [self->event]);
+ zclock_log ("C: %s:", s_state_name [server->state]);
+ zclock_log ("C: (%s)", s_event_name [server->event]);
.endif
- switch (self->state) {
+ switch (server->state) {
.for class.state
case $(name:c)_state:
. for event where name <> "$other"
. if index () > 1
else
. endif
- if (self->event == $(name:c)_event) {
+ if (server->event == $(name:c)_event) {
. output_event_body ()
}
. endfor
@@ -498,163 +645,76 @@ client_execute (client_t *self, int event)
.endfor
}
.if switches.animate ?= 1
- zclock_log ("C: -------------------> %s", s_state_name [self->state]);
+ zclock_log ("C: -------------------> %s", s_state_name [server->state]);
.endif
- if (self->next_event == terminate_event) {
- self->stopped = true;
+ if (server->next_event == terminate_event) {
+ // Automatically calls server_destroy
+ // reset state machine
break;
}
}
}
-// Restart client dialog from zero
-
-static void
-client_restart (client_t *self, char *endpoint)
-{
- // Reconnect to new endpoint if specified
- if (endpoint) {
- if (self->dealer)
- zsocket_destroy (self->ctx, self->dealer);
- self->dealer = zsocket_new (self->ctx, ZMQ_DEALER);
- zsocket_connect (self->dealer, endpoint);
- }
- // Clear out any previous request data
- $(codec)_destroy (&self->request);
- self->request = $(codec)_new (0);
-
- // Restart dialog state machine from zero
-.for class.state where item () = 1
- self->state = $(name:c)_state;
-.endfor
- self->expires_at = 0;
-
- // Application hook to reinitialize dialog
- // Provides us with an event to kick things off
- initialize_the_client (self);
- client_execute (self, self->next_event);
-}
-
static void
-control_message (client_t *self)
+client_server_message (client_t *self, server_t *server)
{
- zmsg_t *msg = zmsg_recv (self->pipe);
- char *method = zmsg_popstr (msg);
-.for class.method
- if (streq (method, "$(NAME)")) {
-. for argument
-. if type = "string"
- char *$(name) = zmsg_popstr (msg);
-. elsif type = "number"
- char *$(name)_string = zmsg_popstr (msg);
- long $(name) = atoi ($(name)_string);
- free ($(name)_string);
-. endif
-. endfor
- $(string.trim (method.?''):block )
-. for method.return
- $(string.trim (return.?''):block )
-. endfor
-. for argument where type = "string"
- free ($(name));
-. endfor
- }
- else
-.endfor
- if (streq (method, "CONNECT")) {
- char *endpoint = zmsg_popstr (msg);
- client_restart (self, endpoint);
- free (endpoint);
- }
- else
- if (streq (method, "CONFIG")) {
- char *config_file = zmsg_popstr (msg);
- fmq_config_destroy (&self->config);
- self->config = fmq_config_load (config_file);
- if (self->config)
- client_apply_config (self);
- else {
- printf ("E: cannot load config file '%s'\\n", config_file);
- self->config = fmq_config_new ("root", NULL);
- }
- free (config_file);
- }
- else
- if (streq (method, "SETOPTION")) {
- char *path = zmsg_popstr (msg);
- char *value = zmsg_popstr (msg);
- fmq_config_path_set (self->config, path, value);
- client_config_self (self);
- free (path);
- free (value);
- }
- else
- if (streq (method, "STOP")) {
- zstr_send (self->pipe, "OK");
- self->stopped = true;
- }
- free (method);
- zmsg_destroy (&msg);
-
- if (self->next_event)
- client_execute (self, self->next_event);
-}
-
-static void
-server_message (client_t *self)
-{
- if (self->reply)
- $(codec)_destroy (&self->reply);
- self->reply = $(codec)_recv (self->dealer);
- if (!self->reply)
+ if (server->reply)
+ $(codec)_destroy (&server->reply);
+ server->reply = $(codec)_recv (server->dealer);
+ if (!server->reply)
return; // Interrupted; do nothing
-.if switches.trace ?= 1
+.if switches.trace ?= 1
zclock_log ("Received reply from server");
- $(codec)_dump (self->reply);
+ $(codec)_dump (server->reply);
.endif
+ // Any input from server counts as activity
+ server->expires_at = zclock_time () + self->heartbeat * 3;
. for class.event where external ?= 1
. if index () > 1
else
. endif
- if ($(codec)_id (self->reply) == $(CODEC)_$(NAME:C))
- client_execute (self, $(name:c)_event);
+ if ($(codec)_id (server->reply) == $(CODEC)_$(NAME:C))
+ client_server_execute (self, server, $(name:c)_event);
. endfor
-
- // Any input from server counts as activity
- self->expires_at = zclock_time () + self->heartbeat * 2;
}
-
// Finally here's the client thread itself, which polls its two
// sockets and processes incoming messages
-
static void
client_thread (void *args, zctx_t *ctx, void *pipe)
{
client_t *self = client_new (ctx, pipe);
+ int pollset_size = 1;
+ zmq_pollitem_t pollset [MAX_SERVERS] = {
+ { self->pipe, 0, ZMQ_POLLIN, 0 }
+ };
while (!self->stopped && !zctx_interrupted) {
- // Build structure each time since self->dealer can change
- zmq_pollitem_t items [] = {
- { self->pipe, 0, ZMQ_POLLIN, 0 },
- { self->dealer, 0, ZMQ_POLLIN, 0 }
- };
- int poll_size = self->dealer? 2: 1;
- if (zmq_poll (items, poll_size, self->heartbeat * ZMQ_POLL_MSEC) == -1)
+ // Rebuild pollset if we need to
+ int server_nbr;
+ if (self->dirty) {
+ for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++) {
+ pollset [1 + server_nbr].socket = self->servers [server_nbr]->dealer;
+ pollset [1 + server_nbr].events = ZMQ_POLLIN;
+ }
+ pollset_size = 1 + self->nbr_servers;
+ }
+ if (zmq_poll (pollset, pollset_size, self->heartbeat * ZMQ_POLL_MSEC) == -1)
break; // Context has been shut down
// Process incoming messages; either of these can
// throw events into the state machine
- if (items [0].revents & ZMQ_POLLIN)
- control_message (self);
-
- if (items [1].revents & ZMQ_POLLIN)
- server_message (self);
-
- // Check whether server seems dead
- if (self->expires_at && zclock_time () >= self->expires_at)
- client_restart (self, NULL);
+ if (pollset [0].revents & ZMQ_POLLIN)
+ client_control_message (self);
+
+ // Here, array of sockets to servers
+ for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++) {
+ if (pollset [1 + server_nbr].revents & ZMQ_POLLIN) {
+ server_t *server = self->servers [server_nbr];
+ client_server_message (self, server);
+ }
+ }
}
client_destroy (&self);
}
73 scripts/server_c.gsl
View
@@ -264,17 +264,25 @@ s_event_name [] = {
.endif
+// Forward declarations
+typedef struct _server_t server_t;
+typedef struct _client_t client_t;
+
+.for class.declare where type ?= "forward"
+$(string.trim (declare.?''):)
+
+.endfor
+
// ---------------------------------------------------------------------
// Context for the server thread
-typedef struct {
- // Properties accessible to client actions
+struct _server_t {
+ // Properties accessible to server actions
.for class.self
. for context
$(string.trim (context.?''):block )
. endfor
.endfor
-
// Properties you should NOT touch
zctx_t *ctx; // Own CZMQ context
void *pipe; // Socket to back to caller
@@ -284,22 +292,21 @@ typedef struct {
fmq_config_t *config; // Configuration tree
int monitor; // Monitor interval
int64_t monitor_at; // Next monitor at this time
- int heartbeat; // Heartbeat for clients
-} server_t;
+ int heartbeat; // Client heartbeat interval
+};
// ---------------------------------------------------------------------
// Context for each client connection
-typedef struct {
+struct _client_t {
// Properties accessible to client actions
- int64_t heartbeat; // Heartbeat interval
+ int heartbeat; // Client heartbeat interval
event_t next_event; // Next event
.for class.client
. for context
$(string.trim (context.?''):block )
. endfor
.endfor
-
// Properties you should NOT touch
void *router; // Socket to client
int64_t heartbeat_at; // Next heartbeat at this time
@@ -310,13 +317,12 @@ typedef struct {
zframe_t *address; // Client address identity
$(codec)_t *request; // Last received request
$(codec)_t *reply; // Reply to send out, if any
-} client_t;
-
+};
static void
server_client_execute (server_t *server, client_t *client, int event);
-.for class.declare
+.for class.declare where !defined (type)
$(string.trim (declare.?''):)
.endfor
@@ -324,13 +330,13 @@ $(string.trim (declare.?''):)
// Client methods
static client_t *
-client_new (char *hashkey, zframe_t *address)
+client_new (zframe_t *address)
{
client_t *self = (client_t *) zmalloc (sizeof (client_t));
- self->hashkey = hashkey;
.for class.state where item () = 1
self->state = $(name:c)_state;
.endfor
+ self->hashkey = zframe_strhex (address);
self->address = zframe_dup (address);
self->reply = $(codec)_new (0);
$(codec)_address_set (self->reply, self->address);
@@ -362,17 +368,12 @@ client_destroy (client_t **self_p)
}
}
+// Callback when we remove client from 'clients' hash table
static void
-client_set_request (client_t *self, $(codec)_t *request)
+client_free (void *argument)
{
- if (self->request)
- $(codec)_destroy (&self->request);
- self->request = request;
-
- // Any input from client counts as heartbeat
- self->heartbeat_at = zclock_time () + self->heartbeat;
- // Any input from client counts as activity
- self->expires_at = zclock_time () + self->heartbeat * 3;
+ client_t *client = (client_t *) argument;
+ client_destroy (&client);
}
// Client hash function that calculates tickless timer
@@ -407,15 +408,6 @@ client_ping (const char *key, void *client, void *argument)
return 0;
}
-// Callback when we remove client from 'clients' hash table
-static void
-client_free (void *argument)
-{
- client_t *client = (client_t *) argument;
- client_destroy (&client);
-}
-
-
// Server methods
static void
@@ -470,7 +462,6 @@ server_destroy (server_t **self_p)
// * apply server configuration
// * print any echo items in top-level sections
// * apply sections that match methods
-
static void
server_apply_config (server_t *self)
{
@@ -503,6 +494,7 @@ server_apply_config (server_t *self)
server_config_self (self);
}
+// Process message from pipe
static void
server_control_message (server_t *self)
{
@@ -601,7 +593,6 @@ $(name:c) (server_t *self, client_t *client)
.endfor
// Execute state machine as long as we have events
-
static void
server_client_execute (server_t *self, client_t *client, int event)
{
@@ -659,16 +650,22 @@ server_client_message (server_t *self)
char *hashkey = zframe_strhex ($(codec)_address (request));
client_t *client = zhash_lookup (self->clients, hashkey);
if (client == NULL) {
- client = client_new (hashkey, $(codec)_address (request));
+ client = client_new ($(codec)_address (request));
client->heartbeat = self->heartbeat;
client->router = self->router;
zhash_insert (self->clients, hashkey, client);
zhash_freefn (self->clients, hashkey, client_free);
}
- else
- free (hashkey);
+ free (hashkey);
+ if (client->request)
+ $(codec)_destroy (&client->request);
+ client->request = request;
- client_set_request (client, request);
+ // Any input from client counts as heartbeat
+ client->heartbeat_at = zclock_time () + client->heartbeat;
+ // Any input from client counts as activity
+ client->expires_at = zclock_time () + client->heartbeat * 3;
+
. for class.event where external ?= 1
. if index () > 1
else
@@ -680,7 +677,6 @@ server_client_message (server_t *self)
// Finally here's the server thread itself, which polls its two
// sockets and processes incoming messages
-
static void
server_thread (void *args, zctx_t *ctx, void *pipe)
{
@@ -689,7 +685,6 @@ server_thread (void *args, zctx_t *ctx, void *pipe)
{ self->pipe, 0, ZMQ_POLLIN, 0 },
{ self->router, 0, ZMQ_POLLIN, 0 }
};
-
self->monitor_at = zclock_time () + self->monitor;
while (!self->stopped && !zctx_interrupted) {
// Calculate tickless timer, up to interval seconds
1  scripts/server_java.gsl
View
@@ -350,7 +350,6 @@ long $(name)\
private int heartbeat; // Heartbeat for clients
// Server methods
-
private void config ()
{
// Get standard server configuration
2  src/filemq.c
View
@@ -51,9 +51,9 @@ int main (int argc, char *argv [])
fmq_server_bind (server, "tcp://*:5670");
fmq_client_t *client = fmq_client_new ();
- fmq_client_connect (client, "tcp://localhost:5670");
fmq_client_set_inbox (client, argv [2]);
fmq_client_subscribe (client, "/");
+ fmq_client_connect (client, "tcp://localhost:5670");
while (!zctx_interrupted)
zclock_sleep (1000);
666 src/fmq_client.c
View
@@ -100,7 +100,7 @@ fmq_client_setoption (fmq_client_t *self, const char *path, const char *value)
// --------------------------------------------------------------------------
-// Open connection to server
+// Create outgoing connection to server
void
fmq_client_connect (fmq_client_t *self, const char *endpoint)
@@ -178,7 +178,7 @@ typedef enum {
typedef enum {
terminate_event = -1,
- ready_event = 1,
+ initialize_event = 1,
srsly_event = 2,
rtfm_event = 3,
_other_event = 4,
@@ -188,25 +188,74 @@ typedef enum {
finished_event = 8,
cheezburger_event = 9,
hugz_event = 10,
- subscribe_event = 11,
- send_credit_event = 12,
- icanhaz_ok_event = 13
+ send_credit_event = 11,
+ icanhaz_ok_event = 12
} event_t;
+// Maximum number of server connections we allow
+#define MAX_SERVERS 256
+
// Forward declarations
typedef struct _client_t client_t;
+typedef struct _server_t server_t;
+
+// Forward declarations
+typedef struct _sub_t sub_t;
+
+
+// ---------------------------------------------------------------------
+// Context for the client thread
+
+struct _client_t {
+ // Properties accessible to client actions
+ zlist_t *subs; // Subscriptions
+ sub_t *sub; // Subscription we want to send
+ // Properties you should NOT touch
+ zctx_t *ctx; // Own CZMQ context
+ void *pipe; // Socket to back to caller
+ server_t *servers [MAX_SERVERS];
+ // Server connections
+ uint nbr_servers; // How many connections we have
+ bool dirty; // If true, rebuild pollset
+ bool stopped; // Is the client stopped?
+ fmq_config_t *config; // Configuration tree
+ int heartbeat; // Heartbeat interval
+};
+
+// ---------------------------------------------------------------------
+// Context for each server connection
+
+struct _server_t {
+ // Properties accessible to server actions
+ event_t next_event; // Next event
+ size_t credit; // Current credit pending
+ fmq_file_t *file; // File we're writing to
+ // Properties you should NOT touch
+ zctx_t *ctx; // Own CZMQ context
+ uint index; // Index into client->server_array
+ void *dealer; // Socket to talk to server
+ int64_t expires_at; // Connection expires at
+ state_t state; // Current state
+ event_t event; // Current event
+ char *endpoint; // Server endpoint
+ fmq_msg_t *request; // Next message to send
+ fmq_msg_t *reply; // Last received reply
+};
+
+static void
+client_server_execute (client_t *self, server_t *server, int event);
// There's no point making these configurable
#define CREDIT_SLICE 1000000
#define CREDIT_MINIMUM (CREDIT_SLICE * 4) + 1
// Subscription in memory
-typedef struct {
+struct _sub_t {
client_t *client; // Pointer to parent client
char *inbox; // Inbox location
char *path; // Path we subscribe to
-} sub_t;
+};
static sub_t *
sub_new (client_t *client, char *inbox, char *path)
@@ -244,31 +293,39 @@ sub_cache (sub_t *self)
}
-// ---------------------------------------------------------------------
-// Context for the client thread
+// Server methods
-struct _client_t {
- // Properties accessible to client actions
- event_t next_event; // Next event
- bool connected; // Are we connected to server?
- zlist_t *subs; // Subscriptions
- sub_t *sub; // Subscription we want to send
- size_t credit; // Current credit pending
- fmq_file_t *file; // File we're writing to
+static server_t *
+server_new (zctx_t *ctx, char *endpoint)
+{
+ server_t *self = (server_t *) zmalloc (sizeof (server_t));
+ self->ctx = ctx;
+ self->endpoint = strdup (endpoint);
+ self->dealer = zsocket_new (self->ctx, ZMQ_DEALER);
+ self->request = fmq_msg_new (0);
+ zsocket_connect (self->dealer, endpoint);
+ self->state = start_state;
- // Properties you should NOT touch
- zctx_t *ctx; // Own CZMQ context
- void *pipe; // Socket to back to caller
- void *dealer; // Socket to talk to server
- bool stopped; // Has client stopped?
- fmq_config_t *config; // Configuration tree
- state_t state; // Current state
- event_t event; // Current event
- fmq_msg_t *request; // Next message to send
- fmq_msg_t *reply; // Last received reply
- int heartbeat; // Heartbeat interval
- int64_t expires_at; // Server expires at
-};
+ return self;
+}
+
+static void
+server_destroy (server_t **self_p)
+{
+ assert (self_p);
+ if (*self_p) {
+ server_t *self = *self_p;
+ zsocket_destroy (self->ctx, self->dealer);
+ fmq_msg_destroy (&self->request);
+ fmq_msg_destroy (&self->reply);
+ free (self->endpoint);
+
+ free (self);
+ *self_p = NULL;
+ }
+}
+
+// Client methods
static void
client_config_self (client_t *self)
@@ -287,7 +344,6 @@ client_new (zctx_t *ctx, void *pipe)
self->config = fmq_config_new ("root", NULL);
client_config_self (self);
self->subs = zlist_new ();
- self->connected = false;
return self;
}
@@ -298,8 +354,11 @@ client_destroy (client_t **self_p)
if (*self_p) {
client_t *self = *self_p;
fmq_config_destroy (&self->config);
- fmq_msg_destroy (&self->request);
- fmq_msg_destroy (&self->reply);
+ int server_nbr;
+ for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++) {
+ server_t *server = self->servers [server_nbr];
+ server_destroy (&server);
+ }
// Destroy subscriptions
while (zlist_size (self->subs)) {
sub_t *sub = (sub_t *) zlist_pop (self->subs);
@@ -315,7 +374,6 @@ client_destroy (client_t **self_p)
// * apply client configuration
// * print any echo items in top-level sections
// * apply sections that match methods
-
static void
client_apply_config (client_t *self)
{
@@ -346,10 +404,6 @@ client_apply_config (client_t *self)
char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
self->sub = sub_new (self, inbox, path);
zlist_append (self->subs, self->sub);
-
- // If we're connected, then also send to server
- if (self->connected)
- self->next_event = subscribe_event;
}
else
if (streq (fmq_config_name (section), "set_inbox")) {
@@ -367,113 +421,192 @@ client_apply_config (client_t *self)
client_config_self (self);
}
-// Custom actions for state machine
-
+// Process message from pipe
static void
-initialize_the_client (client_t *self)
+client_control_message (client_t *self)
{
- self->next_event = ready_event;
+ zmsg_t *msg = zmsg_recv (self->pipe);
+ char *method = zmsg_popstr (msg);
+ if (streq (method, "SUBSCRIBE")) {
+ char *path = zmsg_popstr (msg);
+ // Store subscription along with any previous ones
+ // Check we don't already have a subscription for this path
+ self->sub = (sub_t *) zlist_first (self->subs);
+ while (self->sub) {
+ if (streq (path, self->sub->path))
+ return;
+ self->sub = (sub_t *) zlist_next (self->subs);
+ }
+ // Subscription path must start with '/'
+ // We'll do better error handling later
+ assert (*path == '/');
+
+ // New subscription, store it for later replay
+ char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
+ self->sub = sub_new (self, inbox, path);
+ zlist_append (self->subs, self->sub);
+ free (path);
+ }
+ else
+ if (streq (method, "SET INBOX")) {
+ char *path = zmsg_popstr (msg);
+ fmq_config_path_set (self->config, "client/inbox", path);
+ free (path);
+ }
+ else
+ if (streq (method, "SET RESYNC")) {
+ char *enabled_string = zmsg_popstr (msg);
+ long enabled = atoi (enabled_string);
+ free (enabled_string);
+ // Request resynchronization from server
+ fmq_config_path_set (self->config, "client/resync", enabled? "1" :"0");
+ }
+ else
+ if (streq (method, "CONFIG")) {
+ char *config_file = zmsg_popstr (msg);
+ fmq_config_destroy (&self->config);
+ self->config = fmq_config_load (config_file);
+ if (self->config)
+ client_apply_config (self);
+ else {
+ printf ("E: cannot load config file '%s'\n", config_file);
+ self->config = fmq_config_new ("root", NULL);
+ }
+ free (config_file);
+ }
+ else
+ if (streq (method, "SETOPTION")) {
+ char *path = zmsg_popstr (msg);
+ char *value = zmsg_popstr (msg);
+ fmq_config_path_set (self->config, path, value);
+ client_config_self (self);
+ free (path);
+ free (value);
+ }
+ else
+ if (streq (method, "STOP")) {
+ zstr_send (self->pipe, "OK");
+ self->stopped = true;
+ }
+ else
+ if (streq (method, "CONNECT")) {
+ char *endpoint = zmsg_popstr (msg);
+ if (self->nbr_servers < MAX_SERVERS) {
+ server_t *server = server_new (self->ctx, endpoint);
+ self->servers [self->nbr_servers++] = server;
+ self->dirty = true;
+ client_server_execute (self, server, initialize_event);
+ }
+ else
+ printf ("E: too many server connections (max %d)\n", MAX_SERVERS);
+
+ free (endpoint);
+ }
+ free (method);
+ zmsg_destroy (&msg);
}
+// Custom actions for state machine
+
static void
-try_security_mechanism (client_t *self)
+try_security_mechanism (client_t *self, server_t *server)
{
char *login = fmq_config_resolve (self->config, "security/plain/login", "guest");
char *password = fmq_config_resolve (self->config, "security/plain/password", "");
zframe_t *frame = fmq_sasl_plain_encode (login, password);
- fmq_msg_mechanism_set (self->request, "PLAIN");
- fmq_msg_response_set (self->request, frame);
+ fmq_msg_mechanism_set (server->request, "PLAIN");
+ fmq_msg_response_set (server->request, frame);
}
static void
-connected_to_server (client_t *self)
+connected_to_server (client_t *self, server_t *server)
{
- self->connected = true;
+
}
static void
-get_first_subscription (client_t *self)
+get_first_subscription (client_t *self, server_t *server)
{
self->sub = (sub_t *) zlist_first (self->subs);
if (self->sub)
- self->next_event = ok_event;
+ server->next_event = ok_event;
else
- self->next_event = finished_event;
+ server->next_event = finished_event;
}
static void
-get_next_subscription (client_t *self)
+get_next_subscription (client_t *self, server_t *server)
{
self->sub = (sub_t *) zlist_next (self->subs);
if (self->sub)
- self->next_event = ok_event;
+ server->next_event = ok_event;
else
- self->next_event = finished_event;
+ server->next_event = finished_event;
}
static void
-format_icanhaz_command (client_t *self)
+format_icanhaz_command (client_t *self, server_t *server)
{
- fmq_msg_path_set (self->request, self->sub->path);
+ fmq_msg_path_set (server->request, self->sub->path);
// If client app wants full resync, send cache to server
if (atoi (fmq_config_resolve (self->config, "client/resync", "0")) == 1) {
- fmq_msg_options_insert (self->request, "RESYNC", "1");
- fmq_msg_cache_set (self->request, sub_cache (self->sub));
+ fmq_msg_options_insert (server->request, "RESYNC", "1");
+ fmq_msg_cache_set (server->request, sub_cache (self->sub));
}
}
static void
-refill_credit_as_needed (client_t *self)
+refill_credit_as_needed (client_t *self, server_t *server)
{
- // If credit has fallen too low, send more credit
- size_t credit_to_send = 0;
- while (self->credit < CREDIT_MINIMUM) {
- credit_to_send += CREDIT_SLICE;
- self->credit += CREDIT_SLICE;
- }
- if (credit_to_send) {
- fmq_msg_credit_set (self->request, credit_to_send);
- self->next_event = send_credit_event;
- }
+ // If credit has fallen too low, send more credit
+ size_t credit_to_send = 0;
+ while (server->credit < CREDIT_MINIMUM) {
+ credit_to_send += CREDIT_SLICE;
+ server->credit += CREDIT_SLICE;
+ }
+ if (credit_to_send) {
+ fmq_msg_credit_set (server->request, credit_to_send);
+ server->next_event = send_credit_event;
+ }
}
static void
-process_the_patch (client_t *self)
+process_the_patch (client_t *self, server_t *server)
{
char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
- char *filename = fmq_msg_filename (self->reply);
+ char *filename = fmq_msg_filename (server->reply);
// Filenames from server must start with slash, which we skip
assert (*filename == '/');
filename++;
- if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_CREATE) {
- if (self->file == NULL) {
- self->file = fmq_file_new (inbox, filename);
- if (fmq_file_output (self->file)) {
+ if (fmq_msg_operation (server->reply) == FMQ_MSG_FILE_CREATE) {
+ if (server->file == NULL) {
+ server->file = fmq_file_new (inbox, filename);
+ if (fmq_file_output (server->file)) {
// File not writeable, skip patch
- fmq_file_destroy (&self->file);
+ fmq_file_destroy (&server->file);
return;
}
}
// Try to write, ignore errors in this version
- zframe_t *frame = fmq_msg_chunk (self->reply);
+ zframe_t *frame = fmq_msg_chunk (server->reply);
fmq_chunk_t *chunk = fmq_chunk_new (zframe_data (frame), zframe_size (frame));
if (fmq_chunk_size (chunk) > 0) {
- fmq_file_write (self->file, chunk, fmq_msg_offset (self->reply));
- self->credit -= fmq_chunk_size (chunk);
+ fmq_file_write (server->file, chunk, fmq_msg_offset (server->reply));
+ server->credit -= fmq_chunk_size (chunk);
}
else {
// Zero-sized chunk means end of file, so report back to caller
zstr_sendm (self->pipe, "DELIVER");
zstr_sendm (self->pipe, filename);
zstr_sendf (self->pipe, "%s/%s", inbox, filename);
- fmq_file_destroy (&self->file);
+ fmq_file_destroy (&server->file);
}
fmq_chunk_destroy (&chunk);
}
else
- if (fmq_msg_operation (self->reply) == FMQ_MSG_FILE_DELETE) {
+ if (fmq_msg_operation (server->reply) == FMQ_MSG_FILE_DELETE) {
zclock_log ("I: delete %s/%s", inbox, filename);
fmq_file_t *file = fmq_file_new (inbox, filename);
fmq_file_remove (file);
@@ -482,89 +615,87 @@ process_the_patch (client_t *self)
}
static void
-log_access_denied (client_t *self)
+log_access_denied (client_t *self, server_t *server)
{
puts ("W: server denied us access, retrying...");
}
static void
-log_invalid_message (client_t *self)
+log_invalid_message (client_t *self, server_t *server)
{
puts ("E: server claims we sent an invalid message");
}
static void
-log_protocol_error (client_t *self)
+log_protocol_error (client_t *self, server_t *server)
{
puts ("E: protocol error");
}
static void
-terminate_the_client (client_t *self)
+terminate_the_server (client_t *self, server_t *server)
{
- self->connected = false;
- self->next_event = terminate_event;
+ server->next_event = terminate_event;
}
// Execute state machine as long as we have events
-
static void
-client_execute (client_t *self, int event)
+client_server_execute (client_t *self, server_t *server, int event)
{
- self->next_event = event;
- while (self->next_event) {
- self->event = self->next_event;
- self->next_event = 0;
- switch (self->state) {
+ server->next_event = event;
+ while (server->next_event) {
+ server->event = server->next_event;
+ server->next_event = 0;
+ switch (server->state) {
case start_state:
- if (self->event == ready_event) {
- fmq_msg_id_set (self->request, FMQ_MSG_OHAI);
- fmq_msg_send (&self->request, self->dealer);
- self->request = fmq_msg_new (0);
- self->state = requesting_access_state;
+ if (server->event == initialize_event) {
+ fmq_msg_id_set (server->request, FMQ_MSG_OHAI);
+ fmq_msg_send (&server->request, server->dealer);
+ server->request = fmq_msg_new (0);
+ server->state = requesting_access_state;
}
else
- if (self->event == srsly_event) {
- log_access_denied (self);
- terminate_the_client (self);
- self->state = start_state;
+ if (server->event == srsly_event) {
+ log_access_denied (self, server);
+ terminate_the_server (self, server);
+ server->state = start_state;
}
else
- if (self->event == rtfm_event) {
- log_invalid_message (self);
- terminate_the_client (self);
+ if (server->event == rtfm_event) {
+ log_invalid_message (self, server);
+ terminate_the_server (self, server);
}
else {
// Process all other events
- log_protocol_error (self);
- terminate_the_client (self);
+ log_protocol_error (self, server);
+ terminate_the_server (self, server);
}
break;
case requesting_access_state:
- if (self->event == orly_event) {
- try_security_mechanism (self);
- fmq_msg_id_set (self->request, FMQ_MSG_YARLY);
- fmq_msg_send (&self->request, self->dealer);
- self->request = fmq_msg_new (0);
- self->state = requesting_access_state;
+ if (server->event == orly_event) {
+ try_security_mechanism (self, server);
+ fmq_msg_id_set (server->request, FMQ_MSG_YARLY);
+ fmq_msg_send (&server->request, server->dealer);
+ server->request = fmq_msg_new (0);
+ server->state = requesting_access_state;
}
else
- if (self->event == ohai_ok_event) {
- connected_to_server (self);
- get_first_subscription (self);
- self->state = subscribing_state;
+ if (server->event == ohai_ok_event) {
+ connected_to_server (self, server);
+ get_first_subscription (self, server);
+ server->state = subscribing_state;
}
else
- if (self->event == srsly_event) {
- log_access_denied (self);
- terminate_the_client (self);
- self->state = start_state;
+ if (server->event == srsly_event) {
+ log_access_denied (self, server);
+ terminate_the_server (self, server);
+ server->state = start_state;
}
else
- if (self->event == rtfm_event) {
- log_invalid_message (self);
- terminate_the_client (self);
+ if (server->event == rtfm_event) {
+ log_invalid_message (self, server);
+ terminate_the_server (self, server);
}
else {
// Process all other events
@@ -572,262 +703,153 @@ client_execute (client_t *self, int event)
break;
case subscribing_state:
- if (self->event == ok_event) {
- format_icanhaz_command (self);
- fmq_msg_id_set (self->request, FMQ_MSG_ICANHAZ);
- fmq_msg_send (&self->request, self->dealer);
- self->request = fmq_msg_new (0);
- get_next_subscription (self);
- self->state = subscribing_state;
+ if (server->event == ok_event) {
+ format_icanhaz_command (self, server);
+ fmq_msg_id_set (server->request, FMQ_MSG_ICANHAZ);
+ fmq_msg_send (&server->request, server->dealer);
+ server->request = fmq_msg_new (0);
+ get_next_subscription (self, server);
+ server->state = subscribing_state;
}
else
- if (self->event == finished_event) {
- refill_credit_as_needed (self);
- self->state = ready_state;
+ if (server->event == finished_event) {
+ refill_credit_as_needed (self, server);
+ server->state = ready_state;
}
else
- if (self->event == srsly_event) {
- log_access_denied (self);
- terminate_the_client (self);
- self->state = start_state;
+ if (server->event == srsly_event) {
+ log_access_denied (self, server);
+ terminate_the_server (self, server);
+ server->state = start_state;
}
else
- if (self->event == rtfm_event) {
- log_invalid_message (self);
- terminate_the_client (self);
+ if (server->event == rtfm_event) {
+ log_invalid_message (self, server);
+ terminate_the_server (self, server);
}
else {
// Process all other events
- log_protocol_error (self);
- terminate_the_client (self);
+ log_protocol_error (self, server);
+ terminate_the_server (self, server);
}
break;
case ready_state:
- if (self->event == cheezburger_event) {
- process_the_patch (self);
- refill_credit_as_needed (self);
- }
- else
- if (self->event == hugz_event) {
- fmq_msg_id_set (self->request, FMQ_MSG_HUGZ_OK);
- fmq_msg_send (&self->request, self->dealer);
- self->request = fmq_msg_new (0);
+ if (server->event == cheezburger_event) {
+ process_the_patch (self, server);
+ refill_credit_as_needed (self, server);
}
else
- if (self->event == subscribe_event) {
- format_icanhaz_command (self);
- fmq_msg_id_set (self->request, FMQ_MSG_ICANHAZ);
- fmq_msg_send (&self->request, self->dealer);
- self->request = fmq_msg_new (0);
+ if (server->event == hugz_event) {
+ fmq_msg_id_set (server->request, FMQ_MSG_HUGZ_OK);
+ fmq_msg_send (&server->request, server->dealer);
+ server->request = fmq_msg_new (0);
}
else
- if (self->event == send_credit_event) {
- fmq_msg_id_set (self->request, FMQ_MSG_NOM);
- fmq_msg_send (&self->request, self->dealer);
- self->request = fmq_msg_new (0);
+ if (server->event == send_credit_event) {
+ fmq_msg_id_set (server->request, FMQ_MSG_NOM);
+ fmq_msg_send (&server->request, server->dealer);
+ server->request = fmq_msg_new (0);
}
else
- if (self->event == icanhaz_ok_event) {
+ if (server->event == icanhaz_ok_event) {
}
else
- if (self->event == srsly_event) {
- log_access_denied (self);
- terminate_the_client (self);
- self->state = start_state;
+ if (server->event == srsly_event) {
+ log_access_denied (self, server);
+ terminate_the_server (self, server);
+ server->state = start_state;
}
else
- if (self->event == rtfm_event) {
- log_invalid_message (self);
- terminate_the_client (self);
+ if (server->event == rtfm_event) {
+ log_invalid_message (self, server);
+ terminate_the_server (self, server);
}
else {
// Process all other events
- log_protocol_error (self);
- terminate_the_client (self);
+ log_protocol_error (self, server);
+ terminate_the_server (self, server);
}
break;
}
- if (self->next_event == terminate_event) {
- self->stopped = true;
+ if (server->next_event == terminate_event) {
+ // Automatically calls server_destroy
+ // reset state machine
break;
}
}
}
-// Restart client dialog from zero
-
static void
-client_restart (client_t *self, char *endpoint)
-{
- // Reconnect to new endpoint if specified
- if (endpoint) {
- if (self->dealer)
- zsocket_destroy (self->ctx, self->dealer);
- self->dealer = zsocket_new (self->ctx, ZMQ_DEALER);
- zsocket_connect (self->dealer, endpoint);
- }
- // Clear out any previous request data
- fmq_msg_destroy (&self->request);
- self->request = fmq_msg_new (0);
-
- // Restart dialog state machine from zero
- self->state = start_state;
- self->expires_at = 0;
-
- // Application hook to reinitialize dialog
- // Provides us with an event to kick things off
- initialize_the_client (self);
- client_execute (self, self->next_event);
-}
-
-static void
-control_message (client_t *self)
+client_server_message (client_t *self, server_t *server)
{
- zmsg_t *msg = zmsg_recv (self->pipe);
- char *method = zmsg_popstr (msg);
- if (streq (method, "SUBSCRIBE")) {
- char *path = zmsg_popstr (msg);
- // Store subscription along with any previous ones
- // Check we don't already have a subscription for this path
- self->sub = (sub_t *) zlist_first (self->subs);
- while (self->sub) {
- if (streq (path, self->sub->path))
- return;
- self->sub = (sub_t *) zlist_next (self->subs);
- }
- // Subscription path must start with '/'
- // We'll do better error handling later
- assert (*path == '/');
-
- // New subscription, store it for later replay
- char *inbox = fmq_config_resolve (self->config, "client/inbox", ".inbox");
- self->sub = sub_new (self, inbox, path);
- zlist_append (self->subs, self->sub);
-
- // If we're connected, then also send to server
- if (self->connected)
- self->next_event = subscribe_event;
- free (path);
- }
- else
- if (streq (method, "SET INBOX")) {
- char *path = zmsg_popstr (msg);
- fmq_config_path_set (self->config, "client/inbox", path);
- free (path);
- }
- else
- if (streq (method, "SET RESYNC")) {
- char *enabled_string = zmsg_popstr (msg);
- long enabled = atoi (enabled_string);
- free (enabled_string);
- // Request resynchronization from server
- fmq_config_path_set (self->config, "client/resync", enabled? "1" :"0");
- }
- else
- if (streq (method, "CONNECT")) {
- char *endpoint = zmsg_popstr (msg);
- client_restart (self, endpoint);
- free (endpoint);
- }
- else
- if (streq (method, "CONFIG")) {
- char *config_file = zmsg_popstr (msg);
- fmq_config_destroy (&self->config);
- self->config = fmq_config_load (config_file);
- if (self->config)
- client_apply_config (self);
- else {
- printf ("E: cannot load config file '%s'\n", config_file);
- self->config = fmq_config_new ("root", NULL);
- }
- free (config_file);
- }
- else
- if (streq (method, "SETOPTION")) {
- char *path = zmsg_popstr (msg);
- char *value = zmsg_popstr (msg);
- fmq_config_path_set (self->config, path, value);
- client_config_self (self);
- free (path);
- free (value);
- }
- else
- if (streq (method, "STOP")) {
- zstr_send (self->pipe, "OK");
- self->stopped = true;
- }
- free (method);
- zmsg_destroy (&msg);
-
- if (self->next_event)
- client_execute (self, self->next_event);
-}
-
-static void
-server_message (client_t *self)
-{
- if (self->reply)
- fmq_msg_destroy (&self->reply);
- self->reply = fmq_msg_recv (self->dealer);
- if (!self->reply)
+ if (server->reply)
+ fmq_msg_destroy (&server->reply);
+ server->reply = fmq_msg_recv (server->dealer);
+ if (!server->reply)
return; // Interrupted; do nothing
+
+ // Any input from server counts as activity
+ server->expires_at = zclock_time () + self->heartbeat * 3;
- if (fmq_msg_id (self->reply) == FMQ_MSG_SRSLY)
- client_execute (self, srsly_event);
+ if (fmq_msg_id (server->reply) == FMQ_MSG_SRSLY)
+ client_server_execute (self, server, srsly_event);
else
- if (fmq_msg_id (self->reply) == FMQ_MSG_RTFM)
- client_execute (self, rtfm_event);
+ if (fmq_msg_id (server->reply) == FMQ_MSG_RTFM)
+ client_server_execute (self, server, rtfm_event);
else
- if (fmq_msg_id (self->reply) == FMQ_MSG_ORLY)
- client_execute (self, orly_event);
+ if (fmq_msg_id (server->reply) == FMQ_MSG_ORLY)
+ client_server_execute (self, server, orly_event);
else
- if (fmq_msg_id (self->reply) == FMQ_MSG_OHAI_OK)
- client_execute (self, ohai_ok_event);
+ if (fmq_msg_id (server->reply) == FMQ_MSG_OHAI_OK)
+ client_server_execute (self, server, ohai_ok_event);
else
- if (fmq_msg_id (self->reply) == FMQ_MSG_CHEEZBURGER)
- client_execute (self, cheezburger_event);
+ if (fmq_msg_id (server->reply) == FMQ_MSG_CHEEZBURGER)
+ client_server_execute (self, server, cheezburger_event);
else
- if (fmq_msg_id (self->reply) == FMQ_MSG_HUGZ)
- client_execute (self, hugz_event);
+ if (fmq_msg_id (server->reply) == FMQ_MSG_HUGZ)
+ client_server_execute (self, server, hugz_event);
else
- if (fmq_msg_id (self->reply) == FMQ_MSG_ICANHAZ_OK)
- client_execute (self, icanhaz_ok_event);
-
- // Any input from server counts as activity
- self->expires_at = zclock_time () + self->heartbeat * 2;
+ if (fmq_msg_id (server->reply) == FMQ_MSG_ICANHAZ_OK)
+ client_server_execute (self, server, icanhaz_ok_event);
}
-
// Finally here's the client thread itself, which polls its two
// sockets and processes incoming messages
-
static void
client_thread (void *args, zctx_t *ctx, void *pipe)
{
client_t *self = client_new (ctx, pipe);
+ int pollset_size = 1;
+ zmq_pollitem_t pollset [MAX_SERVERS] = {
+ { self->pipe, 0, ZMQ_POLLIN, 0 }
+ };
while (!self->stopped && !zctx_interrupted) {
- // Build structure each time since self->dealer can change
- zmq_pollitem_t items [] = {
- { self->pipe, 0, ZMQ_POLLIN, 0 },
- { self->dealer, 0, ZMQ_POLLIN, 0 }
- };
- int poll_size = self->dealer? 2: 1;
- if (zmq_poll (items, poll_size, self->heartbeat * ZMQ_POLL_MSEC) == -1)
+ // Rebuild pollset if we need to
+ int server_nbr;
+ if (self->dirty) {
+ for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++) {
+ pollset [1 + server_nbr].socket = self->servers [server_nbr]->dealer;
+ pollset [1 + server_nbr].events = ZMQ_POLLIN;
+ }
+ pollset_size = 1 + self->nbr_servers;
+ }
+ if (zmq_poll (pollset, pollset_size, self->heartbeat * ZMQ_POLL_MSEC) == -1)
break; // Context has been shut down
// Process incoming messages; either of these can
// throw events into the state machine
- if (items [0].revents & ZMQ_POLLIN)
- control_message (self);
-
- if (items [1].revents & ZMQ_POLLIN)
- server_message (self);
-
- // Check whether server seems dead
- if (self->expires_at && zclock_time () >= self->expires_at)
- client_restart (self, NULL);
+ if (pollset [0].revents & ZMQ_POLLIN)