Skip to content

Commit

Permalink
* sfacctd: consume sFlow from a Kafka broker (3)
Browse files Browse the repository at this point in the history
* nfacctd: consume NetFlow/IPFIX from a Kafka broker (7)
  • Loading branch information
paololucente committed Jun 16, 2018
1 parent 762caa4 commit f08e15a
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 27 deletions.
29 changes: 16 additions & 13 deletions src/nfacctd.c
Expand Up @@ -586,12 +586,7 @@ int main(int argc,char **argv, char **envp)
}
#ifdef WITH_KAFKA
else if (config.nfacctd_kafka_broker_host) {
p_kafka_init_host(&nfacctd_kafka_host, config.nfacctd_kafka_config_file);
p_kafka_connect_to_consume(&nfacctd_kafka_host);
p_kafka_set_broker(&nfacctd_kafka_host, config.nfacctd_kafka_broker_host, config.nfacctd_kafka_broker_port);
p_kafka_set_topic(&nfacctd_kafka_host, config.nfacctd_kafka_topic);
p_kafka_set_content_type(&nfacctd_kafka_host, PM_KAFKA_CNT_TYPE_BIN);
p_kafka_manage_consumer(&nfacctd_kafka_host, TRUE);
NF_init_kafka_host(&nfacctd_kafka_host);

config.handle_fragments = TRUE;
init_ip_fragment_handler();
Expand Down Expand Up @@ -1061,15 +1056,9 @@ int main(int argc,char **argv, char **envp)
if (kafka_reconnect) {
/* Close */
p_kafka_manage_consumer(&nfacctd_kafka_host, FALSE);
p_kafka_close(&nfacctd_kafka_host, FALSE);

/* Re-open */
p_kafka_init_host(&nfacctd_kafka_host, config.nfacctd_kafka_config_file);
p_kafka_connect_to_consume(&nfacctd_kafka_host);
p_kafka_set_broker(&nfacctd_kafka_host, config.nfacctd_kafka_broker_host, config.nfacctd_kafka_broker_port);
p_kafka_set_topic(&nfacctd_kafka_host, config.nfacctd_kafka_topic);
p_kafka_set_content_type(&nfacctd_kafka_host, PM_KAFKA_CNT_TYPE_BIN);
p_kafka_manage_consumer(&nfacctd_kafka_host, TRUE);
NF_init_kafka_host(&nfacctd_kafka_host);

continue;
}
Expand Down Expand Up @@ -2690,3 +2679,17 @@ void nfv9_datalink_frame_section_handler(struct packet_ptrs *pptrs)
}
}
}

#ifdef WITH_KAFKA
void NF_init_kafka_host(void *kh)
{
struct p_kafka_host *kafka_host = kh;

p_kafka_init_host(kafka_host, config.nfacctd_kafka_config_file);
p_kafka_connect_to_consume(kafka_host);
p_kafka_set_broker(kafka_host, config.nfacctd_kafka_broker_host, config.nfacctd_kafka_broker_port);
p_kafka_set_topic(kafka_host, config.nfacctd_kafka_topic);
p_kafka_set_content_type(kafka_host, PM_KAFKA_CNT_TYPE_BIN);
p_kafka_manage_consumer(kafka_host, TRUE);
}
#endif
4 changes: 4 additions & 0 deletions src/nfacctd.h
Expand Up @@ -521,6 +521,10 @@ EXT u_int8_t get_ipfix_vlen(char *, u_int16_t *);
EXT struct template_cache_entry *nfacctd_offline_read_json_template(char *, char *, int);
EXT void load_templates_from_file(char *);
EXT void save_template(struct template_cache_entry *, char *);

#ifdef WITH_KAFKA
EXT void NF_init_kafka_host(void *);
#endif
#undef EXT

#if (!defined __PKT_HANDLERS_C)
Expand Down
2 changes: 1 addition & 1 deletion src/pmacct-build.h
@@ -1 +1 @@
#define PMACCT_BUILD "20180615-00"
#define PMACCT_BUILD "20180616-00"
29 changes: 16 additions & 13 deletions src/sfacctd.c
Expand Up @@ -598,12 +598,7 @@ int main(int argc,char **argv, char **envp)
}
#ifdef WITH_KAFKA
else if (config.nfacctd_kafka_broker_host) {
p_kafka_init_host(&nfacctd_kafka_host, config.nfacctd_kafka_config_file);
p_kafka_connect_to_consume(&nfacctd_kafka_host);
p_kafka_set_broker(&nfacctd_kafka_host, config.nfacctd_kafka_broker_host, config.nfacctd_kafka_broker_port);
p_kafka_set_topic(&nfacctd_kafka_host, config.nfacctd_kafka_topic);
p_kafka_set_content_type(&nfacctd_kafka_host, PM_KAFKA_CNT_TYPE_BIN);
p_kafka_manage_consumer(&nfacctd_kafka_host, TRUE);
SF_init_kafka_host(&nfacctd_kafka_host);

config.handle_fragments = TRUE;
init_ip_fragment_handler();
Expand Down Expand Up @@ -1115,15 +1110,9 @@ int main(int argc,char **argv, char **envp)
if (kafka_reconnect) {
/* Close */
p_kafka_manage_consumer(&nfacctd_kafka_host, FALSE);
p_kafka_close(&nfacctd_kafka_host, FALSE);

/* Re-open */
p_kafka_init_host(&nfacctd_kafka_host, config.nfacctd_kafka_config_file);
p_kafka_connect_to_consume(&nfacctd_kafka_host);
p_kafka_set_broker(&nfacctd_kafka_host, config.nfacctd_kafka_broker_host, config.nfacctd_kafka_broker_port);
p_kafka_set_topic(&nfacctd_kafka_host, config.nfacctd_kafka_topic);
p_kafka_set_content_type(&nfacctd_kafka_host, PM_KAFKA_CNT_TYPE_BIN);
p_kafka_manage_consumer(&nfacctd_kafka_host, TRUE);
SF_init_kafka_host(&nfacctd_kafka_host);

continue;
}
Expand Down Expand Up @@ -2540,3 +2529,17 @@ void sf_flow_sample_hdr_decode(SFSample *sample)
}
}
}

#ifdef WITH_KAFKA
void SF_init_kafka_host(void *kh)
{
struct p_kafka_host *kafka_host = kh;

p_kafka_init_host(kafka_host, config.nfacctd_kafka_config_file);
p_kafka_connect_to_consume(kafka_host);
p_kafka_set_broker(kafka_host, config.nfacctd_kafka_broker_host, config.nfacctd_kafka_broker_port);
p_kafka_set_topic(kafka_host, config.nfacctd_kafka_topic);
p_kafka_set_content_type(kafka_host, PM_KAFKA_CNT_TYPE_BIN);
p_kafka_manage_consumer(kafka_host, TRUE);
}
#endif
4 changes: 4 additions & 0 deletions src/sfacctd.h
Expand Up @@ -350,6 +350,10 @@ EXT void sfv245_check_counter_log_init(struct packet_ptrs *);
EXT void usage_daemon(char *);
EXT void compute_once();

#ifdef WITH_KAFKA
EXT void SF_init_kafka_host(void *);
#endif

/* global variables */
EXT int sfacctd_counter_backend_methods;
EXT struct bgp_misc_structs *sf_cnt_misc_db;
Expand Down

0 comments on commit f08e15a

Please sign in to comment.