From 34f4c5a39cf4167e4667398dff26922441a7c5a0 Mon Sep 17 00:00:00 2001 From: Victor Seva Date: Wed, 17 May 2023 16:37:04 +0200 Subject: [PATCH] nsq: clang-format for coherent indentation and coding style --- src/modules/nsq/nsq_mod.c | 138 +++++++++++++++++++---------------- src/modules/nsq/nsq_mod.h | 4 +- src/modules/nsq/nsq_reader.c | 78 ++++++++++++-------- src/modules/nsq/nsq_reader.h | 6 +- 4 files changed, 129 insertions(+), 97 deletions(-) diff --git a/src/modules/nsq/nsq_mod.c b/src/modules/nsq/nsq_mod.c index 56eca6ddc88..8a5361cd363 100644 --- a/src/modules/nsq/nsq_mod.c +++ b/src/modules/nsq/nsq_mod.c @@ -30,30 +30,29 @@ MODULE_VERSION static pv_export_t nsq_mod_pvs[] = { - {{"nsqE", (sizeof("nsqE")-1)}, PVT_OTHER, nsq_pv_get_event_payload, 0, 0, 0, 0, 0}, - { {0, 0}, 0, 0, 0, 0, 0, 0, 0 } -}; - -static param_export_t params[]= -{ - {"consumer_workers", INT_PARAM, &nsq_consumer_workers}, - {"max_in_flight", INT_PARAM, &nsq_max_in_flight}, - {"lookupd_address", PARAM_STR, &nsq_lookupd_address}, - {"lookupd_port", INT_PARAM, &lookupd_port}, - {"consumer_use_nsqd", INT_PARAM, &consumer_use_nsqd}, // consume messages from nsqd instead of lookupd - {"topic_channel", PARAM_STRING|USE_FUNC_PARAM, (void*)nsq_add_topic_channel}, - {"nsqd_address", PARAM_STR, &nsqd_address}, - {"nsqd_port", INT_PARAM, &nsqd_port}, - {"consumer_event_key", PARAM_STR, &nsq_event_key}, - {"consumer_event_subkey", PARAM_STR, &nsq_event_sub_key}, - { 0, 0, 0 } -}; + {{"nsqE", (sizeof("nsqE") - 1)}, PVT_OTHER, nsq_pv_get_event_payload, 0, + 0, 0, 0, 0}, + {{0, 0}, 0, 0, 0, 0, 0, 0, 0}}; + +static param_export_t params[] = { + {"consumer_workers", INT_PARAM, &nsq_consumer_workers}, + {"max_in_flight", INT_PARAM, &nsq_max_in_flight}, + {"lookupd_address", PARAM_STR, &nsq_lookupd_address}, + {"lookupd_port", INT_PARAM, &lookupd_port}, + {"consumer_use_nsqd", INT_PARAM, + &consumer_use_nsqd}, // consume messages from nsqd instead of lookupd + {"topic_channel", PARAM_STRING | USE_FUNC_PARAM, + (void *)nsq_add_topic_channel}, + {"nsqd_address", PARAM_STR, &nsqd_address}, + {"nsqd_port", INT_PARAM, &nsqd_port}, + {"consumer_event_key", PARAM_STR, &nsq_event_key}, + {"consumer_event_subkey", PARAM_STR, &nsq_event_sub_key}, {0, 0, 0}}; static void free_tc_list(nsq_topic_channel_t *tcl) { nsq_topic_channel_t *tc, *tc0; tc = tcl; - while (tc) { + while(tc) { tc0 = tc->next; free(tc->topic); free(tc->channel); @@ -65,23 +64,25 @@ static void free_tc_list(nsq_topic_channel_t *tcl) static int nsq_add_topic_channel(modparam_t type, void *val) { - nsq_topic_channel_t* tc; + nsq_topic_channel_t *tc; size_t size; - char *channel = (char*)val; + char *channel = (char *)val; char *topic; char *sep = NULL; sep = strchr(channel, ':'); - if (!sep) { - topic = (char*)val; + if(!sep) { + topic = (char *)val; channel = DEFAULT_CHANNEL; - LM_ERR("delimiter (\":\") not found inside topic_channel param, using default channel [%s]\n", channel); + LM_ERR("delimiter (\":\") not found inside topic_channel param, using " + "default channel [%s]\n", + channel); } else { topic = strsep(&channel, ":"); } size = sizeof(nsq_topic_channel_t); - tc = (nsq_topic_channel_t*)pkg_malloc(size); - if (tc == NULL) { + tc = (nsq_topic_channel_t *)pkg_malloc(size); + if(tc == NULL) { LM_ERR("memory error!\n"); free_tc_list(tc_list); return -1; @@ -98,16 +99,15 @@ static int nsq_add_topic_channel(modparam_t type, void *val) } struct module_exports exports = { - "nsq", - DEFAULT_DLFLAGS, /* dlopen flags */ - 0, /* Exported functions */ - params, /* Exported parameters */ - 0, /* exported MI functions */ - nsq_mod_pvs, /* exported pseudo-variables */ - 0, /* response function*/ - mod_init, /* module initialization function */ - mod_child_init, /* per-child init function */ - mod_destroy /* destroy function */ + "nsq", DEFAULT_DLFLAGS, /* dlopen flags */ + 0, /* Exported functions */ + params, /* Exported parameters */ + 0, /* exported MI functions */ + nsq_mod_pvs, /* exported pseudo-variables */ + 0, /* response function*/ + mod_init, /* module initialization function */ + mod_child_init, /* per-child init function */ + mod_destroy /* destroy function */ }; static int fire_init_event(int rank) @@ -117,20 +117,20 @@ static int fire_init_event(int rank) int rtb, rt; LM_DBG("rank is (%d)\n", rank); - if (rank!=PROC_INIT) + if(rank != PROC_INIT) return 0; rt = route_get(&event_rt, "nsq:mod-init"); - if (rt>=0 && event_rt.rlist[rt]!=NULL) { + if(rt >= 0 && event_rt.rlist[rt] != NULL) { LM_DBG("executing event_route[nsq:mod-init] (%d)\n", rt); - if (faked_msg_init()<0) + if(faked_msg_init() < 0) return -1; fmsg = faked_msg_next(); rtb = get_route_type(); set_route_type(REQUEST_ROUTE); init_run_actions_ctx(&ctx); run_top_route(event_rt.rlist[rt], fmsg, &ctx); - if (ctx.run_flags&DROP_R_F) { + if(ctx.run_flags & DROP_R_F) { LM_ERR("exit due to 'drop' in event route\n"); return -1; } @@ -147,7 +147,7 @@ static int mod_init(void) return -1; } LM_DBG("NSQ Workers per Topic/Channel: %d\n", nsq_consumer_workers); - if (!nsq_topic_channel_counter) { + if(!nsq_topic_channel_counter) { nsq_topic_channel_counter = 1; } LM_DBG("NSQ Total Topic/Channel: %d\n", nsq_topic_channel_counter); @@ -172,17 +172,20 @@ void nsq_consumer_worker_proc(char *topic, char *channel, int max_in_flight) void *ctx = NULL; //(void *)(new TestNsqMsgContext()); static char address[128]; - if (loop == NULL) { + if(loop == NULL) { LM_ERR("cannot get libev loop\n"); } - LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", topic, channel); + LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", + topic, channel); // setup the reader - rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, NULL, nsq_message_handler); + rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, NULL, + nsq_message_handler); rdr->max_in_flight = max_in_flight; - if (consumer_use_nsqd == 0) { - snprintf(address, 128, "%.*s", nsq_lookupd_address.len, nsq_lookupd_address.s); + if(consumer_use_nsqd == 0) { + snprintf(address, 128, "%.*s", nsq_lookupd_address.len, + nsq_lookupd_address.s); nsq_reader_add_nsqlookupd_endpoint(rdr, address, lookupd_port); } else { snprintf(address, 128, "%.*s", nsqd_address.len, nsqd_address.s); @@ -202,42 +205,48 @@ static int mod_child_init(int rank) int workers = nsq_consumer_workers / nsq_topic_channel_counter; int max_in_flight = 1; - if (nsq_max_in_flight > 1) { + if(nsq_max_in_flight > 1) { max_in_flight = nsq_max_in_flight; } fire_init_event(rank); - if (rank==PROC_INIT || rank==PROC_TCP_MAIN) + if(rank == PROC_INIT || rank == PROC_TCP_MAIN) return 0; - if (rank==PROC_MAIN) { + if(rank == PROC_MAIN) { nsq_topic_channel_t *tc; tc = tc_list; - if (tc == NULL) { + if(tc == NULL) { LM_ERR("topic and channel not set, using defaults\n"); for(i = 0; i < workers; i++) { - pid=fork_process(PROC_XWORKER, "NSQ Consumer Worker", 1); - if (pid<0) + pid = fork_process(PROC_XWORKER, "NSQ Consumer Worker", 1); + if(pid < 0) return -1; /* error */ - if (pid==0){ - if (cfg_child_init()) return -1; - nsq_consumer_worker_proc(DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight); - LM_CRIT("nsq_consumer_worker_proc():: worker_process finished without exit!\n"); + if(pid == 0) { + if(cfg_child_init()) + return -1; + nsq_consumer_worker_proc( + DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight); + LM_CRIT("nsq_consumer_worker_proc():: worker_process " + "finished without exit!\n"); exit(-1); } } } else { - while (tc) { + while(tc) { for(i = 0; i < workers; i++) { - pid=fork_process(PROC_XWORKER, "NSQ Consumer Worker", 1); - if (pid<0) + pid = fork_process(PROC_XWORKER, "NSQ Consumer Worker", 1); + if(pid < 0) return -1; /* error */ - if (pid==0){ - if (cfg_child_init()) return -1; - nsq_consumer_worker_proc(tc->topic, tc->channel, max_in_flight); - LM_CRIT("nsq_consumer_worker_proc():: worker_process finished without exit!\n"); + if(pid == 0) { + if(cfg_child_init()) + return -1; + nsq_consumer_worker_proc( + tc->topic, tc->channel, max_in_flight); + LM_CRIT("nsq_consumer_worker_proc():: worker_process " + "finished without exit!\n"); exit(-1); } } @@ -255,6 +264,7 @@ static int mod_child_init(int rank) /** * destroy module function */ -static void mod_destroy(void) { +static void mod_destroy(void) +{ free_tc_list(tc_list); } diff --git a/src/modules/nsq/nsq_mod.h b/src/modules/nsq/nsq_mod.h index 890a5dd154e..59034b8bbe4 100644 --- a/src/modules/nsq/nsq_mod.h +++ b/src/modules/nsq/nsq_mod.h @@ -64,10 +64,10 @@ int nsq_consumer_workers = DBN_DEFAULT_NO_WORKERS; static int mod_init(void); static int mod_child_init(int); -static int nsq_add_topic_channel(modparam_t type, void* val); +static int nsq_add_topic_channel(modparam_t type, void *val); static void free_tc_list(nsq_topic_channel_t *tc_list); static void mod_destroy(void); -int nsq_pv_get_event_payload(struct sip_msg*, pv_param_t*, pv_value_t*); +int nsq_pv_get_event_payload(struct sip_msg *, pv_param_t *, pv_value_t *); #endif diff --git a/src/modules/nsq/nsq_reader.c b/src/modules/nsq/nsq_reader.c index a8f403bda9c..e3bb2505a49 100644 --- a/src/modules/nsq/nsq_reader.c +++ b/src/modules/nsq/nsq_reader.c @@ -33,9 +33,11 @@ extern json_api_t json_api; extern str nsq_event_key; extern str nsq_event_sub_key; -int nsq_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) +int nsq_pv_get_event_payload( + struct sip_msg *msg, pv_param_t *param, pv_value_t *res) { - return eventData == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventData); + return eventData == NULL ? pv_get_null(msg, param, res) + : pv_get_strzval(msg, param, res, eventData); } int nsq_consumer_fire_event(char *routename) @@ -46,12 +48,12 @@ int nsq_consumer_fire_event(char *routename) LM_DBG("searching event_route[%s]\n", routename); rt = route_get(&event_rt, routename); - if (rt < 0 || event_rt.rlist[rt] == NULL) { + if(rt < 0 || event_rt.rlist[rt] == NULL) { LM_DBG("route %s does not exist\n", routename); return -2; } LM_DBG("executing event_route[%s] (%d)\n", routename, rt); - if (faked_msg_init()<0) { + if(faked_msg_init() < 0) { return -2; } fmsg = faked_msg_next(); @@ -76,40 +78,57 @@ int nsq_consumer_event(char *payload, char *channel, char *topic) eventData = payload; json_obj = json_api.json_parse(payload); - if (json_obj == NULL) { + if(json_obj == NULL) { return ret; } - k = pkg_malloc(nsq_event_key.len+1); + k = pkg_malloc(nsq_event_key.len + 1); memcpy(k, nsq_event_key.s, nsq_event_key.len); k[nsq_event_key.len] = '\0'; json_api.extract_field(json_obj, k, &ev_category); pkg_free(k); - k = pkg_malloc(nsq_event_sub_key.len+1); + k = pkg_malloc(nsq_event_sub_key.len + 1); memcpy(k, nsq_event_sub_key.s, nsq_event_sub_key.len); k[nsq_event_sub_key.len] = '\0'; json_api.extract_field(json_obj, k, &ev_name); pkg_free(k); - snprintf(buffer, 512, "nsq:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s); - for (p=buffer ; *p; ++p) *p = tolower(*p); - for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; - if (nsq_consumer_fire_event(buffer) != 0) { - snprintf(buffer, 512, "nsq:consumer-event-%.*s", ev_category.len, ev_category.s); - for (p=buffer ; *p; ++p) *p = tolower(*p); - for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; - if (nsq_consumer_fire_event(buffer) != 0) { - snprintf(buffer, 512, "nsq:consumer-event-%.*s-%.*s", nsq_event_key.len, nsq_event_key.s, nsq_event_sub_key.len, nsq_event_sub_key.s); - for (p=buffer ; *p; ++p) *p = tolower(*p); - for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; - if (nsq_consumer_fire_event(buffer) != 0) { - snprintf(buffer, 512, "nsq:consumer-event-%.*s", nsq_event_key.len, nsq_event_key.s); - for (p=buffer ; *p; ++p) *p = tolower(*p); - for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; - if (nsq_consumer_fire_event(buffer) != 0) { + snprintf(buffer, 512, "nsq:consumer-event-%.*s-%.*s", ev_category.len, + ev_category.s, ev_name.len, ev_name.s); + for(p = buffer; *p; ++p) + *p = tolower(*p); + for(p = buffer; *p; ++p) + if(*p == '_') + *p = '-'; + if(nsq_consumer_fire_event(buffer) != 0) { + snprintf(buffer, 512, "nsq:consumer-event-%.*s", ev_category.len, + ev_category.s); + for(p = buffer; *p; ++p) + *p = tolower(*p); + for(p = buffer; *p; ++p) + if(*p == '_') + *p = '-'; + if(nsq_consumer_fire_event(buffer) != 0) { + snprintf(buffer, 512, "nsq:consumer-event-%.*s-%.*s", + nsq_event_key.len, nsq_event_key.s, nsq_event_sub_key.len, + nsq_event_sub_key.s); + for(p = buffer; *p; ++p) + *p = tolower(*p); + for(p = buffer; *p; ++p) + if(*p == '_') + *p = '-'; + if(nsq_consumer_fire_event(buffer) != 0) { + snprintf(buffer, 512, "nsq:consumer-event-%.*s", + nsq_event_key.len, nsq_event_key.s); + for(p = buffer; *p; ++p) + *p = tolower(*p); + for(p = buffer; *p; ++p) + if(*p == '_') + *p = '-'; + if(nsq_consumer_fire_event(buffer) != 0) { snprintf(buffer, 512, "nsq:consumer-event"); - if (nsq_consumer_fire_event(buffer) != 0) { + if(nsq_consumer_fire_event(buffer) != 0) { LM_ERR("nsq:consumer-event not found"); } } @@ -117,7 +136,7 @@ int nsq_consumer_event(char *payload, char *channel, char *topic) } } - if (json_obj) { + if(json_obj) { json_object_put(json_obj); } @@ -126,12 +145,13 @@ int nsq_consumer_event(char *payload, char *channel, char *topic) return ret; } -void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx) +void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, + struct NSQMessage *msg, void *ctx) { int ret = 0; - char *payload = (char*)shm_malloc(msg->body_length + 1); - if (!payload) { + char *payload = (char *)shm_malloc(msg->body_length + 1); + if(!payload) { LM_ERR("error allocating shared memory for payload"); } strncpy(payload, msg->body, msg->body_length); @@ -141,7 +161,7 @@ void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, str buffer_reset(conn->command_buf); - if (ret < 0) { + if(ret < 0) { nsq_requeue(conn->command_buf, msg->id, 100); } else { nsq_finish(conn->command_buf, msg->id); diff --git a/src/modules/nsq/nsq_reader.h b/src/modules/nsq/nsq_reader.h index ba0ffca9ad6..b53681d5509 100644 --- a/src/modules/nsq/nsq_reader.h +++ b/src/modules/nsq/nsq_reader.h @@ -32,10 +32,12 @@ #include "../../core/fmsg.h" #include "nsq.h" -int nsq_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); +int nsq_pv_get_event_payload( + struct sip_msg *msg, pv_param_t *param, pv_value_t *res); int nsq_consumer_fire_event(char *routename); int nsq_consumer_event(char *payload, char *channel, char *topic); -void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx); +void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, + struct NSQMessage *msg, void *ctx); #endif /* __NSQ_READER_H_ */