Permalink
Browse files

fmq_client allows multiple server connections

  • Loading branch information...
miniway committed Jan 8, 2013
1 parent 32fc373 commit c5f80da498676479599759322d43763b89517640
View
@@ -10,7 +10,7 @@ private static final int CREDIT_SLICE = 1000000;
private static final int CREDIT_MINIMUM = (CREDIT_SLICE * 4) + 1;
</declare>
<client>
<self>
<context>
private boolean connected; // Are we connected to server?
private List &lt;Sub&gt; subs; // Subscriptions
@@ -27,7 +27,18 @@ connected = false;
for (Sub sub: subs)
sub.destroy ();
</destruct>
</client>
</self>
<server>
<context>
private int credit; // Current credit pending
private FmqFile file; // File we're writing to
</context>
<construct>
</construct>
<destruct>
</destruct>
</server>
<!-- Embedded class for subscriptions -->
<include filename = "fmq_client_sub.xml" />
@@ -36,8 +47,8 @@ for (Sub sub: subs)
String login = config.resolve ("security/plain/login", "guest");
String password = config.resolve ("security/plain/password", "");
ZFrame frame = FmqSasl.plainEncode (login, password);
request.setMechanism ("PLAIN");
request.setResponse (frame);
server.request.setMechanism ("PLAIN");
server.request.setResponse (frame);
</action>
<action name = "connected to server">
@@ -48,78 +59,78 @@ connected = true;
subIterator = subs.iterator ();
if (subIterator.hasNext ()) {
sub = subIterator.next ();
next_event = Event.ok_event;
server.next_event = Event.ok_event;
} else
next_event = Event.finished_event;
server.next_event = Event.finished_event;
</action>
<action name = "get next subscription">
if (subIterator.hasNext ()) {
sub = subIterator.next ();
next_event = Event.ok_event;
server.next_event = Event.ok_event;
} else
next_event = Event.finished_event;
server.next_event = Event.finished_event;
</action>
<action name = "format icanhaz command">
request.setPath (sub.path);
server.request.setPath (sub.path);
// If client app wants full resync, send cache to server
if (Integer.parseInt (config.resolve ("client/resync", "0")) == 1) {
request.insertOptions ("RESYNC", "1");
request.setCache (sub.cache ());
server.request.insertOptions ("RESYNC", "1");
server.request.setCache (sub.cache ());
}
</action>
<action name = "refill credit as needed">
// If credit has fallen too low, send more credit
int credit_to_send = 0;
while (credit &lt; CREDIT_MINIMUM) {
while (server.credit &lt; CREDIT_MINIMUM) {
credit_to_send += CREDIT_SLICE;
credit += CREDIT_SLICE;
server.credit += CREDIT_SLICE;
}
if (credit_to_send > 0) {
request.setCredit (credit_to_send);
next_event = Event.send_credit_event;
server.request.setCredit (credit_to_send);
server.next_event = Event.send_credit_event;
}
</action>
<action name = "process the patch">
String inbox = config.resolve ("client/inbox", ".inbox");
String filename = reply.filename ();
String filename = server.reply.filename ();
// Filenames from server must start with slash, which we skip
assert (filename.startsWith ("/"));
filename = filename.substring (1);
if (reply.operation () == FmqMsg.FMQ_MSG_FILE_CREATE) {
if (file == null) {
file = new FmqFile (inbox, filename);
if (!file.output ()) {
if (server.reply.operation () == FmqMsg.FMQ_MSG_FILE_CREATE) {
if (server.file == null) {
server.file = new FmqFile (inbox, filename);
if (!server.file.output ()) {
// File not writeable, skip patch
file.destroy ();
file = null;
server.file.destroy ();
server.file = null;
return;
}
}
// Try to write, ignore errors in this version
ZFrame frame = reply.chunk ();
ZFrame frame = server.reply.chunk ();
FmqChunk chunk = new FmqChunk (frame.getData (), frame.size ());
if (chunk.size () > 0) {
file.write (chunk, reply.offset ());
credit -= chunk.size ();
server.file.write (chunk, server.reply.offset ());
server.credit -= chunk.size ();
}
else {
// Zero-sized chunk means end of file, so report back to caller
pipe.sendMore ("DELIVER");
pipe.sendMore (filename);
pipe.send (String.format ("%s/%s", inbox, filename));
file.destroy ();
file = null;
server.file.destroy ();
server.file = null;
}
chunk.destroy ();
}
else
if (reply.operation () == FmqMsg.FMQ_MSG_FILE_DELETE) {
if (server.reply.operation () == FmqMsg.FMQ_MSG_FILE_DELETE) {
zclock_log ("I: delete %s/%s", inbox, filename);
FmqFile file = new FmqFile (inbox, filename);
file.remove ();
@@ -140,11 +151,6 @@ System.out.println ("E: server claims we sent an invalid message");
System.out.println ("E: protocol error");
</action>
<action name = "terminate the server">
connected = false;
next_event = Event.terminate_event;
</action>
<method name = "subscribe">
<argument name = "path" type = "string" />
// Store subscription along with any previous ones
@@ -161,10 +167,6 @@ assert (path.startsWith ("/"));
String inbox = config.resolve ("client/inbox", ".inbox");
sub = new Sub (this, inbox, path);
subs.add (sub);
// If we're connected, then also send to server
if (connected)
next_event = Event.subscribe_event;
</method>
<method name = "set inbox">
@@ -9,7 +9,7 @@ FileMQ protocol server
private static final int CHUNK_SIZE = 1000000;
</declare>
<server>
<self>
<context>
private List &lt;Mount&gt; mounts; // Mount points
private int port; // Server port
@@ -22,7 +22,7 @@ mounts = new ArrayList &lt;Mount&gt; ();
for (Mount mount : mounts)
mount.destroy ();
</destruct>
</server>
</self>
<client>
<context>
Oops, something went wrong.

0 comments on commit c5f80da

Please sign in to comment.