Skip to content

Commit

Permalink
dmq: improve bus stability and reduce unnecessary state transfer
Browse files Browse the repository at this point in the history
- prevents split cluster in certain scenarios (e.g. GH issue #1349)
- add 'pending' state for new, locally added nodes, until confirmed
- continue to probe nodes marked as inactive/disabled so that they
  are eventually removed (but still exclude from normal replication)
  • Loading branch information
charlesrchance committed Dec 20, 2017
1 parent db0b73b commit 851e610
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 17 deletions.
19 changes: 13 additions & 6 deletions src/modules/dmq/dmq_funcs.c
Expand Up @@ -147,8 +147,9 @@ int is_from_remote_node(sip_msg_t *msg)
* except - we do not send the message to this node
* resp_cback - a response callback that gets called when the transaction is complete
*/
int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except,
dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type)
int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except,
dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type,
int incl_inactive)
{
dmq_node_t *node;

Expand All @@ -158,10 +159,10 @@ int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except,
/* we do not send the message to the following:
* - the except node
* - itself
* - any inactive nodes
* - any inactive nodes (unless incl_inactive is specified)
*/
if((except && cmp_dmq_node(node, except)) || node->local
|| node->status != DMQ_NODE_ACTIVE) {
|| (node->status != DMQ_NODE_ACTIVE && !incl_inactive)) {
LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri));
node = node->next;
continue;
Expand All @@ -181,6 +182,12 @@ int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except,
return -1;
}

int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except,
dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type)
{
return bcast_dmq_message1(peer, body, except, resp_cback, max_forwards, content_type, 0);
}

/**
* @brief send a dmq message
*
Expand Down Expand Up @@ -520,8 +527,8 @@ void ping_servers(unsigned int ticks, void *param)
LM_ERR("could not build notification body\n");
return;
}
ret = bcast_dmq_message(dmq_notification_peer, body, NULL,
&notification_callback, 1, &notification_content_type);
ret = bcast_dmq_message1(dmq_notification_peer, body, NULL,
&notification_callback, 1, &notification_content_type, 1);
pkg_free(body->s);
pkg_free(body);
if(ret < 0) {
Expand Down
3 changes: 3 additions & 0 deletions src/modules/dmq/dmq_funcs.h
Expand Up @@ -55,6 +55,9 @@ int cfg_dmq_is_from_node(struct sip_msg *msg, char *p1, char *p2);
dmq_peer_t *register_dmq_peer(dmq_peer_t *peer);
int dmq_send_message(dmq_peer_t *peer, str *body, dmq_node_t *node,
dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type);
int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except,
dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type,
int incl_inactive);
int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except,
dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type);

Expand Down
28 changes: 27 additions & 1 deletion src/modules/dmq/dmqnode.c
Expand Up @@ -35,6 +35,7 @@ str dmq_node_status_str = str_init("status");
str dmq_node_active_str = str_init("active");
str dmq_node_disabled_str = str_init("disabled");
str dmq_node_timeout_str = str_init("timeout");
str dmq_node_pending_str = str_init("pending");

/**
* @brief get the string status of the node
Expand All @@ -51,6 +52,9 @@ str *get_status_str(int status)
case DMQ_NODE_TIMEOUT: {
return &dmq_node_timeout_str;
}
case DMQ_NODE_PENDING: {
return &dmq_node_pending_str;
}
default: {
return 0;
}
Expand Down Expand Up @@ -119,6 +123,8 @@ int set_dmq_node_params(dmq_node_t *node, param_t *params)
node->status = DMQ_NODE_TIMEOUT;
} else if(STR_EQ(*status, dmq_node_disabled_str)) {
node->status = DMQ_NODE_DISABLED;
} else if(STR_EQ(*status, dmq_node_pending_str)) {
node->status = DMQ_NODE_PENDING;
} else {
LM_ERR("invalid status parameter: %.*s\n", STR_FMT(status));
goto error;
Expand All @@ -134,7 +140,7 @@ int set_dmq_node_params(dmq_node_t *node, param_t *params)
*/
int set_default_dmq_node_params(dmq_node_t *node)
{
node->status = DMQ_NODE_ACTIVE;
node->status = DMQ_NODE_PENDING;
return 0;
}

