From e5d9611884187952f7e05f3ee191320c5123c463 Mon Sep 17 00:00:00 2001 From: Andrey Rybkin Date: Fri, 14 Nov 2014 10:06:19 +0300 Subject: [PATCH] Add module for usrloc sync via dmq --- modules/usrloc_dmq/Makefile | 12 + modules/usrloc_dmq/doc/Makefile | 4 + modules/usrloc_dmq/doc/usrloc_dmq.xml | 48 ++ modules/usrloc_dmq/doc/usrloc_dmq_admin.xml | 74 +++ modules/usrloc_dmq/usrloc_dmq.c | 87 ++++ modules/usrloc_dmq/usrloc_sync.c | 544 ++++++++++++++++++++ modules/usrloc_dmq/usrloc_sync.h | 32 ++ 7 files changed, 801 insertions(+) create mode 100644 modules/usrloc_dmq/Makefile create mode 100644 modules/usrloc_dmq/doc/Makefile create mode 100644 modules/usrloc_dmq/doc/usrloc_dmq.xml create mode 100644 modules/usrloc_dmq/doc/usrloc_dmq_admin.xml create mode 100644 modules/usrloc_dmq/usrloc_dmq.c create mode 100644 modules/usrloc_dmq/usrloc_sync.c create mode 100644 modules/usrloc_dmq/usrloc_sync.h diff --git a/modules/usrloc_dmq/Makefile b/modules/usrloc_dmq/Makefile new file mode 100644 index 00000000000..95ffd81c616 --- /dev/null +++ b/modules/usrloc_dmq/Makefile @@ -0,0 +1,12 @@ +include ../../Makefile.defs +auto_gen= +NAME=usrloc_dmq.so + +DEFS+=-DKAMAILIO_MOD_INTERFACE + +SERLIBPATH=../../lib +SER_LIBS+=$(SERLIBPATH)/kmi/kmi +SER_LIBS+=$(SERLIBPATH)/srdb1/srdb1 +SER_LIBS+=$(SERLIBPATH)/kcore/kcore +SER_LIBS+=$(SERLIBPATH)/srutils/srutils +include ../../Makefile.modules diff --git a/modules/usrloc_dmq/doc/Makefile b/modules/usrloc_dmq/doc/Makefile new file mode 100644 index 00000000000..db2e055cc2a --- /dev/null +++ b/modules/usrloc_dmq/doc/Makefile @@ -0,0 +1,4 @@ +docs = usrloc_dmq.xml + +docbook_dir = ../../../docbook +include $(docbook_dir)/Makefile.module diff --git a/modules/usrloc_dmq/doc/usrloc_dmq.xml b/modules/usrloc_dmq/doc/usrloc_dmq.xml new file mode 100644 index 00000000000..565cc8a08cf --- /dev/null +++ b/modules/usrloc_dmq/doc/usrloc_dmq.xml @@ -0,0 +1,48 @@ + + + +%docentities; + +]> + + + + usrloc_dmq Module + &kamailioname; + + + Andrey + Rybkin + bks.tv + rybkin.a@bks.tv +
+ + + +
+
+ + Andrey + Rybkin + bks.tv + rybkin.a@bks.tv +
+ + + +
+
+
+ + 2014 + +
+ + + + + +
diff --git a/modules/usrloc_dmq/doc/usrloc_dmq_admin.xml b/modules/usrloc_dmq/doc/usrloc_dmq_admin.xml new file mode 100644 index 00000000000..29564b53454 --- /dev/null +++ b/modules/usrloc_dmq/doc/usrloc_dmq_admin.xml @@ -0,0 +1,74 @@ + + + +%docentities; + +]> + + + + + &adminguide; + +
+ Overview + + The module add usrloc contacts replication between multiple servers via DMQ module. + +
+
+ Dependencies +
+ &kamailio; Modules + + The following modules must be loaded before this module: + + + + DMQ module must be loaded first.. + USRLOC module must be loaded first.. + + + + +
+
+ Parameters +
+ <varname>enable</varname> (int) + + USRLOC replication + 0 - disabled + 1 - enabled + + + Default value is 0. + + +
+
+ <varname>flag</varname> (int) + + Flag to be used for marking if a contact should be constructed for the DMQ + + + + Default value is 2. + + + + Set <varname>flag</varname> parameter + +... +modparam("usrloc_dmq", "flag", 2) +... + + +
+
+ + + diff --git a/modules/usrloc_dmq/usrloc_dmq.c b/modules/usrloc_dmq/usrloc_dmq.c new file mode 100644 index 00000000000..8291664a0a9 --- /dev/null +++ b/modules/usrloc_dmq/usrloc_dmq.c @@ -0,0 +1,87 @@ +#include +#include "../../sr_module.h" +#include "../../dprint.h" +#include "../../error.h" +#include "../../modules/usrloc/usrloc.h" +#include "../usrloc/ul_callback.h" +#include "../../modules/sl/sl.h" +#include "../../mod_fix.h" + +#include "usrloc_sync.h" + +static int mod_init(void); +static int child_init(int); + +int enable_usrloc = 0; +int usrloc_syncflag = 2; + +MODULE_VERSION + +static param_export_t params[] = { + {"enable", INT_PARAM, &enable_usrloc}, + {"flag", INT_PARAM, &usrloc_syncflag}, + {0, 0, 0} +}; + +struct module_exports exports = { + "usrloc_dmq", /* module name */ + DEFAULT_DLFLAGS, /* dlopen flags */ + 0, /* exported functions */ + params, /* exported parameters */ + 0, /* exported statistics */ + 0, /* exported MI functions */ + 0, /* exported pseudo-variables */ + 0, /* extra processes */ + mod_init, /* module initialization function */ + 0, /* response handling function */ + 0, /* destroy function */ + child_init /* per-child init function */ +}; + + +static int mod_init(void) +{ + LM_ERR("dmq_sync loaded: usrloc=%d\n", enable_usrloc); + + if (enable_usrloc) { + usrloc_dmq_flag = 1 << usrloc_syncflag; + bind_usrloc_t bind_usrloc; + + bind_usrloc = (bind_usrloc_t)find_export("ul_bind_usrloc", 1, 0); + if (!bind_usrloc) { + LM_ERR("can't bind usrloc\n"); + return -1; + } + if (bind_usrloc(&ul) < 0) { + LM_ERR("Can't bind ul\n"); + return -1; + } + if(ul.register_ulcb != NULL) { + if(ul.register_ulcb(ULCB_MAX, ul_cb_contact, 0)< 0) + { + LM_ERR("can not register callback for expired contacts\n"); + return -1; + } + } + if (!usrloc_dmq_initialize()){ + LM_DBG("usrloc_dmq initialized\n"); + } else { + LM_ERR("Error in usrloc_dmq_initialize()\n"); + } + } + return 0; +} + +static int child_init(int rank) +{ + + if (rank == PROC_MAIN) { + LM_ERR("child_init PROC_MAIN\n"); + return 0; + } + if(rank == PROC_INIT || rank == PROC_TCP_MAIN) { + LM_ERR("child_init PROC_INIT\n"); + return 0; + } + return 0; +} diff --git a/modules/usrloc_dmq/usrloc_sync.c b/modules/usrloc_dmq/usrloc_sync.c new file mode 100644 index 00000000000..cffd7f27a69 --- /dev/null +++ b/modules/usrloc_dmq/usrloc_sync.c @@ -0,0 +1,544 @@ +#include "usrloc_sync.h" +#include "../usrloc/usrloc.h" +#include "../usrloc/ul_callback.h" +#include "../usrloc/dlist.h" +#include "../../dprint.h" +#include "../../parser/parse_from.h" +#include "../../parser/parse_addr_spec.h" + +static str usrloc_dmq_content_type = str_init("application/json"); +static str dmq_200_rpl = str_init("OK"); +static str dmq_400_rpl = str_init("Bad Request"); +static str dmq_500_rpl = str_init("Server Internal Error"); + +dmq_api_t usrloc_dmqb; +dmq_peer_t* usrloc_dmq_peer = NULL; +dmq_resp_cback_t usrloc_dmq_resp_callback = {&usrloc_dmq_resp_callback_f, 0}; + +int usrloc_dmq_send_all(); +int usrloc_dmq_request_sync(); +int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node); +usrloc_api_t ul; + +#define MAX_AOR_LEN 256 +int extract_aor(str* _uri, str* _a, sip_uri_t *_pu) +{ + static char aor_buf[MAX_AOR_LEN]; + sip_uri_t turi; + sip_uri_t *puri; + str *uri; + + memset(aor_buf, 0, MAX_AOR_LEN); + uri=_uri; + + if(_pu!=NULL) + puri = _pu; + else + puri = &turi; + + if (parse_uri(uri->s, uri->len, puri) < 0) { + LM_ERR("failed to parse AoR [%.*s]\n", uri->len, uri->s); + return -1; + } + + if ( (puri->user.len + puri->host.len + 1) > MAX_AOR_LEN) { + LM_ERR("Address Of Record too long\n"); + return -2; + } + + _a->s = aor_buf; + _a->len = puri->user.len; + + if (un_escape(&puri->user, _a) < 0) { + LM_ERR("failed to unescape username\n"); + return -3; + } + + strlower(_a); + + return 0; +} + +int add_contact(str aor, ucontact_info_t* ci) +{ + urecord_t* r; + udomain_t* _d; + ucontact_t* c; + str contact; + int res; + + ul.get_udomain("location", &_d); + ul.lock_udomain(_d, &aor); + res = ul.get_urecord(_d, &aor, &r); + if (res < 0) { + LM_ERR("failed to retrieve record from usrloc\n"); + goto error; + } else if ( res == 0) { + LM_DBG("'%.*s' found in usrloc\n", aor.len, ZSW(aor.s)); + res = ul.get_ucontact_by_instance(r, &aor, ci, &c); + LM_DBG("get_ucontact_by_instance = %d\n", res); + if (res==-1) { + LM_ERR("Invalid cseq\n"); + goto error; + } else if (res > 0 ) { + LM_DBG("Not found contact\n"); + ul.insert_ucontact(r, &contact, ci, &c); + } else if (res == 0) { + LM_DBG("Found contact\n"); + ul.update_ucontact(r, c, ci); + } + } else { + LM_DBG("'%.*s' Not found in usrloc\n", aor.len, ZSW(aor.s)); + ul.insert_urecord(_d, &aor, &r); + LM_DBG("Insert record\n"); + contact.s = ci->c->s; + contact.len = ci->c->len; + ul.insert_ucontact(r, &contact, ci, &c); + LM_DBG("Insert ucontact\n"); + } + + LM_DBG("Release record\n"); + ul.release_urecord(r); + LM_DBG("Unlock udomain\n"); + ul.unlock_udomain(_d, &aor); + return 0; + error: + ul.unlock_udomain(_d, &aor); + return -1; +} + +void usrloc_get_all_ucontact(dmq_node_t* node) +{ + int rval, len=0; + void *buf, *cp; + str c; + str path; + str ruid; + unsigned int aorhash; + struct socket_info* send_sock; + unsigned int flags; + + len = 0; + buf = NULL; + + if (ul.get_all_ucontacts == NULL){ + LM_ERR("ul.get_all_ucontacts is NULL\n"); + goto done; + } + rval = ul.get_all_ucontacts(buf, len, 0, 0, 1); + if (rval<0) { + LM_ERR("failed to fetch contacts\n"); + goto done; + } + if (rval > 0) { + if (buf != NULL) + pkg_free(buf); + len = rval * 2; + buf = pkg_malloc(len); + if (buf == NULL) { + LM_ERR("out of pkg memory\n"); + goto done; + } + rval = ul.get_all_ucontacts(buf, len, 0, 0, 1); + if (rval != 0) { + pkg_free(buf); + goto done; + } + } + if (buf == NULL) + goto done; + cp = buf; + while (1) { + memcpy(&(c.len), cp, sizeof(c.len)); + if (c.len == 0) + break; + c.s = (char*)cp + sizeof(c.len); + cp = (char*)cp + sizeof(c.len) + c.len; + memcpy( &send_sock, cp, sizeof(send_sock)); + cp = (char*)cp + sizeof(send_sock); + memcpy( &flags, cp, sizeof(flags)); + cp = (char*)cp + sizeof(flags); + memcpy( &(path.len), cp, sizeof(path.len)); + path.s = path.len ? ((char*)cp + sizeof(path.len)) : NULL ; + cp = (char*)cp + sizeof(path.len) + path.len; + memcpy( &(ruid.len), cp, sizeof(ruid.len)); + ruid.s = ruid.len ? ((char*)cp + sizeof(ruid.len)) : NULL ; + cp = (char*)cp + sizeof(ruid.len) + ruid.len; + memcpy( &aorhash, cp, sizeof(aorhash)); + cp = (char*)cp + sizeof(aorhash); + + + str aor; + sip_uri_t puri; + urecord_t* r; + udomain_t* _d; + ucontact_t* ptr = 0; + int res; + + if (extract_aor(&c, &aor, &puri) < 0) { + LM_ERR("failed to extract address of record\n"); + continue; + } + ul.get_udomain("location", &_d); + ul.lock_udomain(_d, &aor); + res = ul.get_urecord(_d, &aor, &r); + if (res > 0) { + LM_DBG("'%.*s' Not found in usrloc\n", aor.len, ZSW(aor.s)); + ul.unlock_udomain(_d, &aor); + continue; + } + + LM_DBG("- AoR: %.*s AoRhash=%d Flags=%d\n", aor.len, aor.s, aorhash, flags); + + ptr = r->contacts; + + while (ptr) { + usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node); + ptr = ptr->next; + } + ul.release_urecord(r); + ul.unlock_udomain(_d, &aor); + } + pkg_free(buf); + +done: + c.s = ""; c.len = 0; +} + + +int usrloc_dmq_initialize() +{ + dmq_peer_t not_peer; + + /* load the DMQ API */ + if (dmq_load_api(&usrloc_dmqb)!=0) { + LM_ERR("cannot load dmq api\n"); + return -1; + } else { + LM_DBG("loaded dmq api\n"); + } + not_peer.callback = usrloc_dmq_handle_msg; + not_peer.init_callback = usrloc_dmq_request_sync; + not_peer.description.s = "usrloc"; + not_peer.description.len = 6; + not_peer.peer_id.s = "usrloc"; + not_peer.peer_id.len = 6; + usrloc_dmq_peer = usrloc_dmqb.register_dmq_peer(¬_peer); + if(!usrloc_dmq_peer) { + LM_ERR("error in register_dmq_peer\n"); + goto error; + } else { + LM_DBG("dmq peer registered\n"); + } + return 0; +error: + return -1; +} + + +int usrloc_dmq_send(str* body, dmq_node_t* node) { + if (!usrloc_dmq_peer) { + LM_ERR("dlg_dmq_peer is null!\n"); + return -1; + } + if (node) { + LM_DBG("sending dmq message ...\n"); + usrloc_dmqb.send_message(usrloc_dmq_peer, body, node, &usrloc_dmq_resp_callback, 1, &usrloc_dmq_content_type); + } else { + LM_DBG("sending dmq broadcast...\n"); + usrloc_dmqb.bcast_message(usrloc_dmq_peer, body, 0, &usrloc_dmq_resp_callback, 1, &usrloc_dmq_content_type); + } + return 0; +} + + +/** +* @brief ht dmq callback +*/ +int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node) +{ + int content_length; + str body; + srjson_doc_t jdoc; + srjson_t *it = NULL; + static ucontact_info_t ci; + + int action, expires, cseq, flags, cflags, q, last_modified, methods, reg_id, tcpconn_id; + str aor, ruid, c, received, path, callid, user_agent, instance; + + parse_from_header(msg); + body = ((struct to_body*)msg->from->parsed)->uri; + + LM_DBG("dmq message received from %.*s\n", body.len, body.s); + + if(!msg->content_length) { + LM_ERR("no content length header found\n"); + goto invalid; + } + content_length = get_content_length(msg); + if(!content_length) { + LM_DBG("content length is 0\n"); + goto invalid; + } + + body.s = get_body(msg); + body.len = content_length; + + if (!body.s) { + LM_ERR("unable to get body\n"); + goto error; + } + + srjson_InitDoc(&jdoc, NULL); + jdoc.buf = body; + if(jdoc.root == NULL) { + jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s); + if(jdoc.root == NULL) + { + LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s); + goto invalid; + } + } + + for(it=jdoc.root->child; it; it = it->next) + { + if (it->string == NULL) continue; + + if (strcmp(it->string, "action")==0) { + action = it->valueint; + } else if (strcmp(it->string, "aor")==0) { + aor.s = it->valuestring; + aor.len = strlen(aor.s); + } else if (strcmp(it->string, "ruid")==0) { + ruid.s = it->valuestring; + ruid.len = strlen(ruid.s); + } else if (strcmp(it->string, "c")==0) { + c.s = it->valuestring; + c.len = strlen(c.s); + } else if (strcmp(it->string, "received")==0) { + received.s = it->valuestring; + received.len = strlen(received.s); + } else if (strcmp(it->string, "path")==0) { + path.s = it->valuestring; + path.len = strlen(path.s); + } else if (strcmp(it->string, "callid")==0) { + callid.s = it->valuestring; + callid.len = strlen(callid.s); + } else if (strcmp(it->string, "user_agent")==0) { + user_agent.s = it->valuestring; + user_agent.len = strlen(user_agent.s); + } else if (strcmp(it->string, "instance")==0) { + instance.s = it->valuestring; + instance.len = strlen(instance.s); + } else if (strcmp(it->string, "expires")==0) { // + expires = it->valueint; + } else if (strcmp(it->string, "cseq")==0) { + cseq = it->valueint; + } else if (strcmp(it->string, "flags")==0) { + flags = it->valueint; + } else if (strcmp(it->string, "cflags")==0) { + cflags = it->valueint; + } else if (strcmp(it->string, "q")==0) { + q = it->valueint; + } else if (strcmp(it->string, "last_modified")==0) { + last_modified = it->valueint; + } else if (strcmp(it->string, "methods")==0) { + methods = it->valueint; + } else if (strcmp(it->string, "reg_id")==0) { + reg_id = it->valueint; + } else if (strcmp(it->string, "tcpconn_id")==0) { + tcpconn_id = it->valueint; + } else { + LM_ERR("unrecognized field in json object\n"); + } + } + srjson_DestroyDoc(&jdoc); + memset( &ci, 0, sizeof(ucontact_info_t)); + ci.ruid = ruid; + ci.c = &c; + ci.received = received; + ci.path = &path; + ci.expires = expires; + ci.q = q; + ci.callid = &callid; + ci.cseq = cseq; + ci.flags = flags; + ci.flags |= usrloc_dmq_flag; + ci.cflags = cflags; + ci.user_agent = &user_agent; + ci.methods = methods; + ci.instance = instance; + ci.reg_id = reg_id; + ci.tcpconn_id = tcpconn_id; + ci.last_modified = last_modified; + + switch(action) { + case DMQ_UPDATE: + LM_DBG("Received DMQ_UPDATE. Update contact info...\n"); + add_contact(aor, &ci); + break; + case DMQ_RM: + LM_DBG("Received DMQ_RM. Delete contact info...\n"); + break; + case DMQ_SYNC: + LM_DBG("Received DMQ_SYNC. Sending all contacts...\n"); + usrloc_get_all_ucontact(node); + break; + case DMQ_NONE: + LM_DBG("Received DMQ_NONE. Not used...\n"); + break; + + default: goto invalid; + } + + resp->reason = dmq_200_rpl; + resp->resp_code = 200; + return 0; + +invalid: + resp->reason = dmq_400_rpl; + resp->resp_code = 400; + return 0; + +error: + resp->reason = dmq_500_rpl; + resp->resp_code = 500; + return 0; +} + + +int usrloc_dmq_request_sync() { + srjson_doc_t jdoc; + LM_DBG("requesting sync from dmq peers\n"); + srjson_InitDoc(&jdoc, NULL); + + jdoc.root = srjson_CreateObject(&jdoc); + if(jdoc.root==NULL) { + LM_ERR("cannot create json root\n"); + goto error; + } + + srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DMQ_SYNC); + jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root); + if(jdoc.buf.s==NULL) { + LM_ERR("unable to serialize data\n"); + goto error; + } + jdoc.buf.len = strlen(jdoc.buf.s); + LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); + if (usrloc_dmq_send(&jdoc.buf, 0)!=0) { + goto error; + } + + jdoc.free_fn(jdoc.buf.s); + jdoc.buf.s = NULL; + srjson_DestroyDoc(&jdoc); + return 0; + +error: + if(jdoc.buf.s!=NULL) { + jdoc.free_fn(jdoc.buf.s); + jdoc.buf.s = NULL; + } + srjson_DestroyDoc(&jdoc); + return -1; +} + +int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) { + srjson_doc_t jdoc; + srjson_InitDoc(&jdoc, NULL); + + int flags; + + jdoc.root = srjson_CreateObject(&jdoc); + if(jdoc.root==NULL) { + LM_ERR("cannot create json root\n"); + goto error; + } + + flags = ptr->flags; + flags &= ~usrloc_dmq_flag; + + srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action); + + srjson_AddStrToObject(&jdoc, jdoc.root, "aor", aor.s, aor.len); + srjson_AddStrToObject(&jdoc, jdoc.root, "ruid", ptr->ruid.s, ptr->ruid.len); + srjson_AddStrToObject(&jdoc, jdoc.root, "c", ptr->c.s, ptr->c.len); + srjson_AddStrToObject(&jdoc, jdoc.root, "received", ptr->received.s, ptr->received.len); + srjson_AddStrToObject(&jdoc, jdoc.root, "path", ptr->path.s, ptr->path.len); + srjson_AddStrToObject(&jdoc, jdoc.root, "callid", ptr->callid.s, ptr->callid.len); + srjson_AddStrToObject(&jdoc, jdoc.root, "user_agent", ptr->user_agent.s, ptr->user_agent.len); + srjson_AddStrToObject(&jdoc, jdoc.root, "instance", ptr->instance.s, ptr->instance.len); + srjson_AddNumberToObject(&jdoc, jdoc.root, "expires", ptr->expires); + srjson_AddNumberToObject(&jdoc, jdoc.root, "cseq", ptr->cseq); + srjson_AddNumberToObject(&jdoc, jdoc.root, "flags", flags); + srjson_AddNumberToObject(&jdoc, jdoc.root, "cflags", ptr->cflags); + srjson_AddNumberToObject(&jdoc, jdoc.root, "q", ptr->q); + srjson_AddNumberToObject(&jdoc, jdoc.root, "last_modified", ptr->last_modified); + srjson_AddNumberToObject(&jdoc, jdoc.root, "methods", ptr->methods); + srjson_AddNumberToObject(&jdoc, jdoc.root, "reg_id", ptr->reg_id); + srjson_AddNumberToObject(&jdoc, jdoc.root, "tcpconn_id", ptr->tcpconn_id); + + jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root); + if(jdoc.buf.s==NULL) { + LM_ERR("unable to serialize data\n"); + goto error; + } + jdoc.buf.len = strlen(jdoc.buf.s); + + LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); + if (usrloc_dmq_send(&jdoc.buf, node)!=0) { + goto error; + } + + jdoc.free_fn(jdoc.buf.s); + jdoc.buf.s = NULL; + srjson_DestroyDoc(&jdoc); + return 0; + +error: + if(jdoc.buf.s!=NULL) { + jdoc.free_fn(jdoc.buf.s); + jdoc.buf.s = NULL; + } + srjson_DestroyDoc(&jdoc); + return -1; +} + +int usrloc_dmq_resp_callback_f(struct sip_msg* msg, int code, + dmq_node_t* node, void* param) +{ + LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param); + return 0; +} + +void ul_cb_contact(ucontact_t* ptr, int type, void* param) +{ + str aor; + + LM_DBG("Callback from usrloc with type=%d\n", type); + aor.s = ptr->aor->s; + aor.len = ptr->aor->len; + + if (!(ptr->flags & usrloc_dmq_flag)) { + + switch(type){ + case UL_CONTACT_INSERT: + usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, 0); + break; + case UL_CONTACT_UPDATE: + usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, 0); + break; + case UL_CONTACT_DELETE: + //usrloc_dmq_send_contact(ptr, aor, DMQ_RM); + LM_DBG("Contact <%.*s> deleted\n", aor.len, aor.s); + break; + case UL_CONTACT_EXPIRE: + //usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE); + LM_DBG("Contact <%.*s> expired\n", aor.len, aor.s); + break; + } + } else { + LM_DBG("Contact recieved from DMQ... skip\n"); + } +} diff --git a/modules/usrloc_dmq/usrloc_sync.h b/modules/usrloc_dmq/usrloc_sync.h new file mode 100644 index 00000000000..383a62da432 --- /dev/null +++ b/modules/usrloc_dmq/usrloc_sync.h @@ -0,0 +1,32 @@ +#ifndef _DMQ_SYNC_USRLOC_H_ +#define _DMQ_SYNC_USRLOC_H_ + +#include "../dmq/bind_dmq.h" +#include "../../lib/srutils/srjson.h" +#include "../../parser/msg_parser.h" +#include "../../parser/parse_content.h" +#include "../usrloc/usrloc.h" + +int usrloc_dmq_flag; + +extern dmq_api_t usrloc_dmqb; +extern dmq_peer_t* usrloc_dmq_peer; +extern dmq_resp_cback_t usrloc_dmq_resp_callback; +extern rpc_export_t ul_rpc[]; + +usrloc_api_t ul; + +typedef enum { + DMQ_NONE, + DMQ_UPDATE, + DMQ_RM, + DMQ_SYNC, +} usrloc_dmq_action_t; + +int usrloc_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param); +int usrloc_dmq_initialize(); +int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node); +int usrloc_dmq_request_sync(); +void ul_cb_contact(ucontact_t* c, int type, void* param); + +#endif