Skip to content
Browse files

Merge pull request #16 from miniway/master

java FmqClient allows multiple server connections
  • Loading branch information...
2 parents aec6722 + eb28f82 commit 63d7398c8eda1d01ae9afdc29d9c5f4881c1520e @hintjens hintjens committed Jan 9, 2013
View
2 configure.in
@@ -10,7 +10,7 @@ AC_INIT([filemq],[1.1.0],[zeromq-dev@lists.zeromq.org])
AC_CONFIG_AUX_DIR(config)
AC_CONFIG_MACRO_DIR(config)
-AM_CONFIG_HEADER(src/platform.h)
+AC_CONFIG_HEADERS(src/platform.h)
AM_INIT_AUTOMAKE(tar-ustar)
# This defines PACKAGE_VERSION_... in src/platform.h
View
76 java/model/fmq_client.xml
@@ -10,7 +10,7 @@ private static final int CREDIT_SLICE = 1000000;
private static final int CREDIT_MINIMUM = (CREDIT_SLICE * 4) + 1;
</declare>
-<client>
+<self>
<context>
private boolean connected; // Are we connected to server?
private List &lt;Sub&gt; subs; // Subscriptions
@@ -27,7 +27,18 @@ connected = false;
for (Sub sub: subs)
sub.destroy ();
</destruct>
-</client>
+</self>
+
+<server>
+<context>
+private int credit; // Current credit pending
+private FmqFile file; // File we're writing to
+</context>
+<construct>
+</construct>
+<destruct>
+</destruct>
+</server>
<!-- Embedded class for subscriptions -->
<include filename = "fmq_client_sub.xml" />
@@ -36,8 +47,8 @@ for (Sub sub: subs)
String login = config.resolve ("security/plain/login", "guest");
String password = config.resolve ("security/plain/password", "");
ZFrame frame = FmqSasl.plainEncode (login, password);
-request.setMechanism ("PLAIN");
-request.setResponse (frame);
+server.request.setMechanism ("PLAIN");
+server.request.setResponse (frame);
</action>
<action name = "connected to server">
@@ -48,78 +59,78 @@ connected = true;
subIterator = subs.iterator ();
if (subIterator.hasNext ()) {
sub = subIterator.next ();
- next_event = Event.ok_event;
+ server.next_event = Event.ok_event;
} else
- next_event = Event.finished_event;
+ server.next_event = Event.finished_event;
</action>
<action name = "get next subscription">
if (subIterator.hasNext ()) {
sub = subIterator.next ();
- next_event = Event.ok_event;
+ server.next_event = Event.ok_event;
} else
- next_event = Event.finished_event;
+ server.next_event = Event.finished_event;
</action>
<action name = "format icanhaz command">
-request.setPath (sub.path);
+server.request.setPath (sub.path);
// If client app wants full resync, send cache to server
if (Integer.parseInt (config.resolve ("client/resync", "0")) == 1) {
- request.insertOptions ("RESYNC", "1");
- request.setCache (sub.cache ());
+ server.request.insertOptions ("RESYNC", "1");
+ server.request.setCache (sub.cache ());
}
</action>
<action name = "refill credit as needed">
// If credit has fallen too low, send more credit
int credit_to_send = 0;
-while (credit &lt; CREDIT_MINIMUM) {
+while (server.credit &lt; CREDIT_MINIMUM) {
credit_to_send += CREDIT_SLICE;
- credit += CREDIT_SLICE;
+ server.credit += CREDIT_SLICE;
}
if (credit_to_send > 0) {
- request.setCredit (credit_to_send);
- next_event = Event.send_credit_event;
+ server.request.setCredit (credit_to_send);
+ server.next_event = Event.send_credit_event;
}
</action>
<action name = "process the patch">
String inbox = config.resolve ("client/inbox", ".inbox");
-String filename = reply.filename ();
+String filename = server.reply.filename ();
// Filenames from server must start with slash, which we skip
assert (filename.startsWith ("/"));
filename = filename.substring (1);
-if (reply.operation () == FmqMsg.FMQ_MSG_FILE_CREATE) {
- if (file == null) {
- file = new FmqFile (inbox, filename);
- if (!file.output ()) {
+if (server.reply.operation () == FmqMsg.FMQ_MSG_FILE_CREATE) {
+ if (server.file == null) {
+ server.file = new FmqFile (inbox, filename);
+ if (!server.file.output ()) {
// File not writeable, skip patch
- file.destroy ();
- file = null;
+ server.file.destroy ();
+ server.file = null;
return;
}
}
// Try to write, ignore errors in this version
- ZFrame frame = reply.chunk ();
+ ZFrame frame = server.reply.chunk ();
FmqChunk chunk = new FmqChunk (frame.getData (), frame.size ());
if (chunk.size () > 0) {
- file.write (chunk, reply.offset ());
- credit -= chunk.size ();
+ server.file.write (chunk, server.reply.offset ());
+ server.credit -= chunk.size ();
}
else {
// Zero-sized chunk means end of file, so report back to caller
pipe.sendMore ("DELIVER");
pipe.sendMore (filename);
pipe.send (String.format ("%s/%s", inbox, filename));
- file.destroy ();
- file = null;
+ server.file.destroy ();
+ server.file = null;
}
chunk.destroy ();
}
else
-if (reply.operation () == FmqMsg.FMQ_MSG_FILE_DELETE) {
+if (server.reply.operation () == FmqMsg.FMQ_MSG_FILE_DELETE) {
zclock_log ("I: delete %s/%s", inbox, filename);
FmqFile file = new FmqFile (inbox, filename);
file.remove ();
@@ -140,11 +151,6 @@ System.out.println ("E: server claims we sent an invalid message");
System.out.println ("E: protocol error");
</action>
-<action name = "terminate the server">
-connected = false;
-next_event = Event.terminate_event;
-</action>
-
<method name = "subscribe">
<argument name = "path" type = "string" />
// Store subscription along with any previous ones
@@ -161,10 +167,6 @@ assert (path.startsWith ("/"));
String inbox = config.resolve ("client/inbox", ".inbox");
sub = new Sub (this, inbox, path);
subs.add (sub);
-
-// If we're connected, then also send to server
-if (connected)
- next_event = Event.subscribe_event;
</method>
<method name = "set inbox">
View
4 java/model/fmq_server.xml
@@ -9,7 +9,7 @@ FileMQ protocol server
private static final int CHUNK_SIZE = 1000000;
</declare>
-<server>
+<self>
<context>
private List &lt;Mount&gt; mounts; // Mount points
private int port; // Server port
@@ -22,7 +22,7 @@ mounts = new ArrayList &lt;Mount&gt; ();
for (Mount mount : mounts)
mount.destroy ();
</destruct>
-</server>
+</self>
<client>
<context>
View
530 java/src/main/java/org/filemq/FmqClient.java
@@ -43,6 +43,9 @@
// Structure of our front-end API class
public class FmqClient {
+
+ private final static int MAX_SERVERS = 256;
+
ZContext ctx; // CZMQ context
Socket pipe; // Pipe through to client
@@ -157,7 +160,8 @@ public void setResync (long enabled)
start_state (1),
requesting_access_state (2),
subscribing_state (3),
- ready_state (4);
+ ready_state (4),
+ terminated_state (5);
@SuppressWarnings ("unused")
private final int state;
@@ -168,7 +172,6 @@ public void setResync (long enabled)
};
private enum Event {
- terminate_event (-1),
initialize_event (1),
srsly_event (2),
rtfm_event (3),
@@ -233,20 +236,21 @@ private void destroy ()
private static class Client {
// Properties accessible to client actions
- private Event next_event; // Next event
-
+ private boolean connected; // Are we connected to server?
+ private List <Sub> subs; // Subscriptions
+ private Sub sub; // Subscription we want to send
+ private int credit; // Current credit pending
+ private FmqFile file; // File we're writing to
+ private Iterator <Sub> subIterator;
// Properties you should NOT touch
private ZContext ctx; // Own CZMQ context
private Socket pipe; // Socket to back to caller
- private Socket dealer; // Socket to talk to server
+ private final Server [] servers; // Server connections
+ private int nbrServers; // How many connections we have
+ private boolean dirty; // If true, rebuild pollset
private boolean stopped; // Has client stopped?
private FmqConfig config; // Configuration tree
- private State state; // Current state
- private Event event; // Current event
- private FmqMsg request; // Next message to send
- private FmqMsg reply; // Last received reply
private int heartbeat; // Heartbeat interval
- private long expires_at; // Server expires at
private void config ()
{
@@ -258,18 +262,23 @@ private Client (ZContext ctx, Socket pipe)
{
this.ctx = ctx;
this.pipe = pipe;
+ this.servers = new Server [MAX_SERVERS];
this.config = new FmqConfig ("root", null);
config ();
+ subs = new ArrayList <Sub> ();
+ connected = false;
}
private void destroy ()
{
if (config != null)
config.destroy ();
- if (request != null)
- request.destroy ();
- if (reply != null)
- reply.destroy ();
+ for (int serverNbr = 0; serverNbr < nbrServers; serverNbr++) {
+ Server server = servers [serverNbr];
+ server.destory ();
+ }
+ for (Sub sub: subs)
+ sub.destroy ();
}
// Apply configuration tree:
@@ -304,10 +313,6 @@ private void applyConfig ()
String inbox = config.resolve ("client/inbox", ".inbox");
sub = new Sub (this, inbox, path);
subs.add (sub);
-
- // If we're connected, then also send to server
- if (connected)
- next_event = Event.subscribe_event;
}
else
if (section.name ().equals ("set_inbox")) {
@@ -327,101 +332,101 @@ private void applyConfig ()
// Custom actions for state machine
- private void trySecurityMechanism ()
+ private void trySecurityMechanism (Server server)
{
String login = config.resolve ("security/plain/login", "guest");
String password = config.resolve ("security/plain/password", "");
ZFrame frame = FmqSasl.plainEncode (login, password);
- request.setMechanism ("PLAIN");
- request.setResponse (frame);
+ server.request.setMechanism ("PLAIN");
+ server.request.setResponse (frame);
}
- private void connectedToServer ()
+ private void connectedToServer (Server server)
{
connected = true;
}
- private void getFirstSubscription ()
+ private void getFirstSubscription (Server server)
{
- subIterator = subs.iterator ();
- if (subIterator.hasNext ()) {
- sub = subIterator.next ();
- next_event = Event.ok_event;
- } else
- next_event = Event.finished_event;
+ subIterator = subs.iterator ();
+ if (subIterator.hasNext ()) {
+ sub = subIterator.next ();
+ server.next_event = Event.ok_event;
+ } else
+ server.next_event = Event.finished_event;
}
- private void getNextSubscription ()
+ private void getNextSubscription (Server server)
{
- if (subIterator.hasNext ()) {
- sub = subIterator.next ();
- next_event = Event.ok_event;
- } else
- next_event = Event.finished_event;
+ if (subIterator.hasNext ()) {
+ sub = subIterator.next ();
+ server.next_event = Event.ok_event;
+ } else
+ server.next_event = Event.finished_event;
}
- private void formatIcanhazCommand ()
+ private void formatIcanhazCommand (Server server)
{
- request.setPath (sub.path);
+ server.request.setPath (sub.path);
// If client app wants full resync, send cache to server
if (Integer.parseInt (config.resolve ("client/resync", "0")) == 1) {
- request.insertOptions ("RESYNC", "1");
- request.setCache (sub.cache ());
+ server.request.insertOptions ("RESYNC", "1");
+ server.request.setCache (sub.cache ());
}
}
- private void refillCreditAsNeeded ()
+ private void refillCreditAsNeeded (Server server)
{
// If credit has fallen too low, send more credit
int credit_to_send = 0;
- while (credit < CREDIT_MINIMUM) {
+ while (server.credit < CREDIT_MINIMUM) {
credit_to_send += CREDIT_SLICE;
- credit += CREDIT_SLICE;
+ server.credit += CREDIT_SLICE;
}
if (credit_to_send > 0) {
- request.setCredit (credit_to_send);
- next_event = Event.send_credit_event;
+ server.request.setCredit (credit_to_send);
+ server.next_event = Event.send_credit_event;
}
}
- private void processThePatch ()
+ private void processThePatch (Server server)
{
String inbox = config.resolve ("client/inbox", ".inbox");
- String filename = reply.filename ();
+ String filename = server.reply.filename ();
// Filenames from server must start with slash, which we skip
assert (filename.startsWith ("/"));
filename = filename.substring (1);
- if (reply.operation () == FmqMsg.FMQ_MSG_FILE_CREATE) {
- if (file == null) {
- file = new FmqFile (inbox, filename);
- if (!file.output ()) {
+ if (server.reply.operation () == FmqMsg.FMQ_MSG_FILE_CREATE) {
+ if (server.file == null) {
+ server.file = new FmqFile (inbox, filename);
+ if (!server.file.output ()) {
// File not writeable, skip patch
- file.destroy ();
- file = null;
+ server.file.destroy ();
+ server.file = null;
return;
}
}
// Try to write, ignore errors in this version
- ZFrame frame = reply.chunk ();
+ ZFrame frame = server.reply.chunk ();
FmqChunk chunk = new FmqChunk (frame.getData (), frame.size ());
if (chunk.size () > 0) {
- file.write (chunk, reply.offset ());
- credit -= chunk.size ();
+ server.file.write (chunk, server.reply.offset ());
+ server.credit -= chunk.size ();
}
else {
// Zero-sized chunk means end of file, so report back to caller
pipe.sendMore ("DELIVER");
pipe.sendMore (filename);
pipe.send (String.format ("%s/%s", inbox, filename));
- file.destroy ();
- file = null;
+ server.file.destroy ();
+ server.file = null;
}
chunk.destroy ();
}
else
- if (reply.operation () == FmqMsg.FMQ_MSG_FILE_DELETE) {
+ if (server.reply.operation () == FmqMsg.FMQ_MSG_FILE_DELETE) {
zclock_log ("I: delete %s/%s", inbox, filename);
FmqFile file = new FmqFile (inbox, filename);
file.remove ();
@@ -430,85 +435,147 @@ private void processThePatch ()
}
}
- private void logAccessDenied ()
+ private void logAccessDenied (Server server)
{
System.out.println ("W: server denied us access, retrying...");
}
- private void logInvalidMessage ()
+ private void logInvalidMessage (Server server)
{
System.out.println ("E: server claims we sent an invalid message");
}
- private void logProtocolError ()
+ private void logProtocolError (Server server)
{
System.out.println ("E: protocol error");
}
- private void terminateTheServer ()
+ private void controlMessage ()
{
- connected = false;
- next_event = Event.terminate_event;
+ ZMsg msg = ZMsg.recvMsg (pipe);
+ String method = msg.popString ();
+ if (method.equals ("SUBSCRIBE")) {
+ String path = msg.popString ();
+ // Store subscription along with any previous ones
+ // Check we don't already have a subscription for this path
+ for (Sub sub: subs) {
+ if (path.equals (sub.path))
+ return;
+ }
+ // Subscription path must start with '/'
+ // We'll do better error handling later
+ assert (path.startsWith ("/"));
+
+ // New subscription, store it for later replay
+ String inbox = config.resolve ("client/inbox", ".inbox");
+ sub = new Sub (this, inbox, path);
+ subs.add (sub);
+ }
+ else
+ if (method.equals ("SET INBOX")) {
+ String path = msg.popString ();
+ config.setPath ("client/inbox", path);
+ }
+ else
+ if (method.equals ("SET RESYNC")) {
+ long enabled = Long.parseLong (msg.popString ());
+ // Request resynchronization from server
+ config.setPath ("client/resync", enabled > 0 ? "1" :"0");
+ }
+ else
+ if (method.equals ("CONFIG")) {
+ String config_file = msg.popString ();
+ config.destroy ();
+ config = FmqConfig.load (config_file);
+ if (config != null)
+ applyConfig ();
+ else {
+ System.out.printf ("E: cannot load config file '%s'\n", config_file);
+ config = new FmqConfig ("root", null);
+ }
+ }
+ else
+ if (method.equals ("SETOPTION")) {
+ String path = msg.popString ();
+ String value = msg.popString ();
+ config.setPath (path, value);
+ config ();
+ }
+ else
+ if (method.equals ("STOP")) {
+ pipe.send ("OK");
+ stopped = true;
+ }
+ else
+ if (method.equals ("CONNECT")) {
+ String endpoint = msg.popString ();
+ if (nbrServers < MAX_SERVERS) {
+ Server server = new Server (ctx, endpoint);
+ servers [nbrServers++] = server;
+ dirty = true;
+ serverExecute (server, Event.initialize_event);
+ } else
+ System.out.printf ("E: too many server connections (max %d)\n", MAX_SERVERS);
+ }
+ msg.destroy ();
+
}
// Execute state machine as long as we have events
-
- private void execute (Event event)
+ private void serverExecute (Server server, Event event)
{
- next_event = event;
- while (next_event != null) {
- event = next_event;
- next_event = null;
- switch (state) {
+ server.next_event = event;
+ while (server.next_event != null) {
+ event = server.next_event;
+ server.next_event = null;
+ switch (server.state) {
case start_state:
if (event == Event.initialize_event) {
- request.setId (FmqMsg.OHAI);
- request.send (dealer);
- request = new FmqMsg (0);
- state = State.requesting_access_state;
+ server.request.setId (FmqMsg.OHAI);
+ server.request.send (server.dealer);
+ server.request = new FmqMsg (0);
+ server.state = State.requesting_access_state;
}
else
if (event == Event.srsly_event) {
- logAccessDenied ();
- terminateTheServer ();
- state = State.start_state;
+ logAccessDenied (server);
+ server.state = State.terminated_state;
}
else
if (event == Event.rtfm_event) {
- logInvalidMessage ();
- terminateTheServer ();
+ logInvalidMessage (server);
+ server.state = State.terminated_state;
}
else {
// Process all other events
- logProtocolError ();
- terminateTheServer ();
+ logProtocolError (server);
+ server.state = State.terminated_state;
}
break;
case requesting_access_state:
if (event == Event.orly_event) {
- trySecurityMechanism ();
- request.setId (FmqMsg.YARLY);
- request.send (dealer);
- request = new FmqMsg (0);
- state = State.requesting_access_state;
+ trySecurityMechanism (server);
+ server.request.setId (FmqMsg.YARLY);
+ server.request.send (server.dealer);
+ server.request = new FmqMsg (0);
+ server.state = State.requesting_access_state;
}
else
if (event == Event.ohai_ok_event) {
- connectedToServer ();
- getFirstSubscription ();
- state = State.subscribing_state;
+ connectedToServer (server);
+ getFirstSubscription (server);
+ server.state = State.subscribing_state;
}
else
if (event == Event.srsly_event) {
- logAccessDenied ();
- terminateTheServer ();
- state = State.start_state;
+ logAccessDenied (server);
+ server.state = State.terminated_state;
}
else
if (event == Event.rtfm_event) {
- logInvalidMessage ();
- terminateTheServer ();
+ logInvalidMessage (server);
+ server.state = State.terminated_state;
}
else {
// Process all other events
@@ -517,213 +584,166 @@ private void execute (Event event)
case subscribing_state:
if (event == Event.ok_event) {
- formatIcanhazCommand ();
- request.setId (FmqMsg.ICANHAZ);
- request.send (dealer);
- request = new FmqMsg (0);
- getNextSubscription ();
- state = State.subscribing_state;
+ formatIcanhazCommand (server);
+ server.request.setId (FmqMsg.ICANHAZ);
+ server.request.send (server.dealer);
+ server.request = new FmqMsg (0);
+ getNextSubscription (server);
+ server.state = State.subscribing_state;
}
else
if (event == Event.finished_event) {
- refillCreditAsNeeded ();
- state = State.ready_state;
+ refillCreditAsNeeded (server);
+ server.state = State.ready_state;
}
else
if (event == Event.srsly_event) {
- logAccessDenied ();
- terminateTheServer ();
- state = State.start_state;
+ logAccessDenied (server);
+ server.state = State.terminated_state;
}
else
if (event == Event.rtfm_event) {
- logInvalidMessage ();
- terminateTheServer ();
+ logInvalidMessage (server);
+ server.state = State.terminated_state;
}
else {
// Process all other events
- logProtocolError ();
- terminateTheServer ();
+ logProtocolError (server);
+ server.state = State.terminated_state;
}
break;
case ready_state:
if (event == Event.cheezburger_event) {
- processThePatch ();
- refillCreditAsNeeded ();
+ processThePatch (server);
+ refillCreditAsNeeded (server);
}
else
if (event == Event.hugz_event) {
- request.setId (FmqMsg.HUGZ_OK);
- request.send (dealer);
- request = new FmqMsg (0);
+ server.request.setId (FmqMsg.HUGZ_OK);
+ server.request.send (server.dealer);
+ server.request = new FmqMsg (0);
}
else
if (event == Event.send_credit_event) {
- request.setId (FmqMsg.NOM);
- request.send (dealer);
- request = new FmqMsg (0);
+ server.request.setId (FmqMsg.NOM);
+ server.request.send (server.dealer);
+ server.request = new FmqMsg (0);
}
else
if (event == Event.icanhaz_ok_event) {
}
else
if (event == Event.srsly_event) {
- logAccessDenied ();
- terminateTheServer ();
- state = State.start_state;
+ logAccessDenied (server);
+ server.state = State.terminated_state;
}
else
if (event == Event.rtfm_event) {
- logInvalidMessage ();
- terminateTheServer ();
+ logInvalidMessage (server);
+ server.state = State.terminated_state;
}
else {
// Process all other events
- logProtocolError ();
- terminateTheServer ();
+ logProtocolError (server);
+ server.state = State.terminated_state;
}
break;
- }
- if (next_event == Event.terminate_event) {
- stopped = true;
+ case terminated_state:
+ if (event == Event.srsly_event) {
+ logAccessDenied (server);
+ server.state = State.terminated_state;
+ }
+ else
+ if (event == Event.rtfm_event) {
+ logInvalidMessage (server);
+ server.state = State.terminated_state;
+ }
+ else {
+ // Process all other events
+ }
break;
- }
- }
- }
-
- // Restart client dialog from zero
- private void restart (String endpoint)
- {
- // Reconnect to new endpoint if specified
- if (endpoint != null) {
- if (dealer != null)
- ctx.destroySocket (dealer);
- dealer = ctx.createSocket (ZMQ.DEALER);
- dealer.connect (endpoint);
+ }
}
- // Clear out any previous request data
- if (request != null)
- request.destroy ();
- request = new FmqMsg (0);
-
- // Restart dialog state machine from zero
- state = State.start_state;
- expires_at = 0;
-
- // Application hook to reinitialize dialog
- // Provides us with an event to kick things off
- initializeTheClient ();
- execute (next_event);
}
- private void controlMessage ()
+ private void serverMessage (Server server)
{
- ZMsg msg = ZMsg.recvMsg (pipe);
- String method = msg.popString ();
- if (method.equals ("SUBSCRIBE")) {
- String path = msg.popString ();
- // Store subscription along with any previous ones
- // Check we don't already have a subscription for this path
- for (Sub sub: subs) {
- if (path.equals (sub.path))
- return;
- }
- // Subscription path must start with '/'
- // We'll do better error handling later
- assert (path.startsWith ("/"));
-
- // New subscription, store it for later replay
- String inbox = config.resolve ("client/inbox", ".inbox");
- sub = new Sub (this, inbox, path);
- subs.add (sub);
-
- // If we're connected, then also send to server
- if (connected)
- next_event = Event.subscribe_event;
- }
+ if (server.reply != null)
+ server.reply.destroy ();
+ server.reply = FmqMsg.recv (server.dealer);
+ if (server.reply == null)
+ return; // Interrupted; do nothing
+ // Any input from server counts as activity
+ server.expires_at = System.currentTimeMillis () + heartbeat * 2;
+
+ if (server.reply.id () == FmqMsg.SRSLY)
+ serverExecute (server, Event.srsly_event);
else
- if (method.equals ("SET INBOX")) {
- String path = msg.popString ();
- config.setPath ("client/inbox", path);
- }
+ if (server.reply.id () == FmqMsg.RTFM)
+ serverExecute (server, Event.rtfm_event);
else
- if (method.equals ("SET RESYNC")) {
- long enabled = Long.parseLong (msg.popString ());
- // Request resynchronization from server
- config.setPath ("client/resync", enabled > 0 ? "1" :"0");
- }
+ if (server.reply.id () == FmqMsg.ORLY)
+ serverExecute (server, Event.orly_event);
else
- if (method.equals ("CONNECT")) {
- String endpoint = msg.popString ();
- restart (endpoint);
- }
+ if (server.reply.id () == FmqMsg.OHAI_OK)
+ serverExecute (server, Event.ohai_ok_event);
else
- if (method.equals ("CONFIG")) {
- String config_file = msg.popString ();
- config.destroy ();
- config = FmqConfig.load (config_file);
- if (config != null)
- applyConfig ();
- else {
- System.out.printf ("E: cannot load config file '%s'\n", config_file);
- config = new FmqConfig ("root", null);
- }
- }
+ if (server.reply.id () == FmqMsg.CHEEZBURGER)
+ serverExecute (server, Event.cheezburger_event);
else
- if (method.equals ("SETOPTION")) {
- String path = msg.popString ();
- String value = msg.popString ();
- config.setPath (path, value);
- config ();
- }
+ if (server.reply.id () == FmqMsg.HUGZ)
+ serverExecute (server, Event.hugz_event);
else
- if (method.equals ("STOP")) {
- pipe.send ("OK");
- stopped = true;
- }
- msg.destroy ();
+ if (server.reply.id () == FmqMsg.ICANHAZ_OK)
+ serverExecute (server, Event.icanhaz_ok_event);
+
+ }
+
+
+
+ }
+ private static class Server {
+ // Properties accessible to server actions
+ private Event next_event; // Next event
+
+ private int credit; // Current credit pending
+ private FmqFile file; // File we're writing to
+ // Properties you should NOT touch
+ private final ZContext ctx; // Own CZMQ context
+ private int index; // Index into client->server_array
+ private Socket dealer; // Socket to back to server
+ private long expires_at; // Connection expires at
+ private State state; // Current state
+ private Event event; // Current event
+ private final String endpoint; // server endpoint
+ private FmqMsg request; // Next message to send
+ private FmqMsg reply; // Last received reply
- if (next_event != null)
- execute (next_event);
+ private Server (ZContext ctx, String endpoint)
+ {
+ this.ctx = ctx;
+ this.endpoint = endpoint;
+ dealer = ctx.createSocket (ZMQ.DEALER);
+ request = new FmqMsg (0);
+ dealer.connect (endpoint);
+ state = State.start_state;
+
}
- private void serverMessage ()
+ private void destory ()
{
+ ctx.destroySocket (dealer);
+ request.destroy ();
if (reply != null)
reply.destroy ();
- reply = FmqMsg.recv (dealer);
- if (reply == null)
- return; // Interrupted; do nothing
-
- if (reply.id () == FmqMsg.SRSLY)
- execute (Event.srsly_event);
- else
- if (reply.id () == FmqMsg.RTFM)
- execute (Event.rtfm_event);
- else
- if (reply.id () == FmqMsg.ORLY)
- execute (Event.orly_event);
- else
- if (reply.id () == FmqMsg.OHAI_OK)
- execute (Event.ohai_ok_event);
- else
- if (reply.id () == FmqMsg.CHEEZBURGER)
- execute (Event.cheezburger_event);
- else
- if (reply.id () == FmqMsg.HUGZ)
- execute (Event.hugz_event);
- else
- if (reply.id () == FmqMsg.ICANHAZ_OK)
- execute (Event.icanhaz_ok_event);
-
- // Any input from server counts as activity
- expires_at = System.currentTimeMillis () + heartbeat * 2;
+
}
}
+
// Finally here's the client thread itself, which polls its two
// sockets and processes incoming messages
@@ -739,9 +759,14 @@ public void run (Object [] args, ZContext ctx, Socket pipe)
Poller items = ctx.getContext ().poller ();
items.register (self.pipe, Poller.POLLIN);
- // Build structure each time since self->dealer can change
- if (self.dealer != null)
- items.register (self.dealer, Poller.POLLIN);
+ int serverNbr = 0;
+ // Rebuild pollset if we need to
+ if (self.dirty) {
+ for (serverNbr = 0; serverNbr < self.nbrServers; serverNbr++) {
+ Server server = self.servers [serverNbr];
+ items.register (server.dealer, Poller.POLLIN);
+ }
+ }
if (items.poll (self.heartbeat) == -1)
break; // Context has been shut down
@@ -751,12 +776,13 @@ public void run (Object [] args, ZContext ctx, Socket pipe)
if (items.pollin (0))
self.controlMessage ();
- if (items.pollin (1))
- self.serverMessage ();
-
- // Check whether server seems dead
- if (self.expires_at > 0 && System.currentTimeMillis () >= self.expires_at)
- self.restart (null);
+ // Here, array of sockets to servers
+ for (serverNbr = 0; serverNbr < self.nbrServers; serverNbr++) {
+ if (items.pollin (serverNbr + 1)) {
+ Server server = self.servers [serverNbr];
+ self.serverMessage (server);
+ }
+ }
}
self.destroy ();
}
View
6 java/src/main/java/org/filemq/FmqMsg.java
@@ -736,13 +736,9 @@ public boolean send (Socket socket)
// Send the OHAI to the socket in one step
public static void sendOhai (
- Socket output,
- String protocol,
- int version)
+ Socket output)
{
FmqMsg self = new FmqMsg (FmqMsg.OHAI);
- self.setProtocol (protocol);
- self.setVersion (version);
self.send (output);
}
View
6 java/src/main/java/org/filemq/FmqServer.java
@@ -480,6 +480,8 @@ private static void clientPing (Map <String, Client> clients , Server server)
private static class Server {
// Properties accessible to client actions
+ private List <Mount> mounts; // Mount points
+ private int port; // Server port
// Properties you should NOT touch
private ZContext ctx; // Own CZMQ context
@@ -511,6 +513,7 @@ private Server (ZContext ctx, Socket pipe)
clients = new HashMap <String, Client> ();
config = new FmqConfig ("root", null);
config ();
+ mounts = new ArrayList <Mount> ();
}
private void destroy ()
@@ -519,6 +522,9 @@ private void destroy ()
config.destroy ();
for (Client c: clients.values ())
c.destroy ();
+ // Destroy mount points
+ for (Mount mount : mounts)
+ mount.destroy ();
}
// Apply configuration tree:
View
263 scripts/client_java.gsl
@@ -51,6 +51,9 @@ import org.zeromq.ZFrame;
// Structure of our front-end API class
public class $(class.name:pascal) {
+
+ private final static int MAX_SERVERS = 256;
+
ZContext ctx; // CZMQ context
Socket pipe; // Pipe through to client
@@ -216,7 +219,6 @@ long $(name)\
};
private enum Event {
- terminate_event (-1),
.for class.event
$(name:c)_event ($(index ()))$(last ()??";" ?",")
.endfor
@@ -257,8 +259,6 @@ long $(name)\
private static class Client {
// Properties accessible to client actions
- private Event next_event; // Next event
-
.for class.self
. for context
$(string.trim (context.?''):block )
@@ -267,15 +267,12 @@ long $(name)\
// Properties you should NOT touch
private ZContext ctx; // Own CZMQ context
private Socket pipe; // Socket to back to caller
- private Socket dealer; // Socket to talk to server
+ private final Server [] servers; // Server connections
+ private int nbrServers; // How many connections we have
+ private boolean dirty; // If true, rebuild pollset
private boolean stopped; // Has client stopped?
private FmqConfig config; // Configuration tree
- private State state; // Current state
- private Event event; // Current event
- private $(codec:pascal) request; // Next message to send
- private $(codec:pascal) reply; // Last received reply
private int heartbeat; // Heartbeat interval
- private long expires_at; // Server expires at
private void config ()
{
@@ -287,6 +284,7 @@ long $(name)\
{
this.ctx = ctx;
this.pipe = pipe;
+ this.servers = new Server [MAX_SERVERS];
this.config = new FmqConfig ("root", null);
config ();
@@ -300,10 +298,10 @@ long $(name)\
{
if (config != null)
config.destroy ();
- if (request != null)
- request.destroy ();
- if (reply != null)
- reply.destroy ();
+ for (int serverNbr = 0; serverNbr < nbrServers; serverNbr++) {
+ Server server = servers [serverNbr];
+ server.destory ();
+ }
.for class.self
. for destruct
$(string.trim (destruct.?''):block )
@@ -353,25 +351,25 @@ long $(name)\
. if switches.animate ?= 1
zclock_log ("C: + send $(MESSAGE:C)");
. endif
- request.setId ($(codec:pascal).$(MESSAGE:C));
+ server.request.setId ($(codec:pascal).$(MESSAGE:C));
.if switches.trace ?= 1
zclock_log ("Send request to server");
- $(codec)_dump (self->request);
+ $(codec)_dump (request);
.endif
- request.send (dealer);
- request = new $(codec:pascal) (0);
+ server.request.send (server.dealer);
+ server.request = new $(codec:pascal) (0);
. else
. if switches.animate ?= 1
zclock_log ("C: + $(name)");
. endif
- $(name:camel) ();
+ $(name:camel) (server);
. if count (class.action, name = -1.name) = 0
. echo 'E: you need to add <action name="$(name)">'
. endif
. endif
. endfor
. if defined (event.next)
- state = State.$(next:c)_state;
+ server.state = State.$(next:c)_state;
. endif
.endmacro
.#
@@ -380,83 +378,12 @@ long $(name)\
// Custom actions for state machine
. endif
- private void $(name:camel) ()
+ private void $(name:camel) (Server server)
{
$(string.trim (action.?''):block )
}
.endfor
- // Execute state machine as long as we have events
-
- private void execute (Event event)
- {
- next_event = event;
- while (next_event != null) {
- event = next_event;
- next_event = null;
-.if switches.animate ?= 1
- zclock_log ("C: %s:", s_state_name [self->state]);
- zclock_log ("C: (%s)", s_event_name [self->event]);
-.endif
- switch (state) {
-.for class.state
- case $(name:c)_state:
-. for event where name <> "$other"
-. if index () > 1
- else
-. endif
- if (event == Event.$(name:c)_event) {
-. output_event_body ()
- }
-. endfor
-. for event where name = "$other"
- else {
- // Process all other events
-. output_event_body ()
- }
-. endfor
- break;
-
-.endfor
- }
-.if switches.animate ?= 1
- zclock_log ("C: -------------------> %s", s_state_name [self->state]);
-.endif
- if (next_event == Event.terminate_event) {
- stopped = true;
- break;
- }
- }
- }
-
- // Restart client dialog from zero
-
- private void restart (String endpoint)
- {
- // Reconnect to new endpoint if specified
- if (endpoint != null) {
- if (dealer != null)
- ctx.destroySocket (dealer);
- dealer = ctx.createSocket (ZMQ.DEALER);
- dealer.connect (endpoint);
- }
- // Clear out any previous request data
- if (request != null)
- request.destroy ();
- request = new $(codec:pascal) (0);
-
- // Restart dialog state machine from zero
-.for class.state where item () = 1
- state = State.$(name:c)_state;
-.endfor
- expires_at = 0;
-
- // Application hook to reinitialize dialog
- // Provides us with an event to kick things off
- initializeTheClient ();
- execute (next_event);
- }
-
private void controlMessage ()
{
ZMsg msg = ZMsg.recvMsg (pipe);
@@ -479,11 +406,6 @@ long $(name)\
}
else
.endfor
- if (method.equals ("CONNECT")) {
- String endpoint = msg.popString ();
- restart (endpoint);
- }
- else
if (method.equals ("CONFIG")) {
String config_file = msg.popString ();
config.destroy ();
@@ -507,38 +429,139 @@ long $(name)\
pipe.send ("OK");
stopped = true;
}
+ else
+ if (method.equals ("CONNECT")) {
+ String endpoint = msg.popString ();
+ if (nbrServers < MAX_SERVERS) {
+ Server server = new Server (ctx, endpoint);
+ servers [nbrServers++] = server;
+ dirty = true;
+ serverExecute (server, Event.initialize_event);
+ } else
+ System.out.printf ("E: too many server connections (max %d)\\n", MAX_SERVERS);
+ }
msg.destroy ();
- if (next_event != null)
- execute (next_event);
}
- private void serverMessage ()
+ // Execute state machine as long as we have events
+ private void serverExecute (Server server, Event event)
{
- if (reply != null)
- reply.destroy ();
- reply = FmqMsg.recv (dealer);
- if (reply == null)
+ server.next_event = event;
+ while (server.next_event != null) {
+ event = server.next_event;
+ server.next_event = null;
+.if switches.animate ?= 1
+ zclock_log ("C: %s:", s_state_name [server.state]);
+ zclock_log ("C: (%s)", s_event_name [server.event]);
+.endif
+ switch (server.state) {
+.for class.state
+ case $(name:c)_state:
+. for event where name <> "$other"
+. if index () > 1
+ else
+. endif
+ if (event == Event.$(name:c)_event) {
+. output_event_body ()
+ }
+. endfor
+. for event where name = "$other"
+ else {
+ // Process all other events
+. output_event_body ()
+ }
+. endfor
+ break;
+
+.endfor
+ }
+.if switches.animate ?= 1
+ zclock_log ("C: -------------------> %s", s_state_name [server.state]);
+.endif
+ }
+ }
+
+ private void serverMessage (Server server)
+ {
+ if (server.reply != null)
+ server.reply.destroy ();
+ server.reply = FmqMsg.recv (server.dealer);
+ if (server.reply == null)
return; // Interrupted; do nothing
.if switches.trace ?= 1
zclock_log ("Received reply from server");
- $(codec)_dump (self->reply);
+ $(codec)_dump (server.reply);
.endif
+ // Any input from server counts as activity
+ server.expires_at = System.currentTimeMillis () + heartbeat * 2;
. for class.event where external ?= 1
. if index () > 1
else
. endif
- if (reply.id () == $(codec:pascal).$(NAME:C))
- execute (Event.$(name:c)_event);
+ if (server.reply.id () == $(codec:pascal).$(NAME:C))
+ serverExecute (server, Event.$(name:c)_event);
. endfor
- // Any input from server counts as activity
- expires_at = System.currentTimeMillis () + heartbeat * 2;
}
+
+
}
+ private static class Server {
+ // Properties accessible to server actions
+ private Event next_event; // Next event
+
+.for class.server
+. for context
+ $(string.trim (context.?''):block )
+. endfor
+.endfor
+ // Properties you should NOT touch
+ private final ZContext ctx; // Own CZMQ context
+ private int index; // Index into client->server_array
+ private Socket dealer; // Socket to back to server
+ private long expires_at; // Connection expires at
+ private State state; // Current state
+ private Event event; // Current event
+ private final String endpoint; // server endpoint
+ private $(codec:pascal) request; // Next message to send
+ private $(codec:pascal) reply; // Last received reply
+
+ private Server (ZContext ctx, String endpoint)
+ {
+ this.ctx = ctx;
+ this.endpoint = endpoint;
+ dealer = ctx.createSocket (ZMQ.DEALER);
+ request = new $(codec:pascal) (0);
+ dealer.connect (endpoint);
+.for class.state where item () = 1
+ state = State.$(name:c)_state;
+.endfor
+.for class.server
+. for construct
+ $(string.trim (construct.?''):block )
+. endfor
+.endfor
+ }
+
+ private void destory ()
+ {
+ ctx.destroySocket (dealer);
+ request.destroy ();
+ if (reply != null)
+ reply.destroy ();
+.for class.server
+. for destruct
+ $(string.trim (destruct.?''):block )
+. endfor
+.endfor
+ }
+
+ }
+
// Finally here's the client thread itself, which polls its two
// sockets and processes incoming messages
@@ -554,9 +577,14 @@ long $(name)\
Poller items = ctx.getContext ().poller ();
items.register (self.pipe, Poller.POLLIN);
- // Build structure each time since self->dealer can change
- if (self.dealer != null)
- items.register (self.dealer, Poller.POLLIN);
+ int serverNbr = 0;
+ // Rebuild pollset if we need to
+ if (self.dirty) {
+ for (serverNbr = 0; serverNbr < self.nbrServers; serverNbr++) {
+ Server server = self.servers [serverNbr];
+ items.register (server.dealer, Poller.POLLIN);
+ }
+ }
if (items.poll (self.heartbeat) == -1)
break; // Context has been shut down
@@ -566,12 +594,13 @@ long $(name)\
if (items.pollin (0))
self.controlMessage ();
- if (items.pollin (1))
- self.serverMessage ();
-
- // Check whether server seems dead
- if (self.expires_at > 0 && System.currentTimeMillis () >= self.expires_at)
- self.restart (null);
+ // Here, array of sockets to servers
+ for (serverNbr = 0; serverNbr < self.nbrServers; serverNbr++) {
+ if (items.pollin (serverNbr + 1)) {
+ Server server = self.servers [serverNbr];
+ self.serverMessage (server);
+ }
+ }
}
self.destroy ();
}
View
2 scripts/codec_c.gsl
@@ -735,7 +735,7 @@ $(class.name)_send ($(class.name)_t **self_p, void *output)
int
$(class.name)_send_$(name) (
void *output\
-.for field
+.for field where !defined (value)
,
. if type = "number"
$(ctype) $(name)\
View
4 scripts/codec_java.gsl
@@ -519,7 +519,7 @@ public class $(ClassName)
public static void send$(Name) (
Socket output\
-.for field
+.for field where !defined (value)
,
. if type = "number"
$(ctype) $(name)\
@@ -538,7 +538,7 @@ public class $(ClassName)
)
{
$(ClassName) self = new $(ClassName) ($(ClassName).$(MESSAGE.NAME));
-.for field
+.for field where !defined (value)
. if type = "number" | type = "octets" | type = "string"
self.set$(Name) ($(name));
. elsif type = "strings"

0 comments on commit 63d7398

Please sign in to comment.
Something went wrong with that request. Please try again.