Skip to content

Commit

Permalink
Add support for ZeroMQ.
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkinsw committed Jan 31, 2019
1 parent bcc3480 commit bcd79d0
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 21 deletions.
2 changes: 2 additions & 0 deletions src/cfg.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ struct configuration {
char *print_output_separator;
char *print_output_file;
char *print_latest_file;
int jsonudp_type;
u_int8_t jsonudp_topic;
char *jsonudp_server;
int nfacctd_port;
char *nfacctd_ip;
Expand Down
64 changes: 64 additions & 0 deletions src/cfg_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -2831,6 +2831,70 @@ int cfg_key_print_output_separator(char *filename, char *name, char *value_ptr)
return changes;
}

int cfg_key_jsonudp_type(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int changes = 0;
char *endptr;
int value = 0;

lower_string(value_ptr);
if (!strcmp(value_ptr, "udp")) {
value = JSONUDP_TYPE_UDP;
} else if (!strcmp(value_ptr, "zeromq")) {
value = JSONUDP_TYPE_ZEROMQ;
} else {
Log(LOG_ERR, "ERROR: [%s] '%s' is not a valid type.\n", filename, value_ptr);
return ERR;
}

if (!name) {
for (; list; list = list->next, changes++) {
list->cfg.jsonudp_type = value;
}
} else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.jsonudp_type = value;
changes++;
break;
}
}
}
return changes;
}

