Skip to content

Commit

Permalink
rtpengine: hash table to keep the selected nodes
Browse files Browse the repository at this point in the history
Shared memory hash table with global hashtable lock.
Add state maintaining the selected rtp node, for a given callid.
Hashtable entry expiration time configurable using hash_entry_tout modparam.
The actual deletion happens on the fly while insert/remove/lookup are called.
Updated doku.
  • Loading branch information
Stefan Mititelu committed Nov 5, 2015
1 parent a66e220 commit 2625ab3
Show file tree
Hide file tree
Showing 4 changed files with 523 additions and 36 deletions.
26 changes: 26 additions & 0 deletions modules/rtpengine/doc/rtpengine_admin.xml
Expand Up @@ -356,6 +356,32 @@ modparam("rtpproxy", "rtp_inst_pvar", "$avp(RTP_INSTANCE)")
</example>
</section>

<section id="rtpengine.p.hash_entry_tout">
<title><varname>hash_entry_tout</varname> (string)</title>
<para>
Number of seconds after an rtpengine hash table entry is marked for deletion.
By default, this parameter is set to 120 (seconds).
</para>
<para>
To maintain information about a selected rtp machine node, for a given call, entries are added in a hashtable of (callid, node) pairs.
When "offer" comes, insert new entry in the hastable.
When subsequent commands come, lookup callid and return chosen node.
When "delete" comes, remove old entry from hashtable.
</para>
<para>
NOTE: In the current implementation, the actual deletion happens <emphasis>on the fly</emphasis>,
while insert/remove/lookup the hastable, <emphasis>only</emphasis> for the entries in the insert/remove/lookup path.
</para>
<example>
<title>Set <varname>hash_entry_tout</varname> parameter</title>
<programlisting format="linespecific">
...
modparam("rtpproxy", "hash_entry_tout", "300")
...
</programlisting>
</example>
</section>

</section>

<section>
Expand Down
189 changes: 153 additions & 36 deletions modules/rtpengine/rtpengine.c
Expand Up @@ -78,6 +78,7 @@
#include "../../modules/tm/tm_load.h"
#include "rtpengine.h"
#include "rtpengine_funcs.h"
#include "rtpengine_hash.h"
#include "bencode.h"

MODULE_VERSION
Expand Down Expand Up @@ -187,7 +188,9 @@ static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op
static int fixup_set_id(void ** param, int param_no);
static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2);
static struct rtpp_set * select_rtpp_set(int id_set);
static struct rtpp_node *select_rtpp_node(str, int);
static struct rtpp_node *select_rtpp_node_new(str, int, int);
static struct rtpp_node *select_rtpp_node_old(str, int, int);
static struct rtpp_node *select_rtpp_node(str, int, int);
static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *);
static int get_extra_id(struct sip_msg* msg, str *id_str);

Expand Down Expand Up @@ -228,6 +231,7 @@ static pid_t mypid;
static unsigned int myseqn = 0;
static str extra_id_pv_param = {NULL, 0};
static char *setid_avp_param = NULL;
static int hash_entry_tout = 120;

static char ** rtpp_strings=0;
static int rtpp_sets=0; /*used in rtpengine_set_store()*/
Expand Down Expand Up @@ -336,6 +340,7 @@ static param_export_t params[] = {
{"rtp_inst_pvar", PARAM_STR, &rtp_inst_pv_param },
{"write_sdp_pv", PARAM_STR, &write_sdp_pvar_str },
{"read_sdp_pv", PARAM_STR, &read_sdp_pvar_str },
{"hash_entry_tout", INT_PARAM, &hash_entry_tout },
{0, 0, 0}
};

Expand Down Expand Up @@ -1440,6 +1445,14 @@ mod_init(void)
return -1;
}

