From d5c602454a9af6ebce852f44dc88c4efa4bf036f Mon Sep 17 00:00:00 2001 From: lazedo Date: Tue, 27 Jun 2017 23:00:43 +0100 Subject: [PATCH] kazoo: add $kzRK to accdess routing key of the payload --- src/modules/kazoo/kazoo.c | 1 + src/modules/kazoo/kz_amqp.c | 35 +++++++++++++++++++++++------------ src/modules/kazoo/kz_amqp.h | 1 + 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/modules/kazoo/kazoo.c b/src/modules/kazoo/kazoo.c index 2f2d2b9bd55..40535aa82be 100644 --- a/src/modules/kazoo/kazoo.c +++ b/src/modules/kazoo/kazoo.c @@ -129,6 +129,7 @@ static tr_export_t mod_trans[] = { static pv_export_t kz_mod_pvs[] = { {{"kzR", (sizeof("kzR")-1)}, PVT_OTHER, kz_pv_get_last_query_result, 0, 0, 0, 0, 0}, {{"kzE", (sizeof("kzE")-1)}, PVT_OTHER, kz_pv_get_event_payload, 0, 0, 0, 0, 0}, + {{"kzRK", (sizeof("kzRK")-1)}, PVT_OTHER, kz_pv_get_event_routing_key, 0, 0, 0, 0, 0}, { {0, 0}, 0, 0, 0, 0, 0, 0, 0 } }; diff --git a/src/modules/kazoo/kz_amqp.c b/src/modules/kazoo/kz_amqp.c index 00e6cb7f9fb..af6223d6934 100644 --- a/src/modules/kazoo/kz_amqp.c +++ b/src/modules/kazoo/kz_amqp.c @@ -2281,12 +2281,18 @@ int kz_amqp_send_receive(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd ) } char* eventData = NULL; +char* eventKey = NULL; int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) { return eventData == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventData); } +int kz_pv_get_event_routing_key(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) +{ + return eventKey == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventKey); +} + int kz_amqp_consumer_fire_event(char *eventkey) { sip_msg_t *fmsg; @@ -2314,32 +2320,35 @@ int kz_amqp_consumer_fire_event(char *eventkey) return 0; } -void kz_amqp_consumer_event(char *payload, char* event_key, char* event_subkey) +void kz_amqp_consumer_event(kz_amqp_consumer_delivery_ptr Evt) { json_obj_ptr json_obj = NULL; str ev_name = {0, 0}, ev_category = {0, 0}; char buffer[512]; char * p; - eventData = payload; + eventData = Evt->payload; + if(Evt->routing_key) { + eventKey = Evt->routing_key->s; + } - json_obj = kz_json_parse(payload); + json_obj = kz_json_parse(Evt->payload); if (json_obj == NULL) return; - char* key = (event_key == NULL ? dbk_consumer_event_key.s : event_key); - char* subkey = (event_subkey == NULL ? dbk_consumer_event_subkey.s : event_subkey); + char* key = (Evt->event_key == NULL ? dbk_consumer_event_key.s : Evt->event_key); + char* subkey = (Evt->event_subkey == NULL ? dbk_consumer_event_subkey.s : Evt->event_subkey); json_extract_field(key, ev_category); - if(ev_category.len == 0 && event_key) { - ev_category.s = event_key; - ev_category.len = strlen(event_key); + if(ev_category.len == 0 && Evt->event_key) { + ev_category.s = Evt->event_key; + ev_category.len = strlen(Evt->event_key); } json_extract_field(subkey, ev_name); - if(ev_name.len == 0 && event_subkey) { - ev_name.s = event_subkey; - ev_name.len = strlen(event_subkey); + if(ev_name.len == 0 && Evt->event_subkey) { + ev_name.s = Evt->event_subkey; + ev_name.len = strlen(Evt->event_subkey); } sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s); @@ -2370,6 +2379,7 @@ void kz_amqp_consumer_event(char *payload, char* event_key, char* event_subkey) json_object_put(json_obj); eventData = NULL; + eventKey = NULL; } int check_timeout(struct timeval *now, struct timeval *start, struct timeval *timeout) @@ -3190,7 +3200,8 @@ void kz_amqp_consumer_worker_cb(int fd, short event, void *arg) lock_release(&cmd->lock); } } else { - kz_amqp_consumer_event(Evt->payload, Evt->event_key, Evt->event_subkey); +// kz_amqp_consumer_event(Evt->payload, Evt->event_key, Evt->event_subkey); + kz_amqp_consumer_event(Evt); } kz_amqp_free_consumer_delivery(Evt); diff --git a/src/modules/kazoo/kz_amqp.h b/src/modules/kazoo/kz_amqp.h index b4a88fffb70..cb844ca8463 100644 --- a/src/modules/kazoo/kz_amqp.h +++ b/src/modules/kazoo/kz_amqp.h @@ -293,6 +293,7 @@ int kz_amqp_consumer_worker_proc(int cmd_pipe); int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection); int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); +int kz_pv_get_event_routing_key(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); int kz_pv_get_connection_host(struct sip_msg *msg, pv_param_t *param, pv_value_t *res);