int cfg_key_jsonudp_topic(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int changes = 0;
char *endptr;
int topic_raw = 0;
u_int8_t topic = 0;

topic_raw = strtoull(value_ptr, &endptr, 10);
if (topic_raw < 0) {
Log(LOG_ERR, "ERROR: [%s] '%s' is not a valid topic id.\n", filename, value_ptr);
return ERR;
} else {
topic = (u_int8_t)topic_raw;
}

if (!name) {
for (; list; list = list->next, changes++) {
list->cfg.jsonudp_topic = topic;
}
} else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.jsonudp_topic = topic;
changes++;
break;
}
}
}
return changes;
}
int cfg_key_jsonudp_server(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
Expand Down
2 changes: 2 additions & 0 deletions src/cfg_handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ EXT int cfg_key_print_output_file_append(char *, char *, char *);
EXT int cfg_key_print_output_lock_file(char *, char *, char *);
EXT int cfg_key_print_output_separator(char *, char *, char *);
EXT int cfg_key_print_latest_file(char *, char *, char *);
EXT int cfg_key_jsonudp_type(char *, char *, char *);
EXT int cfg_key_jsonudp_topic(char *, char *, char *);
EXT int cfg_key_jsonudp_server(char *, char *, char *);
EXT int cfg_key_nfacctd_port(char *, char *, char *);
EXT int cfg_key_nfacctd_ip(char *, char *, char *);
Expand Down
107 changes: 86 additions & 21 deletions src/jsonudp_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ int jsonudp_plugin_shutdown_client(int client) {
close(client);
}

void jsonudp_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
void jsonudp_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
{
struct pkt_data *data;
struct sockaddr server_addr;
socklen_t server_addr_len = sizeof(struct sockaddr);
int server_socket = -1;
struct sockaddr udp_server_addr;
socklen_t udp_server_addr_len = sizeof(struct sockaddr);
int udp_server_socket = -1;
struct ports_table pt;
unsigned char *pipebuf;
struct pollfd pfd;
Expand All @@ -127,6 +127,9 @@ void jsonudp_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
u_int32_t bufsz = ((struct channels_list_entry *)ptr)->bufsize;
pid_t core_pid = ((struct channels_list_entry *)ptr)->core_pid;
struct networks_file_data nfd;
#ifdef WITH_ZMQ
struct p_zmq_host zmq_host = {0,};
#endif

unsigned char *rgptr;
int pollagain = TRUE;
Expand All @@ -136,7 +139,11 @@ void jsonudp_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
struct primitives_ptrs prim_ptrs;
char *dataptr;

int jsonudp_type_default = JSONUDP_TYPE_UDP;
char *jsonudp_server_default = "127.0.0.1:5001";
#ifdef WITH_ZMQ
u_int8_t jsonudp_topic_default = 1;
#endif

/*
* General plugin setup (Taken from print_plugin).
Expand All @@ -149,23 +156,64 @@ void jsonudp_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
P_init_default_values();
P_config_checks();
pipebuf = (unsigned char *) pm_malloc(config.buffer_size);

/*
* Zero out locals.
*/
memset(pipebuf, 0, config.buffer_size);
memset(&udp_server_addr, 0, sizeof(struct sockaddr));

/*
* Build a sockaddr from the jsonudp_server parameter.
* If the user did not specify one, we will use a default.
*/
memset(&server_addr, 0, sizeof(struct sockaddr));

if (config.jsonudp_server == NULL)
if (config.jsonudp_server == NULL) {
Log(LOG_WARNING, "WARNING ( %s/%s ): Using default server (%s).\n",
config.name, config.type, jsonudp_server_default);
config.jsonudp_server = jsonudp_server_default;
}
if (!config.jsonudp_type) {
Log(LOG_WARNING, "WARNING ( %s/%s ): Using default type (%s).\n",
config.name, config.type, jsonudp_type_default);
config.jsonudp_type = jsonudp_type_default;
}
if (!config.jsonudp_topic) {
/*
* Only warn about a default topic if they are going to use ZeroMQ.
*/
#ifdef WITH_ZMQ
if (config.jsonudp_type == JSONUDP_TYPE_ZEROMQ) {
Log(LOG_WARNING, "WARNING ( %s/%s ): Using default topic (%d).\n",
config.name, config.type, jsonudp_topic_default);
}
config.jsonudp_topic = jsonudp_topic_default;
#endif
}

parse_hostport(config.jsonudp_server, &server_addr, &server_addr_len);

if ((server_socket = jsonudp_plugin_create_client(&server_addr)) < 0) {
Log(LOG_ERR, "ERROR ( %s/%s ): Could not connect to the server (%s)."
"Exiting.\n", config.name, config.type, config.jsonudp_server);
if (config.jsonudp_type == JSONUDP_TYPE_UDP) {
parse_hostport(config.jsonudp_server,
&udp_server_addr,
&udp_server_addr_len);
if ((udp_server_socket=jsonudp_plugin_create_client(&udp_server_addr))<0) {
Log(LOG_ERR, "ERROR ( %s/%s ): Could not connect to the server (%s). "
"Exiting.\n", config.name,config.type,config.jsonudp_server);
exit_plugin(1);
}
} else {
#ifndef WITH_ZMQ
Log(LOG_ERR, "ERROR ( %s/%s ): Not compiled with ZeroMQ support. "
"Exiting.\n", config.name,config.type);
exit_plugin(1);
#else
/*
* NB: the library that sets the hostname automatically adds tcp://
* to the start of the host. This is really important to know!
*/
p_zmq_init_pub(&zmq_host, config.jsonudp_server, config.jsonudp_topic);
p_zmq_pub_setup(&zmq_host);
#endif
}

/*
Expand Down Expand Up @@ -237,7 +285,7 @@ void jsonudp_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
if (seq == 0) rg_err_count = FALSE;
}
else {
if ((ret = read(pipe_fd, &rgptr, sizeof(rgptr))) == 0)
if ((ret = read(pipe_fd, &rgptr, sizeof(rgptr))) == 0)
exit_plugin(1); /* we exit silently; something happened at the write end */
}

Expand All @@ -256,7 +304,6 @@ void jsonudp_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
Log(LOG_WARNING, "WARN ( %s/%s ): Increase values or look for plugin_buffer_size, plugin_pipe_size in CONFIG-KEYS document.\n\n",
config.name, config.type);
}

rg->ptr = (rg->base + status->last_buf_off);
seq = ((struct ch_buf_hdr *)rg->ptr)->seq;
}
Expand All @@ -267,7 +314,7 @@ void jsonudp_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
rg->ptr += bufsz;
}
data = (struct pkt_data *) (pipebuf+sizeof(struct ch_buf_hdr));
if (config.debug_internal_msg)
if (config.debug_internal_msg)
Log(LOG_DEBUG, "DEBUG ( %s/%s ): buffer received len=%llu seq=%u num_entries=%u\n",
config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->len, seq,
((struct ch_buf_hdr *)pipebuf)->num);
Expand Down Expand Up @@ -319,14 +366,32 @@ void jsonudp_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
char *json_output = compose_json_str(json_obj);
ssize_t json_output_len = strlen(json_output);
if (json_output != NULL) {
if (sendto(server_socket,
json_output,
json_output_len, 0,
(struct sockaddr*)&server_addr,
sizeof(struct sockaddr)) != json_output_len) {
Log(LOG_ERR, "ERROR ( %s/%s ): Error forwarding record.\n",
config.name,
config.type);
if (config.jsonudp_type == JSONUDP_TYPE_UDP) {
/*
* send via udp.
*/
if (sendto(udp_server_socket,
json_output,
json_output_len, 0,
(struct sockaddr*)&udp_server_addr,
sizeof(struct sockaddr)) != json_output_len) {
Log(LOG_ERR, "ERROR ( %s/%s ): Error forwarding record "
"via UDP.\n",
config.name,
config.type);
}
} else {
#ifdef WITH_ZMQ
/*
* send via zeromq.
*/
if (!p_zmq_topic_send(&zmq_host, json_output, json_output_len)) {
Log(LOG_ERR, "ERROR ( %s/%s ): Error forwarding record "
"via ZeroMQ.\n",
config.name,
config.type);
}
#endif
}
}
free(json_output);
Expand Down
2 changes: 2 additions & 0 deletions src/pmacct-data.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ static const struct _dictionary_line dictionary[] = {
{"print_preprocess", cfg_key_sql_preprocess},
{"print_preprocess_type", cfg_key_sql_preprocess_type},
{"print_startup_delay", cfg_key_sql_startup_delay},
{"jsonudp_type", cfg_key_jsonudp_type},
{"jsonudp_topic", cfg_key_jsonudp_topic},
{"jsonudp_server", cfg_key_jsonudp_server},
{"mongo_host", cfg_key_sql_host},
{"mongo_table", cfg_key_sql_table},
Expand Down
3 changes: 3 additions & 0 deletions src/pmacct-defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@
#define PRINT_OUTPUT_EVENT 0x00000008
#define PRINT_OUTPUT_AVRO 0x00000010

#define JSONUDP_TYPE_UDP 0x1
#define JSONUDP_TYPE_ZEROMQ 0x2

#define DIRECTION_UNKNOWN 0x00000000
#define DIRECTION_IN 0x00000001
#define DIRECTION_OUT 0x00000002
Expand Down

0 comments on commit bcd79d0

Please sign in to comment.