Skip to content

Commit

Permalink
tm: implemented t_uac_wait_block rpc command
Browse files Browse the repository at this point in the history
- it blocks while waiting for the reply to return the code and reason
text

(cherry picked from commit dc5a548)
  • Loading branch information
miconda committed Feb 9, 2021
1 parent c3f12eb commit ff34dc4
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 6 deletions.
263 changes: 257 additions & 6 deletions src/modules/tm/rpc_uac.c
Expand Up @@ -23,17 +23,202 @@
#include "../../core/ut.h"
#include "../../core/parser/parse_from.h"
#include "../../core/str_list.h"
#include "../../core/timer_proc.h"
#include "../../core/utils/sruid.h"
#include "ut.h"
#include "dlg.h"
#include "uac.h"
#include "callid.h"



/* RPC substitution char (used in rpc_t_uac headers) */
#define SUBST_CHAR '!'

#define TM_RPC_RESPONSE_LIFETIME 300
#define TM_RPC_RESPONSE_TIMERSTEP 10

void tm_rpc_response_list_clean(unsigned int ticks, void *param);

typedef struct tm_rpc_response {
str ruid;
int flags;
int rcode;
str rtext;
time_t rtime;
struct tm_rpc_response *next;
} tm_rpc_response_t;

typedef struct tm_rpc_response_list {
gen_lock_t rlock;
tm_rpc_response_t *rlist;
} tm_rpc_response_list_t;

static tm_rpc_response_list_t *_tm_rpc_response_list = NULL;

static sruid_t _tm_rpc_sruid;

/**
*
*/
int tm_rpc_response_list_init(void)
{
if(_tm_rpc_response_list != NULL) {
return 0;
}
if(sruid_init(&_tm_rpc_sruid, '-', "tmrp", SRUID_INC)<0) {
LM_ERR("failed to init sruid\n");
return -1;
}
if(sr_wtimer_add(tm_rpc_response_list_clean, 0,
TM_RPC_RESPONSE_TIMERSTEP)<0) {
LM_ERR("failed to register timer routine\n");
return -1;
}
_tm_rpc_response_list = shm_malloc(sizeof(tm_rpc_response_list_t));
if(_tm_rpc_response_list == NULL) {
SHM_MEM_ERROR;
return -1;
}

memset(_tm_rpc_response_list, 0, sizeof(tm_rpc_response_list_t));

lock_init(&_tm_rpc_response_list->rlock);

return 0;
}

/**
*
*/
int tm_rpc_response_list_destroy(void)
{
tm_rpc_response_t *rl0 = NULL;
tm_rpc_response_t *rl1 = NULL;

if(_tm_rpc_response_list == NULL) {
return 0;
}

rl1 = _tm_rpc_response_list->rlist;

while(rl1!=NULL) {
rl0 = rl1;
rl1 = rl1->next;
shm_free(rl0);
}
lock_destroy(&_tm_rpc_response_list->rlock);
shm_free(_tm_rpc_response_list);
_tm_rpc_response_list = NULL;

return 0;
}

/**
*
*/
int tm_rpc_response_list_add(str *ruid, int rcode, str *rtext)
{
size_t rsize = 0;
tm_rpc_response_t *ri = NULL;
if(_tm_rpc_response_list == NULL) {
LM_ERR("rpc response list not initialized\n");
return -1;
}

rsize = sizeof(tm_rpc_response_t) + ruid->len + 2
+ ((rtext!=NULL)?rtext->len:0);

ri = (tm_rpc_response_t*)shm_malloc(rsize);
if(ri==NULL) {
SHM_MEM_ERROR;
return -1;
}
memset(ri, 0, rsize);

ri->ruid.s = (char*)ri + sizeof(tm_rpc_response_t);
ri->ruid.len = ruid->len;
memcpy(ri->ruid.s, ruid->s, ruid->len);
ri->rtime = time(NULL);
ri->rcode = rcode;
if(rtext!=NULL) {
ri->rtext.s = ri->ruid.s + ri->ruid.len + 1;
ri->rtext.len = rtext->len;
memcpy(ri->rtext.s, rtext->s, rtext->len);
}
lock_get(&_tm_rpc_response_list->rlock);
ri->next = _tm_rpc_response_list->rlist;
_tm_rpc_response_list->rlist = ri;
lock_release(&_tm_rpc_response_list->rlock);

return 0;
}

