diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c index f659a70f320..bd43da72a35 100644 --- a/modules/dialog/dlg_dmq.c +++ b/modules/dialog/dlg_dmq.c @@ -35,6 +35,10 @@ dmq_api_t dlg_dmqb; dmq_peer_t* dlg_dmq_peer = NULL; dmq_resp_cback_t dlg_dmq_resp_callback = {&dlg_dmq_resp_callback_f, 0}; +int dmq_send_all_dlgs(); +int dlg_dmq_request_sync(); + + /** * @brief add notification peer */ @@ -51,7 +55,7 @@ int dlg_dmq_initialize() } not_peer.callback = dlg_dmq_handle_msg; - not_peer.init_callback = NULL; + not_peer.init_callback = dlg_dmq_request_sync; not_peer.description.s = "dialog"; not_peer.description.len = 6; not_peer.peer_id.s = "dialog"; @@ -288,6 +292,10 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) unref++; break; + case DLG_DMQ_SYNC: + dmq_send_all_dlgs(); + break; + case DLG_DMQ_NONE: break; } @@ -314,6 +322,46 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) } +int dlg_dmq_request_sync() { + srjson_doc_t jdoc; + + LM_DBG("requesting sync from dmq peers\n"); + + srjson_InitDoc(&jdoc, NULL); + + jdoc.root = srjson_CreateObject(&jdoc); + if(jdoc.root==NULL) { + LM_ERR("cannot create json root\n"); + goto error; + } + + srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DLG_DMQ_SYNC); + jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root); + if(jdoc.buf.s==NULL) { + LM_ERR("unable to serialize data\n"); + goto error; + } + jdoc.buf.len = strlen(jdoc.buf.s); + LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); + if (dlg_dmq_broadcast(&jdoc.buf)!=0) { + goto error; + } + + jdoc.free_fn(jdoc.buf.s); + jdoc.buf.s = NULL; + srjson_DestroyDoc(&jdoc); + return 0; + +error: + if(jdoc.buf.s!=NULL) { + jdoc.free_fn(jdoc.buf.s); + jdoc.buf.s = NULL; + } + srjson_DestroyDoc(&jdoc); + return -1; +} + + int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) { srjson_doc_t jdoc, prof_jdoc; @@ -391,6 +439,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl break; case DLG_DMQ_NONE: + case DLG_DMQ_SYNC: break; } if (needlock) @@ -422,6 +471,30 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl } +int dmq_send_all_dlgs() { + int index; + dlg_entry_t entry; + dlg_cell_t *dlg; + + LM_DBG("sending all dialogs \n"); + + for(index = 0; index< d_table->size; index++){ + /* lock the whole entry */ + entry = (d_table->entries)[index]; + dlg_lock( d_table, &entry); + + for(dlg = entry.first; dlg != NULL; dlg = dlg->next){ + dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC; + dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0); + } + + dlg_unlock( d_table, &entry); + } + + return 0; +} + + /** * @brief dmq response callback */ diff --git a/modules/dialog/dlg_dmq.h b/modules/dialog/dlg_dmq.h index ac38010ced5..bd1375742cd 100644 --- a/modules/dialog/dlg_dmq.h +++ b/modules/dialog/dlg_dmq.h @@ -38,6 +38,7 @@ typedef enum { DLG_DMQ_UPDATE, DLG_DMQ_STATE, DLG_DMQ_RM, + DLG_DMQ_SYNC, } dlg_dmq_action_t; int dlg_dmq_initialize(); diff --git a/modules/dmq/dmq.c b/modules/dmq/dmq.c index 30405f1e1dc..515ba49a9a8 100644 --- a/modules/dmq/dmq.c +++ b/modules/dmq/dmq.c @@ -229,7 +229,14 @@ static int mod_init(void) LM_ERR("error in shm_malloc\n"); return -1; } - + + dmq_init_callback_done = shm_malloc(sizeof(int)); + if (!dmq_init_callback_done) { + LM_ERR("no more shm\n"); + return -1; + } + *dmq_init_callback_done = 0; + /** * add the dmq notification peer. * the dmq is a peer itself so that it can receive node notifications diff --git a/modules/dmq/notification_peer.c b/modules/dmq/notification_peer.c index e7704f06ae1..75c1386f032 100644 --- a/modules/dmq/notification_peer.c +++ b/modules/dmq/notification_peer.c @@ -29,6 +29,9 @@ str notification_content_type = str_init("text/plain"); dmq_resp_cback_t notification_callback = {¬ification_resp_callback_f, 0}; +int *dmq_init_callback_done; + + /** * @brief add notification peer */ @@ -186,7 +189,6 @@ int run_init_callbacks() { */ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) { - static int firstrun = 1; int nodes_recv; str* response_body = NULL; int maxforwards = 0; @@ -223,9 +225,9 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) ¬ification_callback, maxforwards, ¬ification_content_type); } pkg_free(response_body); - if (firstrun) { + if (!*dmq_init_callback_done) { + *dmq_init_callback_done = 1; run_init_callbacks(); - firstrun = 0; } return 0; error: @@ -312,8 +314,17 @@ int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) { int ret; + int nodes_recv; + LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param); - if(code == 408) { + if(code == 200) { + nodes_recv = extract_node_list(node_list, msg); + LM_DBG("received %d new or changed nodes\n", nodes_recv); + if (!*dmq_init_callback_done) { + *dmq_init_callback_done = 1; + run_init_callbacks(); + } + } else if(code == 408) { /* deleting node - the server did not respond */ LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri)); if (STR_EQ(node->orig_uri, dmq_notification_address)) { diff --git a/modules/dmq/notification_peer.h b/modules/dmq/notification_peer.h index 72df4ec1044..ff9871df93f 100644 --- a/modules/dmq/notification_peer.h +++ b/modules/dmq/notification_peer.h @@ -34,6 +34,7 @@ #include "dmq_funcs.h" extern str notification_content_type; +extern int *dmq_init_callback_done; int add_notification_peer(); int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);