Skip to content

Commit

Permalink
rtpengine: fix queried_nodes_limit logic
Browse files Browse the repository at this point in the history
Right now, even if the selected node returns error, the same node is still
selected and still queried for maximum of queried_nodes_limit times.

Don't retry to query the previous nodes, upon error returned (i.e.
Parallel session limit reached"). Instead, remember the queried nodes and try
to select between un-queried ones. Thus, rtpengine_offer() will select a proper,
available node which will be inserted in the hashtable and further used.
  • Loading branch information
Stefan Mititelu committed Feb 23, 2016
1 parent b531e17 commit 131d899
Showing 1 changed file with 47 additions and 14 deletions.
61 changes: 47 additions & 14 deletions modules/rtpengine/rtpengine.c
Expand Up @@ -197,9 +197,10 @@ static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op
static int fixup_set_id(void ** param, int param_no);
static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2);
static struct rtpp_set * select_rtpp_set(int id_set);
static struct rtpp_node *select_rtpp_node_new(str, str, int);
static struct rtpp_node *select_rtpp_node_new(str, str, int, struct rtpp_node **, int);
static struct rtpp_node *select_rtpp_node_old(str, str, int);
static struct rtpp_node *select_rtpp_node(str, str, int);
static struct rtpp_node *select_rtpp_node(str, str, int, struct rtpp_node **, int);
static int is_queried_node(struct rtpp_node *, struct rtpp_node **, int);
static int build_rtpp_socks(unsigned int current_rtpp_no);
static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *);
static int get_extra_id(struct sip_msg* msg, str *id_str);
Expand Down Expand Up @@ -234,6 +235,7 @@ static int rtpengine_disable_tout = 60;
static int rtpengine_allow_op = 0;
static int rtpengine_retr = 5;
static int rtpengine_tout_ms = 1000;
static struct rtpp_node **queried_nodes_ptr = NULL;
static int queried_nodes_limit = MAX_RTPP_TRIED_NODES;
static pid_t mypid;
static unsigned int myseqn = 0;
Expand Down Expand Up @@ -387,6 +389,24 @@ struct module_exports exports = {
child_init
};

/* check if the node is already queried */
static int is_queried_node(struct rtpp_node *node, struct rtpp_node **queried_nodes_ptr, int queried_nodes)
{
int i;

if (!queried_nodes_ptr) {
return 0;
}

for (i = 0; i < queried_nodes; i++) {
if (node == queried_nodes_ptr[i]) {
return 1;
}
}

return 0;
}

/* hide the node from display and disable it permanent */
int rtpengine_delete_node(struct rtpp_node *rtpp_node)
{
Expand Down Expand Up @@ -1906,10 +1926,17 @@ child_init(int rank)

rtpp_socks = (int*)pkg_malloc(sizeof(int)*(rtpp_socks_size));
if (!rtpp_socks) {
LM_ERR("no more pkg memory for rtpp_socks\n");
return -1;
}

// vector of pointers to queried nodes
queried_nodes_ptr = (struct rtpp_node**)pkg_malloc(queried_nodes_limit * sizeof(struct rtpp_node*));
if (!queried_nodes_ptr) {
LM_ERR("no more pkg memory for queried_nodes_ptr\n");
return -1;
}
memset(queried_nodes_ptr, 0, queried_nodes_limit * sizeof(struct rtpp_node*));

/* Iterate known RTP proxies - create sockets */
if (rtpp_socks_size) {
build_rtpp_socks(rtpp_socks_size);
Expand Down Expand Up @@ -2208,7 +2235,7 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
bencode_item_t *item, *resp;
str callid = STR_NULL, from_tag = STR_NULL, to_tag = STR_NULL, viabranch = STR_NULL;
str body = STR_NULL, error = STR_NULL;
int ret, queried_nodes;
int ret, queried_nodes = 0;
struct rtpp_node *node;
char *cp;
pv_value_t pv_val;
Expand Down Expand Up @@ -2333,14 +2360,14 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
if(msg->id != current_msg_id)
active_rtpp_set = default_rtpp_set;

queried_nodes = 0;
select_node:
do {
if (++queried_nodes > queried_nodes_limit) {
if (queried_nodes >= queried_nodes_limit) {
LM_ERR("queried nodes limit reached\n");
goto error;
}
node = select_rtpp_node(callid, viabranch, 1);

node = select_rtpp_node(callid, viabranch, 1, queried_nodes_ptr, queried_nodes);
if (!node) {
LM_ERR("no available proxies\n");
goto error;
Expand All @@ -2351,7 +2378,10 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
node->rn_disabled = 1;
node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
}

queried_nodes_ptr[queried_nodes++] = node;
} while (cp == NULL);

LM_DBG("proxy reply: %.*s\n", ret, cp);

set_rtp_inst_pvar(msg, &node->rn_url);
Expand All @@ -2366,8 +2396,7 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
if (!bencode_dictionary_get_str(resp, "error-reason", &error)) {
LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
}
else {
} else {
if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) &&
(strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0))
{
Expand Down Expand Up @@ -2679,7 +2708,7 @@ static struct rtpp_set * select_rtpp_set(int id_set ){
* run the selection algorithm and return the new selected node
*/
static struct rtpp_node *
select_rtpp_node_new(str callid, str viabranch, int do_test)
select_rtpp_node_new(str callid, str viabranch, int do_test, struct rtpp_node **queried_nodes_ptr, int queried_nodes)
{
struct rtpp_node* node;
unsigned i, sum, sumcut, weight_sum;
Expand Down Expand Up @@ -2707,7 +2736,7 @@ select_rtpp_node_new(str callid, str viabranch, int do_test)
}

/* Select only between enabled machines */
if (!node->rn_disabled) {
if (!node->rn_disabled && !is_queried_node(node, queried_nodes_ptr, queried_nodes)) {
weight_sum += node->rn_weight;
}
}
Expand Down Expand Up @@ -2752,7 +2781,11 @@ select_rtpp_node_new(str callid, str viabranch, int do_test)
if (node->rn_disabled)
continue;

/* Found enabled machine */
/* Select only between not already queried machines */
if (is_queried_node(node, queried_nodes_ptr, queried_nodes))
continue;

/* Found machine */
if (sumcut < node->rn_weight) {
lock_release(active_rtpp_set->rset_lock);
goto found;
Expand Down Expand Up @@ -2808,7 +2841,7 @@ select_rtpp_node_old(str callid, str viabranch, int do_test)
* the call if some proxies were disabled or enabled (e.g. kamctl command)
*/
static struct rtpp_node *
select_rtpp_node(str callid, str viabranch, int do_test)
select_rtpp_node(str callid, str viabranch, int do_test, struct rtpp_node **queried_nodes_ptr, int queried_nodes)
{
struct rtpp_node *node = NULL;
unsigned int current_rtpp_no;
Expand Down Expand Up @@ -2837,7 +2870,7 @@ select_rtpp_node(str callid, str viabranch, int do_test)
// check node
if (!node) {
// run the selection algorithm
node = select_rtpp_node_new(callid, viabranch, do_test);
node = select_rtpp_node_new(callid, viabranch, do_test, queried_nodes_ptr, queried_nodes);

// check node
if (!node) {
Expand Down

0 comments on commit 131d899

Please sign in to comment.