/**
*
*/
tm_rpc_response_t *tm_rpc_response_list_get(str *ruid)
{
tm_rpc_response_t *ri0 = NULL;
tm_rpc_response_t *ri1 = NULL;

if(_tm_rpc_response_list == NULL) {
LM_ERR("rpc response list not initialized\n");
return NULL;
}

lock_get(&_tm_rpc_response_list->rlock);
ri1 = _tm_rpc_response_list->rlist;
while(ri1!=NULL) {
if(ri1->ruid.len==ruid->len
&& memcmp(ri1->ruid.s, ruid->s, ruid->len)==0) {
if(ri0 == NULL) {
_tm_rpc_response_list->rlist = ri1->next;
} else {
ri0->next = ri1->next;
}
lock_release(&_tm_rpc_response_list->rlock);
return ri1;
}
ri0 = ri1;
ri1 = ri1->next;
}
lock_release(&_tm_rpc_response_list->rlock);
return NULL;
}

/**
*
*/
void tm_rpc_response_list_clean(unsigned int ticks, void *param)
{
tm_rpc_response_t *ri0 = NULL;
tm_rpc_response_t *ri1 = NULL;
time_t tnow;

if(_tm_rpc_response_list == NULL) {
return;
}

tnow = time(NULL);
lock_get(&_tm_rpc_response_list->rlock);
ri1 = _tm_rpc_response_list->rlist;
while(ri1!=NULL) {
if(ri1->rtime < tnow - TM_RPC_RESPONSE_LIFETIME) {
if(ri0 == NULL) {
_tm_rpc_response_list->rlist = ri1->next;
} else {
ri0->next = ri1->next;
}
LM_DBG("freeing item [%.*s]\n", ri1->ruid.len, ri1->ruid.s);
shm_free(ri1);
ri1 = ri0->next;
} else {
ri0 = ri1;
ri1 = ri1->next;
}
}
lock_release(&_tm_rpc_response_list->rlock);
}

/** make sure the rpc user created the msg properly.
* Make sure that the FIFO user created the message
Expand Down Expand Up @@ -393,6 +578,25 @@ static void rpc_uac_callback(struct cell* t, int type, struct tmcb_params* ps)
}


/* t_uac callback */
static void rpc_uac_block_callback(struct cell* t, int type,
struct tmcb_params* ps)
{
str *ruid;
str rtext;

ruid = (str*)(*ps->param);
*ps->param=0;
if (ps->rpl==FAKED_REPLY) {
rtext.s = error_text(ps->code);
rtext.len = strlen(rtext.s);
} else {
rtext = ps->rpl->first_line.u.reply.reason;
}
tm_rpc_response_list_add(ruid, ps->code, &rtext);
shm_free(ruid);
}


/** rpc t_uac version-
* It expects the following list of strings as parameters:
Expand Down Expand Up @@ -433,11 +637,15 @@ static void rpc_t_uac(rpc_t* rpc, void* c, int reply_wait)
dlg_t dlg;
uac_req_t uac_req;
rpc_delayed_ctx_t* dctx;
str *ruid = NULL;
tm_rpc_response_t *ritem = NULL;
int rcount = 0;
void* th = NULL;

body.s=0;
body.len=0;
dctx=0;
if (reply_wait && (rpc->capabilities == 0 ||
if (reply_wait==1 && (rpc->capabilities == 0 ||
!(rpc->capabilities(c) & RPC_DELAYED_REPLY))) {
rpc->fault(c, 600, "Reply wait/async mode not supported"
" by this rpc transport");
Expand Down Expand Up @@ -538,7 +746,7 @@ static void rpc_t_uac(rpc_t* rpc, void* c, int reply_wait)
if(hfb.s!=NULL && hfb.len>0) uac_req.headers=&hfb;
uac_req.body=body.len?&body:0;
uac_req.dialog=&dlg;
if (reply_wait){
if (reply_wait==1){
dctx=rpc->delayed_ctx_new(c);
if (dctx==0){
rpc->fault(c, 500, "internal error: failed to create context");
Expand All @@ -551,22 +759,57 @@ static void rpc_t_uac(rpc_t* rpc, void* c, int reply_wait)
want to still send a reply */
rpc=&dctx->rpc;
c=dctx->reply_ctx;
} else if (reply_wait==2) {
sruid_next(&_tm_rpc_sruid);
uac_req.cb = rpc_uac_block_callback;
ruid = shm_str_dup_block(&_tm_rpc_sruid.uid);
uac_req.cbp = ruid;
uac_req.cb_flags = TMCB_LOCAL_COMPLETED;
}

