diff --git a/src/modules/dmq_usrloc/dmq_usrloc.c b/src/modules/dmq_usrloc/dmq_usrloc.c index 3814d755851..4e36d8e292d 100644 --- a/src/modules/dmq_usrloc/dmq_usrloc.c +++ b/src/modules/dmq_usrloc/dmq_usrloc.c @@ -37,6 +37,8 @@ static int child_init(int); int dmq_usrloc_enable = 0; int _dmq_usrloc_sync = 1; int _dmq_usrloc_batch_size = 0; +int _dmq_usrloc_batch_msg_contacts = 1; +int _dmq_usrloc_batch_msg_size = 60000; int _dmq_usrloc_batch_usleep = 0; str _dmq_usrloc_domain = str_init("location"); @@ -47,6 +49,8 @@ MODULE_VERSION static param_export_t params[] = { {"enable", INT_PARAM, &dmq_usrloc_enable}, {"sync", INT_PARAM, &_dmq_usrloc_sync}, + {"batch_msg_contacts", INT_PARAM, &_dmq_usrloc_batch_msg_contacts}, + {"batch_msg_size", INT_PARAM, &_dmq_usrloc_batch_msg_size}, {"batch_size", INT_PARAM, &_dmq_usrloc_batch_size}, {"batch_usleep", INT_PARAM, &_dmq_usrloc_batch_usleep}, {"usrloc_domain", PARAM_STR, &_dmq_usrloc_domain}, @@ -74,6 +78,15 @@ static int mod_init(void) bind_usrloc_t bind_usrloc; LM_INFO("dmq usrloc replication mode = %d\n", dmq_usrloc_enable); + if(_dmq_usrloc_batch_msg_size > 60000) { + LM_ERR("batch_msg_size too high[%d] setting to [60000]\n", _dmq_usrloc_batch_msg_size); + _dmq_usrloc_batch_msg_size = 60000; + } + if(_dmq_usrloc_batch_msg_contacts > 150) { + LM_ERR("batch_msg_contacts too high[%d] setting to [150]\n", _dmq_usrloc_batch_msg_contacts); + _dmq_usrloc_batch_msg_contacts = 150; + } + if (dmq_usrloc_enable) { bind_usrloc = (bind_usrloc_t)find_export("ul_bind_usrloc", 1, 0); diff --git a/src/modules/dmq_usrloc/doc/dmq_usrloc.xml b/src/modules/dmq_usrloc/doc/dmq_usrloc.xml index 9c35f91d091..c6b36acda72 100644 --- a/src/modules/dmq_usrloc/doc/dmq_usrloc.xml +++ b/src/modules/dmq_usrloc/doc/dmq_usrloc.xml @@ -39,6 +39,21 @@ 2014 + + Julien + Chavanton + flowroute.com + jchavanton@gmail.com +
+ + + +
+
+ + + 2017 + diff --git a/src/modules/dmq_usrloc/doc/dmq_usrloc_admin.xml b/src/modules/dmq_usrloc/doc/dmq_usrloc_admin.xml index c6fbd228a12..81be4308a45 100644 --- a/src/modules/dmq_usrloc/doc/dmq_usrloc_admin.xml +++ b/src/modules/dmq_usrloc/doc/dmq_usrloc_admin.xml @@ -174,6 +174,54 @@ modparam("dmq_usrloc", "batch_size", 4000) ... modparam("dmq_usrloc", "batch_usleep", 1000) ... + + + +
+ <varname>batch_msg_size</varname> (int) + + The parameter controls the size of the messages during a sync + This is to make sure the messages are never larger then 65536 (the maximum datagram size) + + Note that batch_msg_contacts will also be checked + + + Default value is 60000. + Maximum value is 60000. + + + + Set <varname>batch_msg_size</varname> parameter + +... +modparam("dmq_usrloc", "batch_msg_contacts", 50) # 50 contacts / message +modparam("dmq_usrloc", "batch_msg_size", 500000) +# with this config, when doing a full sync, each message will be sent a soon as the body is larger 50K or contains 50 contacts +... + + +
+
+ <varname>batch_msg_contacts</varname> (int) + + The parameter controls the amount of contact per message/transaction during a sync + + Note that batch_msg_size will also be checked + + + Default value is 1. + Maximum value is 150. + + + + Set <varname>batch_msg_contacts</varname> parameter + +... +modparam("dmq_usrloc", "batch_msg_contacts", 50) # 50 contacts / message +modparam("dmq_usrloc", "batch_size", 10000) # 10000 contacts / batch +modparam("dmq_usrloc", "batch_usleep", 500000) # one batch every 500ms +# syncing 20K contacts/second with 50 contacts/message +...
diff --git a/src/modules/dmq_usrloc/usrloc_sync.c b/src/modules/dmq_usrloc/usrloc_sync.c index 5a11e82970d..cbc1d6db472 100644 --- a/src/modules/dmq_usrloc/usrloc_sync.c +++ b/src/modules/dmq_usrloc/usrloc_sync.c @@ -18,6 +18,9 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * + * Edited : + * + * Copyright (C) 2017 Julien Chavanton, Flowroute */ #include "usrloc_sync.h" @@ -41,10 +44,14 @@ dmq_resp_cback_t usrloc_dmq_resp_callback = {&usrloc_dmq_resp_callback_f, 0}; int usrloc_dmq_send_all(); int usrloc_dmq_request_sync(); int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node); +int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node); +void usrloc_dmq_send_multi_contact_flush(dmq_node_t* node); #define MAX_AOR_LEN 256 extern int _dmq_usrloc_sync; +extern int _dmq_usrloc_batch_msg_contacts; +extern int _dmq_usrloc_batch_msg_size; extern int _dmq_usrloc_batch_size; extern int _dmq_usrloc_batch_usleep; extern str _dmq_usrloc_domain; @@ -240,7 +247,11 @@ void usrloc_get_all_ucontact(dmq_node_t* node) LM_DBG("- AoR: %.*s AoRhash=%d Flags=%d\n", aor.len, aor.s, aorhash, flags); while (ptr) { - usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node); + if (_dmq_usrloc_batch_msg_contacts >1) { + usrloc_dmq_send_multi_contact(ptr, aor, DMQ_UPDATE, node); + } else { + usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node); + } n++; ptr = ptr->next; } @@ -254,6 +265,7 @@ void usrloc_get_all_ucontact(dmq_node_t* node) } } dmq_usrloc_free(buf); + usrloc_dmq_send_multi_contact_flush(node); // send any remaining contacts done: c.s = ""; c.len = 0; @@ -307,69 +319,17 @@ int usrloc_dmq_send(str* body, dmq_node_t* node) { return 0; } - -/** - * @brief ht dmq callback - */ -int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node) -{ - int content_length; - str body; - srjson_doc_t jdoc; - srjson_t *it = NULL; +static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t* node) { static ucontact_info_t ci; - - unsigned int action, expires, cseq, flags, cflags, q, last_modified, - methods, reg_id; - str aor=STR_NULL, ruid=STR_NULL, c=STR_NULL, received=STR_NULL, - path=STR_NULL, callid=STR_NULL, user_agent=STR_NULL, instance=STR_NULL; + srjson_t *it = NULL; + unsigned int action, expires, cseq, flags, cflags, q, last_modified, methods, reg_id; + str aor=STR_NULL, ruid=STR_NULL, c=STR_NULL, received=STR_NULL, path=STR_NULL, + callid=STR_NULL, user_agent=STR_NULL, instance=STR_NULL; action = expires = cseq = flags = cflags = q = last_modified = methods = reg_id = 0; - srjson_InitDoc(&jdoc, NULL); - if(parse_from_header(msg)<0) { - LM_ERR("failed to parse from header\n"); - goto invalid; - } - body = ((struct to_body*)msg->from->parsed)->uri; - - LM_DBG("dmq message received from %.*s\n", body.len, body.s); - - if(parse_headers(msg, HDR_EOH_F, 0)<0) { - LM_ERR("failed to parse the headers\n"); - goto invalid; - } - if(!msg->content_length) { - LM_ERR("no content length header found\n"); - goto invalid; - } - content_length = get_content_length(msg); - if(!content_length) { - LM_DBG("content length is 0\n"); - goto invalid; - } - - body.s = get_body(msg); - body.len = content_length; - - if (!body.s) { - LM_ERR("unable to get body\n"); - goto error; - } - - jdoc.buf = body; - if(jdoc.root == NULL) { - jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s); - if(jdoc.root == NULL) - { - LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s); - goto invalid; - } - } - - for(it=jdoc.root->child; it; it = it->next) - { + for(it=jdoc_action; it; it = it->next) { if (it->string == NULL) continue; if (strcmp(it->string, "action")==0) { @@ -398,7 +358,7 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* } else if (strcmp(it->string, "instance")==0) { instance.s = it->valuestring; instance.len = strlen(instance.s); - } else if (strcmp(it->string, "expires")==0) { // + } else if (strcmp(it->string, "expires")==0) { expires = SRJSON_GET_UINT(it); } else if (strcmp(it->string, "cseq")==0) { cseq = SRJSON_GET_UINT(it); @@ -454,7 +414,71 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* LM_DBG("Received DMQ_NONE. Not used...\n"); break; default: + return 0; + } + return 1; +} + + +/** + * @brief ht dmq callback + */ +int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node) +{ + int content_length; + str body; + srjson_doc_t jdoc; + + srjson_InitDoc(&jdoc, NULL); + if (parse_from_header(msg)<0) { + LM_ERR("failed to parse from header\n"); + goto invalid; + } + body = ((struct to_body*)msg->from->parsed)->uri; + + LM_DBG("dmq message received from %.*s\n", body.len, body.s); + + if (parse_headers(msg, HDR_EOH_F, 0)<0) { + LM_ERR("failed to parse the headers\n"); + goto invalid; + } + if (!msg->content_length) { + LM_ERR("no content length header found\n"); + goto invalid; + } + content_length = get_content_length(msg); + if (!content_length) { + LM_DBG("content length is 0\n"); + goto invalid; + } + + body.s = get_body(msg); + body.len = content_length; + + if (!body.s) { + LM_ERR("unable to get body\n"); + goto error; + } + + jdoc.buf = body; + if (jdoc.root == NULL) { + jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s); + if (jdoc.root == NULL) { + LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s); goto invalid; + } + } + + if (strcmp(jdoc.root->child->string, "multi")==0) { + LM_DBG("request [%s]\n", jdoc.root->child->string); + srjson_t *jdoc_actions = jdoc.root->child->child; + srjson_t *it = NULL; + for(it=jdoc_actions; it; it = it->next) { + LM_DBG("action [%s]\n", jdoc_actions->child->string); + if (!usrloc_dmq_execute_action(it->child, node)) goto invalid; + } + } else { + if (!usrloc_dmq_execute_action(jdoc.root->child, node)) goto invalid; } srjson_DestroyDoc(&jdoc); @@ -517,6 +541,149 @@ int usrloc_dmq_request_sync() { return -1; } +/* while prt append json string + * */ + +/* Multi contacts */ +typedef struct jdoc_contact_group { + int count; + int size; + srjson_doc_t jdoc; + srjson_t *jdoc_contacts; +} jdoc_contact_group_t; + +static jdoc_contact_group_t jdoc_contact_group; + +static void usrloc_dmq_contacts_group_init(void) { + if (jdoc_contact_group.jdoc.root) + return; + jdoc_contact_group.count = 0; + jdoc_contact_group.size = 12; // {"multi":{}} + srjson_InitDoc(&jdoc_contact_group.jdoc, NULL); + LM_DBG("init multi contacts batch. \n"); + jdoc_contact_group.jdoc.root = srjson_CreateObject(&jdoc_contact_group.jdoc); + if (jdoc_contact_group.jdoc.root==NULL) + LM_ERR("cannot create json root ! \n"); + jdoc_contact_group.jdoc_contacts = srjson_CreateObject(&jdoc_contact_group.jdoc); + if (jdoc_contact_group.jdoc_contacts==NULL) { + LM_ERR("cannot create json contacts ! \n"); + srjson_DestroyDoc(&jdoc_contact_group.jdoc); + } +} + +static void usrloc_dmq_contacts_group_send(dmq_node_t* node) { + if (jdoc_contact_group.count == 0) + return; + srjson_doc_t *jdoc = &jdoc_contact_group.jdoc; + srjson_t *jdoc_contacts = jdoc_contact_group.jdoc_contacts; + + srjson_AddItemToObject(jdoc, jdoc->root, "multi", jdoc_contacts); + + LM_DBG("json[%s]\n", srjson_PrintUnformatted(jdoc, jdoc->root)); + jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root); + if(jdoc->buf.s==NULL) { + LM_ERR("unable to serialize data\n"); + goto error; + } + jdoc->buf.len = strlen(jdoc->buf.s); + + LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s); + if (usrloc_dmq_send(&jdoc->buf, node)!=0) { + LM_ERR("unable to send data\n"); + goto error; + } + + jdoc->free_fn(jdoc->buf.s); + jdoc->buf.s = NULL; + srjson_DestroyDoc(jdoc); + return; + +error: + if(jdoc->buf.s!=NULL) { + jdoc->free_fn(jdoc->buf.s); + jdoc->buf.s = NULL; + } + srjson_DestroyDoc(jdoc); + return; +} + +void usrloc_dmq_send_multi_contact_flush(dmq_node_t* node) { + usrloc_dmq_contacts_group_send(node); + usrloc_dmq_contacts_group_init(); +} + +int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) { + + usrloc_dmq_contacts_group_init(); + + srjson_doc_t *jdoc = &jdoc_contact_group.jdoc; + srjson_t *jdoc_contacts = jdoc_contact_group.jdoc_contacts; + + int flags; + flags = ptr->flags; + flags &= ~FL_RPL; + + srjson_t * jdoc_contact = srjson_CreateObject(jdoc); + if(!jdoc_contact) { + LM_ERR("cannot create json root\n"); + return -1; + } + LM_DBG("group size[%d]\n", jdoc_contact_group.size); + jdoc_contact_group.size += 188; // json overhead ("":{"action":,"aor":"","ruid":"","c":""...) + + srjson_AddNumberToObject(jdoc, jdoc_contact, "action", action); + jdoc_contact_group.size += snprintf(NULL,0,"%d", action); + + srjson_AddStrToObject(jdoc, jdoc_contact, "aor", aor.s, aor.len); + jdoc_contact_group.size += aor.len; + srjson_AddStrToObject(jdoc, jdoc_contact, "ruid", ptr->ruid.s, ptr->ruid.len); + jdoc_contact_group.size += ptr->ruid.len; + srjson_AddStrToObject(jdoc, jdoc_contact, "c", ptr->c.s, ptr->c.len); + jdoc_contact_group.size += ptr->c.len; + srjson_AddStrToObject(jdoc, jdoc_contact, "received", ptr->received.s, ptr->received.len); + jdoc_contact_group.size += ptr->received.len; + srjson_AddStrToObject(jdoc, jdoc_contact, "path", ptr->path.s, ptr->path.len); + jdoc_contact_group.size += ptr->path.len; + srjson_AddStrToObject(jdoc, jdoc_contact, "callid", ptr->callid.s, ptr->callid.len); + jdoc_contact_group.size += ptr->callid.len; + srjson_AddStrToObject(jdoc, jdoc_contact, "user_agent", ptr->user_agent.s, ptr->user_agent.len); + jdoc_contact_group.size += ptr->user_agent.len; + srjson_AddStrToObject(jdoc, jdoc_contact, "instance", ptr->instance.s, ptr->instance.len); + jdoc_contact_group.size += ptr->instance.len; + srjson_AddNumberToObject(jdoc, jdoc_contact, "expires", ptr->expires); + jdoc_contact_group.size += snprintf(NULL,0,"%.0lf",(double)ptr->expires); + srjson_AddNumberToObject(jdoc, jdoc_contact, "cseq", ptr->cseq); + jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cseq); + srjson_AddNumberToObject(jdoc, jdoc_contact, "flags", flags); + jdoc_contact_group.size += snprintf(NULL,0,"%d", flags); + srjson_AddNumberToObject(jdoc, jdoc_contact, "cflags", ptr->cflags); + jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cflags); + srjson_AddNumberToObject(jdoc, jdoc_contact, "q", ptr->q); + jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->q); + srjson_AddNumberToObject(jdoc, jdoc_contact, "last_modified", ptr->last_modified); + jdoc_contact_group.size += snprintf(NULL,0,"%.0lf",(double)ptr->last_modified); + srjson_AddNumberToObject(jdoc, jdoc_contact, "methods", ptr->methods); + jdoc_contact_group.size += snprintf(NULL,0,"%u", ptr->methods); + srjson_AddNumberToObject(jdoc, jdoc_contact, "reg_id", ptr->reg_id); + jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->reg_id); + + char idx[5]; + jdoc_contact_group.count++; + jdoc_contact_group.size += snprintf(idx,5,"%d", jdoc_contact_group.count); + srjson_AddItemToObject(jdoc, jdoc_contacts, idx, jdoc_contact); + + if (jdoc_contact_group.count >= _dmq_usrloc_batch_msg_contacts || jdoc_contact_group.size >= _dmq_usrloc_batch_msg_size) { + LM_DBG("sending group count[%d]size[%d]", jdoc_contact_group.count, jdoc_contact_group.size); + usrloc_dmq_contacts_group_send(node); + usrloc_dmq_contacts_group_init(); + } + + return 0; +} + + + + int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) { srjson_doc_t jdoc; srjson_InitDoc(&jdoc, NULL); @@ -524,7 +691,7 @@ int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* no int flags; jdoc.root = srjson_CreateObject(&jdoc); - if(jdoc.root==NULL) { + if(!jdoc.root) { LM_ERR("cannot create json root\n"); goto error; }