From 2625ab3ccdafd8d474018516e6aa36ce48989db3 Mon Sep 17 00:00:00 2001 From: Stefan Mititelu Date: Thu, 5 Nov 2015 16:32:02 +0200 Subject: [PATCH] rtpengine: hash table to keep the selected nodes Shared memory hash table with global hashtable lock. Add state maintaining the selected rtp node, for a given callid. Hashtable entry expiration time configurable using hash_entry_tout modparam. The actual deletion happens on the fly while insert/remove/lookup are called. Updated doku. --- modules/rtpengine/doc/rtpengine_admin.xml | 26 ++ modules/rtpengine/rtpengine.c | 189 ++++++++++--- modules/rtpengine/rtpengine_hash.c | 314 ++++++++++++++++++++++ modules/rtpengine/rtpengine_hash.h | 30 +++ 4 files changed, 523 insertions(+), 36 deletions(-) create mode 100644 modules/rtpengine/rtpengine_hash.c create mode 100644 modules/rtpengine/rtpengine_hash.h diff --git a/modules/rtpengine/doc/rtpengine_admin.xml b/modules/rtpengine/doc/rtpengine_admin.xml index b447387dac2..bcb1c030ea4 100644 --- a/modules/rtpengine/doc/rtpengine_admin.xml +++ b/modules/rtpengine/doc/rtpengine_admin.xml @@ -356,6 +356,32 @@ modparam("rtpproxy", "rtp_inst_pvar", "$avp(RTP_INSTANCE)") +
+ <varname>hash_entry_tout</varname> (string) + + Number of seconds after an rtpengine hash table entry is marked for deletion. + By default, this parameter is set to 120 (seconds). + + + To maintain information about a selected rtp machine node, for a given call, entries are added in a hashtable of (callid, node) pairs. + When "offer" comes, insert new entry in the hastable. + When subsequent commands come, lookup callid and return chosen node. + When "delete" comes, remove old entry from hashtable. + + + NOTE: In the current implementation, the actual deletion happens on the fly, + while insert/remove/lookup the hastable, only for the entries in the insert/remove/lookup path. + + + Set <varname>hash_entry_tout</varname> parameter + +... +modparam("rtpproxy", "hash_entry_tout", "300") +... + + +
+
diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 86211cc2211..e1ec46c8dca 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -78,6 +78,7 @@ #include "../../modules/tm/tm_load.h" #include "rtpengine.h" #include "rtpengine_funcs.h" +#include "rtpengine_hash.h" #include "bencode.h" MODULE_VERSION @@ -187,7 +188,9 @@ static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op static int fixup_set_id(void ** param, int param_no); static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2); static struct rtpp_set * select_rtpp_set(int id_set); -static struct rtpp_node *select_rtpp_node(str, int); +static struct rtpp_node *select_rtpp_node_new(str, int, int); +static struct rtpp_node *select_rtpp_node_old(str, int, int); +static struct rtpp_node *select_rtpp_node(str, int, int); static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *); static int get_extra_id(struct sip_msg* msg, str *id_str); @@ -228,6 +231,7 @@ static pid_t mypid; static unsigned int myseqn = 0; static str extra_id_pv_param = {NULL, 0}; static char *setid_avp_param = NULL; +static int hash_entry_tout = 120; static char ** rtpp_strings=0; static int rtpp_sets=0; /*used in rtpengine_set_store()*/ @@ -336,6 +340,7 @@ static param_export_t params[] = { {"rtp_inst_pvar", PARAM_STR, &rtp_inst_pv_param }, {"write_sdp_pv", PARAM_STR, &write_sdp_pvar_str }, {"read_sdp_pv", PARAM_STR, &read_sdp_pvar_str }, + {"hash_entry_tout", INT_PARAM, &hash_entry_tout }, {0, 0, 0} }; @@ -1440,6 +1445,14 @@ mod_init(void) return -1; } + /* init the hastable which keeps the call-id <-> selected_node relation */ + if (!rtpengine_hash_table_init()) { + LM_ERR("rtpengine_hash_table_init() failed!\n"); + return -1; + } else { + LM_DBG("rtpengine_hash_table_init() success!\n"); + } + return 0; } @@ -1579,6 +1592,13 @@ static void mod_destroy(void) } shm_free(rtpp_set_list); + + /* destroy the hastable which keeps the call-id <-> selected_node relation */ + if (!rtpengine_hash_table_destroy()) { + LM_ERR("rtpengine_hash_table_destroy() failed!\n"); + } else { + LM_DBG("rtpengine_hash_table_destroy() success!\n"); + } } @@ -1923,16 +1943,17 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_ LM_ERR("queried nodes limit reached\n"); goto error; } - node = select_rtpp_node(callid, 1); + node = select_rtpp_node(callid, 1, op); if (!node) { LM_ERR("no available proxies\n"); goto error; } + cp = send_rtpp_command(node, ng_flags.dict, &ret); - if (cp == NULL) { - node->rn_disabled = 1; - node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; - } + if (cp == NULL) { + node->rn_disabled = 1; + node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; + } } while (cp == NULL); LM_DBG("proxy reply: %.*s\n", ret, cp); @@ -1944,12 +1965,13 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_ LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp); goto error; } + if (!bencode_dictionary_get_strcmp(resp, "result", "error")) { if (!bencode_dictionary_get_str(resp, "error-reason", &error)) { LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp); } else { - if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) && + if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) && (strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0)) { LM_WARN("proxy %.*s: %.*s", node->rn_url.len, node->rn_url.s , error.len, error.s); @@ -1957,12 +1979,23 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_ } LM_ERR("proxy replied with error: %.*s\n", error.len, error.s); } - goto error; + goto error; } if (body_out) *body_out = body; + if (op == OP_DELETE) { + /* Delete the key<->value from the hashtable */ + if (!rtpengine_hash_table_remove(&callid)) { + LM_ERR("rtpengine hash table failed to remove entry for callen=%d callid=%.*s\n", + callid.len, callid.len, callid.s); + } else { + LM_DBG("rtpengine hash table remove entry for callen=%d callid=%.*s\n", + callid.len, callid.len, callid.s); + } + } + return resp; error: @@ -2191,81 +2224,165 @@ static struct rtpp_set * select_rtpp_set(int id_set ){ return rtpp_list; } + /* - * Main balancing routine. This does not try to keep the same proxy for - * the call if some proxies were disabled or enabled; proxy death considered - * too rare. Otherwise we should implement "mature" HA clustering, which is - * too expensive here. + * run the selection algorithm and return the new selected node */ static struct rtpp_node * -select_rtpp_node(str callid, int do_test) +select_rtpp_node_new(str callid, int do_test, int op) { - unsigned sum, sumcut, weight_sum; struct rtpp_node* node; - int was_forced; - - if(!active_rtpp_set){ - LM_ERR("script error -no valid set selected\n"); - return NULL; - } - /* Most popular case: 1 proxy, nothing to calculate */ - if (active_rtpp_set->rtpp_node_count == 1) { - node = active_rtpp_set->rn_first; - if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()) - node->rn_disabled = rtpp_test(node, 1, 0); - return node->rn_disabled ? NULL : node; - } + unsigned i, sum, sumcut, weight_sum; + int was_forced = 0; /* XXX Use quick-and-dirty hashing algo */ - for(sum = 0; callid.len > 0; callid.len--) - sum += callid.s[callid.len - 1]; + for(i = 0; i < callid.len; i++) + sum += callid.s[i]; sum &= 0xff; - was_forced = 0; retry: weight_sum = 0; - for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) { + for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) { + /* Try to enable if it's time to try. */ if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){ - /* Try to enable if it's time to try. */ node->rn_disabled = rtpp_test(node, 1, 0); } - if (!node->rn_disabled) + + /* Select only between enabled machines */ + if (!node->rn_disabled) { weight_sum += node->rn_weight; + } } + + /* No proxies? Force all to be redetected, if not yet */ if (weight_sum == 0) { - /* No proxies? Force all to be redetected, if not yet */ - if (was_forced) + if (was_forced) { return NULL; + } + was_forced = 1; + for(node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) { node->rn_disabled = rtpp_test(node, 1, 1); } + goto retry; } + + /* sumcut here lays from 0 to weight_sum-1 */ sumcut = sum % weight_sum; + /* - * sumcut here lays from 0 to weight_sum-1. * Scan proxy list and decrease until appropriate proxy is found. */ for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) { + /* Select only between enabled machines */ if (node->rn_disabled) continue; + + /* Found enabled machine */ if (sumcut < node->rn_weight) goto found; + + /* Update sumcut if enabled machine */ sumcut -= node->rn_weight; } + /* No node list */ return NULL; + found: if (do_test) { node->rn_disabled = rtpp_test(node, node->rn_disabled, 0); if (node->rn_disabled) goto retry; } + + /* build hash table entry */ + struct rtpengine_hash_entry *entry = shm_malloc(sizeof(struct rtpp_node)); + if (shm_str_dup(&entry->callid, &callid) < 0) { + LM_ERR("rtpengine hash table fail to duplicate calllen=%d callid=%.*s", + callid.len, callid.len, callid.s); + } + entry->node = node; + entry->next = NULL; + entry->tout = get_ticks() + hash_entry_tout; + + /* Insert the key<->entry from the hashtable */ + if (!rtpengine_hash_table_insert(&callid, entry)) { + LM_ERR("rtpengine hash table fail to insert node=%.*s for calllen=%d callid=%.*s", + node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s); + } else { + LM_DBG("rtpengine hash table insert node=%.*s for calllen=%d callid=%.*s\n", + node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s); + } + + /* Return selected node */ return node; } +/* + * lookup the hastable (key=callid value=node) and get the old node + */ +static struct rtpp_node * +select_rtpp_node_old(str callid, int do_test, int op) +{ + struct rtpp_node *node = NULL; + struct rtpengine_hash_entry *entry = NULL; + + entry = rtpengine_hash_table_lookup(&callid); + if (!entry) { + LM_ERR("rtpengine hash table lookup failed to find entry for calllen=%d callid=%.*s\n", + callid.len, callid.len, callid.s); + } else { + LM_DBG("rtpengine hash table lookup find entry for calllen=%d callid=%.*s\n", + callid.len, callid.len, callid.s); + node = entry->node; + } + + if (!node) { + LM_ERR("rtpengine hash table lookup failed to find node for calllen=%d callid=%.*s\n", + callid.len, callid.len, callid.s); + return NULL; + } else { + LM_DBG("rtpengine hash table lookup find node=%.*s for calllen=%d callid=%.*s\n", + node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s); + } + + // if node broke, don't send any message + if (!node->rn_disabled) { + return node; + } else { + LM_DBG("rtpengine hash table lookup find node=%.*s for calllen=%d callid=%.*s, which is disabled!\n", + node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s); + } + + return NULL; +} + +/* + * Main balancing routine. This DO try to keep the same proxy for + * the call if some proxies were disabled or enabled (e.g. kamctl command) + */ +static struct rtpp_node * +select_rtpp_node(str callid, int do_test, int op) +{ + if(!active_rtpp_set) { + LM_ERR("script error - no valid set selected\n"); + return NULL; + } + + // calculate and choose a node + if (op == OP_OFFER) { + // run the selection algorithm + return select_rtpp_node_new(callid, do_test, op); + } else { + // lookup the hastable (key=callid value=node) and get the old node + return select_rtpp_node_old(callid, do_test, op); + } +} + static int get_extra_id(struct sip_msg* msg, str *id_str) { if(msg==NULL || extra_id_pv==NULL || id_str==NULL) { diff --git a/modules/rtpengine/rtpengine_hash.c b/modules/rtpengine/rtpengine_hash.c new file mode 100644 index 00000000000..f23b45db48c --- /dev/null +++ b/modules/rtpengine/rtpengine_hash.c @@ -0,0 +1,314 @@ +#include "rtpengine_hash.h" + +#include "../../str.h" +#include "../../dprint.h" +#include "../../mem/shm_mem.h" +#include "../../locking.h" +#include "../../timer.h" + +static gen_lock_t *rtpengine_hash_lock; +static struct rtpengine_hash_table *rtpengine_hash_table; + +/* from sipwise rtpengine */ +static int str_cmp_str(const str *a, const str *b) { + if (a->len < b->len) + return -1; + if (a->len > b->len) + return 1; + if (a->len == 0 && b->len == 0) + return 0; + return memcmp(a->s, b->s, a->len); +} + +/* from sipwise rtpengine */ +static int str_equal(void *a, void *b) { + return (str_cmp_str((str *) a, (str *) b) == 0); +} + +/* from sipwise rtpengine */ +static unsigned int str_hash(void *ss) { + const str *s = (str*) ss; + unsigned int ret = 5381; + str it = *s; + + while (it.len > 0) { + ret = (ret << 5) + ret + *it.s; + it.s++; + it.len--; + } + + return ret % RTPENGINE_HASH_TABLE_SIZE; +} + +/* rtpengine glib hash API */ +int rtpengine_hash_table_init() { + int i; + + // init hashtable + rtpengine_hash_table = shm_malloc(sizeof(struct rtpengine_hash_table)); + if (!rtpengine_hash_table) { + LM_ERR("no shm left to create rtpengine_hash_table\n"); + return 0; + } + + // init hashtable entry_list heads (never filled) + for (i = 0; i < RTPENGINE_HASH_TABLE_SIZE; i++) { + rtpengine_hash_table->entry_list[i] = shm_malloc(sizeof(struct rtpengine_hash_entry)); + if (!rtpengine_hash_table->entry_list[i]) { + LM_ERR("no shm left to create rtpengine_hash_table->entry_list[%d]\n", i); + return 0; + } + + /* never expire the head of the hashtable index lists */ + rtpengine_hash_table->entry_list[i]->tout = -1; + rtpengine_hash_table->entry_list[i]->next = NULL; + } + + // init lock + rtpengine_hash_lock = lock_alloc(); + if (!rtpengine_hash_lock) { + LM_ERR("no shm left to init rtpengine_hash_table lock"); + return 0; + } + + return 1; +} + +int rtpengine_hash_table_destroy() { + int i; + struct rtpengine_hash_entry *entry, *last_entry; + + // check rtpengine hashtable + if (!rtpengine_hash_table) { + LM_ERR("NULL rtpengine_hash_table"); + return 0; + } + + // destroy hashtable entry_list content + lock_get(rtpengine_hash_lock); + for (i = 0; i < RTPENGINE_HASH_TABLE_SIZE; i++) { + entry = rtpengine_hash_table->entry_list[i]; + while (entry) { + last_entry = entry; + entry = entry->next; + shm_free(last_entry->callid.s); + shm_free(last_entry); + } + } + + // destroy hashtable + shm_free(rtpengine_hash_table); + rtpengine_hash_table = NULL; + lock_release(rtpengine_hash_lock); + + // destroy lock + if (!rtpengine_hash_lock) { + LM_ERR("NULL rtpengine_hash_lock"); + } else { + lock_dealloc(rtpengine_hash_lock); + rtpengine_hash_lock = NULL; + } + + return 1; +} + +int rtpengine_hash_table_insert(void *key, void *value) { + struct rtpengine_hash_entry *entry, *last_entry; + struct rtpengine_hash_entry *new_entry = (struct rtpengine_hash_entry *) value; + unsigned int hash_index; + + // check rtpengine hashtable + if (!rtpengine_hash_table) { + LM_ERR("NULL rtpengine_hash_table"); + return 0; + } + + // get entry list + hash_index = str_hash(key); + entry = rtpengine_hash_table->entry_list[hash_index]; + last_entry = entry; + + // lock + lock_get(rtpengine_hash_lock); + while (entry) { + // if key found, don't add new entry + if (str_equal(&entry->callid, &new_entry->callid)) { + // unlock + lock_release(rtpengine_hash_lock); + LM_ERR("Call id %.*s already in hashtable, ignore new value", entry->callid.len, entry->callid.s); + return 0; + } + + // if expired entry discovered, delete it + if (entry->tout < get_ticks()) { + // set pointers; exclude entry + last_entry->next = entry->next; + + // free current entry; entry points to unknown + shm_free(entry->callid.s); + shm_free(entry); + + // set pointers + entry = last_entry; + } + + // next entry in the list + last_entry = entry; + entry = entry->next; + } + + last_entry->next = new_entry; + + // unlock + lock_release(rtpengine_hash_lock); + + return 1; +} + +int rtpengine_hash_table_remove(void *key) { + struct rtpengine_hash_entry *entry, *last_entry; + unsigned int hash_index; + + // check rtpengine hashtable + if (!rtpengine_hash_table) { + LM_ERR("NULL rtpengine_hash_table"); + return 0; + } + + // get first entry from entry list; jump over unused list head + hash_index = str_hash(key); + entry = rtpengine_hash_table->entry_list[hash_index]; + last_entry = entry; + + // lock + lock_get(rtpengine_hash_lock); + while (entry) { + // if key found, delete entry + if (str_equal(&entry->callid, (str *)key)) { + // free entry + last_entry->next = entry->next; + shm_free(entry->callid.s); + shm_free(entry); + + // unlock + lock_release(rtpengine_hash_lock); + + return 1; + } + + // if expired entry discovered, delete it + if (entry->tout < get_ticks()) { + // set pointers; exclude entry + last_entry->next = entry->next; + + // free current entry; entry points to unknown + shm_free(entry->callid.s); + shm_free(entry); + + // set pointers + entry = last_entry; + } + + last_entry = entry; + entry = entry->next; + } + + // unlock + lock_release(rtpengine_hash_lock); + + return 0; +} + +void* rtpengine_hash_table_lookup(void *key) { + struct rtpengine_hash_entry *entry, *last_entry; + unsigned int hash_index; + + // check rtpengine hashtable + if (!rtpengine_hash_table) { + LM_ERR("NULL rtpengine_hash_table"); + return 0; + } + + // get first entry from entry list; jump over unused list head + hash_index = str_hash(key); + entry = rtpengine_hash_table->entry_list[hash_index]; + last_entry = entry; + + // lock + lock_get(rtpengine_hash_lock); + while (entry) { + // if key found, return entry + if (str_equal(&entry->callid, (str *)key)) { + // unlock + lock_release(rtpengine_hash_lock); + + return entry; + } + + // if expired entry discovered, delete it + if (entry->tout < get_ticks()) { + // set pointers; exclude entry + last_entry->next = entry->next; + + // free current entry; entry points to unknown + shm_free(entry->callid.s); + shm_free(entry); + + // set pointers + entry = last_entry; + } + + last_entry = entry; + entry = entry->next; + } + + // unlock + lock_release(rtpengine_hash_lock); + + return NULL; +} + +// print hash table entries while deleting expired entries +void rtpengine_hash_table_print() { + int i; + struct rtpengine_hash_entry *entry, *last_entry; + + // check rtpengine hashtable + if (!rtpengine_hash_table) { + LM_ERR("NULL rtpengine_hash_table"); + return ; + } + + // lock + lock_get(rtpengine_hash_lock); + + // print hashtable + for (i = 0; i < RTPENGINE_HASH_TABLE_SIZE; i++) { + entry = rtpengine_hash_table->entry_list[i]; + last_entry = entry; + + while (entry) { + // if expired entry discovered, delete it + if (entry->tout < get_ticks()) { + // set pointers; exclude entry + last_entry->next = entry->next; + + // free current entry; entry points to unknown + shm_free(entry->callid.s); + shm_free(entry); + + // set pointers + entry = last_entry; + } else { + LM_DBG("hash_index=%d callid=%.*s tout=%u\n", + i, entry->callid.len, entry->callid.s, entry->tout - get_ticks()); + } + + last_entry = entry; + entry = entry->next; + } + } + + // unlock + lock_release(rtpengine_hash_lock); +} diff --git a/modules/rtpengine/rtpengine_hash.h b/modules/rtpengine/rtpengine_hash.h new file mode 100644 index 00000000000..71a73b45b1e --- /dev/null +++ b/modules/rtpengine/rtpengine_hash.h @@ -0,0 +1,30 @@ +#ifndef _RTPENGINE_HASH_H +#define _RTPENGINE_HASH_H + +#include "../../str.h" + +#define RTPENGINE_HASH_TABLE_SIZE 512 + +/* table entry */ +struct rtpengine_hash_entry { + unsigned int tout; // call timeout + str callid; // call callid + struct rtpp_node *node; // call selected node + + struct rtpengine_hash_entry *next; // next +}; + +/* table */ +struct rtpengine_hash_table { + struct rtpengine_hash_entry *entry_list[RTPENGINE_HASH_TABLE_SIZE]; +}; + + +int rtpengine_hash_table_init(); +int rtpengine_hash_table_destroy(); +int rtpengine_hash_table_insert(void *key, void *value); +int rtpengine_hash_table_remove(void *key); +void* rtpengine_hash_table_lookup(void *key); +void rtpengine_hash_table_print() ; + +#endif