diff --git a/modules/ims_registrar_pcscf/async_reginfo.c b/modules/ims_registrar_pcscf/async_reginfo.c index fe34164c5ab..240372265a5 100644 --- a/modules/ims_registrar_pcscf/async_reginfo.c +++ b/modules/ims_registrar_pcscf/async_reginfo.c @@ -45,7 +45,13 @@ #include #include "async_reginfo.h" +#include "../pua/pua.h" +#include "../pua/send_publish.h" +#include "../pua/pua_bind.h" + +extern pua_api_t pua; +extern int reginfo_queue_size_threshold; reginfo_event_list_t *reginfo_event_list = 0; int init_reginfo_event_list() @@ -62,6 +68,7 @@ int init_reginfo_event_list() return 0; } reginfo_event_list->lock = lock_init(reginfo_event_list->lock); + reginfo_event_list->size = 0; sem_new(reginfo_event_list->empty, 0); //pre-locked - as we assume list is empty at start @@ -83,17 +90,132 @@ void destroy_reginfo_event_list() shm_free(reginfo_event_list); } -reginfo_event_t* new_reginfo_event (int event) +reginfo_event_t* new_reginfo_event (int event, str *publ_body, str *publ_id, str *publ_content_type, str *subs_remote_target, str *subs_watcher_uri, + str *subs_contact, str *subs_outbound_proxy, int expires, int flag, int source_flag, int reg_info_event, str *extra_headers, str *pres_uri) { - reginfo_event_t *new_event = shm_malloc(sizeof(reginfo_event_t)); + char *p; + int len; + reginfo_event_t *new_event; + + len = sizeof(reginfo_event_t); + if(publ_body){ + len += publ_body->len; + } + if(publ_id){ + len += publ_id->len; + } + if(publ_content_type){ + len += publ_content_type->len; + } + if(subs_remote_target){ + len += subs_remote_target->len; + } + if(subs_watcher_uri){ + len += subs_watcher_uri->len; + } + if(subs_contact){ + len += subs_contact->len; + } + if(subs_outbound_proxy){ + len += subs_outbound_proxy->len; + } + if(extra_headers){ + len += extra_headers->len; + } + if(pres_uri){ + len += pres_uri->len; + } + + LM_DBG("Shm alloc'ing %d for new reg info event\n", len); + new_event = (reginfo_event_t*) shm_malloc(len); + if (!new_event) { LM_ERR("No more shm mem\n"); return NULL; } + memset(new_event, 0, len); + + p = (char*) (new_event + 1); + + if(publ_body) { + LM_DBG("publ_body [%.*s]\n", publ_body->len, publ_body->s); + new_event->publ_body.s = p; + new_event->publ_body.len = publ_body->len; + memcpy(p, publ_body->s, publ_body->len); + p += publ_body->len; + } + if(publ_id) { + LM_DBG("publ_id [%.*s]\n", publ_id->len, publ_id->s); + new_event->publ_id.s = p; + new_event->publ_id.len = publ_id->len; + memcpy(p, publ_id->s, publ_id->len); + p += publ_id->len; + } + if(publ_content_type) { + LM_DBG("publ_content_type [%.*s]\n", publ_content_type->len, publ_content_type->s); + new_event->publ_content_type.s = p; + new_event->publ_content_type.len = publ_content_type->len; + memcpy(p, publ_content_type->s, publ_content_type->len); + p += publ_content_type->len; + } + if(subs_remote_target) { + LM_DBG("subs_remote_target [%.*s]\n", subs_remote_target->len, subs_remote_target->s); + new_event->subs_remote_target.s = p; + new_event->subs_remote_target.len = subs_remote_target->len; + memcpy(p, subs_remote_target->s, subs_remote_target->len); + p += subs_remote_target->len; + } + if(subs_watcher_uri) { + LM_DBG("subs_watcher_uri [%.*s]\n", subs_watcher_uri->len, subs_watcher_uri->s); + new_event->subs_watcher_uri.s = p; + new_event->subs_watcher_uri.len = subs_watcher_uri->len; + memcpy(p, subs_watcher_uri->s, subs_watcher_uri->len); + p += subs_watcher_uri->len; + } + if(subs_contact) { + LM_DBG("subs_contact [%.*s]\n", subs_contact->len, subs_contact->s); + new_event->subs_contact.s = p; + new_event->subs_contact.len = subs_contact->len; + memcpy(p, subs_contact->s, subs_contact->len); + p += subs_contact->len; + } + if(subs_outbound_proxy) { + LM_DBG("subs_outbound_proxy [%.*s]\n", subs_outbound_proxy->len, subs_outbound_proxy->s); + new_event->subs_outbound_proxy.s = p; + new_event->subs_outbound_proxy.len = subs_outbound_proxy->len; + memcpy(p, subs_outbound_proxy->s, subs_outbound_proxy->len); + p += subs_outbound_proxy->len; + } + if(extra_headers) { + LM_DBG("extra_headers [%.*s]\n", extra_headers->len, extra_headers->s); + new_event->extra_headers.s = p; + new_event->extra_headers.len = extra_headers->len; + memcpy(p, extra_headers->s, extra_headers->len); + p += extra_headers->len; + } + if(pres_uri) { + LM_DBG("pres_uri [%.*s]\n", pres_uri->len, pres_uri->s); + new_event->pres_uri.s = p; + new_event->pres_uri.len = pres_uri->len; + memcpy(p, pres_uri->s, pres_uri->len); + p += pres_uri->len; + } + + if (p != (((char*) new_event) + len)) { + LM_CRIT("buffer overflow\n"); + shm_free(new_event); + return 0; + } + + new_event->expires = expires; + new_event->flag = flag; + new_event->reg_info_event = reg_info_event; + new_event->sourge_flag = source_flag; + new_event->registered = time(NULL); new_event->event = event; new_event->next = 0; - + return new_event; } @@ -106,6 +228,12 @@ void push_reginfo_event(reginfo_event_t* event) reginfo_event_list->tail->next = event; reginfo_event_list->tail = event; } + reginfo_event_list->size++; + + if(reginfo_queue_size_threshold > 0 && reginfo_event_list->size > reginfo_queue_size_threshold) { + LM_WARN("Reginfo queue is size [%d] and has exceed reginfo_queue_size_threshold of [%d]", reginfo_event_list->size, reginfo_queue_size_threshold); + } + sem_release(reginfo_event_list->empty); lock_release(reginfo_event_list->lock); } @@ -128,6 +256,9 @@ reginfo_event_t* pop_reginfo_event() reginfo_event_list->tail = 0; } ev->next = 0; //make sure whoever gets this cant access our list + + reginfo_event_list->size--; + lock_release(reginfo_event_list->lock); return ev; @@ -143,18 +274,75 @@ void free_reginfo_event(reginfo_event_t* ev) void reginfo_event_process() { + publ_info_t publ; + subs_info_t subs; reginfo_event_t *ev; for (;;) { - LM_DBG("POPPING REGINFO EVENT\n"); + LM_DBG("POPPING REGINFO EVENT\n"); ev = pop_reginfo_event(); LM_DBG("PROCESSING REGINFO EVENT with event [%d]\n", ev->event); switch (ev->event) { case REG_EVENT_PUBLISH: - LM_DBG("Sending out-of-band publish\n"); + LM_DBG("Sending out-of-band publish with pres_uri [%.*s] publ_id [%.*s] publ_content_type [%.*s] extra_headers [%.*s]" + "expires [%d] event [%d]\n", + ev->pres_uri.len, ev->pres_uri.s, ev->publ_id.len, ev->publ_id.s, ev->publ_content_type.len, ev->publ_content_type.s, + ev->extra_headers.len, ev->extra_headers.s, ev->expires, ev->reg_info_event); + LM_DBG("publ_body [%.*s] \n", + ev->publ_body.len, ev->publ_body.s); + + memset(&publ, 0, sizeof(publ_info_t)); + publ.pres_uri = &ev->pres_uri; + publ.body = &ev->publ_body; + publ.id = ev->publ_id; + publ.content_type = ev->publ_content_type; + publ.expires = ev->expires; + + /* make UPDATE_TYPE, as if this "publish dialog" is not found + by pua it will fallback to INSERT_TYPE anyway */ + publ.flag |= ev->flag; + publ.source_flag |= ev->sourge_flag; + publ.event |= ev->reg_info_event; + publ.extra_headers = &ev->extra_headers; + + if (pua.send_publish(&publ) < 0) { + LM_ERR("Error while sending publish\n"); + } break; case REG_EVENT_SUBSCRIBE: - LM_DBG("Sending out-of-band subscribe\n"); + memset(&subs, 0, sizeof(subs_info_t)); + + subs.remote_target = &ev->subs_remote_target; + subs.pres_uri= &ev->pres_uri; + subs.watcher_uri= &ev->subs_watcher_uri; + subs.expires = ev->expires; + + subs.source_flag= ev->sourge_flag; + subs.event= ev->reg_info_event; + subs.contact= &ev->subs_contact; + subs.extra_headers = &ev->extra_headers; + + if(ev->subs_outbound_proxy.s) { + subs.outbound_proxy= &ev->subs_outbound_proxy; + } + + subs.flag|= ev->flag; + + + LM_DBG("Sending out-of-band subscribe with remote_target [%.*s] pres_uri [%.*s] subs_watcher_uri [%.*s] subs_contact [%.*s] extra_headers [%.*s] " + "expires [%d] event [%d] flag [%d] source_flag [%d]\n", + subs.remote_target->len, subs.remote_target->s, subs.pres_uri->len, subs.pres_uri->s, subs.watcher_uri->len, subs.watcher_uri->s, + subs.contact->len, subs.contact->s, subs.extra_headers->len, subs.extra_headers->s, subs.expires, subs.event, subs.flag, subs.source_flag); + if(subs.outbound_proxy) { + LM_DBG("subs_outbound_proxy [%.*s]\n", + subs.outbound_proxy->len, subs.outbound_proxy->s); + } + + + if(pua.send_subscribe(&subs)< 0) { + LM_ERR("while sending subscribe\n"); + } + break; default: LM_ERR("Unknown REG event.....ignoring\n"); diff --git a/modules/ims_registrar_pcscf/async_reginfo.h b/modules/ims_registrar_pcscf/async_reginfo.h index 0db662ccc32..dcae1b5c3a8 100644 --- a/modules/ims_registrar_pcscf/async_reginfo.h +++ b/modules/ims_registrar_pcscf/async_reginfo.h @@ -56,10 +56,24 @@ typedef struct _reginfo_event{ int event; /* event id */ time_t registered; /* time event was added to list - useful if we want to report on things that have taken too long to process */ + str publ_body; + str publ_id; + str publ_content_type; + str subs_remote_target; + str subs_watcher_uri; + str subs_contact; + str subs_outbound_proxy; + int expires; + int flag; + int sourge_flag; + int reg_info_event; + str extra_headers; + str pres_uri; struct _reginfo_event *next; } reginfo_event_t; typedef struct { + int size; gen_lock_t *lock; reginfo_event_t *head; reginfo_event_t *tail; @@ -70,7 +84,8 @@ typedef struct { int init_reginfo_event_list(); void destroy_reginfo_event_list(); -reginfo_event_t* new_reginfo_event (int event); /*create new event*/ +reginfo_event_t* new_reginfo_event (int event, str *publ_body, str *publ_id, str *publ_content_type, str *subs_remote_target, str *subs_watcher_uri, + str *subs_contact, str *subs_outbound_proxy, int expires, int flag, int source_flag, int reg_info_event, str *extra_headers, str *pres_uri); void push_reginfo_event(reginfo_event_t* event); /*add event to stack*/ reginfo_event_t* pop_reginfo_event(); /*pop next (head) event off list*/ void free_reginfo_event(reginfo_event_t*); /*free memory allocated for event*/ diff --git a/modules/ims_registrar_pcscf/reg_mod.c b/modules/ims_registrar_pcscf/reg_mod.c index a987a0488f9..0e27bb95ffb 100644 --- a/modules/ims_registrar_pcscf/reg_mod.c +++ b/modules/ims_registrar_pcscf/reg_mod.c @@ -58,16 +58,18 @@ #include "../../lib/kcore/statistics.h" #include "../../modules/sl/sl.h" #include "../../mod_fix.h" +#include "../../cfg/cfg_struct.h" /* Bindings to PUA */ #include "../pua/pua_bind.h" #include "notify.h" +#include "async_reginfo.h" + #include "reg_mod.h" #include "save.h" #include "service_routes.h" #include "lookup.h" - MODULE_VERSION usrloc_api_t ul; /**!< Structure containing pointers to usrloc functions*/ @@ -92,6 +94,7 @@ unsigned int pending_reg_expires = 30; /**!< parameter for expiry time of a pe int is_registered_fallback2ip = 0; +int reginfo_queue_size_threshold = 0; /**Threshold for size of reginfo queue after which a warning is logged */ char* rcv_avp_param = 0; @@ -165,6 +168,7 @@ static param_export_t params[] = { {"ignore_contact_rxport_check", INT_PARAM, &ignore_contact_rxport_check }, {"ignore_reg_state", INT_PARAM, &ignore_reg_state }, {"force_icscf_uri", PARAM_STR, &force_icscf_uri }, + {"reginfo_queue_size_threshold", INT_PARAM, ®info_queue_size_threshold }, // {"store_profile_dereg", INT_PARAM, &store_data_on_dereg}, {0, 0, 0} }; @@ -232,6 +236,9 @@ static int mod_init(void) { bind_usrloc_t bind_usrloc; bind_pua_t bind_pua; + /*register space for event processor*/ + register_procs(1); + if (!fix_parameters()) goto error; /* bind the SL API */ @@ -284,6 +291,13 @@ static int mod_init(void) { return -1; } LM_DBG("Successfully bound to PUA module\n"); + + /*init cdb cb event list*/ + if (!init_reginfo_event_list()) { + LM_ERR("unable to initialise reginfo_event_list\n"); + return -1; + } + LM_DBG("Successfully initialised reginfo_event_list\n"); } return 0; @@ -299,6 +313,20 @@ static void mod_destroy(void) static int child_init(int rank) { + + LM_DBG("Initialization of module in child [%d] \n", rank); + if (rank == PROC_MAIN) { + LM_DBG("Creating RegInfo Event Processor process\n"); + int pid = fork_process(PROC_SIPINIT, "RegInfo Event Processor", 1); + if (pid < 0) + return -1; //error + if (pid == 0) { + if (cfg_child_init()) + return -1; //error + reginfo_event_process(); + } + } + if (rank == PROC_MAIN || rank == PROC_TCP_MAIN) return 0; if (rank == 1) { diff --git a/modules/ims_registrar_pcscf/subscribe.c b/modules/ims_registrar_pcscf/subscribe.c index 9c859dfb0c8..68f788f84ae 100644 --- a/modules/ims_registrar_pcscf/subscribe.c +++ b/modules/ims_registrar_pcscf/subscribe.c @@ -32,6 +32,7 @@ #include "../pua/pua.h" #include "../pua/pua_bind.h" +#include "async_reginfo.h" extern pua_api_t pua; @@ -44,14 +45,16 @@ int reginfo_subscribe_real(struct sip_msg* msg, pv_elem_t* uri, str* service_rou str uri_str = {0, 0}; char uri_buf[512]; int uri_buf_len = 512; - subs_info_t subs; + //subs_info_t subs; str p_asserted_identity_header; + reginfo_event_t *new_event; + str *subs_outbound_proxy = 0; int len = strlen(P_ASSERTED_IDENTITY_HDR_PREFIX) + pcscf_uri.len + 1 + CRLF_LEN; p_asserted_identity_header.s = (char *)pkg_malloc( len ); if ( p_asserted_identity_header.s == NULL ) { LM_ERR( "insert_asserted_identity: pkg_malloc %d bytes failed", len ); - return -1; + goto error; } memcpy(p_asserted_identity_header.s, P_ASSERTED_IDENTITY_HDR_PREFIX, strlen(P_ASSERTED_IDENTITY_HDR_PREFIX)); @@ -64,11 +67,8 @@ int reginfo_subscribe_real(struct sip_msg* msg, pv_elem_t* uri, str* service_rou p_asserted_identity_header.len += CRLF_LEN; if (pv_printf(msg, uri, uri_buf, &uri_buf_len) < 0) { - LM_ERR("cannot print uri into the format\n"); - if (p_asserted_identity_header.s) { - pkg_free(p_asserted_identity_header.s); - } - return -1; + LM_ERR("cannot print uri into the format\n"); + goto error; } uri_str.s = uri_buf; uri_str.len = uri_buf_len; @@ -77,34 +77,31 @@ int reginfo_subscribe_real(struct sip_msg* msg, pv_elem_t* uri, str* service_rou LM_DBG("Subscribing to %.*s\n", uri_str.len, uri_str.s); - memset(&subs, 0, sizeof(subs_info_t)); - - subs.remote_target = &uri_str; - subs.pres_uri= &uri_str; - subs.watcher_uri= &pcscf_uri; - subs.expires = expires; - - subs.source_flag= REGINFO_SUBSCRIBE; - subs.event= REGINFO_EVENT; - subs.contact= &pcscf_uri; - subs.extra_headers = &p_asserted_identity_header; - if(force_icscf_uri.s && force_icscf_uri.len) { - subs.outbound_proxy= &force_icscf_uri; + subs_outbound_proxy= &force_icscf_uri; } - subs.flag|= UPDATE_TYPE; + new_event = new_reginfo_event(REG_EVENT_SUBSCRIBE, 0, 0, 0, &uri_str, &pcscf_uri, &pcscf_uri, + subs_outbound_proxy, expires, UPDATE_TYPE, REGINFO_SUBSCRIBE, REGINFO_EVENT, &p_asserted_identity_header, &uri_str); - if(pua.send_subscribe(&subs)< 0) { - LM_ERR("while sending subscribe\n"); - } + if (!new_event) { + LM_ERR("Unable to create event for cdp callback\n"); + goto error; + } + //push the new event onto the stack (FIFO) + push_reginfo_event(new_event); if (p_asserted_identity_header.s) { pkg_free(p_asserted_identity_header.s); } return 1; + + error: + + if (p_asserted_identity_header.s) { + pkg_free(p_asserted_identity_header.s); + } + return -1; + } - - - diff --git a/modules/ims_registrar_pcscf/ul_callback.c b/modules/ims_registrar_pcscf/ul_callback.c index 119f3c9ad46..0691bf2e850 100644 --- a/modules/ims_registrar_pcscf/ul_callback.c +++ b/modules/ims_registrar_pcscf/ul_callback.c @@ -48,6 +48,7 @@ #include "../pua/send_publish.h" #include "../pua/pua_bind.h" +#include "async_reginfo.h" #include @@ -185,12 +186,14 @@ str* build_reginfo_partial(ppublic_t *impu, struct pcontact* c, int type) { #define P_ASSERTED_IDENTITY_HDR_PREFIX "P-Asserted-Identity: <" int send_partial_publish(ppublic_t *impu, struct pcontact *c, int type) { - publ_info_t publ; + //publ_info_t publ; str content_type; int id_buf_len; char id_buf[512]; str p_asserted_identity_header; - + str publ_id; + reginfo_event_t *new_event; + content_type.s = "application/reginfo+xml"; content_type.len = 23; @@ -220,30 +223,31 @@ int send_partial_publish(ppublic_t *impu, struct pcontact *c, int type) goto error; } LM_DBG("XML-Body:\n%.*s\n", body->len, body->s); - - memset(&publ, 0, sizeof(publ_info_t)); - publ.pres_uri = &impu->public_identity; - publ.body = body; + id_buf_len = snprintf(id_buf, sizeof(id_buf), "IMSPCSCF_PUBLISH.%.*s", c->aor.len, c->aor.s); - publ.id.s = id_buf; - publ.id.len = id_buf_len; - publ.content_type = content_type; - publ.expires = 3600; - - /* make UPDATE_TYPE, as if this "publish dialog" is not found - by pua it will fallback to INSERT_TYPE anyway */ - publ.flag |= UPDATE_TYPE; - publ.source_flag |= REGINFO_PUBLISH; - publ.event |= REGINFO_EVENT; - publ.extra_headers = &p_asserted_identity_header; - - if (pua.send_publish(&publ) < 0) { - LM_ERR("Error while sending publish\n"); - } + publ_id.s = id_buf; + publ_id.len = id_buf_len; + + new_event = new_reginfo_event(REG_EVENT_PUBLISH, body, &publ_id, &content_type, 0, 0, + 0, 0, 3600, UPDATE_TYPE, REGINFO_PUBLISH, REGINFO_EVENT, &p_asserted_identity_header, &impu->public_identity); + + if (!new_event) { + LM_ERR("Unable to create event for cdp callback\n"); + goto error; + } + //push the new event onto the stack (FIFO) + push_reginfo_event(new_event); + if (p_asserted_identity_header.s) { pkg_free(p_asserted_identity_header.s); } + if (body) { + if (body->s) + xmlFree(body->s); + pkg_free(body); + } + return 1; error: