Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Send NAK when the command is filtered out #30

Merged
merged 1 commit into from

2 participants

@hurtonm
Owner

This pull request extends the broker with siganlling so that the client can find out the command was dropped due to filtering.

@hintjens hintjens merged commit 8c7b594 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 30, 2012
  1. @hurtonm
This page is out of date. Refresh to see the latest.
View
6 libmdp/include/mdp_common.h
@@ -33,11 +33,11 @@
// MDP/Client commands, as strings
#define MDPC_REQUEST "\001"
-#define MDPC_PARTIAL "\002"
-#define MDPC_FINAL "\003"
+#define MDPC_REPORT "\002"
+#define MDPC_NAK "\003"
static char *mdpc_commands [] = {
- NULL, "REQUEST", "PARTIAL", "FINAL",
+ NULL, "REQUEST", "REPORT", "NAK",
};
// This is the version of MDP/Worker we implement
View
59 libmdp/src/mdp_broker.c
@@ -79,7 +79,7 @@ static service_t *
static void
s_service_destroy (void *argument);
static void
- s_service_dispatch (service_t *service, zframe_t *sender, zmsg_t *msg);
+ s_service_dispatch (service_t *service);
static void
s_service_enable_command (service_t *self, const char *command);
static void
@@ -183,7 +183,7 @@ s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
zlist_append (worker->service->waiting, worker);
worker->service->workers++;
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
- s_service_dispatch (worker->service, NULL, NULL);
+ s_service_dispatch (worker->service);
zframe_destroy (&service_frame);
zclock_log ("worker created");
}
@@ -195,6 +195,7 @@ s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_pushstr (msg, worker->service->name);
+ zmsg_pushstr (msg, MDPC_REPORT);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
@@ -280,13 +281,35 @@ s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
// Insert the protocol header and service name, then rewrap envelope.
zmsg_push (msg, zframe_dup (service_frame));
+ zmsg_pushstr (msg, MDPC_REPORT);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, zframe_dup (sender));
zmsg_send (&msg, self->socket);
}
- else
- // Else dispatch the message to the requested service
- s_service_dispatch (service, sender, msg);
+ else {
+ int enabled = 1;
+ if (zmsg_size (msg) >= 1) {
+ zframe_t *cmd_frame = zmsg_first (msg);
+ char *cmd = zframe_strdup (cmd_frame);
+ enabled = s_service_is_command_enabled (service, cmd);
+ free (cmd);
+ }
+
+ // Forward the message to the worker.
+ if (enabled) {
+ zmsg_wrap (msg, zframe_dup (sender));
+ zlist_append (service->requests, msg);
+ s_service_dispatch (service);
+ }
+ // Send a NAK message back to the client.
+ else {
+ zmsg_push (msg, zframe_dup (service_frame));
+ zmsg_pushstr (msg, MDPC_NAK);
+ zmsg_pushstr (msg, MDPC_CLIENT);
+ zmsg_wrap (msg, zframe_dup (sender));
+ zmsg_send (&msg, self->socket);
+ }
+ }
zframe_destroy (&service_frame);
}
@@ -368,33 +391,11 @@ s_service_destroy (void *argument)
}
// The dispatch method sends request to the worker.
-// If no worker is available, we put the request into
-// the queue.
-
static void
-s_service_dispatch (service_t *self, zframe_t *sender, zmsg_t *msg)
+s_service_dispatch (service_t *self)
{
assert (self);
- if (msg) {
- int enabled = 1;
- if (zmsg_size (msg) >= 1) {
- zframe_t *cmd_frame = zmsg_first (msg);
- char *cmd = zframe_strdup (cmd_frame);
- enabled = s_service_is_command_enabled (self, cmd);
- free (cmd);
- }
- // If the command is blacklisted, we drop the message;
- // otherwise, we queue the message.
- if (enabled) {
- // Set reply return address to client sender
- zmsg_wrap (msg, zframe_dup (sender));
- zlist_append (self->requests, msg);
- }
- else
- zmsg_destroy (&msg);
- }
-
s_broker_purge (self->broker);
if (zlist_size (self->waiting) == 0)
return;
@@ -404,7 +405,7 @@ s_service_dispatch (service_t *self, zframe_t *sender, zmsg_t *msg)
zlist_remove (self->waiting, worker);
zmsg_t *msg = (zmsg_t*)zlist_pop (self->requests);
s_worker_send (worker, MDPW_REQUEST, NULL, msg);
- // Workers are scheduled in the RR fashion
+ // Workers are scheduled in the round-robin fashion
zlist_append (self->waiting, worker);
zmsg_destroy (&msg);
}
View
7 libmdp/src/mdp_client.c
@@ -170,11 +170,12 @@ mdp_client_recv (mdp_client_t *self, char **service_p)
// Message format:
// Frame 1: empty frame (delimiter)
// Frame 2: "MDPCxy" (six bytes, MDP/Client x.y)
- // Frame 3: Service name (printable string)
- // Frame 4..n: Application frames
+ // Frame 3: REPORT|NAK
+ // Frame 4: Service name (printable string)
+ // Frame 5..n: Application frames
// We would handle malformed replies better in real code
- assert (zmsg_size (msg) >= 4);
+ assert (zmsg_size (msg) >= 5);
zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
Something went wrong with that request. Please try again.