Expand Down Expand Up @@ -366,6 +372,26 @@ dmq_node_t *add_dmq_node(dmq_node_list_t *list, str *uri)
return NULL;
}

/**
* @brief update status of existing dmq node
*/
int update_dmq_node_status(dmq_node_list_t *list, dmq_node_t *node, int status)
{
dmq_node_t *cur;
lock_get(&list->lock);
cur = list->nodes;
while(cur) {
if(cmp_dmq_node(cur, node)) {
cur->status = status;
lock_release(&list->lock);
return 1;
}
cur = cur->next;
}
lock_release(&list->lock);
return 0;
}

/**
* @brief build dmq node string
*/
Expand Down
2 changes: 2 additions & 0 deletions src/modules/dmq/dmqnode.h
Expand Up @@ -37,6 +37,7 @@
#define DMQ_NODE_ACTIVE 1 << 1
#define DMQ_NODE_TIMEOUT 1 << 2
#define DMQ_NODE_DISABLED 1 << 3
#define DMQ_NODE_PENDING 1 << 4

typedef struct dmq_node
{
Expand Down Expand Up @@ -69,6 +70,7 @@ dmq_node_t *find_dmq_node_uri(dmq_node_list_t *list, str *uri);
dmq_node_t *find_dmq_node_uri2(str *uri);
int del_dmq_node(dmq_node_list_t *list, dmq_node_t *node);
int cmp_dmq_node(dmq_node_t *node, dmq_node_t *cmpnode);
int update_dmq_node_status(dmq_node_list_t *list, dmq_node_t *node, int status);
dmq_node_t *shm_dup_node(dmq_node_t *node);
void destroy_dmq_node(dmq_node_t *node, int shm);
void shm_free_node(dmq_node_t *node);
Expand Down
24 changes: 14 additions & 10 deletions src/modules/dmq/notification_peer.c
Expand Up @@ -60,6 +60,7 @@ int add_notification_peer()
}
/* local node - only for self */
self_node->local = 1;
self_node->status = DMQ_NODE_ACTIVE;
return 0;
error:
return -1;
Expand Down Expand Up @@ -411,7 +412,7 @@ int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg)
update_list->nodes = cur;
update_list->count++;
total_nodes++;
} else if(find->uri.params.s && ret->status != find->status) {
} else if(!ret->local && find->uri.params.s && ret->status != find->status) {
LM_DBG("updating status on %.*s from %d to %d\n", STR_FMT(&tmp_uri),
ret->status, find->status);
ret->status = find->status;
Expand Down Expand Up @@ -533,16 +534,18 @@ str *build_notification_body()
lock_get(&node_list->lock);
cur_node = node_list->nodes;
while(cur_node) {
LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
/* body->len - clen - 2 bytes left to write - including the \r\n */
slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2);
if(slen < 0) {
LM_ERR("cannot build_node_string\n");
goto error;
if (cur_node->local || cur_node->status == DMQ_NODE_ACTIVE) {
LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
/* body->len - clen - 2 bytes left to write - including the \r\n */
slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2);
if(slen < 0) {
LM_ERR("cannot build_node_string\n");
goto error;
}
clen += slen;
body->s[clen++] = '\r';
body->s[clen++] = '\n';
}
clen += slen;
body->s[clen++] = '\r';
body->s[clen++] = '\n';
cur_node = cur_node->next;
}
lock_release(&node_list->lock);
Expand Down Expand Up @@ -597,6 +600,7 @@ int notification_resp_callback_f(
STR_FMT(&node->orig_uri));
if(STR_EQ(node->orig_uri, dmq_notification_address)) {
LM_ERR("not deleting notification_peer\n");
update_dmq_node_status(node_list, node, DMQ_NODE_PENDING);
return 0;
}
ret = del_dmq_node(node_list, node);
Expand Down

0 comments on commit 851e610

Please sign in to comment.