diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 8726e05780e..32a5813798d 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -197,9 +197,10 @@ static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op static int fixup_set_id(void ** param, int param_no); static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2); static struct rtpp_set * select_rtpp_set(int id_set); -static struct rtpp_node *select_rtpp_node_new(str, str, int); +static struct rtpp_node *select_rtpp_node_new(str, str, int, struct rtpp_node **, int); static struct rtpp_node *select_rtpp_node_old(str, str, int); -static struct rtpp_node *select_rtpp_node(str, str, int); +static struct rtpp_node *select_rtpp_node(str, str, int, struct rtpp_node **, int); +static int is_queried_node(struct rtpp_node *, struct rtpp_node **, int); static int build_rtpp_socks(unsigned int current_rtpp_no); static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *); static int get_extra_id(struct sip_msg* msg, str *id_str); @@ -234,6 +235,7 @@ static int rtpengine_disable_tout = 60; static int rtpengine_allow_op = 0; static int rtpengine_retr = 5; static int rtpengine_tout_ms = 1000; +static struct rtpp_node **queried_nodes_ptr = NULL; static int queried_nodes_limit = MAX_RTPP_TRIED_NODES; static pid_t mypid; static unsigned int myseqn = 0; @@ -387,6 +389,24 @@ struct module_exports exports = { child_init }; +/* check if the node is already queried */ +static int is_queried_node(struct rtpp_node *node, struct rtpp_node **queried_nodes_ptr, int queried_nodes) +{ + int i; + + if (!queried_nodes_ptr) { + return 0; + } + + for (i = 0; i < queried_nodes; i++) { + if (node == queried_nodes_ptr[i]) { + return 1; + } + } + + return 0; +} + /* hide the node from display and disable it permanent */ int rtpengine_delete_node(struct rtpp_node *rtpp_node) { @@ -1906,10 +1926,17 @@ child_init(int rank) rtpp_socks = (int*)pkg_malloc(sizeof(int)*(rtpp_socks_size)); if (!rtpp_socks) { - LM_ERR("no more pkg memory for rtpp_socks\n"); return -1; } + // vector of pointers to queried nodes + queried_nodes_ptr = (struct rtpp_node**)pkg_malloc(queried_nodes_limit * sizeof(struct rtpp_node*)); + if (!queried_nodes_ptr) { + LM_ERR("no more pkg memory for queried_nodes_ptr\n"); + return -1; + } + memset(queried_nodes_ptr, 0, queried_nodes_limit * sizeof(struct rtpp_node*)); + /* Iterate known RTP proxies - create sockets */ if (rtpp_socks_size) { build_rtpp_socks(rtpp_socks_size); @@ -2208,7 +2235,7 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_ bencode_item_t *item, *resp; str callid = STR_NULL, from_tag = STR_NULL, to_tag = STR_NULL, viabranch = STR_NULL; str body = STR_NULL, error = STR_NULL; - int ret, queried_nodes; + int ret, queried_nodes = 0; struct rtpp_node *node; char *cp; pv_value_t pv_val; @@ -2333,14 +2360,14 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_ if(msg->id != current_msg_id) active_rtpp_set = default_rtpp_set; - queried_nodes = 0; select_node: do { - if (++queried_nodes > queried_nodes_limit) { + if (queried_nodes >= queried_nodes_limit) { LM_ERR("queried nodes limit reached\n"); goto error; } - node = select_rtpp_node(callid, viabranch, 1); + + node = select_rtpp_node(callid, viabranch, 1, queried_nodes_ptr, queried_nodes); if (!node) { LM_ERR("no available proxies\n"); goto error; @@ -2351,7 +2378,10 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_ node->rn_disabled = 1; node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; } + + queried_nodes_ptr[queried_nodes++] = node; } while (cp == NULL); + LM_DBG("proxy reply: %.*s\n", ret, cp); set_rtp_inst_pvar(msg, &node->rn_url); @@ -2366,8 +2396,7 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_ if (!bencode_dictionary_get_strcmp(resp, "result", "error")) { if (!bencode_dictionary_get_str(resp, "error-reason", &error)) { LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp); - } - else { + } else { if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) && (strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0)) { @@ -2679,7 +2708,7 @@ static struct rtpp_set * select_rtpp_set(int id_set ){ * run the selection algorithm and return the new selected node */ static struct rtpp_node * -select_rtpp_node_new(str callid, str viabranch, int do_test) +select_rtpp_node_new(str callid, str viabranch, int do_test, struct rtpp_node **queried_nodes_ptr, int queried_nodes) { struct rtpp_node* node; unsigned i, sum, sumcut, weight_sum; @@ -2707,7 +2736,7 @@ select_rtpp_node_new(str callid, str viabranch, int do_test) } /* Select only between enabled machines */ - if (!node->rn_disabled) { + if (!node->rn_disabled && !is_queried_node(node, queried_nodes_ptr, queried_nodes)) { weight_sum += node->rn_weight; } } @@ -2752,7 +2781,11 @@ select_rtpp_node_new(str callid, str viabranch, int do_test) if (node->rn_disabled) continue; - /* Found enabled machine */ + /* Select only between not already queried machines */ + if (is_queried_node(node, queried_nodes_ptr, queried_nodes)) + continue; + + /* Found machine */ if (sumcut < node->rn_weight) { lock_release(active_rtpp_set->rset_lock); goto found; @@ -2808,7 +2841,7 @@ select_rtpp_node_old(str callid, str viabranch, int do_test) * the call if some proxies were disabled or enabled (e.g. kamctl command) */ static struct rtpp_node * -select_rtpp_node(str callid, str viabranch, int do_test) +select_rtpp_node(str callid, str viabranch, int do_test, struct rtpp_node **queried_nodes_ptr, int queried_nodes) { struct rtpp_node *node = NULL; unsigned int current_rtpp_no; @@ -2837,7 +2870,7 @@ select_rtpp_node(str callid, str viabranch, int do_test) // check node if (!node) { // run the selection algorithm - node = select_rtpp_node_new(callid, viabranch, do_test); + node = select_rtpp_node_new(callid, viabranch, do_test, queried_nodes_ptr, queried_nodes); // check node if (!node) {