ret = t_uac(&uac_req);

if (ret <= 0) {
err_ret = err2reason_phrase(ret, &sip_error, err_buf,
sizeof(err_buf), "RPC/UAC") ;
if (err_ret > 0 )
{
if (err_ret > 0 ) {
rpc->fault(c, sip_error, "%s", err_buf);
} else {
rpc->fault(c, 500, "RPC/UAC error");
}
if (dctx)
if (dctx) {
rpc->delayed_ctx_close(dctx);
}
if(ruid) {
shm_free(ruid);
}
goto error01;
}

if(reply_wait==2) {
while(ritem==NULL && rcount<800) {
sleep_us(100000);
rcount++;
ritem = tm_rpc_response_list_get(&_tm_rpc_sruid.uid);
}
if(ritem == NULL) {
rpc->fault(c, 500, "No response");
} else {
/* add structure node */
if (rpc->add(c, "{", &th) < 0) {
rpc->fault(c, 500, "Structure error");
} else {
if(rpc->struct_add(th, "dS",
"code", ritem->rcode,
"text", &ritem->rtext)<0) {
rpc->fault(c, 500, "Fields error");
return;
}
}
shm_free(ritem);
}
}

error01:
if (hfb.s) pkg_free(hfb.s);
error:
Expand All @@ -590,6 +833,14 @@ void rpc_t_uac_wait(rpc_t* rpc, void* c)
rpc_t_uac(rpc, c, 1);
}

/** t_uac with blocking for reply waiting.
* @see rpc_t_uac.
*/
void rpc_t_uac_wait_block(rpc_t* rpc, void* c)
{
rpc_t_uac(rpc, c, 2);
}


static int t_uac_check_msg(struct sip_msg* msg,
str* method, str* body,
Expand Down
4 changes: 4 additions & 0 deletions src/modules/tm/rpc_uac.h
Expand Up @@ -21,8 +21,12 @@
#include "../../core/rpc.h"


int tm_rpc_response_list_init(void);
int tm_rpc_response_list_destroy(void);

void rpc_t_uac_start(rpc_t* rpc, void* c);
void rpc_t_uac_wait(rpc_t* rpc, void* c);
void rpc_t_uac_wait_block(rpc_t* rpc, void* c);

int t_uac_send(str *method, str *ruri, str *nexthop, str *send_socket,
str *headers, str *body);
Expand Down
13 changes: 13 additions & 0 deletions src/modules/tm/tm.c
Expand Up @@ -723,6 +723,11 @@ static int mod_init(void)
return -1;
}

if(tm_rpc_response_list_init()<0) {
LM_ERR("failed to init rpc\n");
return -1;
}

if(on_sl_reply_name.s!=NULL && on_sl_reply_name.len>0) {
keng = sr_kemi_eng_get();
if(keng==NULL) {
Expand Down Expand Up @@ -2767,6 +2772,13 @@ static const char* rpc_t_uac_wait_doc[2] = {
0
};

static const char* rpc_t_uac_wait_block_doc[2] = {
"starts a tm uac and waits for the final reply in blocking mode, using a"
" list of string parameters: method, ruri, dst_uri send_sock, headers"
" (CRLF separated) and body (optional)",
0
};

static const char* tm_rpc_list_doc[2] = {
"List transactions.",
0
Expand All @@ -2787,6 +2799,7 @@ static rpc_export_t tm_rpc[] = {
{"tm.hash_stats", tm_rpc_hash_stats, tm_rpc_hash_stats_doc, 0},
{"tm.t_uac_start", rpc_t_uac_start, rpc_t_uac_start_doc, 0 },
{"tm.t_uac_wait", rpc_t_uac_wait, rpc_t_uac_wait_doc, RET_ARRAY},
{"tm.t_uac_wait_block", rpc_t_uac_wait_block, rpc_t_uac_wait_block_doc, 0},
{"tm.list", tm_rpc_list, tm_rpc_list_doc, RET_ARRAY},
{"tm.clean", tm_rpc_clean, tm_rpc_clean_doc, 0},
{0, 0, 0, 0}
Expand Down

0 comments on commit ff34dc4

Please sign in to comment.