diff --git a/src/modules/dmq_usrloc/dmq_usrloc.c b/src/modules/dmq_usrloc/dmq_usrloc.c index b7fbed94062..d7715ff10e4 100644 --- a/src/modules/dmq_usrloc/dmq_usrloc.c +++ b/src/modules/dmq_usrloc/dmq_usrloc.c @@ -48,30 +48,28 @@ usrloc_api_t dmq_ul; MODULE_VERSION -static param_export_t params[] = { - {"enable", INT_PARAM, &dmq_usrloc_enable}, - {"sync", INT_PARAM, &_dmq_usrloc_sync}, - {"replicate_socket_info", INT_PARAM, &_dmq_usrloc_replicate_socket_info}, - {"batch_msg_contacts", INT_PARAM, &_dmq_usrloc_batch_msg_contacts}, - {"batch_msg_size", INT_PARAM, &_dmq_usrloc_batch_msg_size}, - {"batch_size", INT_PARAM, &_dmq_usrloc_batch_size}, - {"batch_usleep", INT_PARAM, &_dmq_usrloc_batch_usleep}, - {"usrloc_domain", PARAM_STR, &_dmq_usrloc_domain}, - {"usrloc_delete", INT_PARAM, &_dmq_usrloc_delete}, - {0, 0, 0} -}; +static param_export_t params[] = {{"enable", INT_PARAM, &dmq_usrloc_enable}, + {"sync", INT_PARAM, &_dmq_usrloc_sync}, + {"replicate_socket_info", INT_PARAM, + &_dmq_usrloc_replicate_socket_info}, + {"batch_msg_contacts", INT_PARAM, &_dmq_usrloc_batch_msg_contacts}, + {"batch_msg_size", INT_PARAM, &_dmq_usrloc_batch_msg_size}, + {"batch_size", INT_PARAM, &_dmq_usrloc_batch_size}, + {"batch_usleep", INT_PARAM, &_dmq_usrloc_batch_usleep}, + {"usrloc_domain", PARAM_STR, &_dmq_usrloc_domain}, + {"usrloc_delete", INT_PARAM, &_dmq_usrloc_delete}, {0, 0, 0}}; struct module_exports exports = { - "dmq_usrloc", /* module name */ - DEFAULT_DLFLAGS, /* dlopen flags */ - 0, /* exported functions */ - params, /* exported parameters */ - 0, /* RPC method exports */ - 0, /* exported pseudo-variables */ - 0, /* response handling function */ - mod_init, /* module initialization function */ - child_init, /* per-child init function */ - 0 /* module destroy function */ + "dmq_usrloc", /* module name */ + DEFAULT_DLFLAGS, /* dlopen flags */ + 0, /* exported functions */ + params, /* exported parameters */ + 0, /* RPC method exports */ + 0, /* exported pseudo-variables */ + 0, /* response handling function */ + mod_init, /* module initialization function */ + child_init, /* per-child init function */ + 0 /* module destroy function */ }; @@ -81,33 +79,34 @@ static int mod_init(void) LM_INFO("dmq usrloc replication mode = %d\n", dmq_usrloc_enable); if(_dmq_usrloc_batch_msg_size > 60000) { - LM_ERR("batch_msg_size too high[%d] setting to [60000]\n", _dmq_usrloc_batch_msg_size); + LM_ERR("batch_msg_size too high[%d] setting to [60000]\n", + _dmq_usrloc_batch_msg_size); _dmq_usrloc_batch_msg_size = 60000; } if(_dmq_usrloc_batch_msg_contacts > 150) { - LM_ERR("batch_msg_contacts too high[%d] setting to [150]\n", _dmq_usrloc_batch_msg_contacts); + LM_ERR("batch_msg_contacts too high[%d] setting to [150]\n", + _dmq_usrloc_batch_msg_contacts); _dmq_usrloc_batch_msg_contacts = 150; } - if (dmq_usrloc_enable) { + if(dmq_usrloc_enable) { bind_usrloc = (bind_usrloc_t)find_export("ul_bind_usrloc", 1, 0); - if (!bind_usrloc) { + if(!bind_usrloc) { LM_ERR("can't bind usrloc\n"); return -1; } - if (bind_usrloc(&dmq_ul) < 0) { + if(bind_usrloc(&dmq_ul) < 0) { LM_ERR("Can't bind ul\n"); return -1; } if(dmq_ul.register_ulcb != NULL) { - if(dmq_ul.register_ulcb(ULCB_MAX, dmq_ul_cb_contact, 0)< 0) - { + if(dmq_ul.register_ulcb(ULCB_MAX, dmq_ul_cb_contact, 0) < 0) { LM_ERR("can not register callback for expired contacts\n"); return -1; } } - if (!usrloc_dmq_initialize()){ + if(!usrloc_dmq_initialize()) { LM_DBG("dmq_usrloc initialized\n"); } else { LM_ERR("Error in dmq_usrloc_initialize()\n"); @@ -119,7 +118,7 @@ static int mod_init(void) static int child_init(int rank) { - if (rank == PROC_MAIN) { + if(rank == PROC_MAIN) { LM_DBG("child_init PROC_MAIN\n"); return 0; } diff --git a/src/modules/dmq_usrloc/usrloc_sync.c b/src/modules/dmq_usrloc/usrloc_sync.c index 657db6cb7cb..61342cca2fe 100644 --- a/src/modules/dmq_usrloc/usrloc_sync.c +++ b/src/modules/dmq_usrloc/usrloc_sync.c @@ -33,21 +33,23 @@ #include "../../core/parser/parse_addr_spec.h" static str usrloc_dmq_content_type = str_init("application/json"); -static str dmq_200_rpl = str_init("OK"); -static str dmq_400_rpl = str_init("Bad Request"); -static str dmq_500_rpl = str_init("Server Internal Error"); +static str dmq_200_rpl = str_init("OK"); +static str dmq_400_rpl = str_init("Bad Request"); +static str dmq_500_rpl = str_init("Server Internal Error"); static int *usrloc_dmq_recv = 0; dmq_api_t usrloc_dmqb; -dmq_peer_t* usrloc_dmq_peer = NULL; +dmq_peer_t *usrloc_dmq_peer = NULL; dmq_resp_cback_t usrloc_dmq_resp_callback = {&usrloc_dmq_resp_callback_f, 0}; int usrloc_dmq_send_all(); int usrloc_dmq_request_sync(); -int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node); -int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node); -void usrloc_dmq_send_multi_contact_flush(dmq_node_t* node); +int usrloc_dmq_send_contact( + ucontact_t *ptr, str aor, int action, dmq_node_t *node); +int usrloc_dmq_send_multi_contact( + ucontact_t *ptr, str aor, int action, dmq_node_t *node); +void usrloc_dmq_send_multi_contact_flush(dmq_node_t *node); #define MAX_AOR_LEN 256 @@ -60,15 +62,15 @@ extern int _dmq_usrloc_batch_usleep; extern str _dmq_usrloc_domain; extern int _dmq_usrloc_delete; -static int add_contact(str aor, ucontact_info_t* ci) +static int add_contact(str aor, ucontact_info_t *ci) { - urecord_t* r = NULL; - udomain_t* _d; - ucontact_t* c = NULL; + urecord_t *r = NULL; + udomain_t *_d; + ucontact_t *c = NULL; str contact; int res; - if (dmq_ul.get_udomain(_dmq_usrloc_domain.s, &_d) < 0) { + if(dmq_ul.get_udomain(_dmq_usrloc_domain.s, &_d) < 0) { LM_ERR("Failed to get domain\n"); return -1; } @@ -77,10 +79,11 @@ static int add_contact(str aor, ucontact_info_t* ci) LM_DBG("ci->ruid: %.*s\n", ci->ruid.len, ci->ruid.s); LM_DBG("aorhash: %i\n", dmq_ul.get_aorhash(&aor)); - if (ci->ruid.len > 0) { + if(ci->ruid.len > 0) { // Search by ruid, if possible - res = dmq_ul.get_urecord_by_ruid(_d, dmq_ul.get_aorhash(&aor), &ci->ruid, &r, &c); - if (res == 0) { + res = dmq_ul.get_urecord_by_ruid( + _d, dmq_ul.get_aorhash(&aor), &ci->ruid, &r, &c); + if(res == 0) { LM_DBG("Found contact\n"); dmq_ul.update_ucontact(r, c, ci); LM_DBG("Release record\n"); @@ -93,22 +96,22 @@ static int add_contact(str aor, ucontact_info_t* ci) dmq_ul.lock_udomain(_d, &aor); res = dmq_ul.get_urecord(_d, &aor, &r); - if (res < 0) { + if(res < 0) { LM_ERR("failed to retrieve record from usrloc\n"); goto error; - } else if ( res == 0) { + } else if(res == 0) { LM_DBG("'%.*s' found in usrloc\n", aor.len, ZSW(aor.s)); res = dmq_ul.get_ucontact(r, ci->c, ci->callid, ci->path, ci->cseq, &c); LM_DBG("get_ucontact = %d\n", res); - if (res==-1) { + if(res == -1) { LM_ERR("Invalid cseq\n"); goto error; - } else if (res > 0 ) { + } else if(res > 0) { LM_DBG("Not found contact\n"); contact.s = ci->c->s; contact.len = ci->c->len; dmq_ul.insert_ucontact(r, &contact, ci, &c); - } else if (res == 0) { + } else if(res == 0) { LM_DBG("Found contact\n"); dmq_ul.update_ucontact(r, c, ci); } @@ -132,24 +135,25 @@ static int add_contact(str aor, ucontact_info_t* ci) return -1; } -static int delete_contact(str aor, ucontact_info_t* ci) +static int delete_contact(str aor, ucontact_info_t *ci) { - udomain_t* _d; - urecord_t* r; - ucontact_t* c; + udomain_t *_d; + urecord_t *r; + ucontact_t *c; - if (dmq_ul.get_udomain(_dmq_usrloc_domain.s, &_d) < 0) { + if(dmq_ul.get_udomain(_dmq_usrloc_domain.s, &_d) < 0) { LM_ERR("Failed to get domain\n"); return -1; } /* it locks the udomain on success */ - if (dmq_ul.get_urecord_by_ruid(_d, dmq_ul.get_aorhash(&aor), - &ci->ruid, &r, &c) != 0) { + if(dmq_ul.get_urecord_by_ruid( + _d, dmq_ul.get_aorhash(&aor), &ci->ruid, &r, &c) + != 0) { LM_DBG("AOR/Contact [%.*s] not found\n", aor.len, aor.s); return -1; } - if (dmq_ul.delete_ucontact(r, c) != 0) { + if(dmq_ul.delete_ucontact(r, c) != 0) { dmq_ul.unlock_udomain(_d, &aor); LM_WARN("could not delete contact\n"); return -1; @@ -160,96 +164,97 @@ static int delete_contact(str aor, ucontact_info_t* ci) return 0; } -#define dmq_usrloc_malloc malloc -#define dmq_usrloc_free free +#define dmq_usrloc_malloc malloc +#define dmq_usrloc_free free -void usrloc_get_all_ucontact(dmq_node_t* node) +void usrloc_get_all_ucontact(dmq_node_t *node) { - int rval, len=0; + int rval, len = 0; void *buf, *cp; str c, recv; str path; str ruid; unsigned int aorhash; - struct socket_info* send_sock; + struct socket_info *send_sock; unsigned int flags; len = 0; buf = NULL; str aor; - urecord_t* r; - udomain_t* _d; - ucontact_t* ptr = 0; + urecord_t *r; + udomain_t *_d; + ucontact_t *ptr = 0; int res; int n; - if (dmq_ul.get_all_ucontacts == NULL){ + if(dmq_ul.get_all_ucontacts == NULL) { LM_ERR("dmq_ul.get_all_ucontacts is NULL\n"); goto done; } - if (dmq_ul.get_udomain(_dmq_usrloc_domain.s, &_d) < 0) { + if(dmq_ul.get_udomain(_dmq_usrloc_domain.s, &_d) < 0) { LM_ERR("Failed to get domain\n"); goto done; } rval = dmq_ul.get_all_ucontacts(buf, len, 0, 0, 1, 0); - if (rval<0) { + if(rval < 0) { LM_ERR("failed to fetch contacts\n"); goto done; } - if (rval > 0) { + if(rval > 0) { len = rval * 2; buf = dmq_usrloc_malloc(len); - if (buf == NULL) { + if(buf == NULL) { PKG_MEM_ERROR; goto done; } rval = dmq_ul.get_all_ucontacts(buf, len, 0, 0, 1, 0); - if (rval != 0) { + if(rval != 0) { dmq_usrloc_free(buf); goto done; } } - if (buf == NULL) + if(buf == NULL) goto done; cp = buf; n = 0; - while (1) { + while(1) { memcpy(&(c.len), cp, sizeof(c.len)); - if (c.len == 0) + if(c.len == 0) break; - c.s = (char*)cp + sizeof(c.len); - cp = (char*)cp + sizeof(c.len) + c.len; + c.s = (char *)cp + sizeof(c.len); + cp = (char *)cp + sizeof(c.len) + c.len; memcpy(&(recv.len), cp, sizeof(recv.len)); - recv.s = (char*)cp + sizeof(recv.len); - cp = (char*)cp + sizeof(recv.len) + recv.len; - memcpy( &send_sock, cp, sizeof(send_sock)); - cp = (char*)cp + sizeof(send_sock); - memcpy( &flags, cp, sizeof(flags)); - cp = (char*)cp + sizeof(flags); - memcpy( &(path.len), cp, sizeof(path.len)); - path.s = path.len ? ((char*)cp + sizeof(path.len)) : NULL ; - cp = (char*)cp + sizeof(path.len) + path.len; - memcpy( &(ruid.len), cp, sizeof(ruid.len)); - ruid.s = ruid.len ? ((char*)cp + sizeof(ruid.len)) : NULL ; - cp = (char*)cp + sizeof(ruid.len) + ruid.len; - memcpy( &aorhash, cp, sizeof(aorhash)); - cp = (char*)cp + sizeof(aorhash); + recv.s = (char *)cp + sizeof(recv.len); + cp = (char *)cp + sizeof(recv.len) + recv.len; + memcpy(&send_sock, cp, sizeof(send_sock)); + cp = (char *)cp + sizeof(send_sock); + memcpy(&flags, cp, sizeof(flags)); + cp = (char *)cp + sizeof(flags); + memcpy(&(path.len), cp, sizeof(path.len)); + path.s = path.len ? ((char *)cp + sizeof(path.len)) : NULL; + cp = (char *)cp + sizeof(path.len) + path.len; + memcpy(&(ruid.len), cp, sizeof(ruid.len)); + ruid.s = ruid.len ? ((char *)cp + sizeof(ruid.len)) : NULL; + cp = (char *)cp + sizeof(ruid.len) + ruid.len; + memcpy(&aorhash, cp, sizeof(aorhash)); + cp = (char *)cp + sizeof(aorhash); r = 0; ptr = 0; res = dmq_ul.get_urecord_by_ruid(_d, aorhash, &ruid, &r, &ptr); - if (res < 0) { + if(res < 0) { LM_DBG("'%.*s' Not found in usrloc\n", ruid.len, ZSW(ruid.s)); continue; } aor = r->aor; - LM_DBG("- AoR: %.*s AoRhash=%d Flags=%d\n", aor.len, aor.s, aorhash, flags); + LM_DBG("- AoR: %.*s AoRhash=%d Flags=%d\n", aor.len, aor.s, aorhash, + flags); - while (ptr) { - if (_dmq_usrloc_batch_msg_contacts >1) { + while(ptr) { + if(_dmq_usrloc_batch_msg_contacts > 1) { usrloc_dmq_send_multi_contact(ptr, aor, DMQ_UPDATE, node); } else { usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node); @@ -259,8 +264,8 @@ void usrloc_get_all_ucontact(dmq_node_t* node) } dmq_ul.release_urecord(r); dmq_ul.unlock_udomain(_d, &aor); - if(_dmq_usrloc_batch_size>0 && _dmq_usrloc_batch_usleep>0) { - if(n>=_dmq_usrloc_batch_size) { + if(_dmq_usrloc_batch_size > 0 && _dmq_usrloc_batch_usleep > 0) { + if(n >= _dmq_usrloc_batch_size) { n = 0; sleep_us(_dmq_usrloc_batch_usleep); } @@ -270,7 +275,8 @@ void usrloc_get_all_ucontact(dmq_node_t* node) usrloc_dmq_send_multi_contact_flush(node); // send any remaining contacts done: - c.s = ""; c.len = 0; + c.s = ""; + c.len = 0; } @@ -279,7 +285,7 @@ int usrloc_dmq_initialize() dmq_peer_t not_peer; /* load the DMQ API */ - if (dmq_load_api(&usrloc_dmqb)!=0) { + if(dmq_load_api(&usrloc_dmqb) != 0) { LM_ERR("cannot load dmq api\n"); return -1; } else { @@ -304,12 +310,13 @@ int usrloc_dmq_initialize() } -int usrloc_dmq_send(str* body, dmq_node_t* node) { - if (!usrloc_dmq_peer) { +int usrloc_dmq_send(str *body, dmq_node_t *node) +{ + if(!usrloc_dmq_peer) { LM_ERR("dlg_dmq_peer is null!\n"); return -1; } - if (node) { + if(node) { LM_DBG("sending dmq message ...\n"); usrloc_dmqb.send_message(usrloc_dmq_peer, body, node, &usrloc_dmq_resp_callback, 1, &usrloc_dmq_content_type); @@ -321,95 +328,106 @@ int usrloc_dmq_send(str* body, dmq_node_t* node) { return 0; } -static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t* node) { +static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t *node) +{ static ucontact_info_t ci; srjson_t *it = NULL; - struct socket_info* sock=0; + struct socket_info *sock = 0; unsigned int action, expires, cseq, flags, cflags, q, last_modified, - methods, reg_id, server_id, port, proto; - str aor=STR_NULL, ruid=STR_NULL, received=STR_NULL, instance=STR_NULL, sockname=STR_NULL; - static str host=STR_NULL, c=STR_NULL, callid=STR_NULL, path=STR_NULL, user_agent=STR_NULL; + methods, reg_id, server_id, port, proto; + str aor = STR_NULL, ruid = STR_NULL, received = STR_NULL, + instance = STR_NULL, sockname = STR_NULL; + static str host = STR_NULL, c = STR_NULL, callid = STR_NULL, + path = STR_NULL, user_agent = STR_NULL; - action = expires = cseq = flags = cflags = q = last_modified - = methods = reg_id = server_id = port = proto = 0; + action = expires = cseq = flags = cflags = q = last_modified = methods = + reg_id = server_id = port = proto = 0; - for(it=jdoc_action; it; it = it->next) { - if (it->string == NULL) continue; + for(it = jdoc_action; it; it = it->next) { + if(it->string == NULL) + continue; - if (strcmp(it->string, "action")==0) { + if(strcmp(it->string, "action") == 0) { action = SRJSON_GET_UINT(it); - } else if (strcmp(it->string, "aor")==0) { + } else if(strcmp(it->string, "aor") == 0) { aor.s = it->valuestring; aor.len = strlen(aor.s); - } else if (strcmp(it->string, "ruid")==0) { + } else if(strcmp(it->string, "ruid") == 0) { ruid.s = it->valuestring; ruid.len = strlen(ruid.s); - } else if (strcmp(it->string, "c")==0) { + } else if(strcmp(it->string, "c") == 0) { c.s = it->valuestring; c.len = strlen(c.s); - } else if (strcmp(it->string, "received")==0) { + } else if(strcmp(it->string, "received") == 0) { received.s = it->valuestring; received.len = strlen(received.s); - } else if (_dmq_usrloc_replicate_socket_info==DMQ_USRLOC_REPLICATE_SOCKET && strcmp(it->string, "sock")==0) { - if (parse_phostport( it->valuestring, &host.s, &host.len, - (int*)&port, (int*)&proto)!=0) { + } else if(_dmq_usrloc_replicate_socket_info + == DMQ_USRLOC_REPLICATE_SOCKET + && strcmp(it->string, "sock") == 0) { + if(parse_phostport(it->valuestring, &host.s, &host.len, + (int *)&port, (int *)&proto) + != 0) { LM_ERR("bad socket <%s>\n", it->valuestring); return 0; } - sock = grep_sock_info( &host, (unsigned short)port, proto); - if (sock==0) { + sock = grep_sock_info(&host, (unsigned short)port, proto); + if(sock == 0) { LM_DBG("non-local socket <%s>...ignoring\n", it->valuestring); } else { - sock = grep_sock_info( &host, (unsigned short)port, proto); + sock = grep_sock_info(&host, (unsigned short)port, proto); sock->sock_str.s = it->valuestring; sock->sock_str.len = strlen(sock->sock_str.s); } - } else if (_dmq_usrloc_replicate_socket_info==DMQ_USRLOC_REPLICATE_SOCKNAME && strcmp(it->string, "sockname")==0) { + } else if(_dmq_usrloc_replicate_socket_info + == DMQ_USRLOC_REPLICATE_SOCKNAME + && strcmp(it->string, "sockname") == 0) { sockname.s = it->valuestring; sockname.len = strlen(sockname.s); sock = ksr_get_socket_by_name(&sockname); - if (sock==0) { - LM_DBG("socket with name <%s> not known ... ignoring\n", it->valuestring); + if(sock == 0) { + LM_DBG("socket with name <%s> not known ... ignoring\n", + it->valuestring); } - } else if (strcmp(it->string, "path")==0) { + } else if(strcmp(it->string, "path") == 0) { path.s = it->valuestring; path.len = strlen(path.s); - } else if (strcmp(it->string, "callid")==0) { + } else if(strcmp(it->string, "callid") == 0) { callid.s = it->valuestring; callid.len = strlen(callid.s); - } else if (strcmp(it->string, "user_agent")==0) { + } else if(strcmp(it->string, "user_agent") == 0) { user_agent.s = it->valuestring; user_agent.len = strlen(user_agent.s); - } else if (strcmp(it->string, "instance")==0) { + } else if(strcmp(it->string, "instance") == 0) { instance.s = it->valuestring; instance.len = strlen(instance.s); - } else if (strcmp(it->string, "expires")==0) { + } else if(strcmp(it->string, "expires") == 0) { expires = SRJSON_GET_UINT(it); - } else if (strcmp(it->string, "cseq")==0) { + } else if(strcmp(it->string, "cseq") == 0) { cseq = SRJSON_GET_UINT(it); - } else if (strcmp(it->string, "flags")==0) { + } else if(strcmp(it->string, "flags") == 0) { flags = SRJSON_GET_UINT(it); - } else if (strcmp(it->string, "cflags")==0) { + } else if(strcmp(it->string, "cflags") == 0) { cflags = SRJSON_GET_UINT(it); - } else if (strcmp(it->string, "q")==0) { + } else if(strcmp(it->string, "q") == 0) { q = SRJSON_GET_UINT(it); - } else if (strcmp(it->string, "last_modified")==0) { + } else if(strcmp(it->string, "last_modified") == 0) { last_modified = SRJSON_GET_UINT(it); - } else if (strcmp(it->string, "methods")==0) { + } else if(strcmp(it->string, "methods") == 0) { methods = SRJSON_GET_UINT(it); - } else if (strcmp(it->string, "reg_id")==0) { + } else if(strcmp(it->string, "reg_id") == 0) { reg_id = SRJSON_GET_UINT(it); - } else if (strcmp(it->string, "server_id")==0) { - server_id = SRJSON_GET_UINT(it); + } else if(strcmp(it->string, "server_id") == 0) { + server_id = SRJSON_GET_UINT(it); } else { LM_ERR("unrecognized field in json object\n"); } } - memset( &ci, 0, sizeof(ucontact_info_t)); + memset(&ci, 0, sizeof(ucontact_info_t)); ci.ruid = ruid; ci.c = &c; ci.received = received; - if (_dmq_usrloc_replicate_socket_info & (DMQ_USRLOC_REPLICATE_SOCKET|DMQ_USRLOC_REPLICATE_SOCKNAME)) + if(_dmq_usrloc_replicate_socket_info + & (DMQ_USRLOC_REPLICATE_SOCKET | DMQ_USRLOC_REPLICATE_SOCKNAME)) ci.sock = sock; ci.path = &path; ci.expires = expires; @@ -448,11 +466,12 @@ static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t* node) { return 1; } -static int init_usrloc_dmq_recv() { - if (!usrloc_dmq_recv) { +static int init_usrloc_dmq_recv() +{ + if(!usrloc_dmq_recv) { LM_DBG("Initializing usrloc_dmq_recv for pid (%d)\n", my_pid()); - usrloc_dmq_recv = (int*)pkg_malloc(sizeof(int)); - if (!usrloc_dmq_recv) { + usrloc_dmq_recv = (int *)pkg_malloc(sizeof(int)); + if(!usrloc_dmq_recv) { PKG_MEM_ERROR; return -1; } @@ -464,37 +483,38 @@ static int init_usrloc_dmq_recv() { /** * @brief ht dmq callback */ -int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node) +int usrloc_dmq_handle_msg( + struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *node) { int content_length; str body; srjson_doc_t jdoc; - if (!usrloc_dmq_recv && init_usrloc_dmq_recv() < 0) { + if(!usrloc_dmq_recv && init_usrloc_dmq_recv() < 0) { return 0; } *usrloc_dmq_recv = 1; srjson_InitDoc(&jdoc, NULL); - if (parse_from_header(msg)<0) { + if(parse_from_header(msg) < 0) { LM_ERR("failed to parse from header\n"); goto invalid; } - body = ((struct to_body*)msg->from->parsed)->uri; + body = ((struct to_body *)msg->from->parsed)->uri; LM_DBG("dmq message received from %.*s\n", body.len, body.s); - if (parse_headers(msg, HDR_EOH_F, 0)<0) { + if(parse_headers(msg, HDR_EOH_F, 0) < 0) { LM_ERR("failed to parse the headers\n"); goto invalid; } - if (!msg->content_length) { + if(!msg->content_length) { LM_ERR("no content length header found\n"); goto invalid; } content_length = get_content_length(msg); - if (!content_length) { + if(!content_length) { LM_DBG("content length is 0\n"); goto invalid; } @@ -502,30 +522,32 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* body.s = get_body(msg); body.len = content_length; - if (!body.s) { + if(!body.s) { LM_ERR("unable to get body\n"); goto error; } jdoc.buf = body; - if (jdoc.root == NULL) { + if(jdoc.root == NULL) { jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s); - if (jdoc.root == NULL) { + if(jdoc.root == NULL) { LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s); goto invalid; } } - if (strcmp(jdoc.root->child->string, "multi")==0) { + if(strcmp(jdoc.root->child->string, "multi") == 0) { LM_DBG("request [%s]\n", jdoc.root->child->string); srjson_t *jdoc_actions = jdoc.root->child->child; srjson_t *it = NULL; - for(it=jdoc_actions; it; it = it->next) { + for(it = jdoc_actions; it; it = it->next) { LM_DBG("action [%s]\n", jdoc_actions->child->string); - if (!usrloc_dmq_execute_action(it->child, node)) goto invalid; + if(!usrloc_dmq_execute_action(it->child, node)) + goto invalid; } } else { - if (!usrloc_dmq_execute_action(jdoc.root->child, node)) goto invalid; + if(!usrloc_dmq_execute_action(jdoc.root->child, node)) + goto invalid; } *usrloc_dmq_recv = 0; @@ -550,30 +572,31 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* } -int usrloc_dmq_request_sync() { +int usrloc_dmq_request_sync() +{ srjson_doc_t jdoc; - if(_dmq_usrloc_sync==0) + if(_dmq_usrloc_sync == 0) return 0; LM_DBG("requesting sync from dmq peers\n"); srjson_InitDoc(&jdoc, NULL); jdoc.root = srjson_CreateObject(&jdoc); - if(jdoc.root==NULL) { + if(jdoc.root == NULL) { LM_ERR("cannot create json root\n"); goto error; } srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DMQ_SYNC); jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root); - if(jdoc.buf.s==NULL) { + if(jdoc.buf.s == NULL) { LM_ERR("unable to serialize data\n"); goto error; } jdoc.buf.len = strlen(jdoc.buf.s); LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); - if (usrloc_dmq_send(&jdoc.buf, 0)!=0) { + if(usrloc_dmq_send(&jdoc.buf, 0) != 0) { goto error; } @@ -583,7 +606,7 @@ int usrloc_dmq_request_sync() { return 0; error: - if(jdoc.buf.s!=NULL) { + if(jdoc.buf.s != NULL) { jdoc.free_fn(jdoc.buf.s); jdoc.buf.s = NULL; } @@ -595,7 +618,8 @@ int usrloc_dmq_request_sync() { * */ /* Multi contacts */ -typedef struct jdoc_contact_group { +typedef struct jdoc_contact_group +{ int count; int size; srjson_doc_t jdoc; @@ -604,25 +628,29 @@ typedef struct jdoc_contact_group { static jdoc_contact_group_t jdoc_contact_group; -static void usrloc_dmq_contacts_group_init(void) { - if (jdoc_contact_group.jdoc.root) +static void usrloc_dmq_contacts_group_init(void) +{ + if(jdoc_contact_group.jdoc.root) return; jdoc_contact_group.count = 0; jdoc_contact_group.size = 12; // {"multi":{}} srjson_InitDoc(&jdoc_contact_group.jdoc, NULL); LM_DBG("init multi contacts batch. \n"); - jdoc_contact_group.jdoc.root = srjson_CreateObject(&jdoc_contact_group.jdoc); - if (jdoc_contact_group.jdoc.root==NULL) + jdoc_contact_group.jdoc.root = + srjson_CreateObject(&jdoc_contact_group.jdoc); + if(jdoc_contact_group.jdoc.root == NULL) LM_ERR("cannot create json root ! \n"); - jdoc_contact_group.jdoc_contacts = srjson_CreateObject(&jdoc_contact_group.jdoc); - if (jdoc_contact_group.jdoc_contacts==NULL) { + jdoc_contact_group.jdoc_contacts = + srjson_CreateObject(&jdoc_contact_group.jdoc); + if(jdoc_contact_group.jdoc_contacts == NULL) { LM_ERR("cannot create json contacts ! \n"); srjson_DestroyDoc(&jdoc_contact_group.jdoc); } } -static void usrloc_dmq_contacts_group_send(dmq_node_t* node) { - if (jdoc_contact_group.count == 0) +static void usrloc_dmq_contacts_group_send(dmq_node_t *node) +{ + if(jdoc_contact_group.count == 0) return; srjson_doc_t *jdoc = &jdoc_contact_group.jdoc; srjson_t *jdoc_contacts = jdoc_contact_group.jdoc_contacts; @@ -631,14 +659,14 @@ static void usrloc_dmq_contacts_group_send(dmq_node_t* node) { LM_DBG("json[%s]\n", srjson_PrintUnformatted(jdoc, jdoc->root)); jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root); - if(jdoc->buf.s==NULL) { + if(jdoc->buf.s == NULL) { LM_ERR("unable to serialize data\n"); goto error; } jdoc->buf.len = strlen(jdoc->buf.s); LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s); - if (usrloc_dmq_send(&jdoc->buf, node)!=0) { + if(usrloc_dmq_send(&jdoc->buf, node) != 0) { LM_ERR("unable to send data\n"); goto error; } @@ -649,7 +677,7 @@ static void usrloc_dmq_contacts_group_send(dmq_node_t* node) { return; error: - if(jdoc->buf.s!=NULL) { + if(jdoc->buf.s != NULL) { jdoc->free_fn(jdoc->buf.s); jdoc->buf.s = NULL; } @@ -657,79 +685,96 @@ static void usrloc_dmq_contacts_group_send(dmq_node_t* node) { return; } -void usrloc_dmq_send_multi_contact_flush(dmq_node_t* node) { +void usrloc_dmq_send_multi_contact_flush(dmq_node_t *node) +{ usrloc_dmq_contacts_group_send(node); usrloc_dmq_contacts_group_init(); } -int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) { +int usrloc_dmq_send_multi_contact( + ucontact_t *ptr, str aor, int action, dmq_node_t *node) +{ usrloc_dmq_contacts_group_init(); srjson_doc_t *jdoc = &jdoc_contact_group.jdoc; srjson_t *jdoc_contacts = jdoc_contact_group.jdoc_contacts; - srjson_t * jdoc_contact = srjson_CreateObject(jdoc); + srjson_t *jdoc_contact = srjson_CreateObject(jdoc); if(!jdoc_contact) { LM_ERR("cannot create json root\n"); return -1; } LM_DBG("group size[%d]\n", jdoc_contact_group.size); - jdoc_contact_group.size += 201; // json overhead ("":{"action":,"aor":"","ruid":"","c":""...) + jdoc_contact_group.size += + 201; // json overhead ("":{"action":,"aor":"","ruid":"","c":""...) srjson_AddNumberToObject(jdoc, jdoc_contact, "action", action); - jdoc_contact_group.size += snprintf(NULL,0,"%d", action); + jdoc_contact_group.size += snprintf(NULL, 0, "%d", action); srjson_AddStrToObject(jdoc, jdoc_contact, "aor", aor.s, aor.len); jdoc_contact_group.size += aor.len; - srjson_AddStrToObject(jdoc, jdoc_contact, "ruid", ptr->ruid.s, ptr->ruid.len); + srjson_AddStrToObject( + jdoc, jdoc_contact, "ruid", ptr->ruid.s, ptr->ruid.len); jdoc_contact_group.size += ptr->ruid.len; srjson_AddStrToObject(jdoc, jdoc_contact, "c", ptr->c.s, ptr->c.len); jdoc_contact_group.size += ptr->c.len; - srjson_AddStrToObject(jdoc, jdoc_contact, "received", ptr->received.s, ptr->received.len); + srjson_AddStrToObject( + jdoc, jdoc_contact, "received", ptr->received.s, ptr->received.len); jdoc_contact_group.size += ptr->received.len; - if (_dmq_usrloc_replicate_socket_info==DMQ_USRLOC_REPLICATE_SOCKET && ptr->sock!=NULL && ptr->sock->sock_str.s!=NULL) { - srjson_AddStrToObject(jdoc, jdoc_contact, "sock", ptr->sock->sock_str.s, ptr->sock->sock_str.len); + if(_dmq_usrloc_replicate_socket_info == DMQ_USRLOC_REPLICATE_SOCKET + && ptr->sock != NULL && ptr->sock->sock_str.s != NULL) { + srjson_AddStrToObject(jdoc, jdoc_contact, "sock", ptr->sock->sock_str.s, + ptr->sock->sock_str.len); jdoc_contact_group.size += ptr->sock->sock_str.len; - } - else if (_dmq_usrloc_replicate_socket_info==DMQ_USRLOC_REPLICATE_SOCKNAME && ptr->sock!=NULL && ptr->sock->sockname.s!=NULL) { - srjson_AddStrToObject(jdoc, jdoc_contact, "sockname", ptr->sock->sockname.s, ptr->sock->sockname.len); + } else if(_dmq_usrloc_replicate_socket_info == DMQ_USRLOC_REPLICATE_SOCKNAME + && ptr->sock != NULL && ptr->sock->sockname.s != NULL) { + srjson_AddStrToObject(jdoc, jdoc_contact, "sockname", + ptr->sock->sockname.s, ptr->sock->sockname.len); jdoc_contact_group.size += ptr->sock->sockname.len; } - srjson_AddStrToObject(jdoc, jdoc_contact, "path", ptr->path.s, ptr->path.len); + srjson_AddStrToObject( + jdoc, jdoc_contact, "path", ptr->path.s, ptr->path.len); jdoc_contact_group.size += ptr->path.len; - srjson_AddStrToObject(jdoc, jdoc_contact, "callid", ptr->callid.s, ptr->callid.len); + srjson_AddStrToObject( + jdoc, jdoc_contact, "callid", ptr->callid.s, ptr->callid.len); jdoc_contact_group.size += ptr->callid.len; - srjson_AddStrToObject(jdoc, jdoc_contact, "user_agent", ptr->user_agent.s, ptr->user_agent.len); + srjson_AddStrToObject(jdoc, jdoc_contact, "user_agent", ptr->user_agent.s, + ptr->user_agent.len); jdoc_contact_group.size += ptr->user_agent.len; - srjson_AddStrToObject(jdoc, jdoc_contact, "instance", ptr->instance.s, ptr->instance.len); + srjson_AddStrToObject( + jdoc, jdoc_contact, "instance", ptr->instance.s, ptr->instance.len); jdoc_contact_group.size += ptr->instance.len; srjson_AddNumberToObject(jdoc, jdoc_contact, "expires", ptr->expires); - jdoc_contact_group.size += snprintf(NULL,0,"%.0lf",(double)ptr->expires); + jdoc_contact_group.size += snprintf(NULL, 0, "%.0lf", (double)ptr->expires); srjson_AddNumberToObject(jdoc, jdoc_contact, "cseq", ptr->cseq); - jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cseq); + jdoc_contact_group.size += snprintf(NULL, 0, "%d", ptr->cseq); srjson_AddNumberToObject(jdoc, jdoc_contact, "flags", ptr->flags); - jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->flags); + jdoc_contact_group.size += snprintf(NULL, 0, "%d", ptr->flags); srjson_AddNumberToObject(jdoc, jdoc_contact, "cflags", ptr->cflags); - jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cflags); + jdoc_contact_group.size += snprintf(NULL, 0, "%d", ptr->cflags); srjson_AddNumberToObject(jdoc, jdoc_contact, "q", ptr->q); - jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->q); - srjson_AddNumberToObject(jdoc, jdoc_contact, "last_modified", ptr->last_modified); - jdoc_contact_group.size += snprintf(NULL,0,"%.0lf",(double)ptr->last_modified); + jdoc_contact_group.size += snprintf(NULL, 0, "%d", ptr->q); + srjson_AddNumberToObject( + jdoc, jdoc_contact, "last_modified", ptr->last_modified); + jdoc_contact_group.size += + snprintf(NULL, 0, "%.0lf", (double)ptr->last_modified); srjson_AddNumberToObject(jdoc, jdoc_contact, "methods", ptr->methods); - jdoc_contact_group.size += snprintf(NULL,0,"%u", ptr->methods); + jdoc_contact_group.size += snprintf(NULL, 0, "%u", ptr->methods); srjson_AddNumberToObject(jdoc, jdoc_contact, "reg_id", ptr->reg_id); - jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->reg_id); + jdoc_contact_group.size += snprintf(NULL, 0, "%d", ptr->reg_id); srjson_AddNumberToObject(jdoc, jdoc_contact, "server_id", ptr->server_id); - jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->server_id); + jdoc_contact_group.size += snprintf(NULL, 0, "%d", ptr->server_id); char idx[5]; jdoc_contact_group.count++; - jdoc_contact_group.size += snprintf(idx,5,"%d", jdoc_contact_group.count); + jdoc_contact_group.size += snprintf(idx, 5, "%d", jdoc_contact_group.count); srjson_AddItemToObject(jdoc, jdoc_contacts, idx, jdoc_contact); - if (jdoc_contact_group.count >= _dmq_usrloc_batch_msg_contacts || jdoc_contact_group.size >= _dmq_usrloc_batch_msg_size) { - LM_DBG("sending group count[%d]size[%d]", jdoc_contact_group.count, jdoc_contact_group.size); + if(jdoc_contact_group.count >= _dmq_usrloc_batch_msg_contacts + || jdoc_contact_group.size >= _dmq_usrloc_batch_msg_size) { + LM_DBG("sending group count[%d]size[%d]", jdoc_contact_group.count, + jdoc_contact_group.size); usrloc_dmq_contacts_group_send(node); usrloc_dmq_contacts_group_init(); } @@ -738,9 +783,9 @@ int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node } - - -int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) { +int usrloc_dmq_send_contact( + ucontact_t *ptr, str aor, int action, dmq_node_t *node) +{ srjson_doc_t jdoc; srjson_InitDoc(&jdoc, NULL); @@ -755,36 +800,44 @@ int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* no srjson_AddStrToObject(&jdoc, jdoc.root, "aor", aor.s, aor.len); srjson_AddStrToObject(&jdoc, jdoc.root, "ruid", ptr->ruid.s, ptr->ruid.len); srjson_AddStrToObject(&jdoc, jdoc.root, "c", ptr->c.s, ptr->c.len); - srjson_AddStrToObject(&jdoc, jdoc.root, "received", ptr->received.s, ptr->received.len); - if (_dmq_usrloc_replicate_socket_info==DMQ_USRLOC_REPLICATE_SOCKET && ptr->sock!=NULL) { - srjson_AddStrToObject(&jdoc, jdoc.root, "sock", ptr->sock->sock_str.s, ptr->sock->sock_str.len); - } - else if (_dmq_usrloc_replicate_socket_info==DMQ_USRLOC_REPLICATE_SOCKNAME && ptr->sock!=NULL && ptr->sock->sockname.s!=NULL) { - srjson_AddStrToObject(&jdoc, jdoc.root, "sockname", ptr->sock->sockname.s, ptr->sock->sockname.len); + srjson_AddStrToObject( + &jdoc, jdoc.root, "received", ptr->received.s, ptr->received.len); + if(_dmq_usrloc_replicate_socket_info == DMQ_USRLOC_REPLICATE_SOCKET + && ptr->sock != NULL) { + srjson_AddStrToObject(&jdoc, jdoc.root, "sock", ptr->sock->sock_str.s, + ptr->sock->sock_str.len); + } else if(_dmq_usrloc_replicate_socket_info == DMQ_USRLOC_REPLICATE_SOCKNAME + && ptr->sock != NULL && ptr->sock->sockname.s != NULL) { + srjson_AddStrToObject(&jdoc, jdoc.root, "sockname", + ptr->sock->sockname.s, ptr->sock->sockname.len); } srjson_AddStrToObject(&jdoc, jdoc.root, "path", ptr->path.s, ptr->path.len); - srjson_AddStrToObject(&jdoc, jdoc.root, "callid", ptr->callid.s, ptr->callid.len); - srjson_AddStrToObject(&jdoc, jdoc.root, "user_agent", ptr->user_agent.s, ptr->user_agent.len); - srjson_AddStrToObject(&jdoc, jdoc.root, "instance", ptr->instance.s, ptr->instance.len); + srjson_AddStrToObject( + &jdoc, jdoc.root, "callid", ptr->callid.s, ptr->callid.len); + srjson_AddStrToObject(&jdoc, jdoc.root, "user_agent", ptr->user_agent.s, + ptr->user_agent.len); + srjson_AddStrToObject( + &jdoc, jdoc.root, "instance", ptr->instance.s, ptr->instance.len); srjson_AddNumberToObject(&jdoc, jdoc.root, "expires", ptr->expires); srjson_AddNumberToObject(&jdoc, jdoc.root, "cseq", ptr->cseq); srjson_AddNumberToObject(&jdoc, jdoc.root, "flags", ptr->flags); srjson_AddNumberToObject(&jdoc, jdoc.root, "cflags", ptr->cflags); srjson_AddNumberToObject(&jdoc, jdoc.root, "q", ptr->q); - srjson_AddNumberToObject(&jdoc, jdoc.root, "last_modified", ptr->last_modified); + srjson_AddNumberToObject( + &jdoc, jdoc.root, "last_modified", ptr->last_modified); srjson_AddNumberToObject(&jdoc, jdoc.root, "methods", ptr->methods); srjson_AddNumberToObject(&jdoc, jdoc.root, "reg_id", ptr->reg_id); srjson_AddNumberToObject(&jdoc, jdoc.root, "server_id", ptr->server_id); jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root); - if(jdoc.buf.s==NULL) { + if(jdoc.buf.s == NULL) { LM_ERR("unable to serialize data\n"); goto error; } jdoc.buf.len = strlen(jdoc.buf.s); LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); - if (usrloc_dmq_send(&jdoc.buf, node)!=0) { + if(usrloc_dmq_send(&jdoc.buf, node) != 0) { goto error; } @@ -794,7 +847,7 @@ int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* no return 0; error: - if(jdoc.buf.s!=NULL) { + if(jdoc.buf.s != NULL) { jdoc.free_fn(jdoc.buf.s); jdoc.buf.s = NULL; } @@ -802,14 +855,14 @@ int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* no return -1; } -int usrloc_dmq_resp_callback_f(struct sip_msg* msg, int code, - dmq_node_t* node, void* param) +int usrloc_dmq_resp_callback_f( + struct sip_msg *msg, int code, dmq_node_t *node, void *param) { LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param); return 0; } -void dmq_ul_cb_contact(ucontact_t* ptr, int type, void* param) +void dmq_ul_cb_contact(ucontact_t *ptr, int type, void *param) { str aor; @@ -817,13 +870,13 @@ void dmq_ul_cb_contact(ucontact_t* ptr, int type, void* param) aor.s = ptr->aor->s; aor.len = ptr->aor->len; - if (!usrloc_dmq_recv && init_usrloc_dmq_recv() < 0) { + if(!usrloc_dmq_recv && init_usrloc_dmq_recv() < 0) { return; } - if (!*usrloc_dmq_recv) { + if(!*usrloc_dmq_recv) { LM_DBG("Replicating local update to other nodes...\n"); - switch(type){ + switch(type) { case UL_CONTACT_INSERT: usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, 0); break; @@ -831,7 +884,7 @@ void dmq_ul_cb_contact(ucontact_t* ptr, int type, void* param) usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, 0); break; case UL_CONTACT_DELETE: - if (_dmq_usrloc_delete >= 1) { + if(_dmq_usrloc_delete >= 1) { usrloc_dmq_send_contact(ptr, aor, DMQ_RM, 0); } break; diff --git a/src/modules/dmq_usrloc/usrloc_sync.h b/src/modules/dmq_usrloc/usrloc_sync.h index 5c2c914aa13..cb55f3bb344 100644 --- a/src/modules/dmq_usrloc/usrloc_sync.h +++ b/src/modules/dmq_usrloc/usrloc_sync.h @@ -34,17 +34,20 @@ extern usrloc_api_t dmq_ul; -typedef enum { +typedef enum +{ DMQ_NONE, DMQ_UPDATE, DMQ_RM, DMQ_SYNC, } usrloc_dmq_action_t; -int usrloc_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param); +int usrloc_dmq_resp_callback_f( + struct sip_msg *msg, int code, dmq_node_t *node, void *param); int usrloc_dmq_initialize(); -int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node); +int usrloc_dmq_handle_msg( + struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *node); int usrloc_dmq_request_sync(); -void dmq_ul_cb_contact(ucontact_t* c, int type, void* param); +void dmq_ul_cb_contact(ucontact_t *c, int type, void *param); #endif