/* init the hastable which keeps the call-id <-> selected_node relation */
if (!rtpengine_hash_table_init()) {
LM_ERR("rtpengine_hash_table_init() failed!\n");
return -1;
} else {
LM_DBG("rtpengine_hash_table_init() success!\n");
}

return 0;
}

Expand Down Expand Up @@ -1579,6 +1592,13 @@ static void mod_destroy(void)
}

shm_free(rtpp_set_list);

/* destroy the hastable which keeps the call-id <-> selected_node relation */
if (!rtpengine_hash_table_destroy()) {
LM_ERR("rtpengine_hash_table_destroy() failed!\n");
} else {
LM_DBG("rtpengine_hash_table_destroy() success!\n");
}
}


Expand Down Expand Up @@ -1923,16 +1943,17 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
LM_ERR("queried nodes limit reached\n");
goto error;
}
node = select_rtpp_node(callid, 1);
node = select_rtpp_node(callid, 1, op);
if (!node) {
LM_ERR("no available proxies\n");
goto error;
}

cp = send_rtpp_command(node, ng_flags.dict, &ret);
if (cp == NULL) {
node->rn_disabled = 1;
node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
}
if (cp == NULL) {
node->rn_disabled = 1;
node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
}
} while (cp == NULL);
LM_DBG("proxy reply: %.*s\n", ret, cp);

Expand All @@ -1944,25 +1965,37 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
goto error;
}

if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
if (!bencode_dictionary_get_str(resp, "error-reason", &error)) {
LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
}
else {
if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) &&
if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) &&
(strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0))
{
LM_WARN("proxy %.*s: %.*s", node->rn_url.len, node->rn_url.s , error.len, error.s);
goto select_node;
}
LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
}
goto error;
goto error;
}

if (body_out)
*body_out = body;

if (op == OP_DELETE) {
/* Delete the key<->value from the hashtable */
if (!rtpengine_hash_table_remove(&callid)) {
LM_ERR("rtpengine hash table failed to remove entry for callen=%d callid=%.*s\n",
callid.len, callid.len, callid.s);
} else {
LM_DBG("rtpengine hash table remove entry for callen=%d callid=%.*s\n",
callid.len, callid.len, callid.s);
}
}

return resp;

error:
Expand Down Expand Up @@ -2191,81 +2224,165 @@ static struct rtpp_set * select_rtpp_set(int id_set ){

return rtpp_list;
}

/*
* Main balancing routine. This does not try to keep the same proxy for
* the call if some proxies were disabled or enabled; proxy death considered
* too rare. Otherwise we should implement "mature" HA clustering, which is
* too expensive here.
* run the selection algorithm and return the new selected node
*/
static struct rtpp_node *
select_rtpp_node(str callid, int do_test)
select_rtpp_node_new(str callid, int do_test, int op)
{
unsigned sum, sumcut, weight_sum;
struct rtpp_node* node;
int was_forced;

if(!active_rtpp_set){
LM_ERR("script error -no valid set selected\n");
return NULL;
}
/* Most popular case: 1 proxy, nothing to calculate */
if (active_rtpp_set->rtpp_node_count == 1) {
node = active_rtpp_set->rn_first;
if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks())
node->rn_disabled = rtpp_test(node, 1, 0);
return node->rn_disabled ? NULL : node;
}
unsigned i, sum, sumcut, weight_sum;
int was_forced = 0;

/* XXX Use quick-and-dirty hashing algo */
for(sum = 0; callid.len > 0; callid.len--)
sum += callid.s[callid.len - 1];
for(i = 0; i < callid.len; i++)
sum += callid.s[i];
sum &= 0xff;

was_forced = 0;
retry:
weight_sum = 0;
for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {

for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
/* Try to enable if it's time to try. */
if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){
/* Try to enable if it's time to try. */
node->rn_disabled = rtpp_test(node, 1, 0);
}
if (!node->rn_disabled)

/* Select only between enabled machines */
if (!node->rn_disabled) {
weight_sum += node->rn_weight;
}
}

