From aed3a523ffd68bd2fdac78b9be68335a72aa78ce Mon Sep 17 00:00:00 2001 From: frederic Date: Tue, 16 Feb 2021 10:27:42 +0100 Subject: [PATCH] topos_redis: support SUBSCRIBE dialog --- src/modules/topos_redis/doc/topos_redis.xml | 9 +++ src/modules/topos_redis/topos_redis_storage.c | 75 ++++++++++++++++--- 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/src/modules/topos_redis/doc/topos_redis.xml b/src/modules/topos_redis/doc/topos_redis.xml index f6a5314d825..4634eb7ebdc 100644 --- a/src/modules/topos_redis/doc/topos_redis.xml +++ b/src/modules/topos_redis/doc/topos_redis.xml @@ -23,6 +23,11 @@ Mierla miconda@gmail.com + + Frederic + Gaisnon + frederic.gaisnon@gmail.com + 2017 @@ -32,6 +37,10 @@ 2017 flowroute.com + + 2021 + MomentTech + diff --git a/src/modules/topos_redis/topos_redis_storage.c b/src/modules/topos_redis/topos_redis_storage.c index 47205c8f133..93f02da0fd3 100644 --- a/src/modules/topos_redis/topos_redis_storage.c +++ b/src/modules/topos_redis/topos_redis_storage.c @@ -265,7 +265,12 @@ int tps_redis_insert_dialog(tps_data_t *td) argvlen[argc] = rkey.len; argc++; - lval = (unsigned long)_tps_api.get_dialog_expire(); + if(td->s_method.len==9 && strncmp(td->s_method.s, "SUBSCRIBE", 9)==0) { + lval = (unsigned long)td->expires; + } else { + lval = (unsigned long)_tps_api.get_dialog_expire(); + } + if(lval==0) { return 0; } @@ -297,7 +302,7 @@ int tps_redis_clean_dialogs(void) /** * */ -int tps_redis_insert_invite_branch(tps_data_t *td) +int tps_redis_insert_initial_method_branch(tps_data_t *td) { char* argv[TPS_REDIS_NR_KEYS]; size_t argvlen[TPS_REDIS_NR_KEYS]; @@ -328,8 +333,9 @@ int tps_redis_insert_invite_branch(tps_data_t *td) rp = _tps_redis_cbuf; rkey.len = snprintf(rp, TPS_REDIS_DATA_SIZE-128, - "%.*sINVITE:%.*s:%.*s", + "%.*s%.*s:%.*s:%.*s", _tps_redis_bprefix.len, _tps_redis_bprefix.s, + td->s_method.len, td->s_method.s, td->a_callid.len, td->a_callid.s, td->b_tag.len, td->b_tag.s); if(rkey.len<0 || rkey.len>=TPS_REDIS_DATA_SIZE-128) { @@ -360,8 +366,8 @@ int tps_redis_insert_invite_branch(tps_data_t *td) } return -1; } - LM_DBG("inserting invite branch record for [%.*s] with argc %d\n", - rkey.len, rkey.s, argc); + LM_DBG("inserting %.*s branch record for [%.*s] with argc %d\n", + td->s_method.len, td->s_method.s,rkey.len, rkey.s, argc); freeReplyObject(rrpl); @@ -552,7 +558,7 @@ int tps_redis_clean_branches(void) /** * */ -int tps_redis_load_invite_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd) +int tps_redis_load_initial_method_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd) { char* argv[TPS_REDIS_NR_KEYS]; size_t argvlen[TPS_REDIS_NR_KEYS]; @@ -588,8 +594,9 @@ int tps_redis_load_invite_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd) rp = _tps_redis_cbuf; rkey.len = snprintf(rp, TPS_REDIS_DATA_SIZE, - "%.*sINVITE:%.*s:%.*s", + "%.*s%.*s:%.*s:%.*s", _tps_redis_bprefix.len, _tps_redis_bprefix.s, + md->s_method.len, md->s_method.s, md->a_callid.len, md->a_callid.s, md->b_tag.len, md->b_tag.s); if(rkey.len<0 || rkey.len>=TPS_REDIS_DATA_SIZE) { @@ -733,9 +740,9 @@ int tps_redis_load_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd, /* load same transaction using Via branch */ xvbranch1 = &md->x_vbranch1; } else { - /* load corresponding INVITE transaction using call-id + to-tag */ - if(tps_redis_load_invite_branch(msg, md, &id)<0) { - LM_ERR("failed to load the INVITE branch value\n"); + /* load corresponding INVITE or SUBSCRIBE transaction using call-id + to-tag */ + if(tps_redis_load_initial_method_branch(msg, md, &id)<0) { + LM_ERR("failed to load the %.*s branch value\n", md->s_method.len, md->s_method.s); return -1; } xvbranch1 = &id.x_vbranch1; @@ -1122,11 +1129,18 @@ int tps_redis_update_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd, } if(md->s_method.len==6 && strncmp(md->s_method.s, "INVITE", 6)==0) { - if(tps_redis_insert_invite_branch(md)<0) { + if(tps_redis_insert_initial_method_branch(md)<0) { LM_ERR("failed to insert INVITE extra branch data\n"); return -1; } } + if(md->s_method.len==9 && strncmp(md->s_method.s, "SUBSCRIBE", 9)==0) { + if(tps_redis_insert_initial_method_branch(md)<0) { + LM_ERR("failed to insert SUBSCRIBE extra branch data\n"); + return -1; + } + } + rsrv = _tps_redis_api.get_server(&_topos_redis_serverid); if(rsrv==NULL) { LM_ERR("cannot find redis server [%.*s]\n", @@ -1209,6 +1223,7 @@ int tps_redis_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd, redisc_server_t *rsrv = NULL; redisReply *rrpl = NULL; int32_t liflags; + unsigned long lval = 0; if(sd->a_uuid.len<=0 && sd->b_uuid.len<=0) { LM_INFO("no uuid for this message\n"); @@ -1297,6 +1312,11 @@ int tps_redis_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd, } } + if (mode & TPS_DBU_TIME) { + lval = (unsigned long)time(NULL); + TPS_REDIS_SET_ARGN(lval, rp, &rval, argc, &td_key_rectime, argv, argvlen); + } + if(argc<=2) { return 0; } @@ -1313,6 +1333,37 @@ int tps_redis_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd, rkey.len, rkey.s, argc); freeReplyObject(rrpl); + if (mode & TPS_DBU_TIME) { + /* reset expire for the key */ + argc = 0; + + argv[argc] = "EXPIRE"; + argvlen[argc] = 6; + argc++; + + argv[argc] = rkey.s; + argvlen[argc] = rkey.len; + argc++; + + lval = (unsigned long)md->expires; + if(lval==0) { + return 0; + } + TPS_REDIS_SET_ARGNV(lval, rp, &rval, argc, argv, argvlen); + + rrpl = _tps_redis_api.exec_argv(rsrv, argc, (const char **)argv, argvlen); + if(rrpl==NULL) { + LM_ERR("failed to execute expire redis command\n"); + if(rsrv->ctxRedis->err) { + LM_ERR("redis error: %s\n", rsrv->ctxRedis->errstr); + } + return -1; + } + LM_DBG("expire %lu set on dialog record for [%.*s] with argc %d\n", lval, + rkey.len, rkey.s, argc); + freeReplyObject(rrpl); + } + return 0; } @@ -1333,7 +1384,7 @@ int tps_redis_end_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd) int32_t liflags; unsigned long lval = 0; - if(md->s_method_id != METHOD_BYE) { + if((md->s_method_id != METHOD_BYE) && !((md->s_method_id == METHOD_SUBSCRIBE) && (md->expires == 0))) { return 0; }