From 851e61063dabe58508d1b909042647415ab14be7 Mon Sep 17 00:00:00 2001 From: Charles Chance Date: Wed, 20 Dec 2017 15:19:53 +0000 Subject: [PATCH] dmq: improve bus stability and reduce unnecessary state transfer - prevents split cluster in certain scenarios (e.g. GH issue #1349) - add 'pending' state for new, locally added nodes, until confirmed - continue to probe nodes marked as inactive/disabled so that they are eventually removed (but still exclude from normal replication) --- src/modules/dmq/dmq_funcs.c | 19 +++++++++++++------ src/modules/dmq/dmq_funcs.h | 3 +++ src/modules/dmq/dmqnode.c | 28 +++++++++++++++++++++++++++- src/modules/dmq/dmqnode.h | 2 ++ src/modules/dmq/notification_peer.c | 24 ++++++++++++++---------- 5 files changed, 59 insertions(+), 17 deletions(-) diff --git a/src/modules/dmq/dmq_funcs.c b/src/modules/dmq/dmq_funcs.c index a7fa31e1002..27afbc425e3 100644 --- a/src/modules/dmq/dmq_funcs.c +++ b/src/modules/dmq/dmq_funcs.c @@ -147,8 +147,9 @@ int is_from_remote_node(sip_msg_t *msg) * except - we do not send the message to this node * resp_cback - a response callback that gets called when the transaction is complete */ -int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except, - dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type) +int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except, + dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type, + int incl_inactive) { dmq_node_t *node; @@ -158,10 +159,10 @@ int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except, /* we do not send the message to the following: * - the except node * - itself - * - any inactive nodes + * - any inactive nodes (unless incl_inactive is specified) */ if((except && cmp_dmq_node(node, except)) || node->local - || node->status != DMQ_NODE_ACTIVE) { + || (node->status != DMQ_NODE_ACTIVE && !incl_inactive)) { LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri)); node = node->next; continue; @@ -181,6 +182,12 @@ int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except, return -1; } +int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except, + dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type) +{ + return bcast_dmq_message1(peer, body, except, resp_cback, max_forwards, content_type, 0); +} + /** * @brief send a dmq message * @@ -520,8 +527,8 @@ void ping_servers(unsigned int ticks, void *param) LM_ERR("could not build notification body\n"); return; } - ret = bcast_dmq_message(dmq_notification_peer, body, NULL, - ¬ification_callback, 1, ¬ification_content_type); + ret = bcast_dmq_message1(dmq_notification_peer, body, NULL, + ¬ification_callback, 1, ¬ification_content_type, 1); pkg_free(body->s); pkg_free(body); if(ret < 0) { diff --git a/src/modules/dmq/dmq_funcs.h b/src/modules/dmq/dmq_funcs.h index b7c36a5106f..a9c6da540c7 100644 --- a/src/modules/dmq/dmq_funcs.h +++ b/src/modules/dmq/dmq_funcs.h @@ -55,6 +55,9 @@ int cfg_dmq_is_from_node(struct sip_msg *msg, char *p1, char *p2); dmq_peer_t *register_dmq_peer(dmq_peer_t *peer); int dmq_send_message(dmq_peer_t *peer, str *body, dmq_node_t *node, dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type); +int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except, + dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type, + int incl_inactive); int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except, dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type); diff --git a/src/modules/dmq/dmqnode.c b/src/modules/dmq/dmqnode.c index 05428b0aff7..6437a5b206f 100644 --- a/src/modules/dmq/dmqnode.c +++ b/src/modules/dmq/dmqnode.c @@ -35,6 +35,7 @@ str dmq_node_status_str = str_init("status"); str dmq_node_active_str = str_init("active"); str dmq_node_disabled_str = str_init("disabled"); str dmq_node_timeout_str = str_init("timeout"); +str dmq_node_pending_str = str_init("pending"); /** * @brief get the string status of the node @@ -51,6 +52,9 @@ str *get_status_str(int status) case DMQ_NODE_TIMEOUT: { return &dmq_node_timeout_str; } + case DMQ_NODE_PENDING: { + return &dmq_node_pending_str; + } default: { return 0; } @@ -119,6 +123,8 @@ int set_dmq_node_params(dmq_node_t *node, param_t *params) node->status = DMQ_NODE_TIMEOUT; } else if(STR_EQ(*status, dmq_node_disabled_str)) { node->status = DMQ_NODE_DISABLED; + } else if(STR_EQ(*status, dmq_node_pending_str)) { + node->status = DMQ_NODE_PENDING; } else { LM_ERR("invalid status parameter: %.*s\n", STR_FMT(status)); goto error; @@ -134,7 +140,7 @@ int set_dmq_node_params(dmq_node_t *node, param_t *params) */ int set_default_dmq_node_params(dmq_node_t *node) { - node->status = DMQ_NODE_ACTIVE; + node->status = DMQ_NODE_PENDING; return 0; } @@ -366,6 +372,26 @@ dmq_node_t *add_dmq_node(dmq_node_list_t *list, str *uri) return NULL; } +/** + * @brief update status of existing dmq node + */ +int update_dmq_node_status(dmq_node_list_t *list, dmq_node_t *node, int status) +{ + dmq_node_t *cur; + lock_get(&list->lock); + cur = list->nodes; + while(cur) { + if(cmp_dmq_node(cur, node)) { + cur->status = status; + lock_release(&list->lock); + return 1; + } + cur = cur->next; + } + lock_release(&list->lock); + return 0; +} + /** * @brief build dmq node string */ diff --git a/src/modules/dmq/dmqnode.h b/src/modules/dmq/dmqnode.h index f7ceefb1da8..9c8630367c9 100644 --- a/src/modules/dmq/dmqnode.h +++ b/src/modules/dmq/dmqnode.h @@ -37,6 +37,7 @@ #define DMQ_NODE_ACTIVE 1 << 1 #define DMQ_NODE_TIMEOUT 1 << 2 #define DMQ_NODE_DISABLED 1 << 3 +#define DMQ_NODE_PENDING 1 << 4 typedef struct dmq_node { @@ -69,6 +70,7 @@ dmq_node_t *find_dmq_node_uri(dmq_node_list_t *list, str *uri); dmq_node_t *find_dmq_node_uri2(str *uri); int del_dmq_node(dmq_node_list_t *list, dmq_node_t *node); int cmp_dmq_node(dmq_node_t *node, dmq_node_t *cmpnode); +int update_dmq_node_status(dmq_node_list_t *list, dmq_node_t *node, int status); dmq_node_t *shm_dup_node(dmq_node_t *node); void destroy_dmq_node(dmq_node_t *node, int shm); void shm_free_node(dmq_node_t *node); diff --git a/src/modules/dmq/notification_peer.c b/src/modules/dmq/notification_peer.c index 979f776525a..c8e659c7213 100644 --- a/src/modules/dmq/notification_peer.c +++ b/src/modules/dmq/notification_peer.c @@ -60,6 +60,7 @@ int add_notification_peer() } /* local node - only for self */ self_node->local = 1; + self_node->status = DMQ_NODE_ACTIVE; return 0; error: return -1; @@ -411,7 +412,7 @@ int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg) update_list->nodes = cur; update_list->count++; total_nodes++; - } else if(find->uri.params.s && ret->status != find->status) { + } else if(!ret->local && find->uri.params.s && ret->status != find->status) { LM_DBG("updating status on %.*s from %d to %d\n", STR_FMT(&tmp_uri), ret->status, find->status); ret->status = find->status; @@ -533,16 +534,18 @@ str *build_notification_body() lock_get(&node_list->lock); cur_node = node_list->nodes; while(cur_node) { - LM_DBG("body_len = %d - clen = %d\n", body->len, clen); - /* body->len - clen - 2 bytes left to write - including the \r\n */ - slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2); - if(slen < 0) { - LM_ERR("cannot build_node_string\n"); - goto error; + if (cur_node->local || cur_node->status == DMQ_NODE_ACTIVE) { + LM_DBG("body_len = %d - clen = %d\n", body->len, clen); + /* body->len - clen - 2 bytes left to write - including the \r\n */ + slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2); + if(slen < 0) { + LM_ERR("cannot build_node_string\n"); + goto error; + } + clen += slen; + body->s[clen++] = '\r'; + body->s[clen++] = '\n'; } - clen += slen; - body->s[clen++] = '\r'; - body->s[clen++] = '\n'; cur_node = cur_node->next; } lock_release(&node_list->lock); @@ -597,6 +600,7 @@ int notification_resp_callback_f( STR_FMT(&node->orig_uri)); if(STR_EQ(node->orig_uri, dmq_notification_address)) { LM_ERR("not deleting notification_peer\n"); + update_dmq_node_status(node_list, node, DMQ_NODE_PENDING); return 0; } ret = del_dmq_node(node_list, node);