Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
1377 lines (1224 sloc) 42.2 KB
.template 0
#
# Generates a server class for a protocol specification
# This manages ROUTER server talking to DEALER clients
#
# This is a code generator built using the iMatix GSL code generation
# language. See https://github.com/imatix/gsl for details.
#
# Copyright (c) the Contributors as noted in the AUTHORS file.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# If proto_class has the 'virtual' flag set, the server will use the
# appropriate zmsg_t calls transparently
include "zproto_lib.gsl"
include "zproto_dot.gsl"
resolve_includes ()
generate_dot ()
set_defaults ()
# Load message structures for this engine
protocol_file = class.protocol_dir + class.protocol_class + ".xml"
global.proto = xml.load_file (protocol_file)
class.proto = class.protocol_class
if !defined (class.header_ext)
class.header_ext = "h"
endif
if !defined (class.source_ext)
class.source_ext = "c"
endif
# Lowercase state/event/action names
for class.state
state.name = "$(name)"
for event
event.name = "$(name)"
if defined (event.next)
event.next = "$(next)"
endif
for action
action.name = "$(name)"
endfor
endfor
endfor
# Lowercase protocol message names and normalize spaces/hyphens
for proto.message
message.name = "$(name:c)"
endfor
# Collect all events and actions at class level
for class.state
state.comma = last()?? ""? ","
for event where name <> "*"
event.name = "$(name:c)"
# Mark event as external if it a protocol message
if count (proto.message, message.name = event.name)
event.external = 1
endif
# Copy event to class if not yet defined there
if count (class.event, name = -1.name) = 0
copy event to class
endif
endfor
for event
for action where count (class.action, name = -1.name) = 0
copy action to class
endfor
endfor
for [before]
for action where count (class.action, name = -1.name) = 0
copy action to class
endfor
endfor
for [after]
for action where count (class.action, name = -1.name) = 0
copy action to class
endfor
endfor
endfor
# Process super states recursively
function resolve_inheritance (state)
if my.state.inherit?"" <> ""
for class.state as superstate where name = my.state.inherit
resolve_inheritance (superstate)
for event where count (my.state.event, name = -1.name) = 0
copy event to my.state
endfor
else
echo "E: superstate $(inherit) isn't defined"
endfor
my.state.inherit = ""
endif
endfunction
for class.state
resolve_inheritance (state)
endfor
# Collect prototypes that we need
for class.action
if name <> "send" \
& name <> "terminate"
new class.prototype
prototype.name = "$(action.name:c)"
prototype.ctype = "void"
prototype.exists = 0
prototype.args = "client_t *self"
endnew
endif
endfor
.endtemplate
.# This is the API for the server
.output "$(class.package_dir)/$(class.name).$(class.header_ext)"
/* =========================================================================
$(class.name) - $(class.title:)
** WARNING *************************************************************
THIS SOURCE FILE IS 100% GENERATED. If you edit this file, you will lose
your changes at the next build cycle. This is great for temporary printf
statements. DO NOT MAKE ANY CHANGES YOU WISH TO KEEP. The correct places
for commits are:
* The XML model used for this code generation: $(filename), or
* The code generation script that built this file: $(script)
************************************************************************
. for class.license
$(string.trim (license.):block )
. endfor
=========================================================================
*/
#ifndef $(CLASS.NAME)_H_INCLUDED
#define $(CLASS.NAME)_H_INCLUDED
.if file.exists ("../include/czmq.h")
#include "czmq.h"
.else
#include <czmq.h>
.endif
#ifdef __cplusplus
extern "C" {
#endif
// @interface
// To work with $(class.name), use the CZMQ zactor API:
//
// Create new $(class.name) instance, passing logging prefix:
//
// zactor_t *$(class.name:c) = zactor_new ($(class.name), "myname");
//
// Destroy $(class.name) instance
//
// zactor_destroy (&$(class.name:c));
//
// Enable verbose logging of commands and activity:
//
// zstr_send ($(class.name:c), "VERBOSE");
//
// Bind $(class.name) to specified endpoint. TCP endpoints may specify
// the port number as "*" to acquire an ephemeral port:
//
// zstr_sendx ($(class.name:c), "BIND", endpoint, NULL);
//
// Return assigned port number, specifically when BIND was done using an
// an ephemeral port:
//
// zstr_sendx ($(class.name:c), "PORT", NULL);
// char *command, *port_str;
// zstr_recvx ($(class.name:c), &command, &port_str, NULL);
// assert (streq (command, "PORT"));
//
// Specify configuration file to load, overwriting any previous loaded
// configuration file or options:
//
// zstr_sendx ($(class.name:c), "LOAD", filename, NULL);
//
// Set configuration path value:
//
// zstr_sendx ($(class.name:c), "SET", path, value, NULL);
//
// Save configuration data to config file on disk:
//
// zstr_sendx ($(class.name:c), "SAVE", filename, NULL);
//
// Send zmsg_t instance to $(class.name):
//
// zactor_send ($(class.name:c), &msg);
//
// Receive zmsg_t instance from $(class.name):
//
// zmsg_t *msg = zactor_recv ($(class.name:c));
//
// This is the $(class.name) constructor as a zactor_fn:
//
$(CLASS.EXPORT_MACRO)void
$(class.name) (zsock_t *pipe, void *args);
// Self test of this class
$(CLASS.EXPORT_MACRO)void
$(class.name)_test (bool verbose);
// @end
#ifdef __cplusplus
}
#endif
#endif
.output "$(class.name)_engine.inc"
/* =========================================================================
$(class.name)_engine - $(class.name:) engine
** WARNING *************************************************************
THIS SOURCE FILE IS 100% GENERATED. If you edit this file, you will lose
your changes at the next build cycle. This is great for temporary printf
statements. DO NOT MAKE ANY CHANGES YOU WISH TO KEEP. The correct places
for commits are:
* The XML model used for this code generation: $(filename), or
* The code generation script that built this file: $(script)
************************************************************************
. for class.license
$(string.trim (license.):block )
. endfor
=========================================================================
*/
#ifdef NDEBUG
#undef NDEBUG
#endif
#define ZPROTO_UNUSED(object) (void)object
// ---------------------------------------------------------------------------
// State machine constants
typedef enum {
.for class.state
$(name:c)_state = $(index ()),
.endfor
// Internal state to mark zombie clients
zombie_state_ = $(count (class.state) + 1)
} state_t;
typedef enum {
NULL_event = 0,
terminate_event = 1,
.for class.event
. event.comma = last()?? ""? ","
$(name)_event = $(index () + 1)$(comma)
.endfor
} event_t;
// Names for state machine logging and error reporting
static const char *
s_state_name [] = {
"(NONE)",
.for class.state
"$(name)"$(comma)
.endfor
};
static const char *
s_event_name [] = {
"(NONE)",
"terminate",
.for class.event
. if defined (event.external)
"$(NAME)"$(comma)
. else
"$(name)"$(comma)
. endif
.endfor
};
// ---------------------------------------------------------------------------
// Context for the whole server task. This embeds the application-level
// server context at its start (the entire structure, not a reference),
// so we can cast a pointer between server_t and s_server_t arbitrarily.
typedef struct {
server_t server; // Application-level server context
zsock_t *pipe; // Socket to back to caller API
zsock_t *router; // Socket to talk to clients
int port; // Server port bound to
zloop_t *loop; // Reactor for server sockets
$(proto)_t *message; // Message received or sent
zhash_t *clients; // Clients we're connected to
zhash_t *zombies; // Zombie clients waiting to be free()d
zconfig_t *config; // Configuration tree
uint client_id; // Client identifier counter
size_t timeout; // Default client expiry timeout
bool verbose; // Verbose logging enabled?
char *log_prefix; // Default log prefix
} s_server_t;
// ---------------------------------------------------------------------------
// Context for each connected client. This embeds the application-level
// client context at its start (the entire structure, not a reference),
// so we can cast a pointer between client_t and s_client_t arbitrarily.
typedef struct {
client_t client; // Application-level client context
s_server_t *server; // Parent server context
size_t refcount; // Optional reference counter
char *hashkey; // Key into server->clients hash
zframe_t *routing_id; // Routing_id back to client
uint unique_id; // Client identifier in server
state_t state; // Current state
event_t event; // Current event
event_t next_event; // The next event
event_t exception; // Exception event, if any
int wakeup; // zloop timer for client alarms
void *ticket; // zloop ticket for client timeouts
event_t wakeup_event; // Wake up with this event
char log_prefix [41]; // Log prefix string
} s_client_t;
static int
server_initialize (server_t *self);
static void
server_terminate (server_t *self);
static zmsg_t *
server_method (server_t *self, const char *method, zmsg_t *msg);
static void
server_configuration (server_t *self, zconfig_t *config);
static int
client_initialize (client_t *self);
static void
client_terminate (client_t *self);
static void
s_server_config_global (s_server_t *server);
static void
s_client_execute (s_client_t *client, event_t event);
static int
s_client_handle_wakeup (zloop_t *loop, int timer_id, void *argument);
.if count (class.event, name = "expired")
static int
s_client_handle_ticket (zloop_t *loop, int timer_id, void *argument);
.endif
static void
s_client_finalize (s_client_t **self_p);
.for class.prototype
static $(ctype)
$(name) ($(args));
.endfor
// ---------------------------------------------------------------------------
// These methods are an internal API for actions
// Optional reference counting. The engine itself does not need this, but if
// the application wishes to delay the final destruction of a client, it
// can do so by using these functions
static void engine_client_get (client_t *client)
{
if (!client)
return;
s_client_t *self = (s_client_t *) client;
++self->refcount;
}
// Relese the application reference to client
static void engine_client_put (client_t *client)
{
if (!client)
return;
s_client_t *self = (s_client_t *) client;
if (--self->refcount == 0 && self->state == zombie_state_) {
zhash_delete (self->server->zombies, self->hashkey);
s_client_finalize (&self);
}
}
#define engine_client_is_valid(c) ((c) && ((s_client_t *)(c))->state != zombie_state_)
// Set the next event, needed in at least one action in an internal
// state; otherwise the state machine will wait for a message on the
// router socket and treat that as the event.
static void
engine_set_next_event (client_t *client, event_t event)
{
if (engine_client_is_valid (client)) {
s_client_t *self = (s_client_t *) client;
self->next_event = event;
}
}
// Raise an exception with 'event', halting any actions in progress.
// Continues execution of actions defined for the exception event.
static void
engine_set_exception (client_t *client, event_t event)
{
if (engine_client_is_valid (client)) {
s_client_t *self = (s_client_t *) client;
self->exception = event;
}
}
// Set wakeup alarm after 'delay' msecs. The next state should
// handle the wakeup event. The alarm is cancelled on any other
// event.
static void
engine_set_wakeup_event (client_t *client, size_t delay, event_t event)
{
if (engine_client_is_valid (client)) {
s_client_t *self = (s_client_t *) client;
if (self->wakeup) {
zloop_timer_end (self->server->loop, self->wakeup);
self->wakeup = 0;
}
self->wakeup = zloop_timer (
self->server->loop, delay, 1, s_client_handle_wakeup, self);
self->wakeup_event = event;
}
}
// Execute 'event' on specified client. Use this to send events to
// other clients. Cancels any wakeup alarm on that client.
static void
engine_send_event (client_t *client, event_t event)
{
if (engine_client_is_valid (client)) {
s_client_t *self = (s_client_t *) client;
s_client_execute (self, event);
}
}
// Execute 'event' on all clients known to the server. If you pass a
// client argument, that client will not receive the broadcast. If you
// want to pass any arguments, store them in the server context.
static void
engine_broadcast_event (server_t *server, client_t *client, event_t event)
{
if (server) {
s_server_t *self = (s_server_t *) server;
zlist_t *keys = zhash_keys (self->clients);
char *key = (char *) zlist_first (keys);
while (key) {
s_client_t *target = (s_client_t *) zhash_lookup (self->clients, key);
if (target != (s_client_t *) client)
s_client_execute (target, event);
key = (char *) zlist_next (keys);
}
zlist_destroy (&keys);
}
}
// Poll actor or zsock for activity, invoke handler on any received
// message. Handler must be a CZMQ zloop_fn function; receives server
// as arg.
static void
engine_handle_socket (server_t *server, void *sock, zloop_reader_fn handler)
{
if (server) {
s_server_t *self = (s_server_t *) server;
// Resolve zactor_t -> zsock_t
if (zactor_is (sock))
sock = zactor_sock ((zactor_t *) sock);
else
assert (zsock_is (sock));
if (handler != NULL) {
int rc = zloop_reader (self->loop, (zsock_t *) sock, handler, self);
assert (rc == 0);
zloop_reader_set_tolerant (self->loop, (zsock_t *) sock);
}
else
zloop_reader_end (self->loop, (zsock_t *) sock);
}
}
// Register monitor function that will be called at regular intervals
// by the server engine. Returns an identifier that can be used to cancel it.
static int
engine_set_monitor (server_t *server, size_t interval, zloop_timer_fn monitor)
{
if (server) {
s_server_t *self = (s_server_t *) server;
int rc = zloop_timer (self->loop, interval, 0, monitor, self);
assert (rc >= 0);
return rc;
}
return -1;
}
// Cancel the monitor function with the given identifier.
static void
engine_cancel_monitor (server_t *server, int identifier)
{
if (server) {
s_server_t *self = (s_server_t *) server;
int rc = zloop_timer_end (self->loop, identifier);
assert (rc >= 0);
}
}
// Set log file prefix; this string will be added to log data, to make
// log data more searchable. The string is truncated to ~20 chars.
static void
engine_set_log_prefix (client_t *client, const char *string)
{
if (engine_client_is_valid (client)) {
s_client_t *self = (s_client_t *) client;
snprintf (self->log_prefix, sizeof (self->log_prefix),
"%6d:%-33s", self->unique_id, string);
}
}
// Set a configuration value in the server's configuration tree. The
// properties this engine uses are: server/verbose, server/timeout, and
// server/background. You can also configure other abitrary properties.
static void
engine_configure (server_t *server, const char *path, const char *value)
{
if (server) {
s_server_t *self = (s_server_t *) server;
zconfig_put (self->config, path, value);
s_server_config_global (self);
}
}
// Return true if server is running in verbose mode, else return false.
static bool
engine_verbose (server_t *server)
{
if (server) {
s_server_t *self = (s_server_t *) server;
return self->verbose;
}
return false;
}
// Pedantic compilers don't like unused functions, so we call the whole
// API, passing null references. It's nasty and horrid and sufficient.
static void
s_satisfy_pedantic_compilers (void)
{
engine_client_get (NULL);
engine_client_put (NULL);
engine_set_next_event (NULL, NULL_event);
engine_set_exception (NULL, NULL_event);
engine_set_wakeup_event (NULL, 0, NULL_event);
engine_send_event (NULL, NULL_event);
engine_broadcast_event (NULL, NULL, NULL_event);
engine_handle_socket (NULL, 0, NULL);
engine_set_monitor (NULL, 0, NULL);
engine_cancel_monitor (NULL, 0);
engine_set_log_prefix (NULL, NULL);
engine_configure (NULL, NULL, NULL);
engine_verbose (NULL);
}
// ---------------------------------------------------------------------------
// Generic methods on protocol messages
// TODO: replace with lookup table, since ID is one byte
static event_t
s_protocol_event ($(proto)_t *message)
{
assert (message);
switch ($(proto)_id (message)) {
. for proto.message where count (class.event, event.name = message.name) = 1
case $(PROTO)_$(NAME):
return $(name)_event;
break;
. endfor
default:
// Invalid $(proto)_t
return terminate_event;
}
}
// ---------------------------------------------------------------------------
// Client methods
static s_client_t *
s_client_new (s_server_t *server, zframe_t *routing_id)
{
s_client_t *self = (s_client_t *) zmalloc (sizeof (s_client_t));
assert (self);
assert ((s_client_t *) &self->client == self);
self->server = server;
self->hashkey = zframe_strhex (routing_id);
self->routing_id = zframe_dup (routing_id);
self->unique_id = server->client_id++;
engine_set_log_prefix (&self->client, server->log_prefix);
self->client.server = (server_t *) server;
self->client.message = server->message;
// propagate client identifier
self->client.unique_id = self->unique_id;
.if count (class.event, name = "expired")
// If expiry timers are being used, create client ticket
if (server->timeout)
self->ticket = zloop_ticket (server->loop, s_client_handle_ticket, self);
.endif
// Give application chance to initialize and set next event
.for class.state where item () = 1
self->state = $(name:c)_state;
.endfor
self->event = NULL_event;
client_initialize (&self->client);
return self;
}
static void
s_client_destroy (s_client_t **self_p)
{
assert (self_p);
if (*self_p) {
s_client_t *self = *self_p;
if (self->wakeup)
zloop_timer_end (self->server->loop, self->wakeup);
if (self->ticket)
zloop_ticket_delete (self->server->loop, self->ticket);
zframe_destroy (&self->routing_id);
// Provide visual clue if application misuses client reference
engine_set_log_prefix (&self->client, "*** TERMINATED ***");
if (!self->refcount) {
s_client_finalize (self_p);
} else {
zhash_insert (self->server->zombies, self->hashkey, self);
self->state = zombie_state_;
}
}
}
static void
s_client_finalize (s_client_t **self_p)
{
s_client_t *self = *self_p;
free (self->hashkey);
client_terminate (&self->client);
free (self);
*self_p = NULL;
}
// Callback when we remove client from 'clients' hash table
static void
s_client_free (void *argument)
{
s_client_t *client = (s_client_t *) argument;
s_client_destroy (&client);
}
.macro output_action_body ()
. for action
if (!self->exception) {
. if name = "send"
// send $(MESSAGE:C)
if (self->server->verbose)
zsys_debug ("%s: $ $(name) $(MESSAGE:C)",
self->log_prefix);
$(proto)_set_id (self->server->message, $(PROTO)_$(MESSAGE:C));
. if switches.trace ?= 1
zsys_debug ("%s: Send message to client", self->log_prefix);
$(proto)_print (self->server->message);
. endif
$(proto)_set_routing_id (self->server->message, self->routing_id);
. if defined (class.send_tracer)
$(class.send_tracer) ((server_t *) self->server);
. endif
. if global.proto.virtual ?= 1
zmsg_t* msg = zmsg_new();
$(proto)_send (self->server->message, msg);
zmsg_send(&msg,self->server->router);
. else
$(proto)_send (self->server->message, self->server->router);
. endif
. elsif name = "terminate"
// terminate
if (self->server->verbose)
zsys_debug ("%s: $ $(name)", self->log_prefix);
self->next_event = terminate_event;
. else
// $(name)
if (self->server->verbose)
zsys_debug ("%s: $ $(name)", self->log_prefix);
$(name:c) (&self->client);
. endif
}
. endfor
.endmacro
.
.macro output_state_change ()
. if defined (event.next)
. for state.[after]
. output_action_body ()
. endfor
if (!self->exception)
self->state = $(next:c)_state;
. my.next_state = class->state ("$(name:c)" = "$(event.next:c)")
. for my.next_state.[before]
. output_action_body ()
. endfor
. endif
.endmacro
// Execute state machine as long as we have events
static void
s_client_execute (s_client_t *self, event_t event)
{
self->next_event = event;
// Cancel wakeup timer, if any was pending
if (self->wakeup) {
zloop_timer_end (self->server->loop, self->wakeup);
self->wakeup = 0;
}
while (self->next_event > 0) {
self->event = self->next_event;
self->next_event = NULL_event;
self->exception = NULL_event;
if (self->server->verbose) {
zsys_debug ("%s: %s:",
self->log_prefix, s_state_name [self->state]);
zsys_debug ("%s: %s",
self->log_prefix, s_event_name [self->event]);
}
switch (self->state) {
.for class.state
. if index () > 1
. endif
case $(name:c)_state:
. for event where name <> "*"
. if index () > 1
else
. endif
if (self->event == $(name)_event) {
. output_action_body ()
. output_state_change ()
}
. endfor
. for event where name = "*"
. if item () > 1
else {
. else
{
. endif
// Handle unexpected protocol events
. output_action_body ()
. output_state_change ()
}
. else
else {
// Handle unexpected internal events
zsys_warning ("%s: unhandled event %s in %s",
self->log_prefix,
s_event_name [self->event],
s_state_name [self->state]);
assert (false);
}
. endfor
break;
.endfor
case zombie_state_:
zsys_error ("%p: zombie client context used", self);
return;
}
// If we had an exception event, interrupt normal programming
if (self->exception) {
if (self->server->verbose)
zsys_debug ("%s: ! %s",
self->log_prefix, s_event_name [self->exception]);
self->next_event = self->exception;
}
if (self->next_event == terminate_event) {
// Automatically calls s_client_destroy
zhash_delete (self->server->clients, self->hashkey);
break;
}
else
if (self->server->verbose)
zsys_debug ("%s: > %s",
self->log_prefix, s_state_name [self->state]);
}
}
.if count (class.event, name = "expired")
// zloop callback when client ticket expires
static int
s_client_handle_ticket (zloop_t *loop, int timer_id, void *argument)
{
s_client_t *self = (s_client_t *) argument;
self->ticket = NULL; // Ticket is now dead
s_client_execute (self, expired_event);
return 0;
}
.endif
// zloop callback when client wakeup timer expires
static int
s_client_handle_wakeup (zloop_t *loop, int timer_id, void *argument)
{
ZPROTO_UNUSED(loop);
ZPROTO_UNUSED(timer_id);
s_client_t *self = (s_client_t *) argument;
s_client_execute (self, self->wakeup_event);
return 0;
}
// Server methods
static void
s_server_config_global (s_server_t *self)
{
// Built-in server configuration options
//
// If we didn't already set verbose, check if the config tree wants it
if (!self->verbose
&& atoi (zconfig_get (self->config, "server/verbose", "0")))
self->verbose = true;
// Default client timeout is 60 seconds
self->timeout = atoi (
zconfig_get (self->config, "server/timeout", "60000"));
zloop_set_ticket_delay (self->loop, self->timeout);
// Do we want to run server in the background?
int background = atoi (
zconfig_get (self->config, "server/background", "0"));
if (!background)
zsys_set_logstream (stdout);
}
static s_server_t *
s_server_new (zsock_t *pipe)
{
s_server_t *self = (s_server_t *) zmalloc (sizeof (s_server_t));
assert (self);
assert ((s_server_t *) &self->server == self);
self->pipe = pipe;
self->router = zsock_new (ZMQ_ROUTER);
assert (self->router);
// By default the socket will discard outgoing messages above the
// HWM of 1,000. This isn't helpful for high-volume streaming. We
// will use a unbounded queue here. If applications need to guard
// against queue overflow, they should use a credit-based flow
// control scheme.
zsock_set_unbounded (self->router);
self->message = $(proto)_new ();
self->clients = zhash_new ();
self->zombies = zhash_new ();
self->config = zconfig_new ("root", NULL);
self->loop = zloop_new ();
srandom ((unsigned int) zclock_time ());
self->client_id = randof (1000);
.if switches.trace ?= 1
zloop_set_verbose (self->loop, true);
.endif
s_server_config_global (self);
// Initialize application server context
self->server.pipe = self->pipe;
self->server.config = self->config;
server_initialize (&self->server);
s_satisfy_pedantic_compilers ();
return self;
}
static void
s_server_destroy (s_server_t **self_p)
{
assert (self_p);
if (*self_p) {
s_server_t *self = *self_p;
$(proto)_destroy (&self->message);
// Destroy clients before destroying the server
zhash_destroy (&self->clients);
// Forcibly destroy any zombie clients as we are shutting down
s_client_t *zombie = (s_client_t *)zhash_first (self->zombies);
while (zombie) {
s_client_finalize (&zombie);
zombie = (s_client_t *)zhash_next (self->zombies);
}
zhash_destroy (&self->zombies);
server_terminate (&self->server);
zsock_destroy (&self->router);
zconfig_destroy (&self->config);
zloop_destroy (&self->loop);
free (self);
*self_p = NULL;
}
}
// Apply service-specific configuration tree:
// * apply server configuration
// * print any echo items in top-level sections
// * apply sections that match methods
static void
s_server_config_service (s_server_t *self)
{
// Apply echo commands and class methods
zconfig_t *section = zconfig_locate (self->config, "$(name)");
if (section)
section = zconfig_child (section);
while (section) {
if (streq (zconfig_name (section), "echo"))
zsys_notice ("%s", zconfig_value (section));
else
if (streq (zconfig_name (section), "bind")) {
char *endpoint = zconfig_get (section, "endpoint", "?");
self->port = zsock_bind (self->router, "%s", endpoint);
if (self->port == -1)
zsys_warning ("could not bind to %s (%s)", endpoint, zmq_strerror (zmq_errno ()));
}
#if (ZMQ_VERSION_MAJOR >= 4)
else
if (streq (zconfig_name (section), "security")) {
char *mechanism = zconfig_get (section, "mechanism", "null");
char *domain = zconfig_get (section, "domain", NULL);
if (domain)
zsock_set_zap_domain (self->router, domain);
if (streq (mechanism, "null")) {
zsys_notice ("server is using NULL security");
}
else
if (streq (mechanism, "plain")) {
zsys_notice ("server is using PLAIN security");
zsock_set_plain_server (self->router, 1);
}
else
zsys_warning ("mechanism=%s is not supported", mechanism);
}
#endif
section = zconfig_next (section);
}
s_server_config_global (self);
}
// Process message from pipe
static int
s_server_handle_pipe (zloop_t *loop, zsock_t *reader, void *argument)
{
ZPROTO_UNUSED(loop);
ZPROTO_UNUSED(reader);
s_server_t *self = (s_server_t *) argument;
zmsg_t *msg = zmsg_recv (self->pipe);
if (!msg)
return -1; // Interrupted; exit zloop
. if switches.trace ?= 1
zsys_debug ("API message:");
zmsg_print (msg);
. endif
char *method = zmsg_popstr (msg);
if (self->verbose)
zsys_debug ("%s: API command=%s", self->log_prefix, method);
if (streq (method, "VERBOSE"))
self->verbose = true;
else
if (streq (method, "$TERM")) {
// Shutdown the engine
zstr_free (&method);
zmsg_destroy (&msg);
return -1;
}
else
if (streq (method, "BIND")) {
// Bind to a specified endpoint, which may use an ephemeral port
char *endpoint = zmsg_popstr (msg);
self->port = zsock_bind (self->router, "%s", endpoint);
if (self->port == -1)
zsys_warning ("could not bind to %s", endpoint);
zstr_free (&endpoint);
}
else
if (streq (method, "PORT")) {
// Return PORT + port number from the last bind, if any
zstr_sendm (self->pipe, "PORT");
zstr_sendf (self->pipe, "%d", self->port);
}
else // Deprecated method name
if (streq (method, "LOAD") || streq (method, "CONFIGURE")) {
char *filename = zmsg_popstr (msg);
zconfig_destroy (&self->config);
self->config = zconfig_load (filename);
if (self->config) {
s_server_config_service (self);
self->server.config = self->config;
server_configuration (&self->server, self->config);
}
else {
zsys_warning ("cannot load config file '%s'", filename);
self->config = zconfig_new ("root", NULL);
}
zstr_free (&filename);
}
else
if (streq (method, "SET")) {
char *path = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
zconfig_put (self->config, path, value);
if (streq (path, "server/animate")) {
zsys_warning ("'%s' is deprecated, use VERBOSE command instead", path);
self->verbose = (atoi (value) == 1);
}
s_server_config_global (self);
zstr_free (&value);
zstr_free (&path);
}
else
if (streq (method, "SAVE")) {
char *filename = zmsg_popstr (msg);
if (zconfig_save (self->config, filename))
zsys_warning ("cannot save config file '%s'", filename);
zstr_free (&filename);
}
else {
// Execute custom method
zmsg_t *reply = server_method (&self->server, method, msg);
// If reply isn't null, send it to caller
zmsg_send (&reply, self->pipe);
}
zstr_free (&method);
zmsg_destroy (&msg);
return 0;
}
// Handle a protocol message from the client
static int
s_server_handle_protocol (zloop_t *loop, zsock_t *reader, void *argument)
{
ZPROTO_UNUSED(loop);
ZPROTO_UNUSED(reader);
s_server_t *self = (s_server_t *) argument;
// We process as many messages as we can, to reduce the overhead
// of polling and the reactor:
while (zsock_events (self->router) & ZMQ_POLLIN) {
. if global.proto.virtual ?= 1
zframe_t* routing_id = NULL;
//If the socket is a router, save the routing Id
if (zsock_type (self->router) == ZMQ_ROUTER) {
routing_id = zframe_recv (self->router);
if (!routing_id || !zsock_rcvmore (self->router)) {
zsys_warning ("$(class.name): no routing ID");
return -1; // Interrupted or malformed
}
}
zmsg_t* recvmsg = zmsg_recv(self->router);
if(!recvmsg)
return -1; // Interrupted; exit zloop
int rc = $(proto)_recv (self->message, recvmsg);
if(rc == -1)
return -1; // Interrupted; exit zloop
if(rc == -2)
continue; // Malformed; discard the message
//For router sockets, set the routing id of this messaged based on the value we saved
if(routing_id)
{ entropy_msg_set_routing_id(self->message,routing_id);
zframe_destroy(&routing_id);
}
. else
int rc = $(proto)_recv (self->message, self->router);
if (rc == -1)
return -1; // Interrupted; exit zloop
. endif
. if defined (class.receive_tracer)
$(class.receive_tracer) ((server_t *) self);
. endif
// TODO: use binary hashing on routing_id
char *hashkey = zframe_strhex ($(proto)_routing_id (self->message));
s_client_t *client = (s_client_t *) zhash_lookup (self->clients, hashkey);
if (client == NULL) {
client = s_client_new (self, $(proto)_routing_id (self->message));
zhash_insert (self->clients, hashkey, client);
zhash_freefn (self->clients, hashkey, s_client_free);
}
free (hashkey);
. if switches.trace ?= 1
zsys_debug ("%d: Client message", client->unique_id);
$(proto)_print (self->message);
. endif
// Any input from client counts as activity
if (client->ticket)
zloop_ticket_reset (self->loop, client->ticket);
if (rc == -2) {
.if count (class.event, name = "malformed")
s_client_execute (client, malformed_event);
continue;
.else
continue; // Malformed, but malformed_event doesn't exist
// -> discard the message
.endif
}
// Pass to client state machine
s_client_execute (client, s_protocol_event (self->message));
}
return 0;
}
// Watch server config file and reload if changed
static int
s_watch_server_config (zloop_t *loop, int timer_id, void *argument)
{
ZPROTO_UNUSED(loop);
ZPROTO_UNUSED(timer_id);
s_server_t *self = (s_server_t *) argument;
if (zconfig_has_changed (self->config)
&& zconfig_reload (&self->config) == 0) {
s_server_config_service (self);
self->server.config = self->config;
server_configuration (&self->server, self->config);
zsys_notice ("reloaded configuration from %s",
zconfig_filename (self->config));
}
return 0;
}
// ---------------------------------------------------------------------------
// This is the server actor, which polls its two sockets and processes
// incoming messages
void
$(class.name) (zsock_t *pipe, void *args)
{
// Initialize
s_server_t *self = s_server_new (pipe);
assert (self);
zsock_signal (pipe, 0);
// Actor argument may be a string used for logging
self->log_prefix = args? (char *) args: (char*)"";
// Set-up server monitor to watch for config file changes
engine_set_monitor ((server_t *) self, 1000, s_watch_server_config);
// Set up handler for the two main sockets the server uses
engine_handle_socket ((server_t *) self, self->pipe, s_server_handle_pipe);
engine_handle_socket ((server_t *) self, self->router, s_server_handle_protocol);
// Run reactor until there's a termination signal
zloop_start (self->loop);
// Reactor has ended
s_server_destroy (&self);
}
.# Generate source file first time only
.source_file = "$(class.name).$(class.source_ext)"
.if !file.exists (source_file)
. output source_file
/* =========================================================================
$(class.name) - $(class.name:)
. for class.license
$(string.trim (license.):block )
. endfor
=========================================================================
*/
/*
@header
Description of class for man page.
@discuss
Detailed discussion of the class, if any.
@end
*/
.if defined (class.project_header)
#include "$(class.project_header)"
.endif
// TODO: Change these to match your project's needs
#include "$(class.package_dir)/$(class.protocol_class).$(class.header_ext)"
#include "$(class.package_dir)/$(class.name).$(class.header_ext)"
// ---------------------------------------------------------------------------
// Forward declarations for the two main classes we use here
typedef struct _server_t server_t;
typedef struct _client_t client_t;
// This structure defines the context for each running server. Store
// whatever properties and structures you need for the server.
struct _server_t {
// These properties must always be present in the server_t
// and are set by the generated engine; do not modify them!
zsock_t *pipe; // Actor pipe back to caller
zconfig_t *config; // Current loaded configuration
// TODO: Add any properties you need here
};
// ---------------------------------------------------------------------------
// This structure defines the state for each client connection. It will
// be passed to each action in the 'self' argument.
struct _client_t {
// These properties must always be present in the client_t
// and are set by the generated engine; do not modify them!
server_t *server; // Reference to parent server
$(proto)_t *message; // Message in and out
uint unique_id; // Client identifier (for correlation purpose with the engine)
// TODO: Add specific properties for your application
};
// Include the generated server engine
#include "$(class.name)_engine.inc"
// Allocate properties and structures for a new server instance.
// Return 0 if OK, or -1 if there was an error.
static int
server_initialize (server_t *self)
{
ZPROTO_UNUSED(self);
// Construct properties here
return 0;
}
// Free properties and structures for a server instance
static void
server_terminate (server_t *self)
{
ZPROTO_UNUSED(self);
// Destroy properties here
}
// Process server API method, return reply message if any
static zmsg_t *
server_method (server_t *self, const char *method, zmsg_t *msg)
{
ZPROTO_UNUSED(self);
ZPROTO_UNUSED(method);
ZPROTO_UNUSED(msg);
return NULL;
}
// Apply new configuration.
static void
server_configuration (server_t *self, zconfig_t *config)
{
ZPROTO_UNUSED(self);
ZPROTO_UNUSED(config);
// Apply new configuration
}
// Allocate properties and structures for a new client connection and
// optionally engine_set_next_event (). Return 0 if OK, or -1 on error.
static int
client_initialize (client_t *self)
{
ZPROTO_UNUSED(self);
// Construct properties here
return 0;
}
// Free properties and structures for a client connection
static void
client_terminate (client_t *self)
{
ZPROTO_UNUSED(self);
// Destroy properties here
}
// ---------------------------------------------------------------------------
// Selftest
void
$(class.name)_test (bool verbose)
{
printf (" * $(class.name): ");
if (verbose)
printf ("\\n");
// @selftest
zactor_t *server = zactor_new ($(class.name), (char*)"server");
if (verbose)
zstr_send (server, "VERBOSE");
zstr_sendx (server, "BIND", "ipc://@/$(class.name)", NULL);
zsock_t *client = zsock_new (ZMQ_DEALER);
assert (client);
zsock_set_rcvtimeo (client, 2000);
zsock_connect (client, "ipc://@/$(class.name)");
// TODO: fill this out
$(proto)_t *request = $(proto)_new ();
$(proto)_destroy (&request);
zsock_destroy (&client);
zactor_destroy (&server);
// @end
printf ("OK\\n");
}
.endif
.close
.template 0
# Append missing prototypes to source file
input = file.open (source_file)
xline = file.read (input)
while defined (xline)
# Look for function declarations
if regexp.match ("^(\\w+) \\(", xline, token)
for class.prototype where name = token
prototype.exists = 1
endfor
endif
xline = file.read (input)?
endwhile
file.close (input)
append source_file
for class.prototype where exists = 0
echo "Generating stub for $(name)..."
>
>
>// ---------------------------------------------------------------------------
>// $(name)
>//
>
>static $(ctype)
>$(name) ($(args))
>{
if ctype = "long"
> return 0;
elsif ctype = "char *"
> return NULL
else
>
endif
>}
endfor
.endtemplate
You can’t perform that action at this time.