diff --git a/src/modules/kazoo/doc/kazoo_admin.xml b/src/modules/kazoo/doc/kazoo_admin.xml index d9a871cca41..6b236ed50a3 100644 --- a/src/modules/kazoo/doc/kazoo_admin.xml +++ b/src/modules/kazoo/doc/kazoo_admin.xml @@ -297,6 +297,28 @@ modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672") +
+ <varname>event_callback</varname>(str) + + The name of the function in the kemi configuration file (embedded + scripting language such as Lua, Python, ...) to be executed instead + of event_route[...] blocks. + + + The function receives a string parameter with the name of the event, + the values can be: 'kazoo:mod-init', 'kazoo:consumer-event'. + + + + Set <varname>event_callback</varname> parameter + + ... + modparam("kazoo", "event_callback", "ksr_kazoo_event") + ... + + +
+ diff --git a/src/modules/kazoo/kazoo.c b/src/modules/kazoo/kazoo.c index ad788e78c00..697ec1d853f 100644 --- a/src/modules/kazoo/kazoo.c +++ b/src/modules/kazoo/kazoo.c @@ -31,6 +31,7 @@ #include "../../lib/srdb1/db.h" #include "../../core/dprint.h" #include "../../core/cfg/cfg_struct.h" +#include "../../core/kemi.h" #include "kz_amqp.h" #include "kz_json.h" @@ -46,6 +47,8 @@ static int mod_init(void); static int mod_child_init(int rank); static int fire_init_event(int rank); +static int fire_init_event_cfg(void); +static int fire_init_event_kemi(void); static void mod_destroy(void); str dbk_node_hostname = { 0, 0 }; @@ -122,6 +125,9 @@ pv_spec_t kz_query_result_spec; str kz_app_name = str_init(NAME); +str kazoo_event_callback = STR_NULL; +int kazoo_kemi_enabled=0; + MODULE_VERSION static tr_export_t mod_trans[] = { @@ -209,6 +215,7 @@ static param_export_t params[] = { {"pua_lock_type", INT_PARAM, &kz_pua_lock_type}, {"amqp_connect_timeout_micro", INT_PARAM, &kz_amqp_connect_timeout_tv.tv_usec}, {"amqp_connect_timeout_sec", INT_PARAM, &kz_amqp_connect_timeout_tv.tv_sec}, + {"event_callback", PARAM_STR, &kazoo_event_callback}, {0, 0, 0} }; @@ -273,6 +280,7 @@ static int mod_init(void) { return -1; } + if(kz_timer_ms > 0) { kz_timer_tv.tv_usec = (kz_timer_ms % 1000) * 1000; kz_timer_tv.tv_sec = kz_timer_ms / 1000; @@ -312,6 +320,16 @@ static int mod_init(void) { } } + sr_kemi_eng_t *keng = NULL; + if(kazoo_event_callback.s!=NULL && kazoo_event_callback.len>0) { + keng = sr_kemi_eng_get(); + if(keng==NULL) { + LM_ERR("failed to find kemi engine\n"); + return -1; + } + kazoo_kemi_enabled=1; + } + int total_workers = dbk_consumer_workers + (dbk_consumer_processes * kz_server_counter) + 2; @@ -341,6 +359,20 @@ static int mod_init(void) { return 0; } +static sr_kemi_t kazoo_kemi_exports[] = { + { str_init("kazoo"), str_init("kazoo_publish"), + SR_KEMIP_INT, ki_kz_amqp_publish, + { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + { str_init("kazoo"), str_init("kazoo_subscribe"), + SR_KEMIP_INT, ki_kz_amqp_subscribe, + { SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } } +}; + int mod_register(char *path, int *dlflags, void *p1, void *p2) { if(kz_tr_init_buffers()<0) @@ -348,6 +380,9 @@ int mod_register(char *path, int *dlflags, void *p1, void *p2) LM_ERR("failed to initialize transformations buffers\n"); return -1; } + + sr_kemi_modules_add(kazoo_kemi_exports); + return register_trans_mod(path, mod_trans); } @@ -430,16 +465,12 @@ static int mod_child_init(int rank) return 0; } -static int fire_init_event(int rank) +static int fire_init_event_cfg(void) { struct sip_msg *fmsg; struct run_act_ctx ctx; int rtb, rt; - LM_DBG("rank is (%d)\n", rank); - if (rank!=PROC_INIT) - return 0; - rt = route_get(&event_rt, "kazoo:mod-init"); if(rt>=0 && event_rt.rlist[rt]!=NULL) { LM_DBG("executing event_route[kazoo:mod-init] (%d)\n", rt); @@ -461,11 +492,50 @@ static int fire_init_event(int rank) return 0; } +static int fire_init_event_kemi(void) +{ + struct sip_msg *fmsg; + int rtb; + sr_kemi_eng_t *keng = NULL; + + keng = sr_kemi_eng_get(); + if(keng!=NULL) { + str evrtname = str_init("kazoo:mod-init"); + rtb = get_route_type(); + if(faked_msg_init()<0) + return -1; + fmsg = faked_msg_next(); + if(sr_kemi_route(keng, fmsg, EVENT_ROUTE, &kazoo_event_callback, &evrtname)<0) { + LM_ERR("error running event route kemi callback\n"); + } + set_route_type(rtb); + } + else { + LM_ERR("no event route or kemi callback found for execution\n"); + } + + return 0; +} + +static int fire_init_event(int rank) +{ + LM_DBG("rank is (%d)\n", rank); + if (rank!=PROC_INIT) + return 0; + + if (kazoo_kemi_enabled) { + return fire_init_event_kemi(); + } + else { + return fire_init_event_cfg(); + } + + return 0; +} + static void mod_destroy(void) { kz_amqp_destroy(); if (kz_worker_pipes_fds) { shm_free(kz_worker_pipes_fds); } if (kz_worker_pipes) { shm_free(kz_worker_pipes); } } - - diff --git a/src/modules/kazoo/kz_amqp.c b/src/modules/kazoo/kz_amqp.c index 559662a4743..7c9482e2707 100644 --- a/src/modules/kazoo/kz_amqp.c +++ b/src/modules/kazoo/kz_amqp.c @@ -46,7 +46,7 @@ #include "../../core/receive.h" #include "../../core/action.h" #include "../../core/script_cb.h" - +#include "../../core/kemi.h" #include "kz_amqp.h" #include "kz_json.h" @@ -81,6 +81,9 @@ extern int kz_amqps_verify_hostname; extern pv_spec_t kz_query_timeout_spec; +extern int kazoo_kemi_enabled; +extern str kazoo_event_callback; + const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL }; const amqp_table_t kz_amqp_empty_table = { 0, NULL }; @@ -1215,7 +1218,7 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_ int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags) { - str json_s; + str pl_s; str exchange_s; str routing_key_s; @@ -1229,7 +1232,7 @@ int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, c return -1; } - if (fixup_get_svalue(msg, (gparam_p)payload, &json_s) != 0) { + if (fixup_get_svalue(msg, (gparam_p)payload, &pl_s) != 0) { LM_ERR("cannot get json string value : %s\n", payload); return -1; } @@ -1239,19 +1242,22 @@ int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, c return -1; } - struct json_object *j = json_tokener_parse(json_s.s); - - if (j==NULL) { - LM_ERR("empty or invalid JSON payload : %.*s\n", json_s.len, json_s.s); - return -1; - } - - json_object_put(j); - - return kz_amqp_pipe_send(&exchange_s, &routing_key_s, &json_s ); + return ki_kz_amqp_publish(msg, (char*)&exchange_s, (char*)&routing_key_s, (char*)&pl_s); +}; +int ki_kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload) +{ + char *pl = ((str*)payload)->s; + struct json_object *j = json_tokener_parse(pl); -}; + if (j==NULL) { + LM_ERR("empty or invalid JSON payload : %.*s\n", ((str*)payload)->len, ((str*)payload)->s); + return -1; + } + + json_object_put(j); + return kz_amqp_pipe_send((str*)exchange, (str*)routing_key, (str*)payload); +} int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload) { @@ -1793,10 +1799,21 @@ kz_amqp_exchange_binding_ptr kz_amqp_exchange_binding_from_json(json_object* JOb } int kz_amqp_subscribe(struct sip_msg* msg, char* payload) +{ + str payload_s = STR_NULL; + + if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) { + LM_ERR("cannot get payload value\n"); + return -1; + } + + return ki_kz_amqp_subscribe(msg, (char*)(&payload_s)); +} + +int ki_kz_amqp_subscribe(struct sip_msg* msg, char* payload) { str exchange_s = STR_NULL; str queue_s = STR_NULL; - str payload_s = STR_NULL; str key_s = STR_NULL; str subkey_s = STR_NULL; int no_ack = 1; @@ -1816,12 +1833,9 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) kz_amqp_bind_ptr bind = NULL; kz_amqp_binding_ptr binding = NULL; - if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) { - LM_ERR("cannot get payload value\n"); - return -1; - } + char* pl = ((str*)payload)->s; + json_obj = kz_json_parse(pl); - json_obj = kz_json_parse(payload_s.s); if (json_obj == NULL) return -1; @@ -1873,7 +1887,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) } if(routing == NULL) { - LM_INFO("creating empty routing key : %s\n", payload_s.s); + LM_INFO("creating empty routing key : %s\n", pl); routing = kz_amqp_routing_new(""); } @@ -2331,63 +2345,99 @@ int kz_amqp_consumer_fire_event(char *eventkey) return 0; } -void kz_amqp_consumer_event(kz_amqp_consumer_delivery_ptr Evt) +static void kz_amqp_consumer_event_cfg(kz_amqp_consumer_delivery_ptr Evt, json_obj_ptr json_obj) { - json_obj_ptr json_obj = NULL; - str ev_name = {0, 0}, ev_category = {0, 0}; - char buffer[512]; - char * p; + str ev_name = {0, 0}, ev_category = {0, 0}; + char buffer[512]; + char * p; - eventData = Evt->payload; - if(Evt->routing_key) { - eventKey = Evt->routing_key->s; - } - json_obj = kz_json_parse(Evt->payload); - if (json_obj == NULL) - return; - - char* key = (Evt->event_key == NULL ? dbk_consumer_event_key.s : Evt->event_key); - char* subkey = (Evt->event_subkey == NULL ? dbk_consumer_event_subkey.s : Evt->event_subkey); + char* key = (Evt->event_key == NULL ? dbk_consumer_event_key.s : Evt->event_key); + char* subkey = (Evt->event_subkey == NULL ? dbk_consumer_event_subkey.s : Evt->event_subkey); - json_extract_field(key, ev_category); - if(ev_category.len == 0 && Evt->event_key) { - ev_category.s = Evt->event_key; - ev_category.len = strlen(Evt->event_key); - } + json_extract_field(key, ev_category); + if(ev_category.len == 0 && Evt->event_key) { + ev_category.s = Evt->event_key; + ev_category.len = strlen(Evt->event_key); + } - json_extract_field(subkey, ev_name); - if(ev_name.len == 0 && Evt->event_subkey) { - ev_name.s = Evt->event_subkey; - ev_name.len = strlen(Evt->event_subkey); - } + json_extract_field(subkey, ev_name); + if(ev_name.len == 0 && Evt->event_subkey) { + ev_name.s = Evt->event_subkey; + ev_name.len = strlen(Evt->event_subkey); + } - sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s); - for (p=buffer ; *p; ++p) *p = tolower(*p); - for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; - if(kz_amqp_consumer_fire_event(buffer) != 0) { - sprintf(buffer, "kazoo:consumer-event-%.*s",ev_category.len, ev_category.s); - for (p=buffer ; *p; ++p) *p = tolower(*p); - for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; - if(kz_amqp_consumer_fire_event(buffer) != 0) { - sprintf(buffer, "kazoo:consumer-event-%s-%s", key, subkey); - for (p=buffer ; *p; ++p) *p = tolower(*p); - for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; - if(kz_amqp_consumer_fire_event(buffer) != 0) { - sprintf(buffer, "kazoo:consumer-event-%s", key); - for (p=buffer ; *p; ++p) *p = tolower(*p); - for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; + sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s); + for (p=buffer ; *p; ++p) *p = tolower(*p); + for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; + if(kz_amqp_consumer_fire_event(buffer) != 0) { + sprintf(buffer, "kazoo:consumer-event-%.*s",ev_category.len, ev_category.s); + for (p=buffer ; *p; ++p) *p = tolower(*p); + for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; + if(kz_amqp_consumer_fire_event(buffer) != 0) { + sprintf(buffer, "kazoo:consumer-event-%s-%s", key, subkey); + for (p=buffer ; *p; ++p) *p = tolower(*p); + for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; + if(kz_amqp_consumer_fire_event(buffer) != 0) { + sprintf(buffer, "kazoo:consumer-event-%s", key); + for (p=buffer ; *p; ++p) *p = tolower(*p); + for (p=buffer ; *p; ++p) if(*p == '_') *p = '-'; if(kz_amqp_consumer_fire_event(buffer) != 0) { sprintf(buffer, "kazoo:consumer-event"); if(kz_amqp_consumer_fire_event(buffer) != 0) { LM_ERR("kazoo:consumer-event not found\n"); } } - } - } - } + } + } + } +} + +static void kz_amqp_consumer_event_kemi(void) +{ + sr_kemi_eng_t *keng = NULL; + int rtb; + + keng = sr_kemi_eng_get(); + if(keng!=NULL) { + sip_msg_t *msg; + str evrtname = str_init("kazoo:consumer-event"); + + rtb = get_route_type(); + msg = faked_msg_next(); + if(sr_kemi_route(keng, msg, EVENT_ROUTE, &kazoo_event_callback, &evrtname)<0) { + LM_ERR("error running event route kemi callback\n"); + } + + set_route_type(rtb); + } else { + LM_ERR("no event route or kemi callback found for execution\n"); + } + +} + +void kz_amqp_consumer_event(kz_amqp_consumer_delivery_ptr Evt) +{ + json_obj_ptr json_obj = NULL; + + eventData = Evt->payload; + if(Evt->routing_key) { + eventKey = Evt->routing_key->s; + } + + json_obj = kz_json_parse(Evt->payload); + if (json_obj == NULL) + return; + + if (kazoo_kemi_enabled) { + kz_amqp_consumer_event_kemi(); + } + else { + kz_amqp_consumer_event_cfg(Evt, json_obj); + } + if(json_obj) - json_object_put(json_obj); + json_object_put(json_obj); eventData = NULL; eventKey = NULL; diff --git a/src/modules/kazoo/kz_amqp.h b/src/modules/kazoo/kz_amqp.h index 0059a84ea2e..1ca3e5447fe 100644 --- a/src/modules/kazoo/kz_amqp.h +++ b/src/modules/kazoo/kz_amqp.h @@ -274,9 +274,11 @@ int kz_amqp_add_connection(modparam_t type, void* val); int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload); int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags); +int ki_kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload); int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst); int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload); int kz_amqp_subscribe(struct sip_msg* msg, char* payload); +int ki_kz_amqp_subscribe(struct sip_msg* msg, char* payload); int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue_name, char* routing_key); int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded); int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val);