Skip to content

Commit

Permalink
Send NAK when the command is filtered out
Browse files Browse the repository at this point in the history
  • Loading branch information
hurtonm committed Sep 30, 2012
1 parent a71f4d9 commit 5442f46
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 35 deletions.
6 changes: 3 additions & 3 deletions libmdp/include/mdp_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@

// MDP/Client commands, as strings
#define MDPC_REQUEST "\001"
#define MDPC_PARTIAL "\002"
#define MDPC_FINAL "\003"
#define MDPC_REPORT "\002"
#define MDPC_NAK "\003"

static char *mdpc_commands [] = {
NULL, "REQUEST", "PARTIAL", "FINAL",
NULL, "REQUEST", "REPORT", "NAK",
};

// This is the version of MDP/Worker we implement
Expand Down
59 changes: 30 additions & 29 deletions libmdp/src/mdp_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static service_t *
static void
s_service_destroy (void *argument);
static void
s_service_dispatch (service_t *service, zframe_t *sender, zmsg_t *msg);
s_service_dispatch (service_t *service);
static void
s_service_enable_command (service_t *self, const char *command);
static void
Expand Down Expand Up @@ -183,7 +183,7 @@ s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
zlist_append (worker->service->waiting, worker);
worker->service->workers++;
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
s_service_dispatch (worker->service, NULL, NULL);
s_service_dispatch (worker->service);
zframe_destroy (&service_frame);
zclock_log ("worker created");
}
Expand All @@ -195,6 +195,7 @@ s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_pushstr (msg, worker->service->name);
zmsg_pushstr (msg, MDPC_REPORT);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
Expand Down Expand Up @@ -280,13 +281,35 @@ s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)

// Insert the protocol header and service name, then rewrap envelope.
zmsg_push (msg, zframe_dup (service_frame));
zmsg_pushstr (msg, MDPC_REPORT);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, zframe_dup (sender));
zmsg_send (&msg, self->socket);
}
else
// Else dispatch the message to the requested service
s_service_dispatch (service, sender, msg);
else {
int enabled = 1;
if (zmsg_size (msg) >= 1) {
zframe_t *cmd_frame = zmsg_first (msg);
char *cmd = zframe_strdup (cmd_frame);
enabled = s_service_is_command_enabled (service, cmd);
free (cmd);
}

// Forward the message to the worker.
if (enabled) {
zmsg_wrap (msg, zframe_dup (sender));
zlist_append (service->requests, msg);
s_service_dispatch (service);
}
// Send a NAK message back to the client.
else {
zmsg_push (msg, zframe_dup (service_frame));
zmsg_pushstr (msg, MDPC_NAK);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, zframe_dup (sender));
zmsg_send (&msg, self->socket);
}
}

zframe_destroy (&service_frame);
}
Expand Down Expand Up @@ -368,33 +391,11 @@ s_service_destroy (void *argument)
}

// The dispatch method sends request to the worker.
// If no worker is available, we put the request into
// the queue.

static void
s_service_dispatch (service_t *self, zframe_t *sender, zmsg_t *msg)
s_service_dispatch (service_t *self)
{
assert (self);

if (msg) {
int enabled = 1;
if (zmsg_size (msg) >= 1) {
zframe_t *cmd_frame = zmsg_first (msg);
char *cmd = zframe_strdup (cmd_frame);
enabled = s_service_is_command_enabled (self, cmd);
free (cmd);
}
// If the command is blacklisted, we drop the message;
// otherwise, we queue the message.
if (enabled) {
// Set reply return address to client sender
zmsg_wrap (msg, zframe_dup (sender));
zlist_append (self->requests, msg);
}
else
zmsg_destroy (&msg);
}

s_broker_purge (self->broker);
if (zlist_size (self->waiting) == 0)
return;
Expand All @@ -404,7 +405,7 @@ s_service_dispatch (service_t *self, zframe_t *sender, zmsg_t *msg)
zlist_remove (self->waiting, worker);
zmsg_t *msg = (zmsg_t*)zlist_pop (self->requests);
s_worker_send (worker, MDPW_REQUEST, NULL, msg);
// Workers are scheduled in the RR fashion
// Workers are scheduled in the round-robin fashion
zlist_append (self->waiting, worker);
zmsg_destroy (&msg);
}
Expand Down
7 changes: 4 additions & 3 deletions libmdp/src/mdp_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,12 @@ mdp_client_recv (mdp_client_t *self, char **service_p)
// Message format:
// Frame 1: empty frame (delimiter)
// Frame 2: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 3: Service name (printable string)
// Frame 4..n: Application frames
// Frame 3: REPORT|NAK
// Frame 4: Service name (printable string)
// Frame 5..n: Application frames

// We would handle malformed replies better in real code
assert (zmsg_size (msg) >= 4);
assert (zmsg_size (msg) >= 5);

zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
Expand Down

0 comments on commit 5442f46

Please sign in to comment.