Skip to content

Commit

Permalink
dialog: DMQ-sync dialogs with peers on startup
Browse files Browse the repository at this point in the history
Use DMQ's init_callback() to request the peers to send all dialogs.
  • Loading branch information
gaaf committed Sep 4, 2014
1 parent 6ceddd9 commit 144737c
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 6 deletions.
75 changes: 74 additions & 1 deletion modules/dialog/dlg_dmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ dmq_api_t dlg_dmqb;
dmq_peer_t* dlg_dmq_peer = NULL;
dmq_resp_cback_t dlg_dmq_resp_callback = {&dlg_dmq_resp_callback_f, 0};

int dmq_send_all_dlgs();
int dlg_dmq_request_sync();


/**
* @brief add notification peer
*/
Expand All @@ -51,7 +55,7 @@ int dlg_dmq_initialize()
}

not_peer.callback = dlg_dmq_handle_msg;
not_peer.init_callback = NULL;
not_peer.init_callback = dlg_dmq_request_sync;
not_peer.description.s = "dialog";
not_peer.description.len = 6;
not_peer.peer_id.s = "dialog";
Expand Down Expand Up @@ -288,6 +292,10 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
unref++;
break;

case DLG_DMQ_SYNC:
dmq_send_all_dlgs();
break;

case DLG_DMQ_NONE:
break;
}
Expand All @@ -314,6 +322,46 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
}


int dlg_dmq_request_sync() {
srjson_doc_t jdoc;

LM_DBG("requesting sync from dmq peers\n");

srjson_InitDoc(&jdoc, NULL);

jdoc.root = srjson_CreateObject(&jdoc);
if(jdoc.root==NULL) {
LM_ERR("cannot create json root\n");
goto error;
}

srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DLG_DMQ_SYNC);
jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
if(jdoc.buf.s==NULL) {
LM_ERR("unable to serialize data\n");
goto error;
}
jdoc.buf.len = strlen(jdoc.buf.s);
LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
if (dlg_dmq_broadcast(&jdoc.buf)!=0) {
goto error;
}

jdoc.free_fn(jdoc.buf.s);
jdoc.buf.s = NULL;
srjson_DestroyDoc(&jdoc);
return 0;

error:
if(jdoc.buf.s!=NULL) {
jdoc.free_fn(jdoc.buf.s);
jdoc.buf.s = NULL;
}
srjson_DestroyDoc(&jdoc);
return -1;
}


int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) {

srjson_doc_t jdoc, prof_jdoc;
Expand Down Expand Up @@ -391,6 +439,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl
break;

case DLG_DMQ_NONE:
case DLG_DMQ_SYNC:
break;
}
if (needlock)
Expand Down Expand Up @@ -422,6 +471,30 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl
}


int dmq_send_all_dlgs() {
int index;
dlg_entry_t entry;
dlg_cell_t *dlg;

LM_DBG("sending all dialogs \n");

for(index = 0; index< d_table->size; index++){
/* lock the whole entry */
entry = (d_table->entries)[index];
dlg_lock( d_table, &entry);

for(dlg = entry.first; dlg != NULL; dlg = dlg->next){
dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0);
}

dlg_unlock( d_table, &entry);
}

return 0;
}


/**
* @brief dmq response callback
*/
Expand Down
1 change: 1 addition & 0 deletions modules/dialog/dlg_dmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typedef enum {
DLG_DMQ_UPDATE,
DLG_DMQ_STATE,
DLG_DMQ_RM,
DLG_DMQ_SYNC,
} dlg_dmq_action_t;

int dlg_dmq_initialize();
Expand Down
9 changes: 8 additions & 1 deletion modules/dmq/dmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,14 @@ static int mod_init(void)
LM_ERR("error in shm_malloc\n");
return -1;
}


dmq_init_callback_done = shm_malloc(sizeof(int));
if (!dmq_init_callback_done) {
LM_ERR("no more shm\n");
return -1;
}
*dmq_init_callback_done = 0;

/**
* add the dmq notification peer.
* the dmq is a peer itself so that it can receive node notifications
Expand Down
19 changes: 15 additions & 4 deletions modules/dmq/notification_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
str notification_content_type = str_init("text/plain");
dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};

int *dmq_init_callback_done;


/**
* @brief add notification peer
*/
Expand Down Expand Up @@ -186,7 +189,6 @@ int run_init_callbacks() {
*/
int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
{
static int firstrun = 1;
int nodes_recv;
str* response_body = NULL;
int maxforwards = 0;
Expand Down Expand Up @@ -223,9 +225,9 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
&notification_callback, maxforwards, &notification_content_type);
}
pkg_free(response_body);
if (firstrun) {
if (!*dmq_init_callback_done) {
*dmq_init_callback_done = 1;
run_init_callbacks();
firstrun = 0;
}
return 0;
error:
Expand Down Expand Up @@ -312,8 +314,17 @@ int notification_resp_callback_f(struct sip_msg* msg, int code,
dmq_node_t* node, void* param)
{
int ret;
int nodes_recv;

LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
if(code == 408) {
if(code == 200) {
nodes_recv = extract_node_list(node_list, msg);
LM_DBG("received %d new or changed nodes\n", nodes_recv);
if (!*dmq_init_callback_done) {
*dmq_init_callback_done = 1;
run_init_callbacks();
}
} else if(code == 408) {
/* deleting node - the server did not respond */
LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri));
if (STR_EQ(node->orig_uri, dmq_notification_address)) {
Expand Down
1 change: 1 addition & 0 deletions modules/dmq/notification_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "dmq_funcs.h"

extern str notification_content_type;
extern int *dmq_init_callback_done;

int add_notification_peer();
int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
Expand Down

0 comments on commit 144737c

Please sign in to comment.