Permalink
Browse files

Refactored Java code generation

* Separate java/model directory
* Use same XML language in C and Java models
* One model = one language
* Put common specs (FSMs) into separate included files
  • Loading branch information...
1 parent a111e85 commit 5fbe8e29f4703b8c03a79c540bf374fe7342cf60 @hintjens hintjens committed Dec 27, 2012
View
@@ -0,0 +1,194 @@
+<class name = "fmq_client" codec = "fmq_msg">
+This is the FILEMQ/1.0 client protocol handler
+<include filename = "license.xml" />
+<include filename = "../../model/fmq_client_fsm.xml" />
+
+<!-- Server and client contexts -->
+<declare>
+// There's no point making these configurable
+private static final int CREDIT_SLICE = 1000000;
+private static final int CREDIT_MINIMUM = (CREDIT_SLICE * 4) + 1;
+</declare>
+
+<self>
+<context>
+private boolean connected; // Are we connected to server?
+private List &lt;Sub&gt; subs; // Subscriptions
+private Sub sub; // Subscription we want to send
+private int credit; // Current credit pending
+private FmqFile file; // File we're writing to
+private Iterator &lt;Sub&gt; subIterator;
+</context>
+
+<construct>
+subs = new ArrayList &lt;Sub&gt; ();
+connected = false;
+</construct>
+
+<destruct>
+for (Sub sub: subs)
+ sub.destroy ();
+</destruct>
+</self>
+
+<!-- Embedded class for subscriptions -->
+<include filename = "fmq_client_sub.xml" />
+
+<action name = "initialize the client">
+next_event = Event.ready_event;
+</action>
+
+<action name = "try security mechanism">
+String login = config.resolve ("security/plain/login", "guest");
+String password = config.resolve ("security/plain/password", "");
+ZFrame frame = FmqSasl.plainEncode (login, password);
+request.setMechanism ("PLAIN");
+request.setResponse (frame);
+</action>
+
+<action name = "connected to server">
+connected = true;
+</action>
+
+<action name = "get first subscription">
+subIterator = subs.iterator ();
+if (subIterator.hasNext ()) {
+ sub = subIterator.next ();
+ next_event = Event.ok_event;
+} else
+ next_event = Event.finished_event;
+</action>
+
+<action name = "get next subscription">
+if (subIterator.hasNext ()) {
+ sub = subIterator.next ();
+ next_event = Event.ok_event;
+} else
+ next_event = Event.finished_event;
+</action>
+
+<action name = "format icanhaz command">
+request.setPath (sub.path);
+// If client app wants full resync, send cache to server
+if (Integer.parseInt (config.resolve ("client/resync", "0")) == 1) {
+ request.insertOptions ("RESYNC", "1");
+ request.setCache (sub.cache ());
+}
+</action>
+
+<action name = "refill credit as needed">
+// If credit has fallen too low, send more credit
+int credit_to_send = 0;
+while (credit &lt; CREDIT_MINIMUM) {
+ credit_to_send += CREDIT_SLICE;
+ credit += CREDIT_SLICE;
+}
+if (credit_to_send > 0) {
+ request.setCredit (credit_to_send);
+ next_event = Event.send_credit_event;
+}
+</action>
+
+<action name = "process the patch">
+String inbox = config.resolve ("client/inbox", ".inbox");
+String filename = reply.filename ();
+
+// Filenames from server must start with slash, which we skip
+assert (filename.startsWith ("/"));
+filename = filename.substring (1);
+
+if (reply.operation () == FmqMsg.FMQ_MSG_FILE_CREATE) {
+ if (file == null) {
+ file = new FmqFile (inbox, filename);
+ if (!file.output ()) {
+ // File not writeable, skip patch
+ file.destroy ();
+ file = null;
+ return;
+ }
+ }
+ // Try to write, ignore errors in this version
+ ZFrame frame = reply.chunk ();
+ FmqChunk chunk = new FmqChunk (frame.getData (), frame.size ());
+ if (chunk.size () > 0) {
+ file.write (chunk, reply.offset ());
+ credit -= chunk.size ();
+ }
+ else {
+ // Zero-sized chunk means end of file, so report back to caller
+ pipe.sendMore ("DELIVER");
+ pipe.sendMore (filename);
+ pipe.send (String.format ("%s/%s", inbox, filename));
+ file.destroy ();
+ file = null;
+ }
+ chunk.destroy ();
+}
+else
+if (reply.operation () == FmqMsg.FMQ_MSG_FILE_DELETE) {
+ zclock_log ("I: delete %s/%s", inbox, filename);
+ FmqFile file = new FmqFile (inbox, filename);
+ file.remove ();
+ file.destroy ();
+ file = null;
+}
+</action>
+
+<action name = "log access denied">
+System.out.println ("W: server denied us access, retrying...");
+</action>
+
+<action name = "log invalid message">
+System.out.println ("E: server claims we sent an invalid message");
+</action>
+
+<action name = "log protocol error">
+System.out.println ("E: protocol error");
+</action>
+
+<action name = "terminate the client">
+connected = false;
+next_event = Event.terminate_event;
+</action>
+
+<method name = "subscribe">
+<argument name = "path" type = "string" />
+// Store subscription along with any previous ones
+// Check we don't already have a subscription for this path
+for (Sub sub: subs) {
+ if (path.equals (sub.path))
+ return;
+}
+// Subscription path must start with '/'
+// We'll do better error handling later
+assert (path.startsWith ("/"));
+
+// New subscription, store it for later replay
+String inbox = config.resolve ("client/inbox", ".inbox");
+sub = new Sub (this, inbox, path);
+subs.add (sub);
+
+// If we're connected, then also send to server
+if (connected)
+ next_event = Event.subscribe_event;
+</method>
+
+<method name = "set inbox">
+<argument name = "path" type = "string" />
+config.setPath ("client/inbox", path);
+</method>
+
+<method name = "set resync">
+<argument name = "enabled" type = "number" />
+// Request resynchronization from server
+config.setPath ("client/resync", enabled > 0 ? "1" :"0");
+</method>
+
+<selftest config = "client_test.cfg">
+<init>
+self.connect ("tcp://localhost:6001");
+try { Thread.sleep (1000); } catch (Exception e) {}
+</init>
+</selftest>
+
+</class>
@@ -0,0 +1,33 @@
+<declare>
+// Subscription in memory
+private static class Sub {
+ private Client client; // Pointer to parent client
+ private String inbox; // Inbox location
+ private String path; // Path we subscribe to
+
+ private Sub (Client client, String inbox, String path)
+ {
+ this.client = client;
+ this.inbox = inbox;
+ this.path = path;
+ }
+
+ private void destroy ()
+ {
+ }
+
+ // Return new cache object for subscription path
+
+ private Map &lt;String, String&gt; cache ()
+ {
+ // Get directory cache for this path
+ FmqDir dir = FmqDir.newFmqDir (path.substring(1), inbox);
+ if (dir != null) {
+ Map &lt;String, String&gt; cache = dir.cache ();
+ dir.destroy ();
+ return cache;
+ }
+ return null;
+ }
+}
+</declare>
Oops, something went wrong.

0 comments on commit 5fbe8e2

Please sign in to comment.