Skip to content

Commit

Permalink
nsq: clang-format for coherent indentation and coding style
Browse files Browse the repository at this point in the history
  • Loading branch information
linuxmaniac committed May 18, 2023
1 parent 28049ec commit 34f4c5a
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 97 deletions.
138 changes: 74 additions & 64 deletions src/modules/nsq/nsq_mod.c
Expand Up @@ -30,30 +30,29 @@
MODULE_VERSION

static pv_export_t nsq_mod_pvs[] = {
{{"nsqE", (sizeof("nsqE")-1)}, PVT_OTHER, nsq_pv_get_event_payload, 0, 0, 0, 0, 0},
{ {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
};

static param_export_t params[]=
{
{"consumer_workers", INT_PARAM, &nsq_consumer_workers},
{"max_in_flight", INT_PARAM, &nsq_max_in_flight},
{"lookupd_address", PARAM_STR, &nsq_lookupd_address},
{"lookupd_port", INT_PARAM, &lookupd_port},
{"consumer_use_nsqd", INT_PARAM, &consumer_use_nsqd}, // consume messages from nsqd instead of lookupd
{"topic_channel", PARAM_STRING|USE_FUNC_PARAM, (void*)nsq_add_topic_channel},
{"nsqd_address", PARAM_STR, &nsqd_address},
{"nsqd_port", INT_PARAM, &nsqd_port},
{"consumer_event_key", PARAM_STR, &nsq_event_key},
{"consumer_event_subkey", PARAM_STR, &nsq_event_sub_key},
{ 0, 0, 0 }
};
{{"nsqE", (sizeof("nsqE") - 1)}, PVT_OTHER, nsq_pv_get_event_payload, 0,
0, 0, 0, 0},
{{0, 0}, 0, 0, 0, 0, 0, 0, 0}};

static param_export_t params[] = {
{"consumer_workers", INT_PARAM, &nsq_consumer_workers},
{"max_in_flight", INT_PARAM, &nsq_max_in_flight},
{"lookupd_address", PARAM_STR, &nsq_lookupd_address},
{"lookupd_port", INT_PARAM, &lookupd_port},
{"consumer_use_nsqd", INT_PARAM,
&consumer_use_nsqd}, // consume messages from nsqd instead of lookupd
{"topic_channel", PARAM_STRING | USE_FUNC_PARAM,
(void *)nsq_add_topic_channel},
{"nsqd_address", PARAM_STR, &nsqd_address},
{"nsqd_port", INT_PARAM, &nsqd_port},
{"consumer_event_key", PARAM_STR, &nsq_event_key},
{"consumer_event_subkey", PARAM_STR, &nsq_event_sub_key}, {0, 0, 0}};

static void free_tc_list(nsq_topic_channel_t *tcl)
{
nsq_topic_channel_t *tc, *tc0;
tc = tcl;
while (tc) {
while(tc) {
tc0 = tc->next;
free(tc->topic);
free(tc->channel);
Expand All @@ -65,23 +64,25 @@ static void free_tc_list(nsq_topic_channel_t *tcl)

static int nsq_add_topic_channel(modparam_t type, void *val)
{
nsq_topic_channel_t* tc;
nsq_topic_channel_t *tc;
size_t size;
char *channel = (char*)val;
char *channel = (char *)val;
char *topic;
char *sep = NULL;

sep = strchr(channel, ':');
if (!sep) {
topic = (char*)val;
if(!sep) {
topic = (char *)val;
channel = DEFAULT_CHANNEL;
LM_ERR("delimiter (\":\") not found inside topic_channel param, using default channel [%s]\n", channel);
LM_ERR("delimiter (\":\") not found inside topic_channel param, using "
"default channel [%s]\n",
channel);
} else {
topic = strsep(&channel, ":");
}
size = sizeof(nsq_topic_channel_t);
tc = (nsq_topic_channel_t*)pkg_malloc(size);
if (tc == NULL) {
tc = (nsq_topic_channel_t *)pkg_malloc(size);
if(tc == NULL) {
LM_ERR("memory error!\n");
free_tc_list(tc_list);
return -1;
Expand All @@ -98,16 +99,15 @@ static int nsq_add_topic_channel(modparam_t type, void *val)
}

struct module_exports exports = {
"nsq",
DEFAULT_DLFLAGS, /* dlopen flags */
0, /* Exported functions */
params, /* Exported parameters */
0, /* exported MI functions */
nsq_mod_pvs, /* exported pseudo-variables */
0, /* response function*/
mod_init, /* module initialization function */
mod_child_init, /* per-child init function */
mod_destroy /* destroy function */
"nsq", DEFAULT_DLFLAGS, /* dlopen flags */
0, /* Exported functions */
params, /* Exported parameters */
0, /* exported MI functions */
nsq_mod_pvs, /* exported pseudo-variables */
0, /* response function*/
mod_init, /* module initialization function */
mod_child_init, /* per-child init function */
mod_destroy /* destroy function */
};

static int fire_init_event(int rank)
Expand All @@ -117,20 +117,20 @@ static int fire_init_event(int rank)
int rtb, rt;

LM_DBG("rank is (%d)\n", rank);
if (rank!=PROC_INIT)
if(rank != PROC_INIT)
return 0;

rt = route_get(&event_rt, "nsq:mod-init");
if (rt>=0 && event_rt.rlist[rt]!=NULL) {
if(rt >= 0 && event_rt.rlist[rt] != NULL) {
LM_DBG("executing event_route[nsq:mod-init] (%d)\n", rt);
if (faked_msg_init()<0)
if(faked_msg_init() < 0)
return -1;
fmsg = faked_msg_next();
rtb = get_route_type();
set_route_type(REQUEST_ROUTE);
init_run_actions_ctx(&ctx);
run_top_route(event_rt.rlist[rt], fmsg, &ctx);
if (ctx.run_flags&DROP_R_F) {
if(ctx.run_flags & DROP_R_F) {
LM_ERR("exit due to 'drop' in event route\n");
return -1;
}
Expand All @@ -147,7 +147,7 @@ static int mod_init(void)
return -1;
}
LM_DBG("NSQ Workers per Topic/Channel: %d\n", nsq_consumer_workers);
if (!nsq_topic_channel_counter) {
if(!nsq_topic_channel_counter) {
nsq_topic_channel_counter = 1;
}
LM_DBG("NSQ Total Topic/Channel: %d\n", nsq_topic_channel_counter);
Expand All @@ -172,17 +172,20 @@ void nsq_consumer_worker_proc(char *topic, char *channel, int max_in_flight)
void *ctx = NULL; //(void *)(new TestNsqMsgContext());
static char address[128];

if (loop == NULL) {
if(loop == NULL) {
LM_ERR("cannot get libev loop\n");
}

LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", topic, channel);
LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n",
topic, channel);
// setup the reader
rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, NULL, nsq_message_handler);
rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, NULL,
nsq_message_handler);
rdr->max_in_flight = max_in_flight;

if (consumer_use_nsqd == 0) {
snprintf(address, 128, "%.*s", nsq_lookupd_address.len, nsq_lookupd_address.s);
if(consumer_use_nsqd == 0) {
snprintf(address, 128, "%.*s", nsq_lookupd_address.len,
nsq_lookupd_address.s);
nsq_reader_add_nsqlookupd_endpoint(rdr, address, lookupd_port);
} else {
snprintf(address, 128, "%.*s", nsqd_address.len, nsqd_address.s);
Expand All @@ -202,42 +205,48 @@ static int mod_child_init(int rank)
int workers = nsq_consumer_workers / nsq_topic_channel_counter;
int max_in_flight = 1;

if (nsq_max_in_flight > 1) {
if(nsq_max_in_flight > 1) {
max_in_flight = nsq_max_in_flight;
}

fire_init_event(rank);

if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
if(rank == PROC_INIT || rank == PROC_TCP_MAIN)
return 0;

if (rank==PROC_MAIN) {
if(rank == PROC_MAIN) {
nsq_topic_channel_t *tc;

tc = tc_list;
if (tc == NULL) {
if(tc == NULL) {
LM_ERR("topic and channel not set, using defaults\n");
for(i = 0; i < workers; i++) {
pid=fork_process(PROC_XWORKER, "NSQ Consumer Worker", 1);
if (pid<0)
pid = fork_process(PROC_XWORKER, "NSQ Consumer Worker", 1);
if(pid < 0)
return -1; /* error */
if (pid==0){
if (cfg_child_init()) return -1;
nsq_consumer_worker_proc(DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight);
LM_CRIT("nsq_consumer_worker_proc():: worker_process finished without exit!\n");
if(pid == 0) {
if(cfg_child_init())
return -1;
nsq_consumer_worker_proc(
DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight);
LM_CRIT("nsq_consumer_worker_proc():: worker_process "
"finished without exit!\n");
exit(-1);
}
}
} else {
while (tc) {
while(tc) {
for(i = 0; i < workers; i++) {
pid=fork_process(PROC_XWORKER, "NSQ Consumer Worker", 1);
if (pid<0)
pid = fork_process(PROC_XWORKER, "NSQ Consumer Worker", 1);
if(pid < 0)
return -1; /* error */
if (pid==0){
if (cfg_child_init()) return -1;
nsq_consumer_worker_proc(tc->topic, tc->channel, max_in_flight);
LM_CRIT("nsq_consumer_worker_proc():: worker_process finished without exit!\n");
if(pid == 0) {
if(cfg_child_init())
return -1;
nsq_consumer_worker_proc(
tc->topic, tc->channel, max_in_flight);
LM_CRIT("nsq_consumer_worker_proc():: worker_process "
"finished without exit!\n");
exit(-1);
}
}
Expand All @@ -255,6 +264,7 @@ static int mod_child_init(int rank)
/**
* destroy module function
*/
static void mod_destroy(void) {
static void mod_destroy(void)
{
free_tc_list(tc_list);
}
4 changes: 2 additions & 2 deletions src/modules/nsq/nsq_mod.h
Expand Up @@ -64,10 +64,10 @@ int nsq_consumer_workers = DBN_DEFAULT_NO_WORKERS;

static int mod_init(void);
static int mod_child_init(int);
static int nsq_add_topic_channel(modparam_t type, void* val);
static int nsq_add_topic_channel(modparam_t type, void *val);
static void free_tc_list(nsq_topic_channel_t *tc_list);
static void mod_destroy(void);

int nsq_pv_get_event_payload(struct sip_msg*, pv_param_t*, pv_value_t*);
int nsq_pv_get_event_payload(struct sip_msg *, pv_param_t *, pv_value_t *);

#endif
78 changes: 49 additions & 29 deletions src/modules/nsq/nsq_reader.c
Expand Up @@ -33,9 +33,11 @@ extern json_api_t json_api;
extern str nsq_event_key;
extern str nsq_event_sub_key;

int nsq_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
int nsq_pv_get_event_payload(
struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
{
return eventData == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventData);
return eventData == NULL ? pv_get_null(msg, param, res)
: pv_get_strzval(msg, param, res, eventData);
}

int nsq_consumer_fire_event(char *routename)
Expand All @@ -46,12 +48,12 @@ int nsq_consumer_fire_event(char *routename)

LM_DBG("searching event_route[%s]\n", routename);
rt = route_get(&event_rt, routename);
if (rt < 0 || event_rt.rlist[rt] == NULL) {
if(rt < 0 || event_rt.rlist[rt] == NULL) {
LM_DBG("route %s does not exist\n", routename);
return -2;
}
LM_DBG("executing event_route[%s] (%d)\n", routename, rt);
if (faked_msg_init()<0) {
if(faked_msg_init() < 0) {
return -2;
}
fmsg = faked_msg_next();
Expand All @@ -76,48 +78,65 @@ int nsq_consumer_event(char *payload, char *channel, char *topic)
eventData = payload;

json_obj = json_api.json_parse(payload);
if (json_obj == NULL) {
if(json_obj == NULL) {
return ret;
}

k = pkg_malloc(nsq_event_key.len+1);
k = pkg_malloc(nsq_event_key.len + 1);
memcpy(k, nsq_event_key.s, nsq_event_key.len);
k[nsq_event_key.len] = '\0';
json_api.extract_field(json_obj, k, &ev_category);
pkg_free(k);

k = pkg_malloc(nsq_event_sub_key.len+1);
k = pkg_malloc(nsq_event_sub_key.len + 1);
memcpy(k, nsq_event_sub_key.s, nsq_event_sub_key.len);
k[nsq_event_sub_key.len] = '\0';
json_api.extract_field(json_obj, k, &ev_name);
pkg_free(k);

snprintf(buffer, 512, "nsq:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
for (p=buffer ; *p; ++p) *p = tolower(*p);
for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
if (nsq_consumer_fire_event(buffer) != 0) {
snprintf(buffer, 512, "nsq:consumer-event-%.*s", ev_category.len, ev_category.s);
for (p=buffer ; *p; ++p) *p = tolower(*p);
for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
if (nsq_consumer_fire_event(buffer) != 0) {
snprintf(buffer, 512, "nsq:consumer-event-%.*s-%.*s", nsq_event_key.len, nsq_event_key.s, nsq_event_sub_key.len, nsq_event_sub_key.s);
for (p=buffer ; *p; ++p) *p = tolower(*p);
for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
if (nsq_consumer_fire_event(buffer) != 0) {
snprintf(buffer, 512, "nsq:consumer-event-%.*s", nsq_event_key.len, nsq_event_key.s);
for (p=buffer ; *p; ++p) *p = tolower(*p);
for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
if (nsq_consumer_fire_event(buffer) != 0) {
snprintf(buffer, 512, "nsq:consumer-event-%.*s-%.*s", ev_category.len,
ev_category.s, ev_name.len, ev_name.s);
for(p = buffer; *p; ++p)
*p = tolower(*p);
for(p = buffer; *p; ++p)
if(*p == '_')
*p = '-';
if(nsq_consumer_fire_event(buffer) != 0) {
snprintf(buffer, 512, "nsq:consumer-event-%.*s", ev_category.len,
ev_category.s);
for(p = buffer; *p; ++p)
*p = tolower(*p);
for(p = buffer; *p; ++p)
if(*p == '_')
*p = '-';
if(nsq_consumer_fire_event(buffer) != 0) {
snprintf(buffer, 512, "nsq:consumer-event-%.*s-%.*s",
nsq_event_key.len, nsq_event_key.s, nsq_event_sub_key.len,
nsq_event_sub_key.s);
for(p = buffer; *p; ++p)
*p = tolower(*p);
for(p = buffer; *p; ++p)
if(*p == '_')
*p = '-';
if(nsq_consumer_fire_event(buffer) != 0) {
snprintf(buffer, 512, "nsq:consumer-event-%.*s",
nsq_event_key.len, nsq_event_key.s);
for(p = buffer; *p; ++p)
*p = tolower(*p);
for(p = buffer; *p; ++p)
if(*p == '_')
*p = '-';
if(nsq_consumer_fire_event(buffer) != 0) {
snprintf(buffer, 512, "nsq:consumer-event");
if (nsq_consumer_fire_event(buffer) != 0) {
if(nsq_consumer_fire_event(buffer) != 0) {
LM_ERR("nsq:consumer-event not found");
}
}
}
}
}

if (json_obj) {
if(json_obj) {
json_object_put(json_obj);
}

Expand All @@ -126,12 +145,13 @@ int nsq_consumer_event(char *payload, char *channel, char *topic)
return ret;
}

void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx)
void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn,
struct NSQMessage *msg, void *ctx)
{
int ret = 0;

char *payload = (char*)shm_malloc(msg->body_length + 1);
if (!payload) {
char *payload = (char *)shm_malloc(msg->body_length + 1);
if(!payload) {
LM_ERR("error allocating shared memory for payload");
}
strncpy(payload, msg->body, msg->body_length);
Expand All @@ -141,7 +161,7 @@ void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, str

buffer_reset(conn->command_buf);

if (ret < 0) {
if(ret < 0) {
nsq_requeue(conn->command_buf, msg->id, 100);
} else {
nsq_finish(conn->command_buf, msg->id);
Expand Down

0 comments on commit 34f4c5a

Please sign in to comment.