From eaecb97dd7baf3ea220196e799c493b3ca9f5594 Mon Sep 17 00:00:00 2001 From: Julien Chavanton Date: Fri, 7 Dec 2018 10:52:31 -0800 Subject: [PATCH] mqueue: new RPC command mqueue.fetch --- src/modules/mqueue/doc/mqueue.xml | 9 ++++ src/modules/mqueue/doc/mqueue_admin.xml | 22 +++++++++ src/modules/mqueue/mqueue_api.c | 38 +++++++++++++++ src/modules/mqueue/mqueue_api.h | 3 +- src/modules/mqueue/mqueue_mod.c | 64 +++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 1 deletion(-) diff --git a/src/modules/mqueue/doc/mqueue.xml b/src/modules/mqueue/doc/mqueue.xml index 53ce810056b..771d8216341 100644 --- a/src/modules/mqueue/doc/mqueue.xml +++ b/src/modules/mqueue/doc/mqueue.xml @@ -35,11 +35,20 @@ osas@voipembedded.com VoIP Embedded, Inc. + + Julien + Chavanton + jchavanton@gmail.com + 2010 Elena-Ramona Modroiu (asipto.com) + + 2018 + Julien chavanton, Flowroute + diff --git a/src/modules/mqueue/doc/mqueue_admin.xml b/src/modules/mqueue/doc/mqueue_admin.xml index 0ab18f0be09..42542a392cc 100644 --- a/src/modules/mqueue/doc/mqueue_admin.xml +++ b/src/modules/mqueue/doc/mqueue_admin.xml @@ -242,6 +242,28 @@ xlog("L_INFO", "Size of queue is: $var(q_size)\n"); ... &kamcmd; mqueue.get_size xyz ... + + + +
+ mqueue.fetch + + Fetch a key-value pair from a memory queue. + + + Parameters: + + + + name - the name of memory queue + + + + <function>mqueue.fetch</function> usage + +... +&kamcmd; mqueue.fetch xyz +...
diff --git a/src/modules/mqueue/mqueue_api.c b/src/modules/mqueue/mqueue_api.c index 4b961c44b99..d83c3ce6cef 100644 --- a/src/modules/mqueue/mqueue_api.c +++ b/src/modules/mqueue/mqueue_api.c @@ -424,6 +424,44 @@ int pv_get_mqk(struct sip_msg *msg, pv_param_t *param, return pv_get_strval(msg, param, res, &mp->item->key); } +/** + * + */ +str* get_mqk(str *in) +{ + mq_pv_t *mp = NULL; + + if (mq_head_get(in) == NULL) + { + LM_ERR("mqueue not found: %.*s\n", in->len, in->s); + return NULL; + } + + mp = mq_pv_get(in); + if(mp==NULL || mp->item==NULL || mp->item->key.len<=0) + return NULL; + return &mp->item->key; +} + +/** + * + */ +str* get_mqv(str *in) +{ + mq_pv_t *mp = NULL; + + if (mq_head_get(in) == NULL) + { + LM_ERR("mqueue not found: %.*s\n", in->len, in->s); + return NULL; + } + + mp = mq_pv_get(in); + if(mp==NULL || mp->item==NULL || mp->item->val.len<=0) + return NULL; + return &mp->item->val; +} + /** * */ diff --git a/src/modules/mqueue/mqueue_api.h b/src/modules/mqueue/mqueue_api.h index 110bcd03a5a..79996805f74 100644 --- a/src/modules/mqueue/mqueue_api.h +++ b/src/modules/mqueue/mqueue_api.h @@ -35,7 +35,8 @@ int pv_get_mqv(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); int pv_get_mq_size(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); - +str* get_mqk(str *name); +str* get_mqv(str *name); int mq_head_defined(void); void mq_destroy(void); int mq_head_add(str *name, int msize); diff --git a/src/modules/mqueue/mqueue_mod.c b/src/modules/mqueue/mqueue_mod.c index 66ff513ea3e..6d46796ac2e 100644 --- a/src/modules/mqueue/mqueue_mod.c +++ b/src/modules/mqueue/mqueue_mod.c @@ -310,8 +310,72 @@ static const char* mqueue_rpc_get_size_doc[2] = { 0 }; +static void mqueue_rpc_fetch(rpc_t* rpc, void* ctx) +{ + str mqueue_name; + int mqueue_sz = 0; + int ret = 0; + void *th; + str *key = NULL; + str *val = NULL; + + if (rpc->scan(ctx, "S", &mqueue_name) < 1) { + rpc->fault(ctx, 500, "No queue name"); + return; + } + + if(mqueue_name.len <= 0 || mqueue_name.s == NULL) { + LM_ERR("bad mqueue name\n"); + rpc->fault(ctx, 500, "Invalid queue name"); + return; + } + + mqueue_sz = _mq_get_csize(&mqueue_name); + + if(mqueue_sz < 0) { + LM_ERR("no such mqueue\n"); + rpc->fault(ctx, 500, "No such queue"); + return; + } + + ret = mq_head_fetch(&mqueue_name); + if(ret == -2) { + rpc->fault(ctx, 404, "Empty queue"); + return; + } else if(ret <0) { + LM_ERR("mqueue fetch\n"); + rpc->fault(ctx, 500, "Unexpected error (fetch)"); + return; + } + + key = get_mqk(&mqueue_name); + val = get_mqv(&mqueue_name); + + if(!val || !key) { + rpc->fault(ctx, 500, "Unexpected error (result)"); + return; + } + + /* add entry node */ + if(rpc->add(ctx, "{", &th) < 0) { + rpc->fault(ctx, 500, "Internal error root reply"); + return; + } + + if (rpc->struct_add(th, "SS", "key", key, "val", val) < 0) { + rpc->fault(ctx, 500, "Server error appending (key/val)"); + return; + } +} + +static const char* mqueue_rpc_fetch_doc[2] = { + "Fetch an element from the queue.", + 0 +}; + rpc_export_t mqueue_rpc[] = { {"mqueue.get_size", mqueue_rpc_get_size, mqueue_rpc_get_size_doc, 0}, + {"mqueue.fetch", mqueue_rpc_fetch, mqueue_rpc_fetch_doc, 0}, {0, 0, 0, 0} };