/* No proxies? Force all to be redetected, if not yet */
if (weight_sum == 0) {
/* No proxies? Force all to be redetected, if not yet */
if (was_forced)
if (was_forced) {
return NULL;
}

was_forced = 1;

for(node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
node->rn_disabled = rtpp_test(node, 1, 1);
}

goto retry;
}

/* sumcut here lays from 0 to weight_sum-1 */
sumcut = sum % weight_sum;

/*
* sumcut here lays from 0 to weight_sum-1.
* Scan proxy list and decrease until appropriate proxy is found.
*/
for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
/* Select only between enabled machines */
if (node->rn_disabled)
continue;

/* Found enabled machine */
if (sumcut < node->rn_weight)
goto found;

/* Update sumcut if enabled machine */
sumcut -= node->rn_weight;
}

/* No node list */
return NULL;

found:
if (do_test) {
node->rn_disabled = rtpp_test(node, node->rn_disabled, 0);
if (node->rn_disabled)
goto retry;
}

/* build hash table entry */
struct rtpengine_hash_entry *entry = shm_malloc(sizeof(struct rtpp_node));
if (shm_str_dup(&entry->callid, &callid) < 0) {
LM_ERR("rtpengine hash table fail to duplicate calllen=%d callid=%.*s",
callid.len, callid.len, callid.s);
}
entry->node = node;
entry->next = NULL;
entry->tout = get_ticks() + hash_entry_tout;

/* Insert the key<->entry from the hashtable */
if (!rtpengine_hash_table_insert(&callid, entry)) {
LM_ERR("rtpengine hash table fail to insert node=%.*s for calllen=%d callid=%.*s",
node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
} else {
LM_DBG("rtpengine hash table insert node=%.*s for calllen=%d callid=%.*s\n",
node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
}

/* Return selected node */
return node;
}

/*
* lookup the hastable (key=callid value=node) and get the old node
*/
static struct rtpp_node *
select_rtpp_node_old(str callid, int do_test, int op)
{
struct rtpp_node *node = NULL;
struct rtpengine_hash_entry *entry = NULL;

entry = rtpengine_hash_table_lookup(&callid);
if (!entry) {
LM_ERR("rtpengine hash table lookup failed to find entry for calllen=%d callid=%.*s\n",
callid.len, callid.len, callid.s);
} else {
LM_DBG("rtpengine hash table lookup find entry for calllen=%d callid=%.*s\n",
callid.len, callid.len, callid.s);
node = entry->node;
}

if (!node) {
LM_ERR("rtpengine hash table lookup failed to find node for calllen=%d callid=%.*s\n",
callid.len, callid.len, callid.s);
return NULL;
} else {
LM_DBG("rtpengine hash table lookup find node=%.*s for calllen=%d callid=%.*s\n",
node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
}

// if node broke, don't send any message
if (!node->rn_disabled) {
return node;
} else {
LM_DBG("rtpengine hash table lookup find node=%.*s for calllen=%d callid=%.*s, which is disabled!\n",
node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
}

return NULL;
}

/*
* Main balancing routine. This DO try to keep the same proxy for
* the call if some proxies were disabled or enabled (e.g. kamctl command)
*/
static struct rtpp_node *
select_rtpp_node(str callid, int do_test, int op)
{
if(!active_rtpp_set) {
LM_ERR("script error - no valid set selected\n");
return NULL;
}

// calculate and choose a node
if (op == OP_OFFER) {
// run the selection algorithm
return select_rtpp_node_new(callid, do_test, op);
} else {
// lookup the hastable (key=callid value=node) and get the old node
return select_rtpp_node_old(callid, do_test, op);
}
}

static int
get_extra_id(struct sip_msg* msg, str *id_str) {
if(msg==NULL || extra_id_pv==NULL || id_str==NULL) {
Expand Down

0 comments on commit 2625ab3

Please sign in to comment.