diff --git a/src/modules/dmq/bind_dmq.h b/src/modules/dmq/bind_dmq.h index 8b4694e607c..78a09cb0eed 100644 --- a/src/modules/dmq/bind_dmq.h +++ b/src/modules/dmq/bind_dmq.h @@ -16,8 +16,8 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ diff --git a/src/modules/dmq/dmq.c b/src/modules/dmq/dmq.c index 4fa61d947b2..509c42c405f 100644 --- a/src/modules/dmq/dmq.c +++ b/src/modules/dmq/dmq.c @@ -47,39 +47,35 @@ #include "notification_peer.h" #include "dmqnode.h" -static int mod_init(void); -static int child_init(int); -static void destroy(void); - MODULE_VERSION -int startup_time = 0; -int pid = 0; +int dmq_startup_time = 0; +int dmq_pid = 0; /* module parameters */ -int num_workers = DEFAULT_NUM_WORKERS; -int worker_usleep = 0; +int dmq_num_workers = DEFAULT_NUM_WORKERS; +int dmq_worker_usleep = 0; str dmq_server_address = {0, 0}; str dmq_server_socket = {0, 0}; -struct sip_uri dmq_server_uri; +sip_uri_t dmq_server_uri = {0}; str dmq_notification_address = {0, 0}; -int multi_notify = 0; -struct sip_uri dmq_notification_uri; -int ping_interval = 60; +int dmq_multi_notify = 0; +sip_uri_t dmq_notification_uri = {0}; +int dmq_ping_interval = 60; /* TM bind */ -struct tm_binds tmb; +struct tm_binds tmb = {0}; /* SL API structure */ -sl_api_t slb; +sl_api_t slb = {0}; /** module variables */ str dmq_request_method = str_init("KDMQ"); -dmq_worker_t *workers = NULL; -dmq_peer_list_t *peer_list = 0; +dmq_worker_t *dmq_workers = NULL; +dmq_peer_list_t *dmq_peer_list = 0; /* the list of dmq servers */ -dmq_node_list_t *node_list = NULL; -// the dmq module is a peer itself for receiving notifications regarding nodes +dmq_node_list_t *dmq_node_list = NULL; +/* dmq module is a peer itself for receiving notifications regarding nodes */ dmq_peer_t *dmq_notification_peer = NULL; /** module functions */ @@ -109,12 +105,12 @@ static cmd_export_t cmds[] = { }; static param_export_t params[] = { - {"num_workers", INT_PARAM, &num_workers}, - {"ping_interval", INT_PARAM, &ping_interval}, + {"num_workers", INT_PARAM, &dmq_num_workers}, + {"ping_interval", INT_PARAM, &dmq_ping_interval}, {"server_address", PARAM_STR, &dmq_server_address}, {"notification_address", PARAM_STR, &dmq_notification_address}, - {"multi_notify", INT_PARAM, &multi_notify}, - {"worker_usleep", INT_PARAM, &worker_usleep}, + {"multi_notify", INT_PARAM, &dmq_multi_notify}, + {"worker_usleep", INT_PARAM, &dmq_worker_usleep}, {0, 0, 0} }; @@ -184,15 +180,15 @@ static int mod_init(void) } /* load peer list - the list containing the module callbacks for dmq */ - peer_list = init_peer_list(); - if(peer_list == NULL) { + dmq_peer_list = init_peer_list(); + if(dmq_peer_list == NULL) { LM_ERR("cannot initialize peer list\n"); return -1; } /* load the dmq node list - the list containing the dmq servers */ - node_list = init_dmq_node_list(); - if(node_list == NULL) { + dmq_node_list = init_dmq_node_list(); + if(dmq_node_list == NULL) { LM_ERR("cannot initialize node list\n"); return -1; } @@ -203,7 +199,7 @@ static int mod_init(void) } /* register worker processes - add one because of the ping process */ - register_procs(num_workers); + register_procs(dmq_num_workers); /* check server_address and notification_address are not empty and correct */ if(parse_uri(dmq_server_address.s, dmq_server_address.len, &dmq_server_uri) @@ -230,12 +226,12 @@ static int mod_init(void) } /* allocate workers array */ - workers = shm_malloc(num_workers * sizeof(dmq_worker_t)); - if(workers == NULL) { + dmq_workers = shm_malloc(dmq_num_workers * sizeof(dmq_worker_t)); + if(dmq_workers == NULL) { LM_ERR("error in shm_malloc\n"); return -1; } - memset(workers, 0, num_workers * sizeof(dmq_worker_t)); + memset(dmq_workers, 0, dmq_num_workers * sizeof(dmq_worker_t)); dmq_init_callback_done = shm_malloc(sizeof(int)); if(!dmq_init_callback_done) { @@ -253,16 +249,16 @@ static int mod_init(void) return -1; } - startup_time = (int)time(NULL); + dmq_startup_time = (int)time(NULL); /** * add the ping timer * it pings the servers once in a while so that we know which failed */ - if(ping_interval < MIN_PING_INTERVAL) { - ping_interval = MIN_PING_INTERVAL; + if(dmq_ping_interval < MIN_PING_INTERVAL) { + dmq_ping_interval = MIN_PING_INTERVAL; } - if(register_timer(ping_servers, 0, ping_interval) < 0) { + if(register_timer(ping_servers, 0, dmq_ping_interval) < 0) { LM_ERR("cannot register timer callback\n"); return -1; } @@ -278,8 +274,8 @@ static int child_init(int rank) int i, newpid; if(rank == PROC_INIT) { - for(i = 0; i < num_workers; i++) { - if (init_worker(&workers[i]) < 0) { + for(i = 0; i < dmq_num_workers; i++) { + if (init_worker(&dmq_workers[i]) < 0) { LM_ERR("failed to init struct for worker[%d]\n", i); return -1; } @@ -289,7 +285,7 @@ static int child_init(int rank) if(rank == PROC_MAIN) { /* fork worker processes */ - for(i = 0; i < num_workers; i++) { + for(i = 0; i < dmq_num_workers; i++) { LM_DBG("starting worker process %d\n", i); newpid = fork_process(PROC_RPC, "DMQ WORKER", 0); if(newpid < 0) { @@ -299,7 +295,7 @@ static int child_init(int rank) /* child - this will loop forever */ worker_loop(i); } else { - workers[i].pid = newpid; + dmq_workers[i].pid = newpid; } } /* notification_node - the node from which the Kamailio instance @@ -309,9 +305,9 @@ static int child_init(int rank) * a master in this architecture */ if(dmq_notification_address.s) { - notification_node = + dmq_notification_node = add_server_and_notify(&dmq_notification_address); - if(!notification_node) { + if(!dmq_notification_node) { LM_WARN("cannot retrieve initial nodelist from %.*s\n", STR_FMT(&dmq_notification_address)); } @@ -323,7 +319,7 @@ static int child_init(int rank) return 0; } - pid = my_pid(); + dmq_pid = my_pid(); return 0; } @@ -333,10 +329,10 @@ static int child_init(int rank) static void destroy(void) { /* TODO unregister dmq node, free resources */ - if(dmq_notification_address.s && notification_node && self_node) { - LM_DBG("unregistering node %.*s\n", STR_FMT(&self_node->orig_uri)); - self_node->status = DMQ_NODE_DISABLED; - request_nodelist(notification_node, 1); + if(dmq_notification_address.s && dmq_notification_node && dmq_self_node) { + LM_DBG("unregistering node %.*s\n", STR_FMT(&dmq_self_node->orig_uri)); + dmq_self_node->status = DMQ_NODE_DISABLED; + request_nodelist(dmq_notification_node, 1); } if(dmq_server_socket.s) { pkg_free(dmq_server_socket.s); @@ -349,7 +345,7 @@ static void destroy(void) static void dmq_rpc_list_nodes(rpc_t *rpc, void *c) { void *h; - dmq_node_t *cur = node_list->nodes; + dmq_node_t *cur = dmq_node_list->nodes; char ip[IP6_MAX_STR_SIZE + 1]; while(cur) { diff --git a/src/modules/dmq/dmq.h b/src/modules/dmq/dmq.h index 411e41a5524..dbf5cfb53b5 100644 --- a/src/modules/dmq/dmq.h +++ b/src/modules/dmq/dmq.h @@ -15,8 +15,8 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ @@ -36,18 +36,18 @@ #define DEFAULT_NUM_WORKERS 2 #define MIN_PING_INTERVAL 5 -extern int num_workers; -extern int worker_usleep; -extern dmq_worker_t *workers; +extern int dmq_num_workers; +extern int dmq_worker_usleep; +extern dmq_worker_t *dmq_workers; extern dmq_peer_t *dmq_notification_peer; extern str dmq_server_address; -extern dmq_peer_list_t *peer_list; +extern dmq_peer_list_t *dmq_peer_list; extern str dmq_request_method; extern str dmq_server_socket; -extern struct sip_uri dmq_server_uri; +extern sip_uri_t dmq_server_uri; extern str dmq_notification_address; -extern int multi_notify; -extern struct sip_uri dmq_notification_uri; +extern int dmq_multi_notify; +extern sip_uri_t dmq_notification_uri; /* sl and tm */ extern struct tm_binds tmb; extern sl_api_t slb; diff --git a/src/modules/dmq/dmq_funcs.c b/src/modules/dmq/dmq_funcs.c index 27afbc425e3..f9fb5c78ea0 100644 --- a/src/modules/dmq/dmq_funcs.c +++ b/src/modules/dmq/dmq_funcs.c @@ -15,8 +15,8 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ @@ -30,19 +30,19 @@ dmq_peer_t *register_dmq_peer(dmq_peer_t *peer) { dmq_peer_t *new_peer; - if(!peer_list) { + if(!dmq_peer_list) { LM_ERR("peer list not initialized\n"); return NULL; } - lock_get(&peer_list->lock); - if(search_peer_list(peer_list, peer)) { + lock_get(&dmq_peer_list->lock); + if(search_peer_list(dmq_peer_list, peer)) { LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s, peer->description.len, peer->description.s); - lock_release(&peer_list->lock); + lock_release(&dmq_peer_list->lock); return NULL; } - new_peer = add_peer(peer_list, peer); - lock_release(&peer_list->lock); + new_peer = add_peer(dmq_peer_list, peer); + lock_release(&dmq_peer_list->lock); return new_peer; } @@ -124,8 +124,8 @@ int is_from_remote_node(sip_msg_t *msg) ip = &msg->rcv.src_ip; - lock_get(&node_list->lock); - node = node_list->nodes; + lock_get(&dmq_node_list->lock); + node = dmq_node_list->nodes; while(node) { if(!node->local && ip_addr_cmp(ip, &node->ip_address)) { @@ -135,7 +135,7 @@ int is_from_remote_node(sip_msg_t *msg) node = node->next; } done: - lock_release(&node_list->lock); + lock_release(&dmq_node_list->lock); return result; } @@ -153,8 +153,8 @@ int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except, { dmq_node_t *node; - lock_get(&node_list->lock); - node = node_list->nodes; + lock_get(&dmq_node_list->lock); + node = dmq_node_list->nodes; while(node) { /* we do not send the message to the following: * - the except node @@ -175,10 +175,10 @@ int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except, } node = node->next; } - lock_release(&node_list->lock); + lock_release(&dmq_node_list->lock); return 0; error: - lock_release(&node_list->lock); + lock_release(&dmq_node_list->lock); return -1; } @@ -295,13 +295,13 @@ int ki_dmq_send_message(sip_msg_t *msg, str *peer_str, str *to_str, goto error; } } - dmq_node_t *to_dmq_node = find_dmq_node_uri(node_list, to_str); + dmq_node_t *to_dmq_node = find_dmq_node_uri(dmq_node_list, to_str); if(!to_dmq_node) { LM_ERR("cannot find dmq_node: %.*s\n", to_str->len, to_str->s); goto error; } if(dmq_send_message(destination_peer, body_str, to_dmq_node, - ¬ification_callback, 1, ct_str) + &dmq_notification_resp_callback, 1, ct_str) < 0) { LM_ERR("cannot send dmq message\n"); goto error; @@ -366,7 +366,7 @@ int ki_dmq_bcast_message(sip_msg_t *msg, str *peer_str, str *body_str, goto error; } } - if(bcast_dmq_message(destination_peer, body_str, 0, ¬ification_callback, + if(bcast_dmq_message(destination_peer, body_str, 0, &dmq_notification_resp_callback, 1, ct_str) < 0) { LM_ERR("cannot send dmq message\n"); goto error; @@ -425,8 +425,8 @@ int ki_dmq_t_replicate_mode(struct sip_msg *msg, int mode) set_force_socket(msg, sock); } - lock_get(&node_list->lock); - node = node_list->nodes; + lock_get(&dmq_node_list->lock); + node = dmq_node_list->nodes; while(node) { /* we do not send the message to the following: * - ourself @@ -456,10 +456,10 @@ int ki_dmq_t_replicate_mode(struct sip_msg *msg, int mode) node = node->next; } - lock_release(&node_list->lock); + lock_release(&dmq_node_list->lock); return 0; error: - lock_release(&node_list->lock); + lock_release(&dmq_node_list->lock); return -1; } @@ -504,15 +504,15 @@ void ping_servers(unsigned int ticks, void *param) int ret; LM_DBG("ping_servers\n"); - if(!node_list->nodes - || (node_list->nodes->local && !node_list->nodes->next)) { + if(!dmq_node_list->nodes + || (dmq_node_list->nodes->local && !dmq_node_list->nodes->next)) { LM_DBG("node list is empty - attempt to rebuild from notification " "address\n"); *dmq_init_callback_done = 0; if(dmq_notification_address.s) { - notification_node = + dmq_notification_node = add_server_and_notify(&dmq_notification_address); - if(!notification_node) { + if(!dmq_notification_node) { LM_ERR("cannot retrieve initial nodelist from %.*s\n", STR_FMT(&dmq_notification_address)); } @@ -528,7 +528,7 @@ void ping_servers(unsigned int ticks, void *param) return; } ret = bcast_dmq_message1(dmq_notification_peer, body, NULL, - ¬ification_callback, 1, ¬ification_content_type, 1); + &dmq_notification_resp_callback, 1, &dmq_notification_content_type, 1); pkg_free(body->s); pkg_free(body); if(ret < 0) { diff --git a/src/modules/dmq/dmq_funcs.h b/src/modules/dmq/dmq_funcs.h index a9c6da540c7..4cec59dc372 100644 --- a/src/modules/dmq/dmq_funcs.h +++ b/src/modules/dmq/dmq_funcs.h @@ -15,8 +15,8 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ diff --git a/src/modules/dmq/dmqnode.c b/src/modules/dmq/dmqnode.c index 4ed926ff7cd..2eb3b1c3739 100644 --- a/src/modules/dmq/dmqnode.c +++ b/src/modules/dmq/dmqnode.c @@ -26,8 +26,8 @@ #include "dmqnode.h" #include "dmq.h" -dmq_node_t *self_node; -dmq_node_t *notification_node; +dmq_node_t *dmq_self_node; +dmq_node_t *dmq_notification_node; /* name */ str dmq_node_status_str = str_init("status"); @@ -240,7 +240,7 @@ dmq_node_t *find_dmq_node_uri(dmq_node_list_t *list, str *uri) dmq_node_t *find_dmq_node_uri2(str *uri) { - return find_dmq_node_uri(node_list, uri); + return find_dmq_node_uri(dmq_node_list, uri); } /** diff --git a/src/modules/dmq/dmqnode.h b/src/modules/dmq/dmqnode.h index fb0a9dcb2ee..45cac604a42 100644 --- a/src/modules/dmq/dmqnode.h +++ b/src/modules/dmq/dmqnode.h @@ -15,8 +15,8 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ @@ -59,7 +59,7 @@ typedef struct dmq_node_list } dmq_node_list_t; extern str dmq_node_status_str; -extern dmq_node_list_t *node_list; +extern dmq_node_list_t *dmq_node_list; dmq_node_list_t *init_dmq_node_list(); dmq_node_t *build_dmq_node(str *uri, int shm); @@ -80,7 +80,7 @@ int set_dmq_node_params(dmq_node_t *node, param_t *params); str *dmq_get_status_str(int status); int build_node_str(dmq_node_t *node, char *buf, int buflen); -extern dmq_node_t *self_node; -extern dmq_node_t *notification_node; +extern dmq_node_t *dmq_self_node; +extern dmq_node_t *dmq_notification_node; #endif diff --git a/src/modules/dmq/message.h b/src/modules/dmq/message.h index d47e421c6eb..d31386cb98d 100644 --- a/src/modules/dmq/message.h +++ b/src/modules/dmq/message.h @@ -15,8 +15,8 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ diff --git a/src/modules/dmq/notification_peer.c b/src/modules/dmq/notification_peer.c index 769b0e327bc..4cb44c0dee8 100644 --- a/src/modules/dmq/notification_peer.c +++ b/src/modules/dmq/notification_peer.c @@ -27,8 +27,8 @@ #define MAXDMQURILEN 255 #define MAXDMQHOSTS 30 -str notification_content_type = str_init("text/plain"); -dmq_resp_cback_t notification_callback = {¬ification_resp_callback_f, 0}; +str dmq_notification_content_type = str_init("text/plain"); +dmq_resp_cback_t dmq_notification_resp_callback = {¬ification_resp_callback_f, 0}; int *dmq_init_callback_done = 0; @@ -41,7 +41,7 @@ int add_notification_peer() dmq_peer_t not_peer; memset(¬_peer, 0, sizeof(dmq_peer_t)); - not_peer.callback = dmq_notification_callback; + not_peer.callback = dmq_notification_callback_f; not_peer.init_callback = NULL; not_peer.description.s = "notification_peer"; not_peer.description.len = 17; @@ -53,14 +53,14 @@ int add_notification_peer() goto error; } /* add itself to the node list */ - self_node = add_dmq_node(node_list, &dmq_server_address); - if(!self_node) { + dmq_self_node = add_dmq_node(dmq_node_list, &dmq_server_address); + if(!dmq_self_node) { LM_ERR("error adding self node\n"); goto error; } /* local node - only for self */ - self_node->local = 1; - self_node->status = DMQ_NODE_ACTIVE; + dmq_self_node->local = 1; + dmq_self_node->status = DMQ_NODE_ACTIVE; return 0; error: return -1; @@ -296,8 +296,8 @@ dmq_node_t *add_server_and_notify(str *paddr) * o process list **********/ - if(!multi_notify) { - pfirst = add_dmq_node(node_list, paddr); + if(!dmq_multi_notify) { + pfirst = add_dmq_node(dmq_node_list, paddr); } else { /********** * o init data area @@ -319,8 +319,8 @@ dmq_node_t *add_server_and_notify(str *paddr) for(index = 0; index < host_cnt; index++) { pstr->s = puri_list[index]; pstr->len = strlen(puri_list[index]); - if(!find_dmq_node_uri(node_list, pstr)) { // check for duplicates - pnode = add_dmq_node(node_list, pstr); + if(!find_dmq_node_uri(dmq_node_list, pstr)) { // check for duplicates + pnode = add_dmq_node(dmq_node_list, pstr); if(pnode && !pfirst) { pfirst = pnode; } @@ -436,11 +436,11 @@ int run_init_callbacks() { dmq_peer_t *crt; - if(peer_list == 0) { + if(dmq_peer_list == 0) { LM_WARN("peer list is null\n"); return 0; } - crt = peer_list->peers; + crt = dmq_peer_list->peers; while(crt) { if(crt->init_callback) { crt->init_callback(); @@ -454,7 +454,7 @@ int run_init_callbacks() /** * @brief dmq notification callback */ -int dmq_notification_callback( +int dmq_notification_callback_f( struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *dmq_node) { int nodes_recv; @@ -474,14 +474,14 @@ int dmq_notification_callback( maxforwards--; } } - nodes_recv = extract_node_list(node_list, msg); + nodes_recv = extract_node_list(dmq_node_list, msg); LM_DBG("received %d new or changed nodes\n", nodes_recv); response_body = build_notification_body(); if(response_body == NULL) { LM_ERR("no response body\n"); goto error; } - resp->content_type = notification_content_type; + resp->content_type = dmq_notification_content_type; resp->reason = dmq_200_rpl; resp->body = *response_body; resp->resp_code = 200; @@ -490,8 +490,8 @@ int dmq_notification_callback( if(nodes_recv > 0 && maxforwards > 0) { /* maxforwards is set to 0 so that the message is will not be in a spiral */ bcast_dmq_message(dmq_notification_peer, response_body, 0, - ¬ification_callback, maxforwards, - ¬ification_content_type); + &dmq_notification_resp_callback, maxforwards, + &dmq_notification_content_type); } pkg_free(response_body); if(dmq_init_callback_done && !*dmq_init_callback_done) { @@ -533,8 +533,8 @@ str *build_notification_body() return NULL; } /* we add each server to the body - each on a different line */ - lock_get(&node_list->lock); - cur_node = node_list->nodes; + lock_get(&dmq_node_list->lock); + cur_node = dmq_node_list->nodes; while(cur_node) { if (cur_node->local || cur_node->status == DMQ_NODE_ACTIVE) { LM_DBG("body_len = %d - clen = %d\n", body->len, clen); @@ -550,11 +550,11 @@ str *build_notification_body() } cur_node = cur_node->next; } - lock_release(&node_list->lock); + lock_release(&dmq_node_list->lock); body->len = clen; return body; error: - lock_release(&node_list->lock); + lock_release(&dmq_node_list->lock); pkg_free(body->s); pkg_free(body); return NULL; @@ -573,7 +573,8 @@ int request_nodelist(dmq_node_t *node, int forward) return -1; } ret = bcast_dmq_message1(dmq_notification_peer, body, NULL, - ¬ification_callback, forward, ¬ification_content_type, 1); + &dmq_notification_resp_callback, forward, + &dmq_notification_content_type, 1); pkg_free(body->s); pkg_free(body); return ret; @@ -591,8 +592,8 @@ int notification_resp_callback_f( LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param); if(code == 200) { /* be sure that the node that answered is in active state */ - update_dmq_node_status(node_list, node, DMQ_NODE_ACTIVE); - nodes_recv = extract_node_list(node_list, msg); + update_dmq_node_status(dmq_node_list, node, DMQ_NODE_ACTIVE); + nodes_recv = extract_node_list(dmq_node_list, msg); LM_DBG("received %d new or changed nodes\n", nodes_recv); if(dmq_init_callback_done && !*dmq_init_callback_done) { *dmq_init_callback_done = 1; @@ -601,18 +602,18 @@ int notification_resp_callback_f( } else if(code == 408) { if(STR_EQ(node->orig_uri, dmq_notification_address)) { LM_ERR("not deleting notification_peer\n"); - update_dmq_node_status(node_list, node, DMQ_NODE_PENDING); + update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING); return 0; } if (node->status == DMQ_NODE_DISABLED) { /* deleting node - the server did not respond */ LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri)); - ret = del_dmq_node(node_list, node); + ret = del_dmq_node(dmq_node_list, node); LM_DBG("del_dmq_node returned %d\n", ret); } else { /* put the node in disabled state and wait for the next ping before deleting it */ - update_dmq_node_status(node_list, node, DMQ_NODE_DISABLED); + update_dmq_node_status(dmq_node_list, node, DMQ_NODE_DISABLED); } } return 0; diff --git a/src/modules/dmq/notification_peer.h b/src/modules/dmq/notification_peer.h index 2f0ac42dc6a..e913445f990 100644 --- a/src/modules/dmq/notification_peer.h +++ b/src/modules/dmq/notification_peer.h @@ -15,8 +15,8 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ @@ -31,11 +31,11 @@ #include "peer.h" #include "dmq_funcs.h" -extern str notification_content_type; +extern str dmq_notification_content_type; extern int *dmq_init_callback_done; int add_notification_peer(); -int dmq_notification_callback( +int dmq_notification_callback_f( struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *dmq_node); int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg); str *build_notification_body(); @@ -44,7 +44,7 @@ int build_node_str(dmq_node_t *node, char *buf, int buflen); * this is acomplished by a KDMQ request * KDMQ notification@server:port * node - the node to send to - * forward - flag that tells if the node receiving the message is allowed to + * forward - flag that tells if the node receiving the message is allowed to * forward the request to its own list */ int request_nodelist(dmq_node_t *node, int forward); @@ -53,6 +53,6 @@ dmq_node_t *add_server_and_notify(str *server_address); /* helper functions */ extern int notification_resp_callback_f( struct sip_msg *msg, int code, dmq_node_t *node, void *param); -extern dmq_resp_cback_t notification_callback; +extern dmq_resp_cback_t dmq_notification_resp_callback; #endif diff --git a/src/modules/dmq/peer.c b/src/modules/dmq/peer.c index 7c2bf298e7f..8311d7a7d83 100644 --- a/src/modules/dmq/peer.c +++ b/src/modules/dmq/peer.c @@ -15,8 +15,8 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * */ @@ -29,15 +29,15 @@ */ dmq_peer_list_t *init_peer_list() { - dmq_peer_list_t *peer_list; - peer_list = shm_malloc(sizeof(dmq_peer_list_t)); - if(peer_list == NULL) { + dmq_peer_list_t *dmq_peer_list; + dmq_peer_list = shm_malloc(sizeof(dmq_peer_list_t)); + if(dmq_peer_list == NULL) { LM_ERR("no more shm\n"); return NULL; } - memset(peer_list, 0, sizeof(dmq_peer_list_t)); - lock_init(&peer_list->lock); - return peer_list; + memset(dmq_peer_list, 0, sizeof(dmq_peer_list_t)); + lock_init(&dmq_peer_list->lock); + return dmq_peer_list; } /** @@ -90,7 +90,7 @@ dmq_peer_t *find_peer(str peer_id) { dmq_peer_t foo_peer; foo_peer.peer_id = peer_id; - return search_peer_list(peer_list, &foo_peer); + return search_peer_list(dmq_peer_list, &foo_peer); } /** diff --git a/src/modules/dmq/peer.h b/src/modules/dmq/peer.h index d4feba759fa..40a0c82c399 100644 --- a/src/modules/dmq/peer.h +++ b/src/modules/dmq/peer.h @@ -61,7 +61,7 @@ typedef struct dmq_peer_list int count; } dmq_peer_list_t; -extern dmq_peer_list_t *peer_list; +extern dmq_peer_list_t *dmq_peer_list; dmq_peer_list_t *init_peer_list(); dmq_peer_t *search_peer_list(dmq_peer_list_t *peer_list, dmq_peer_t *peer); diff --git a/src/modules/dmq/worker.c b/src/modules/dmq/worker.c index a2ccc636bf9..43d76da3099 100644 --- a/src/modules/dmq/worker.c +++ b/src/modules/dmq/worker.c @@ -79,14 +79,14 @@ void worker_loop(int id) int not_parsed; dmq_node_t *dmq_node = NULL; - worker = &workers[id]; + worker = &dmq_workers[id]; for(;;) { - if(worker_usleep <= 0) { + if(dmq_worker_usleep <= 0) { LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid()); lock_get(&worker->lock); LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid()); } else { - sleep_us(worker_usleep); + sleep_us(dmq_worker_usleep); } /* remove from queue until empty */ @@ -105,7 +105,7 @@ void worker_loop(int id) if(parse_from_header(current_job->msg) < 0) { LM_ERR("bad sip message or missing From hdr\n"); } else { - dmq_node = find_dmq_node_uri(node_list, + dmq_node = find_dmq_node_uri(dmq_node_list, &((struct to_body *)current_job->msg->from->parsed) ->uri); } @@ -185,26 +185,26 @@ int add_dmq_job(struct sip_msg *msg, dmq_peer_t *peer) new_job.f = peer->callback; new_job.msg = cloned_msg; new_job.orig_peer = peer; - if(!num_workers) { + if(!dmq_num_workers) { LM_ERR("error in add_dmq_job: no workers spawned\n"); goto error; } - if(!workers[0].queue) { + if(!dmq_workers[0].queue) { LM_ERR("workers not (yet) initialized\n"); goto error; } /* initialize the worker with the first one */ - worker = workers; + worker = dmq_workers; /* search for an available worker, or, if not possible, * for the least busy one */ - for(i = 0; i < num_workers; i++) { - if(job_queue_size(workers[i].queue) == 0) { - worker = &workers[i]; + for(i = 0; i < dmq_num_workers; i++) { + if(job_queue_size(dmq_workers[i].queue) == 0) { + worker = &dmq_workers[i]; found_available = 1; break; - } else if(job_queue_size(workers[i].queue) + } else if(job_queue_size(dmq_workers[i].queue) < job_queue_size(worker->queue)) { - worker = &workers[i]; + worker = &dmq_workers[i]; } } if(!found_available) { @@ -215,7 +215,7 @@ int add_dmq_job(struct sip_msg *msg, dmq_peer_t *peer) if(job_queue_push(worker->queue, &new_job) < 0) { goto error; } - if(worker_usleep <= 0) { + if(dmq_worker_usleep <= 0) { lock_release(&worker->lock); } return 0; @@ -232,7 +232,7 @@ int add_dmq_job(struct sip_msg *msg, dmq_peer_t *peer) int init_worker(dmq_worker_t *worker) { memset(worker, 0, sizeof(*worker)); - if(worker_usleep <= 0) { + if(dmq_worker_usleep <= 0) { lock_init(&worker->lock); // acquire the lock for the first time - so that dmq_worker_loop blocks lock_get(&worker->lock); diff --git a/src/modules/dmq/worker.h b/src/modules/dmq/worker.h index cb4fce8ccf5..bca4c35b490 100644 --- a/src/modules/dmq/worker.h +++ b/src/modules/dmq/worker.h @@ -15,8 +15,8 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */