diff --git a/lib/srdb1/schema/kamailio-rtpengine.xml b/lib/srdb1/schema/kamailio-rtpengine.xml new file mode 100644 index 00000000000..1958bec293b --- /dev/null +++ b/lib/srdb1/schema/kamailio-rtpengine.xml @@ -0,0 +1,12 @@ + + + %entities; +]> + + + RTPEngine + + diff --git a/lib/srdb1/schema/rtpengine.xml b/lib/srdb1/schema/rtpengine.xml new file mode 100644 index 00000000000..78a54c38c7e --- /dev/null +++ b/lib/srdb1/schema/rtpengine.xml @@ -0,0 +1,60 @@ + + +%entities; + +]> + + + rtpengine + 1 + &MYSQL_TABLE_TYPE; + + This table is used by the rtpengine module. It contains the sets of rtpengine instances used for proxying media between endpoints. More information about the rtpengine module can be found at: &KAMAILIO_MOD_DOC;rtpengine.html + + + + + setid + unsigned int + 10 + RTPEngine instance set ID + 0 + + + + + url + string + 64 + RTPEngine instance socket URL + + + + weight + unsigned int + 10 + RTPEngine instance weight + 1 + + + + + disabled + int + 1 + RTPEngine instance state + 0 + + + + + rtpengine_nodes + + + + + +
diff --git a/modules/rtpengine/doc/rtpengine_admin.xml b/modules/rtpengine/doc/rtpengine_admin.xml index 8b623c8c112..6875e5fa35b 100644 --- a/modules/rtpengine/doc/rtpengine_admin.xml +++ b/modules/rtpengine/doc/rtpengine_admin.xml @@ -221,6 +221,11 @@ modparam("rtpengine", "rtpengine_tout_ms", 2000) This is useful when deactivating a node for maintanance and reject new sessions but allow current ones to finish. + The behaviour is the same for a rtpengine deleted table node. + When the node is deleted from the table and the table reloaded (see nh_reload_rtpp) the node actually is disabled(permanent) and hidden for display. + Next time the same node will be added in the table, and the content reloaded, it will be updated and re-displayed. + + Default value is 0 to keep the current behaviour. @@ -294,7 +299,7 @@ modparam("rtpengine", "extra_id_pv", "$avp(extra_id)") -
+
<varname>setid_avp</varname> (string) The parameter defines an AVP that, if set, @@ -453,13 +458,199 @@ modparam("rtpengine", "hash_table_size", "123") ... modparam("rtpengine", "hash_table_tout", "300") ... + + +
+ + + + +
+ <varname>db_url</varname> (string) + + The rtpengine datablase url. If present and valid, it activates database mode. + Node information is read from database, not from config. + + + By default, the datablase url is NULL (not set). + + + Set <varname>db_url</varname> parameter + +... +modparam("rtpengine", "db_url", "mysql://pass@localhost/db") +... + + +
+ + +
+ <varname>table_name</varname> (string) + + The rtpengine table name. If database mode is activated (i.e. valid db_url), + set the name of rtpengine table, on startup. + + + By default, the rtpengine table name is "rtpengine". + + + NOTE: One needs to add the version of the rtpengine table in the version table. + The current version is version 1. + + + Set <varname>table_name</varname> parameter + +... +modparam("rtpengine", "table_name", "rtpengine_table_name") +... + + + + + Setup <varname>rtpengine</varname> table + +mysql> describe rtpengine; ++----------+------------------+------+-----+---------+-------+ +| Field | Type | Null | Key | Default | Extra | ++----------+------------------+------+-----+---------+-------+ +| setid | int(10) unsigned | NO | | NULL | | +| url | varchar(256) | NO | | NULL | | +| weight | int(10) unsigned | NO | | NULL | | +| disabled | int(11) | NO | | NULL | | ++----------+------------------+------+-----+---------+-------+ + +mysql> select * from rtpengine; ++-------+---------------------------+--------+----------+ +| setid | url | weight | disabled | ++-------+---------------------------+--------+----------+ +| 0 | udp:rtpproxy1.domain:8800 | 100 | 0 | +| 0 | udp:rtpproxy2.domain:8800 | 200 | 1 | ++-------+---------------------------+--------+----------+ + +mysql> select * from version; ++---------------------------+---------------+ +| table_name | table_version | ++---------------------------+---------------+ +| rtpengine | 1 | ++---------------------------+---------------+
+
+ <varname>setid_col</varname> (string) + + Column name in the rtpengine table. If database mode is activated (i.e. valid db_url), + set the setid of rtp nodes according to this column, on startup. + The MySQL value for this column should be INT UNSIGNED. + + + By default, the column name is "setid". + + + Set <varname>setid_col</varname> parameter + +... +modparam("rtpengine", "setid_col", "setid_column_name") +... + + +
+ + +
+ <varname>url_col</varname> (string) + + Column name in the rtpengine table. If database mode is activated (i.e. valid db_url), + set the url of rtp nodes according to this column, on startup. + The MySQL value for this column should be VARCHAR. + + + By default, the column name is "url". + + + Set <varname>url_col</varname> parameter + +... +modparam("rtpengine", "url_col", "url_column_name") +... + +
+ +
+ <varname>weight_col</varname> (string) + + Column name in the rtpengine table. If database mode is activated (i.e. valid db_url), + set the weight of rtp nodes according to this column, on startup. The column value has + priority over the URL weight. + The MySQL value for this column should be INT UNSIGNED. + + + By default, the column name is "weight". + + + Set <varname>weight_col</varname> parameter + +... +modparam("rtpengine", "weight_col", "weight_column_name") +... + + +
+ + +
+ <varname>disabled_col</varname> (string) + + Column name in the rtpengine table. If database mode is activated (i.e. valid db_url), + set the state of rtp nodes according to this column, on startup. + The MySQL value for this column should be INT. + + + By default, the column name is "disabled". + + + Set <varname>disabled_col</varname> parameter + +... +modparam("rtpengine", "disabled_col", "disabled_column_name") +... + + +
+ + +
+ <varname>setid_default</varname> (string) + + The default set of nodes to be used. + + + By default, the setid is 0. + + + NOTE that if setid_avp is configured, this value will be ignored and + the active set will be chosen according to the setid_avp. + + + Set <varname>setid_default</varname> parameter + +... +modparam("rtpengine", "setid_default", 11) +... + + +
+ +
+ + + +
Functions
@@ -1067,6 +1258,30 @@ $ &ctltool; fifo nh_ping_rtpp all
+ +
+ <function moreinfo="none">nh_reload_rtpp</function> + + Reloads the database node table content if configured. + Returns specific message related to success, failure and no db_url configured. + + + NOTE: The current behaviour updates the nodes state or creates new ones or + hides old ones, based on the database content. If allow_op modparam is enabled, + the sessions are still allowed to finish for the hidden old nodes. + + + + <function moreinfo="none">nh_reload_rtpp</function> usage + +... +$ &ctltool; fifo nh_reload_rtpp +... + + +
+ +
<function moreinfo="none">nh_show_hash_total</function> diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index bba10efc4b7..178225897a4 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -91,70 +91,74 @@ MODULE_VERSION #endif /* NAT UAC test constants */ -#define NAT_UAC_TEST_C_1918 0x01 -#define NAT_UAC_TEST_RCVD 0x02 -#define NAT_UAC_TEST_V_1918 0x04 -#define NAT_UAC_TEST_S_1918 0x08 -#define NAT_UAC_TEST_RPORT 0x10 +#define NAT_UAC_TEST_C_1918 0x01 +#define NAT_UAC_TEST_RCVD 0x02 +#define NAT_UAC_TEST_V_1918 0x04 +#define NAT_UAC_TEST_S_1918 0x08 +#define NAT_UAC_TEST_RPORT 0x10 -#define DEFAULT_RTPP_SET_ID 0 -#define MAX_RTPP_TRIED_NODES 50 -#define MI_SET_NATPING_STATE "nh_enable_ping" -#define MI_DEFAULT_NATPING_STATE 1 - -#define MI_MIN_RECHECK_TICKS 0 -#define MI_MAX_RECHECK_TICKS (unsigned int)-1 +#define DEFAULT_RTPP_SET_ID 0 +#define MAX_RTPP_TRIED_NODES 50 +#define MI_SET_NATPING_STATE "nh_enable_ping" +#define MI_DEFAULT_NATPING_STATE 1 #define MI_ENABLE_RTP_PROXY "nh_enable_rtpp" #define MI_SHOW_RTP_PROXIES "nh_show_rtpp" -#define MI_PING_RTP_PROXY "nh_ping_rtpp" -#define MI_SHOW_HASH_TOTAL "nh_show_hash_total" - -#define MI_RTP_PROXY_NOT_FOUND "RTP proxy not found" -#define MI_RTP_PROXY_NOT_FOUND_LEN (sizeof(MI_RTP_PROXY_NOT_FOUND)-1) -#define MI_PING_DISABLED "NATping disabled from script" -#define MI_PING_DISABLED_LEN (sizeof(MI_PING_DISABLED)-1) -#define MI_DISABLED_PERMANENT "1 (permanent)" -#define MI_DISABLED_PERMANENT_LEN (sizeof(MI_DISABLED_PERMANENT)-1) -#define MI_SET "set" -#define MI_SET_LEN (sizeof(MI_SET)-1) -#define MI_INDEX "index" +#define MI_PING_RTP_PROXY "nh_ping_rtpp" +#define MI_SHOW_HASH_TOTAL "nh_show_hash_total" +#define MI_RELOAD_RTP_PROXY "nh_reload_rtpp" + +#define MI_DB_NOT_FOUND "RTP database not found" +#define MI_DB_NOT_FOUND_LEN (sizeof(MI_DB_NOT_FOUND)-1) +#define MI_DB_ERR "Error reloading from RTP database" +#define MI_DB_ERR_LEN (sizeof(MI_DB_ERR)-1) +#define MI_DB_OK "Success reloading from RTP database" +#define MI_DB_OK_LEN (sizeof(MI_DB_OK)-1) +#define MI_RTP_PROXY_NOT_FOUND "RTP proxy not found" +#define MI_RTP_PROXY_NOT_FOUND_LEN (sizeof(MI_RTP_PROXY_NOT_FOUND)-1) +#define MI_PING_DISABLED "NAT ping disabled from script" +#define MI_PING_DISABLED_LEN (sizeof(MI_PING_DISABLED)-1) +#define MI_DISABLED_PERMANENT "1(permanent)" +#define MI_DISABLED_PERMANENT_LEN (sizeof(MI_DISABLED_PERMANENT)-1) +#define MI_SET "set" +#define MI_SET_LEN (sizeof(MI_SET)-1) +#define MI_INDEX "index" #define MI_INDEX_LEN (sizeof(MI_INDEX)-1) -#define MI_ENABLED "enabled" +#define MI_ENABLED "enabled" #define MI_ENABLED_LEN (sizeof(MI_ENABLED)-1) -#define MI_DISABLED "disabled" +#define MI_DISABLED "disabled" #define MI_DISABLED_LEN (sizeof(MI_DISABLED)-1) -#define MI_WEIGHT "weight" +#define MI_WEIGHT "weight" #define MI_WEIGHT_LEN (sizeof(MI_WEIGHT)-1) #define MI_RECHECK_TICKS "recheck_ticks" #define MI_RECHECK_T_LEN (sizeof(MI_RECHECK_TICKS)-1) -#define MI_ERROR "Error when adding rtpp node details" -#define MI_ERROR_LEN (sizeof(MI_ERROR)-1) -#define MI_ALL "all" -#define MI_ALL_LEN (sizeof(MI_ALL)-1) -#define MI_ENABLE "enable" -#define MI_ENABLE_LEN (sizeof(MI_ENABLE)-1) -#define MI_DISABLE "disable" -#define MI_DISABLE_LEN (sizeof(MI_DISABLE)-1) -#define MI_PING "ping" -#define MI_PING_LEN (sizeof(MI_PING)-1) -#define MI_SUCCESS "success" -#define MI_SUCCESS_LEN (sizeof(MI_SUCCESS)-1) -#define MI_FAIL "fail" -#define MI_FAIL_LEN (sizeof(MI_FAIL)-1) +#define MI_ERROR "Error when adding rtpp node details" +#define MI_ERROR_LEN (sizeof(MI_ERROR)-1) +#define MI_ALL "all" +#define MI_ALL_LEN (sizeof(MI_ALL)-1) +#define MI_ENABLE "enable" +#define MI_ENABLE_LEN (sizeof(MI_ENABLE)-1) +#define MI_DISABLE "disable" +#define MI_DISABLE_LEN (sizeof(MI_DISABLE)-1) +#define MI_PING "ping" +#define MI_PING_LEN (sizeof(MI_PING)-1) +#define MI_SUCCESS "success" +#define MI_SUCCESS_LEN (sizeof(MI_SUCCESS)-1) +#define MI_FAIL "fail" +#define MI_FAIL_LEN (sizeof(MI_FAIL)-1) #define MI_HASH_ENTRIES "entries" #define MI_HASH_ENTRIES_LEN (sizeof(MI_HASH_ENTRIES)-1) -#define MI_HASH_ENTRIES_FAIL "Fail to get entry details" -#define MI_HASH_ENTRIES_FAIL_LEN (sizeof(MI_HASH_ENTRIES_FAIL)-1) +#define MI_HASH_ENTRIES_FAIL "Fail to get entry details" +#define MI_HASH_ENTRIES_FAIL_LEN (sizeof(MI_HASH_ENTRIES_FAIL)-1) -#define MI_FOUND_ALL 2 -#define MI_FOUND_ONE 1 -#define MI_FOUND_NONE 0 +#define MI_FOUND_ALL 2 +#define MI_FOUND_ONE 1 +#define MI_FOUND_NONE 0 -#define CPORT "22222" +#define CPORT "22222" enum rtpe_operation { OP_OFFER = 1, @@ -196,11 +200,12 @@ static struct rtpp_set * select_rtpp_set(int id_set); static struct rtpp_node *select_rtpp_node_new(str, str, int); static struct rtpp_node *select_rtpp_node_old(str, str, int); static struct rtpp_node *select_rtpp_node(str, str, int); +static int build_rtpp_socks(unsigned int current_rtpp_no); static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *); static int get_extra_id(struct sip_msg* msg, str *id_str); static int rtpengine_set_store(modparam_t type, void * val); -static int rtpengine_add_rtpengine_set( char * rtp_proxies); +static int rtpengine_add_rtpengine_set(char * rtp_proxies, unsigned int weight, int disabled, unsigned int ticks); static int mod_init(void); static int child_init(int); @@ -210,10 +215,7 @@ static int get_ip_type(char *str_addr); static int get_ip_scope(char *str_addr); // useful for link-local ipv6 static int bind_force_send_ip(int sock_idx); -static int add_rtpp_node_info(struct mi_node *node, - struct rtpp_node *crt_rtpp, - struct rtpp_set *rtpp_list); - +static int add_rtpp_node_info(struct mi_node *node, struct rtpp_node *crt_rtpp, struct rtpp_set *rtpp_list); static int rtpp_test_ping(struct rtpp_node *node); /* Pseudo-Variables */ @@ -225,6 +227,7 @@ static struct mi_root* mi_enable_rtp_proxy(struct mi_root* cmd_tree, void* param static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree, void* param); static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree, void* param); static struct mi_root* mi_show_hash_total(struct mi_root* cmd_tree, void* param); +static struct mi_root* mi_reload_rtp_proxy(struct mi_root* cmd_tree, void* param); static int rtpengine_disable_tout = 60; @@ -238,6 +241,7 @@ static str extra_id_pv_param = {NULL, 0}; static char *setid_avp_param = NULL; static int hash_table_tout = 3600; static int hash_table_size = 256; +static int setid_default = DEFAULT_RTPP_SET_ID; static char ** rtpp_strings=0; static int rtpp_sets=0; /*used in rtpengine_set_store()*/ @@ -256,17 +260,19 @@ static str rtp_inst_pv_param = {NULL, 0}; static pv_spec_t *rtp_inst_pvar = NULL; /* array with the sockets used by rtpporxy (per process)*/ -static unsigned int rtpp_no = 0; +static unsigned int *rtpp_no = 0; +static gen_lock_t *rtpp_no_lock = 0; static int *rtpp_socks = 0; +static unsigned int rtpp_socks_size = 0; -static int setid_avp_type; +static int setid_avp_type; static int_str setid_avp; -static str write_sdp_pvar_str = {NULL, 0}; -static pv_spec_t* write_sdp_pvar = NULL; +static str write_sdp_pvar_str = {NULL, 0}; +static pv_spec_t *write_sdp_pvar = NULL; -static str read_sdp_pvar_str = {NULL, 0}; -static pv_spec_t* read_sdp_pvar = NULL; +static str read_sdp_pvar_str = {NULL, 0}; +static pv_spec_t *read_sdp_pvar = NULL; #define RTPENGINE_SESS_LIMIT_MSG "Parallel session limit reached" #define RTPENGINE_SESS_LIMIT_MSG_LEN (sizeof(RTPENGINE_SESS_LIMIT_MSG)-1) @@ -288,46 +294,46 @@ unsigned int *natping_state=0; static pv_elem_t *extra_id_pv = NULL; static cmd_export_t cmds[] = { - {"set_rtpengine_set", (cmd_function)set_rtpengine_set_f, 1, + {"set_rtpengine_set", (cmd_function)set_rtpengine_set_f, 1, fixup_set_id, 0, ANY_ROUTE}, - {"set_rtpengine_set", (cmd_function)set_rtpengine_set_f, 2, + {"set_rtpengine_set", (cmd_function)set_rtpengine_set_f, 2, fixup_set_id, 0, ANY_ROUTE}, - {"start_recording", (cmd_function)start_recording_f, 0, + {"start_recording", (cmd_function)start_recording_f, 0, 0, 0, ANY_ROUTE }, - {"rtpengine_offer", (cmd_function)rtpengine_offer1_f, 0, + {"rtpengine_offer", (cmd_function)rtpengine_offer1_f, 0, 0, 0, ANY_ROUTE}, - {"rtpengine_offer", (cmd_function)rtpengine_offer1_f, 1, + {"rtpengine_offer", (cmd_function)rtpengine_offer1_f, 1, fixup_spve_null, 0, ANY_ROUTE}, - {"rtpengine_answer", (cmd_function)rtpengine_answer1_f, 0, + {"rtpengine_answer", (cmd_function)rtpengine_answer1_f, 0, 0, 0, ANY_ROUTE}, - {"rtpengine_answer", (cmd_function)rtpengine_answer1_f, 1, + {"rtpengine_answer", (cmd_function)rtpengine_answer1_f, 1, fixup_spve_null, 0, ANY_ROUTE}, - {"rtpengine_manage", (cmd_function)rtpengine_manage1_f, 0, + {"rtpengine_manage", (cmd_function)rtpengine_manage1_f, 0, 0, 0, ANY_ROUTE}, - {"rtpengine_manage", (cmd_function)rtpengine_manage1_f, 1, + {"rtpengine_manage", (cmd_function)rtpengine_manage1_f, 1, fixup_spve_null, 0, ANY_ROUTE}, - {"rtpengine_delete", (cmd_function)rtpengine_delete1_f, 0, + {"rtpengine_delete", (cmd_function)rtpengine_delete1_f, 0, 0, 0, ANY_ROUTE}, - {"rtpengine_delete", (cmd_function)rtpengine_delete1_f, 1, + {"rtpengine_delete", (cmd_function)rtpengine_delete1_f, 1, fixup_spve_null, 0, ANY_ROUTE}, {0, 0, 0, 0, 0, 0} }; static pv_export_t mod_pvs[] = { - {{"rtpstat", (sizeof("rtpstat")-1)}, /* RTP-Statistics */ - PVT_OTHER, pv_get_rtpstat_f, 0, 0, 0, 0, 0}, - {{0, 0}, 0, 0, 0, 0, 0, 0, 0} + {{"rtpstat", (sizeof("rtpstat")-1)}, /* RTP-Statistics */ + PVT_OTHER, pv_get_rtpstat_f, 0, 0, 0, 0, 0}, + {{0, 0}, 0, 0, 0, 0, 0, 0, 0} }; static param_export_t params[] = { @@ -338,17 +344,21 @@ static param_export_t params[] = { {"rtpengine_tout_ms", INT_PARAM, &rtpengine_tout_ms }, {"rtpengine_allow_op", INT_PARAM, &rtpengine_allow_op }, {"queried_nodes_limit", INT_PARAM, &queried_nodes_limit }, - {"db_url", PARAM_STR, &rtpp_db_url }, - {"table_name", PARAM_STR, &rtpp_table_name }, - {"url_col", PARAM_STR, &rtpp_url_col }, - {"extra_id_pv", PARAM_STR, &extra_id_pv_param }, - {"setid_avp", PARAM_STRING, &setid_avp_param }, - {"force_send_interface", PARAM_STRING, &force_send_ip_str }, - {"rtp_inst_pvar", PARAM_STR, &rtp_inst_pv_param }, - {"write_sdp_pv", PARAM_STR, &write_sdp_pvar_str }, - {"read_sdp_pv", PARAM_STR, &read_sdp_pvar_str }, + {"db_url", PARAM_STR, &rtpp_db_url }, + {"table_name", PARAM_STR, &rtpp_table_name }, + {"setid_col", PARAM_STR, &rtpp_setid_col }, + {"url_col", PARAM_STR, &rtpp_url_col }, + {"weight_col", PARAM_STR, &rtpp_weight_col }, + {"disabled_col", PARAM_STR, &rtpp_disabled_col }, + {"extra_id_pv", PARAM_STR, &extra_id_pv_param }, + {"setid_avp", PARAM_STRING, &setid_avp_param }, + {"force_send_interface", PARAM_STRING, &force_send_ip_str }, + {"rtp_inst_pvar", PARAM_STR, &rtp_inst_pv_param }, + {"write_sdp_pv", PARAM_STR, &write_sdp_pvar_str }, + {"read_sdp_pv", PARAM_STR, &read_sdp_pvar_str }, {"hash_table_tout", INT_PARAM, &hash_table_tout }, {"hash_table_size", INT_PARAM, &hash_table_size }, + {"setid_default", INT_PARAM, &setid_default }, {0, 0, 0} }; @@ -356,7 +366,8 @@ static mi_export_t mi_cmds[] = { {MI_ENABLE_RTP_PROXY, mi_enable_rtp_proxy, 0, 0, 0}, {MI_SHOW_RTP_PROXIES, mi_show_rtp_proxy, 0, 0, 0}, {MI_PING_RTP_PROXY, mi_ping_rtp_proxy, 0, 0, 0}, - {MI_SHOW_HASH_TOTAL, mi_show_hash_total, 0, 0, 0}, + {MI_SHOW_HASH_TOTAL, mi_show_hash_total, 0, 0, 0}, + {MI_RELOAD_RTP_PROXY, mi_reload_rtp_proxy, 0, 0, 0}, { 0, 0, 0, 0, 0} }; @@ -376,6 +387,49 @@ struct module_exports exports = { child_init }; +/* hide the node from display and disable it permanent */ +int rtpengine_delete_node(struct rtpp_node *rtpp_node) +{ + rtpp_node->rn_displayed = 0; + rtpp_node->rn_disabled = MI_MAX_RECHECK_TICKS; + + return 1; +} + + +int rtpengine_delete_node_set(struct rtpp_set *rtpp_list) +{ + struct rtpp_node *rtpp_node; + + lock_get(rtpp_list->rset_lock); + for(rtpp_node = rtpp_list->rn_first; rtpp_node != NULL; + rtpp_node = rtpp_node->rn_next) { + rtpengine_delete_node(rtpp_node); + } + lock_release(rtpp_list->rset_lock); + + return 1; +} + + +int rtpengine_delete_node_all() +{ + struct rtpp_set *rtpp_list; + + if (!rtpp_set_list) { + return 1; + } + + lock_get(rtpp_set_list->rset_head_lock); + for(rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL; + rtpp_list = rtpp_list->rset_next) { + rtpengine_delete_node_set(rtpp_list); + } + lock_release(rtpp_set_list->rset_head_lock); + + return 1; +} + static int get_ip_type(char *str_addr) { @@ -511,6 +565,9 @@ static int bind_force_send_ip(int sock_idx) return 0; } +static inline int str_cmp(const str *a , const str *b) { + return ! (a->len == b->len && ! strncmp(a->s, b->s, a->len)); +} static inline int str_eq(const str *p, const char *q) { int l = strlen(q); @@ -520,6 +577,7 @@ static inline int str_eq(const str *p, const char *q) { return 0; return 1; } + static inline str str_prefix(const str *p, const char *q) { str ret; ret.s = NULL; @@ -553,8 +611,7 @@ static int rtpengine_set_store(modparam_t type, void * val){ return -1; } } else {/*realloc to make room for the current set*/ - rtpp_strings = (char**)pkg_realloc(rtpp_strings, - (rtpp_sets+1)* sizeof(char*)); + rtpp_strings = (char**)pkg_realloc(rtpp_strings, (rtpp_sets+1)* sizeof(char*)); if(!rtpp_strings){ LM_ERR("no pkg memory left\n"); return -1; @@ -577,6 +634,28 @@ static int rtpengine_set_store(modparam_t type, void * val){ return 0; } +struct rtpp_node *get_rtpp_node(struct rtpp_set *rtpp_list, str *url) +{ + struct rtpp_node *rtpp_node; + + if (rtpp_list == NULL) { + return NULL; + } + + lock_get(rtpp_list->rset_lock); + rtpp_node = rtpp_list->rn_first; + while (rtpp_node) { + if (str_cmp(&rtpp_node->rn_url, url) == 0) { + lock_release(rtpp_list->rset_lock); + return rtpp_node; + } + rtpp_node = rtpp_node->rn_next; + } + lock_release(rtpp_list->rset_lock); + + return NULL; +} + struct rtpp_set *get_rtpp_set(int set_id) { struct rtpp_set * rtpp_list; @@ -585,15 +664,15 @@ struct rtpp_set *get_rtpp_set(int set_id) if (set_id < DEFAULT_RTPP_SET_ID ) { - LM_ERR(" invalid rtpproxy set value [%d]\n", - set_id); + LM_ERR(" invalid rtpproxy set value [%d]\n", set_id); return NULL; } my_current_id = set_id; /*search for the current_id*/ + lock_get(rtpp_set_list->rset_head_lock); rtpp_list = rtpp_set_list ? rtpp_set_list->rset_first : 0; - while( rtpp_list != 0 && rtpp_list->id_set!=my_current_id) + while (rtpp_list != 0 && rtpp_list->id_set!=my_current_id) rtpp_list = rtpp_list->rset_next; if (rtpp_list==NULL) @@ -601,11 +680,29 @@ struct rtpp_set *get_rtpp_set(int set_id) rtpp_list = shm_malloc(sizeof(struct rtpp_set)); if(!rtpp_list) { + lock_release(rtpp_set_list->rset_head_lock); LM_ERR("no shm memory left to create new rtpproxy set %d\n", my_current_id); return NULL; } memset(rtpp_list, 0, sizeof(struct rtpp_set)); rtpp_list->id_set = my_current_id; + rtpp_list->rset_lock = lock_alloc(); + if (!rtpp_list->rset_lock) { + lock_release(rtpp_set_list->rset_head_lock); + LM_ERR("no shm memory left to create rtpproxy set lock\n"); + shm_free(rtpp_list); + rtpp_list = NULL; + return NULL; + } + if (lock_init(rtpp_list->rset_lock) == 0) { + lock_release(rtpp_set_list->rset_head_lock); + LM_ERR("could not init rtpproxy set lock\n"); + lock_dealloc((void*)rtpp_list->rset_lock); + rtpp_list->rset_lock = NULL; + shm_free(rtpp_list); + rtpp_list = NULL; + return NULL; + } new_list = 1; } else { @@ -614,15 +711,6 @@ struct rtpp_set *get_rtpp_set(int set_id) if (new_list) { - if(!rtpp_set_list){/*initialize the list of set*/ - rtpp_set_list = shm_malloc(sizeof(struct rtpp_set_head)); - if(!rtpp_set_list){ - LM_ERR("no shm memory left to create list of proxysets\n"); - return NULL; - } - memset(rtpp_set_list, 0, sizeof(struct rtpp_set_head)); - } - /*update the list of set info*/ if (!rtpp_set_list->rset_first) { @@ -640,22 +728,27 @@ struct rtpp_set *get_rtpp_set(int set_id) default_rtpp_set = rtpp_list; } } + lock_release(rtpp_set_list->rset_head_lock); + return rtpp_list; } -int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy) +int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy, + unsigned int weight, int disabled, unsigned int ticks, int isDB) { /* Make rtp proxies list. */ char *p, *p1, *p2, *plim; struct rtpp_node *pnode; - int weight; + struct rtpp_node *rtpp_node; + unsigned int local_weight, port; + str s1; p = rtpproxy; plim = p + strlen(p); for(;;) { - weight = 1; + local_weight = weight; while (*p && isspace((int)*p)) ++p; if (p >= plim) @@ -665,33 +758,49 @@ int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy) ++p; if (p <= p1) break; /* may happen??? */ - /* Have weight specified? If yes, scan it */ - p2 = memchr(p1, '=', p - p1); - if (p2 != NULL) { - weight = strtoul(p2 + 1, NULL, 10); - } else { - p2 = p; + p2 = p; + + /* if called for database, consider simple, single char *URL */ + /* if called for config, consider weight URL */ + if (!isDB) { + /* Have weight specified? If yes, scan it */ + p2 = memchr(p1, '=', p - p1); + if (p2 != NULL) { + local_weight = strtoul(p2 + 1, NULL, 10); + } else { + p2 = p; + } } + pnode = shm_malloc(sizeof(struct rtpp_node)); if (pnode == NULL) { LM_ERR("no shm memory left\n"); return -1; } memset(pnode, 0, sizeof(*pnode)); - pnode->idx = rtpp_no++; - pnode->rn_recheck_ticks = 0; - pnode->rn_weight = weight; + + lock_get(rtpp_no_lock); + pnode->idx = *rtpp_no; + + if (ticks == MI_MAX_RECHECK_TICKS) { + pnode->rn_recheck_ticks = ticks; + } else { + pnode->rn_recheck_ticks = ticks + get_ticks(); + } + pnode->rn_weight = local_weight; pnode->rn_umode = 0; - pnode->rn_disabled = 0; + pnode->rn_disabled = disabled; + pnode->rn_displayed = 1; pnode->rn_url.s = shm_malloc(p2 - p1 + 1); if (pnode->rn_url.s == NULL) { + lock_release(rtpp_no_lock); shm_free(pnode); LM_ERR("no shm memory left\n"); return -1; } memmove(pnode->rn_url.s, p1, p2 - p1); - pnode->rn_url.s[p2 - p1] = 0; - pnode->rn_url.len = p2-p1; + pnode->rn_url.s[p2 - p1] = 0; + pnode->rn_url.len = p2-p1; /* Leave only address in rn_address */ pnode->rn_address = pnode->rn_url.s; @@ -704,6 +813,63 @@ int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy) } else if (strncasecmp(pnode->rn_address, "unix:", 5) == 0) { pnode->rn_umode = 0; pnode->rn_address += 5; + } else { + lock_release(rtpp_no_lock); + LM_WARN("Node address must start with 'udp:' or 'udp6:' or 'unix:'. Ignore '%s'.\n", pnode->rn_address); + shm_free(pnode->rn_url.s); + shm_free(pnode); + + if (!isDB) { + continue; + } else { + return 0; + } + } + + /* Check the rn_address is 'hostname:port' */ + /* Check the rn_address port is valid */ + p1 = strchr(pnode->rn_address, ':'); + if (p1 != NULL) { + p1++; + } + + if (p1 != NULL && p1 != '\0') { + s1.s = p1; + s1.len = strlen(p1); + if (str2int(&s1, &port) < 0 || port > 0xFFFF) { + lock_release(rtpp_no_lock); + LM_WARN("Node address must end with a valid port number. Ignore '%s'.\n", pnode->rn_address); + shm_free(pnode->rn_url.s); + shm_free(pnode); + + if (!isDB) { + continue; + } else { + return 0; + } + } + } + + /* If node found in set, update it */ + rtpp_node = get_rtpp_node(rtpp_list, &pnode->rn_url); + + lock_get(rtpp_list->rset_lock); + if (rtpp_node) { + rtpp_node->rn_disabled = pnode->rn_disabled; + rtpp_node->rn_displayed = pnode->rn_displayed; + rtpp_node->rn_recheck_ticks = pnode->rn_recheck_ticks; + rtpp_node->rn_weight = pnode->rn_weight; + lock_release(rtpp_list->rset_lock); + lock_release(rtpp_no_lock); + + shm_free(pnode->rn_url.s); + shm_free(pnode); + + if (!isDB) { + continue; + } else { + return 0; + } } if (rtpp_list->rn_first == NULL) { @@ -714,15 +880,25 @@ int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy) rtpp_list->rn_last = pnode; rtpp_list->rtpp_node_count++; + lock_release(rtpp_list->rset_lock); + + *rtpp_no = *rtpp_no + 1; + lock_release(rtpp_no_lock); + + if (!isDB) { + continue; + } else { + return 0; + } } return 0; } -/* 0-succes - * -1 - erorr +/* 0 - succes + * -1 - erorr * */ -static int rtpengine_add_rtpengine_set( char * rtp_proxies) +static int rtpengine_add_rtpengine_set(char * rtp_proxies, unsigned int weight, int disabled, unsigned int ticks) { char *p,*p2; struct rtpp_set * rtpp_list; @@ -775,7 +951,8 @@ static int rtpengine_add_rtpengine_set( char * rtp_proxies) if (rtpp_list != NULL) { - if (add_rtpengine_socks(rtpp_list, rtp_proxies) != 0) + + if (add_rtpengine_socks(rtpp_list, rtp_proxies, weight, disabled, ticks, 0) != 0) goto error; else return 0; @@ -833,41 +1010,40 @@ static int fixup_set_id(void ** param, int param_no) static int rtpp_test_ping(struct rtpp_node *node) { - bencode_buffer_t bencbuf; - bencode_item_t *dict; - char *cp; - int ret; - - if (bencode_buffer_init(&bencbuf)) { - return -1; - } - dict = bencode_dictionary(&bencbuf); - bencode_dictionary_add_string(dict, "command", command_strings[OP_PING]); - - if (bencbuf.error) { - goto error; - } - - cp = send_rtpp_command(node, dict, &ret); - if (!cp) { - goto error; - } - - dict = bencode_decode_expect(&bencbuf, cp, ret, BENCODE_DICTIONARY); - if (!dict || bencode_dictionary_get_strcmp(dict, "result", "pong")) { - goto error; - } - - bencode_buffer_free(&bencbuf); - return 0; + bencode_buffer_t bencbuf; + bencode_item_t *dict; + char *cp; + int ret; + + if (bencode_buffer_init(&bencbuf)) { + return -1; + } + dict = bencode_dictionary(&bencbuf); + bencode_dictionary_add_string(dict, "command", command_strings[OP_PING]); + + if (bencbuf.error) { + goto error; + } + + cp = send_rtpp_command(node, dict, &ret); + if (!cp) { + goto error; + } + + dict = bencode_decode_expect(&bencbuf, cp, ret, BENCODE_DICTIONARY); + if (!dict || bencode_dictionary_get_strcmp(dict, "result", "pong")) { + goto error; + } + + bencode_buffer_free(&bencbuf); + return 0; error: - bencode_buffer_free(&bencbuf); - return -1; + bencode_buffer_free(&bencbuf); + return -1; } -static struct mi_root* mi_enable_rtp_proxy(struct mi_root *cmd_tree, - void *param ) +static struct mi_root* mi_enable_rtp_proxy(struct mi_root *cmd_tree, void *param) { struct mi_node *node, *crt_node; struct rtpp_set *rtpp_list; @@ -885,7 +1061,7 @@ static struct mi_root* mi_enable_rtp_proxy(struct mi_root *cmd_tree, enable = 0; if (rtpp_set_list == NULL) { - return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); + return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); } node = cmd_tree->node.kids; @@ -919,49 +1095,58 @@ static struct mi_root* mi_enable_rtp_proxy(struct mi_root *cmd_tree, return init_mi_tree(400, MI_MISSING_PARM_S, MI_MISSING_PARM_LEN); } - /* found a matching all - show all rtpp */ - if (strncmp(MI_ALL, rtpp_url.s, MI_ALL_LEN) == 0) { - found = MI_FOUND_ALL; - } + /* found a matching all - show all rtpp */ + if (strncmp(MI_ALL, rtpp_url.s, MI_ALL_LEN) == 0) { + found = MI_FOUND_ALL; + } + lock_get(rtpp_set_list->rset_head_lock); for(rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL; - rtpp_list = rtpp_list->rset_next) { + rtpp_list = rtpp_list->rset_next) { + lock_get(rtpp_list->rset_lock); for(crt_rtpp = rtpp_list->rn_first; crt_rtpp != NULL; - crt_rtpp = crt_rtpp->rn_next) { - - /* found a matching rtpp - show it */ - if (found == MI_FOUND_ALL || - (crt_rtpp->rn_url.len == rtpp_url.len && - strncmp(crt_rtpp->rn_url.s, rtpp_url.s, rtpp_url.len) == 0)) { - /* do ping when try to enable the rtpp */ - if (enable) { - - /* if ping success, enable the rtpp and reset ticks */ - if (rtpp_test_ping(crt_rtpp) == 0) { - crt_rtpp->rn_disabled = 0; - crt_rtpp->rn_recheck_ticks = MI_MIN_RECHECK_TICKS; - - /* if ping fail, disable the rtpps but _not_ permanently*/ - } else { - crt_rtpp->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; - crt_rtpp->rn_disabled = 1; - found_rtpp_disabled = 1; - } - - /* do not ping when disable the rtpp; disable it permanenty */ - } else { + crt_rtpp = crt_rtpp->rn_next) { + + if (!crt_rtpp->rn_displayed) { + continue; + } + + /* found a matching rtpp - show it */ + if (found == MI_FOUND_ALL || + (crt_rtpp->rn_url.len == rtpp_url.len && + strncmp(crt_rtpp->rn_url.s, rtpp_url.s, rtpp_url.len) == 0)) { + + /* do ping when try to enable the rtpp */ + if (enable) { + + /* if ping success, enable the rtpp and reset ticks */ + if (rtpp_test_ping(crt_rtpp) == 0) { + crt_rtpp->rn_disabled = 0; + crt_rtpp->rn_recheck_ticks = MI_MIN_RECHECK_TICKS; + + /* if ping fail, disable the rtpps but _not_ permanently*/ + } else { + crt_rtpp->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; + crt_rtpp->rn_disabled = 1; + found_rtpp_disabled = 1; + } + + /* do not ping when disable the rtpp; disable it permanenty */ + } else { crt_rtpp->rn_disabled = 1; - crt_rtpp->rn_recheck_ticks = MI_MAX_RECHECK_TICKS; - } + crt_rtpp->rn_recheck_ticks = MI_MAX_RECHECK_TICKS; + } - if (found == MI_FOUND_NONE) { - found = MI_FOUND_ONE; - found_rtpp = crt_rtpp; - } - } + if (found == MI_FOUND_NONE) { + found = MI_FOUND_ONE; + found_rtpp = crt_rtpp; + } + } } + lock_release(rtpp_list->rset_lock); } + lock_release(rtpp_set_list->rset_head_lock); root = init_mi_tree(200, MI_OK_S, MI_OK_LEN); if (!root) { @@ -971,49 +1156,45 @@ static struct mi_root* mi_enable_rtp_proxy(struct mi_root *cmd_tree, node = &root->node; switch (found) { - case MI_FOUND_ALL: - snode.s = MI_ALL; - snode.len = MI_ALL_LEN; - break; - case MI_FOUND_ONE: - snode.s = found_rtpp->rn_url.s; - snode.len = found_rtpp->rn_url.len; - break; - default: - if (root) { - free_mi_tree(root); - } - return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); - } - - svalue.s = MI_SUCCESS; - svalue.len = MI_SUCCESS_LEN; - - if (enable) { - sattr.s = MI_ENABLE; - sattr.len = MI_ENABLE_LEN; - - if (found_rtpp_disabled) { - svalue.s = MI_FAIL; - svalue.len = MI_FAIL_LEN; - } - } else { - sattr.s = MI_DISABLE; - sattr.len = MI_DISABLE_LEN; - } - - if (!(crt_node = add_mi_node_child(node, 0, - snode.s, snode.len, - 0, 0)) ) { - LM_ERR("cannot add the child node to the tree\n"); - goto error; - } - - if ((attr = add_mi_attr(crt_node, MI_DUP_VALUE, - sattr.s, sattr.len, - svalue.s, svalue.len)) == 0) { - LM_ERR("cannot add attributes to the node\n"); - goto error; + case MI_FOUND_ALL: + snode.s = MI_ALL; + snode.len = MI_ALL_LEN; + break; + case MI_FOUND_ONE: + snode.s = found_rtpp->rn_url.s; + snode.len = found_rtpp->rn_url.len; + break; + default: + if (root) { + free_mi_tree(root); + } + return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); + } + + svalue.s = MI_SUCCESS; + svalue.len = MI_SUCCESS_LEN; + + if (enable) { + sattr.s = MI_ENABLE; + sattr.len = MI_ENABLE_LEN; + + if (found_rtpp_disabled) { + svalue.s = MI_FAIL; + svalue.len = MI_FAIL_LEN; + } + } else { + sattr.s = MI_DISABLE; + sattr.len = MI_DISABLE_LEN; + } + + if (!(crt_node = add_mi_node_child(node, 0, snode.s, snode.len, 0, 0))) { + LM_ERR("cannot add the child node to the tree\n"); + goto error; + } + + if ((attr = add_mi_attr(crt_node, MI_DUP_VALUE, sattr.s, sattr.len, svalue.s, svalue.len)) == 0) { + LM_ERR("cannot add attributes to the node\n"); + goto error; } @@ -1021,7 +1202,7 @@ static struct mi_root* mi_enable_rtp_proxy(struct mi_root *cmd_tree, error: if (root) { - free_mi_tree(root); + free_mi_tree(root); } return init_mi_tree(404, MI_ERROR, MI_ERROR_LEN); } @@ -1038,17 +1219,15 @@ static struct mi_root* mi_enable_rtp_proxy(struct mi_root *cmd_tree, goto _error;\ }\ if(((_child) = add_mi_node_child((_parent), MI_DUP_VALUE, (_name), \ - (_name_len), (_string), (_len)) ) == 0)\ + (_name_len), (_string), (_len))) == 0)\ goto _error;\ }while(0); -static int add_rtpp_node_info (struct mi_node *node, - struct rtpp_node *crt_rtpp, - struct rtpp_set *rtpp_list) +static int add_rtpp_node_info (struct mi_node *node, struct rtpp_node *crt_rtpp, struct rtpp_set *rtpp_list) { int id_len, len; - int rtpp_ticks; + int rtpp_ticks; struct mi_node *crt_node, *child; struct mi_attr *attr; char *string, *id; @@ -1056,55 +1235,52 @@ static int add_rtpp_node_info (struct mi_node *node, string = id = 0; id = int2str(rtpp_list->id_set, &id_len); - if(!id) { - LM_ERR("cannot convert set id\n"); - goto error; + if (!id) { + LM_ERR("cannot convert set id\n"); + goto error; } - if(!(crt_node = add_mi_node_child(node, 0, crt_rtpp->rn_url.s, - crt_rtpp->rn_url.len, 0,0)) ) { - LM_ERR("cannot add the child node to the tree\n"); - goto error; + if (!(crt_node = add_mi_node_child(node, 0, crt_rtpp->rn_url.s, crt_rtpp->rn_url.len, 0,0))) { + LM_ERR("cannot add the child node to the tree\n"); + goto error; } LM_DBG("adding node name %s \n",crt_rtpp->rn_url.s ); - if((attr = add_mi_attr(crt_node, MI_DUP_VALUE, MI_SET, MI_SET_LEN, - id, id_len))== 0) { - LM_ERR("cannot add attributes to the node\n"); - goto error; + if ((attr = add_mi_attr(crt_node, MI_DUP_VALUE, MI_SET, MI_SET_LEN, id, id_len)) == 0) { + LM_ERR("cannot add attributes to the node\n"); + goto error; } add_rtpp_node_int_info(crt_node, MI_INDEX, MI_INDEX_LEN, - crt_rtpp->idx, child, len, string, error); + crt_rtpp->idx, child, len, string, error); if ((1 == crt_rtpp->rn_disabled ) && (crt_rtpp->rn_recheck_ticks == MI_MAX_RECHECK_TICKS)) { - if( !(child = add_mi_node_child(crt_node, MI_DUP_VALUE, MI_DISABLED, MI_DISABLED_LEN, - MI_DISABLED_PERMANENT, MI_DISABLED_PERMANENT_LEN))) { - LM_ERR("cannot add disabled (permanent) message\n"); - goto error; - } - } - else { - add_rtpp_node_int_info(crt_node, MI_DISABLED, MI_DISABLED_LEN, - crt_rtpp->rn_disabled, child, len, string, error); + if (!(child = add_mi_node_child(crt_node, MI_DUP_VALUE, MI_DISABLED, MI_DISABLED_LEN, + MI_DISABLED_PERMANENT, MI_DISABLED_PERMANENT_LEN))) { + LM_ERR("cannot add disabled (permanent) message\n"); + goto error; + } + } else { + add_rtpp_node_int_info(crt_node, MI_DISABLED, MI_DISABLED_LEN, + crt_rtpp->rn_disabled, child, len, string, error); } add_rtpp_node_int_info(crt_node, MI_WEIGHT, MI_WEIGHT_LEN, - crt_rtpp->rn_weight, child, len, string, error); + crt_rtpp->rn_weight, child, len, string, error); if (crt_rtpp->rn_recheck_ticks == MI_MAX_RECHECK_TICKS) { - if( !(child = add_mi_node_child(crt_node, MI_DUP_VALUE, - MI_RECHECK_TICKS, MI_RECHECK_T_LEN, - "N/A", sizeof("N/A") - 1))) { - LM_ERR("cannot add MAX recheck_ticks value\n"); - goto error; - } + if (!(child = add_mi_node_child(crt_node, MI_DUP_VALUE, + MI_RECHECK_TICKS, MI_RECHECK_T_LEN, + "N/A", sizeof("N/A") - 1))) { + LM_ERR("cannot add MAX recheck_ticks value\n"); + goto error; + } } else { - rtpp_ticks = crt_rtpp->rn_recheck_ticks - get_ticks(); - rtpp_ticks = rtpp_ticks < 0 ? 0 : rtpp_ticks; - add_rtpp_node_int_info(crt_node, MI_RECHECK_TICKS, MI_RECHECK_T_LEN, - rtpp_ticks, child, len, string, error); + rtpp_ticks = crt_rtpp->rn_recheck_ticks - get_ticks(); + rtpp_ticks = rtpp_ticks < 0 ? 0 : rtpp_ticks; + add_rtpp_node_int_info(crt_node, MI_RECHECK_TICKS, MI_RECHECK_T_LEN, + rtpp_ticks, child, len, string, error); } return 0; @@ -1125,7 +1301,7 @@ static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree, void* param) found = MI_FOUND_NONE; if (rtpp_set_list == NULL) { - return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); + return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); } node = cmd_tree->node.kids; @@ -1139,7 +1315,7 @@ static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree, void* param) rtpp_url = node->value; if (strncmp(MI_ALL, rtpp_url.s, MI_ALL_LEN) != 0 && rtpp_set_list == NULL) { - return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); + return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); } node = node->next; @@ -1155,42 +1331,52 @@ static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree, void* param) node = &root->node; - /* found a matching all - show all rtpp */ - if (strncmp(MI_ALL, rtpp_url.s, MI_ALL_LEN) == 0) { - found = MI_FOUND_ALL; - } + /* found a matching all - show all rtpp */ + if (strncmp(MI_ALL, rtpp_url.s, MI_ALL_LEN) == 0) { + found = MI_FOUND_ALL; + } + lock_get(rtpp_set_list->rset_head_lock); for(rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL; - rtpp_list = rtpp_list->rset_next) { + rtpp_list = rtpp_list->rset_next) { + lock_get(rtpp_list->rset_lock); for(crt_rtpp = rtpp_list->rn_first; crt_rtpp != NULL; - crt_rtpp = crt_rtpp->rn_next) { + crt_rtpp = crt_rtpp->rn_next) { - /* found a matching rtpp - show it */ - if (found == MI_FOUND_ALL || - (crt_rtpp->rn_url.len == rtpp_url.len && - strncmp(crt_rtpp->rn_url.s, rtpp_url.s, rtpp_url.len) == 0)) { + if (!crt_rtpp->rn_displayed) { + continue; + } - if (add_rtpp_node_info(node, crt_rtpp, rtpp_list) < 0) { - goto error; - } + /* found a matching rtpp - show it */ + if (found == MI_FOUND_ALL || + (crt_rtpp->rn_url.len == rtpp_url.len && + strncmp(crt_rtpp->rn_url.s, rtpp_url.s, rtpp_url.len) == 0)) { - if (found == MI_FOUND_NONE) { - found = MI_FOUND_ONE; - } - } + if (add_rtpp_node_info(node, crt_rtpp, rtpp_list) < 0) { + lock_release(rtpp_list->rset_lock); + lock_release(rtpp_set_list->rset_head_lock); + goto error; + } + + if (found == MI_FOUND_NONE) { + found = MI_FOUND_ONE; + } + } } + lock_release(rtpp_list->rset_lock); } + lock_release(rtpp_set_list->rset_head_lock); switch (found) { - case MI_FOUND_ALL: - case MI_FOUND_ONE: - break; - default: - if (root) { - free_mi_tree(root); - } - return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); + case MI_FOUND_ALL: + case MI_FOUND_ONE: + break; + default: + if (root) { + free_mi_tree(root); + } + return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); } return root; @@ -1218,7 +1404,7 @@ static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree, void* param) found_rtpp = NULL; if (rtpp_set_list == NULL) { - return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); + return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); } node = cmd_tree->node.kids; @@ -1237,36 +1423,44 @@ static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree, void* param) return init_mi_tree( 400, MI_MISSING_PARM_S, MI_MISSING_PARM_LEN); } - /* found a matching all - ping all rtpp */ - if (strncmp(MI_ALL, rtpp_url.s, MI_ALL_LEN) == 0) { - found = MI_FOUND_ALL; - } + /* found a matching all - ping all rtpp */ + if (strncmp(MI_ALL, rtpp_url.s, MI_ALL_LEN) == 0) { + found = MI_FOUND_ALL; + } + lock_get(rtpp_set_list->rset_head_lock); for (rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL; - rtpp_list = rtpp_list->rset_next) { + rtpp_list = rtpp_list->rset_next) { + lock_get(rtpp_list->rset_lock); for (crt_rtpp = rtpp_list->rn_first; crt_rtpp != NULL; - crt_rtpp = crt_rtpp->rn_next) { + crt_rtpp = crt_rtpp->rn_next) { - /* found a matching rtpp - ping it */ - if (found == MI_FOUND_ALL || - (crt_rtpp->rn_url.len == rtpp_url.len && - strncmp(crt_rtpp->rn_url.s, rtpp_url.s, rtpp_url.len) == 0)) { + if (!crt_rtpp->rn_displayed) { + continue; + } + + /* found a matching rtpp - ping it */ + if (found == MI_FOUND_ALL || + (crt_rtpp->rn_url.len == rtpp_url.len && + strncmp(crt_rtpp->rn_url.s, rtpp_url.s, rtpp_url.len) == 0)) { - /* if ping fail */ - if (rtpp_test_ping(crt_rtpp) < 0) { - crt_rtpp->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; - found_rtpp_disabled = 1; - crt_rtpp->rn_disabled = 1; - } + /* if ping fail */ + if (rtpp_test_ping(crt_rtpp) < 0) { + crt_rtpp->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; + found_rtpp_disabled = 1; + crt_rtpp->rn_disabled = 1; + } - if (found == MI_FOUND_NONE) { - found = MI_FOUND_ONE; - found_rtpp = crt_rtpp; - } - } - } + if (found == MI_FOUND_NONE) { + found = MI_FOUND_ONE; + found_rtpp = crt_rtpp; + } + } + } + lock_release(rtpp_list->rset_lock); } + lock_release(rtpp_set_list->rset_head_lock); root = init_mi_tree(200, MI_OK_S, MI_OK_LEN); if (!root) { @@ -1277,51 +1471,47 @@ static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree, void* param) node = &root->node; switch (found) { - case MI_FOUND_ALL: - snode.s = MI_ALL; - snode.len = MI_ALL_LEN; - break; - case MI_FOUND_ONE: - snode.s = found_rtpp->rn_url.s; - snode.len = found_rtpp->rn_url.len; - break; - default: - if (root) { - free_mi_tree(root); - } - return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); - } - - sattr.s = MI_PING; - sattr.len = MI_PING_LEN; - - if (found_rtpp_disabled) { - svalue.s = MI_FAIL; - svalue.len = MI_FAIL_LEN; - } else { - svalue.s = MI_SUCCESS; - svalue.len = MI_SUCCESS_LEN; - } - - if (!(crt_node = add_mi_node_child(node, 0, - snode.s, snode.len, - 0, 0)) ) { - LM_ERR("cannot add the child node to the tree\n"); - goto error; - } - - if ((attr = add_mi_attr(crt_node, MI_DUP_VALUE, - sattr.s, sattr.len, - svalue.s, svalue.len)) == 0) { - LM_ERR("cannot add attributes to the node\n"); - goto error; + case MI_FOUND_ALL: + snode.s = MI_ALL; + snode.len = MI_ALL_LEN; + break; + case MI_FOUND_ONE: + snode.s = found_rtpp->rn_url.s; + snode.len = found_rtpp->rn_url.len; + break; + default: + if (root) { + free_mi_tree(root); + } + return init_mi_tree(404, MI_RTP_PROXY_NOT_FOUND, MI_RTP_PROXY_NOT_FOUND_LEN); + } + + sattr.s = MI_PING; + sattr.len = MI_PING_LEN; + + if (found_rtpp_disabled) { + svalue.s = MI_FAIL; + svalue.len = MI_FAIL_LEN; + } else { + svalue.s = MI_SUCCESS; + svalue.len = MI_SUCCESS_LEN; + } + + if (!(crt_node = add_mi_node_child(node, 0, snode.s, snode.len, 0, 0))) { + LM_ERR("cannot add the child node to the tree\n"); + goto error; + } + + if ((attr = add_mi_attr(crt_node, MI_DUP_VALUE, sattr.s, sattr.len, svalue.s, svalue.len)) == 0) { + LM_ERR("cannot add attributes to the node\n"); + goto error; } return root; error: if (root) { - free_mi_tree(root); + free_mi_tree(root); } return init_mi_tree(404, MI_ERROR, MI_ERROR_LEN); } @@ -1344,7 +1534,7 @@ static struct mi_root* mi_show_hash_total(struct mi_root* cmd_tree, void* param) node = &root->node; // Create new node and add it to the roots's kids - if(!(crt_node = add_mi_node_child(node, MI_DUP_NAME, "total", strlen("total"), 0, 0))) { + if (!(crt_node = add_mi_node_child(node, MI_DUP_NAME, "total", strlen("total"), 0, 0))) { LM_ERR("cannot add the child node to the tree\n"); goto error; } @@ -1363,12 +1553,54 @@ static struct mi_root* mi_show_hash_total(struct mi_root* cmd_tree, void* param) error: if (root) { - free_mi_tree(root); + free_mi_tree(root); } return init_mi_tree(404, MI_HASH_ENTRIES_FAIL, MI_HASH_ENTRIES_FAIL_LEN); } +static struct mi_root* +mi_reload_rtp_proxy(struct mi_root* cmd_tree, void* param) +{ + struct mi_root *root = NULL; + unsigned int current_rtpp_no; + + if (rtpp_db_url.s == NULL) { + // no database + root = init_mi_tree(404, MI_DB_NOT_FOUND, MI_DB_NOT_FOUND_LEN); + if (!root) { + LM_ERR("the MI tree cannot be initialized!\n"); + return 0; + } + } else { + if (init_rtpproxy_db() < 0) { + // fail reloading from database + root = init_mi_tree(404, MI_DB_ERR, MI_DB_ERR_LEN); + if (!root) { + LM_ERR("the MI tree cannot be initialized!\n"); + return 0; + } + } else { + lock_get(rtpp_no_lock); + current_rtpp_no = *rtpp_no; + lock_release(rtpp_no_lock); + + if (rtpp_socks_size != current_rtpp_no) { + build_rtpp_socks(current_rtpp_no); + } + + // success reloading from database + root = init_mi_tree(200, MI_DB_OK, MI_DB_OK_LEN); + if (!root) { + LM_ERR("the MI tree cannot be initialized!\n"); + return 0; + } + } + } + + return root; +} + static int mod_init(void) @@ -1384,15 +1616,50 @@ mod_init(void) return -1; } - /* any rtpproxy configured? */ - if(rtpp_set_list) - default_rtpp_set = select_rtpp_set(DEFAULT_RTPP_SET_ID); + rtpp_no = (unsigned int*)shm_malloc(sizeof(unsigned int)); + if (!rtpp_no) { + LM_ERR("no more shm memory for rtpp_no\n"); + return -1; + } + *rtpp_no = 0; + + rtpp_no_lock = lock_alloc(); + if (!rtpp_no_lock) { + LM_ERR("no more shm memory for rtpp_no_lock\n"); + return -1; + } + + if (lock_init(rtpp_no_lock) == 0) { + LM_ERR("could not init rtpp_no_lock\n"); + return -1; + } + + /* initialize the list of set; mod_destroy does shm_free() if fail */ + if (!rtpp_set_list) { + rtpp_set_list = shm_malloc(sizeof(struct rtpp_set_head)); + if(!rtpp_set_list){ + LM_ERR("no shm memory left to create list of proxysets\n"); + return -1; + } + memset(rtpp_set_list, 0, sizeof(struct rtpp_set_head)); + + rtpp_set_list->rset_head_lock = lock_alloc(); + if (!rtpp_set_list->rset_head_lock) { + LM_ERR("no shm memory left to create list of proxysets lock\n"); + return -1; + } + + if (lock_init(rtpp_set_list->rset_head_lock) == 0) { + LM_ERR("could not init rtpproxy list of proxysets lock\n"); + return -1; + } + } if (rtpp_db_url.s == NULL) { /* storing the list of rtp proxy sets in shared memory*/ for(i=0;itype != PVT_AVP) && - (rtp_inst_pvar->type != PVT_XAVP) && - (rtp_inst_pvar->type != PVT_SCRIPTVAR))) { - LM_ERR("Invalid pvar name <%.*s>\n", rtp_inst_pv_param.len, rtp_inst_pv_param.s); - return -1; + ((rtp_inst_pvar->type != PVT_AVP) && + (rtp_inst_pvar->type != PVT_XAVP) && + (rtp_inst_pvar->type != PVT_SCRIPTVAR))) { + LM_ERR("Invalid pvar name <%.*s>\n", rtp_inst_pv_param.len, rtp_inst_pv_param.s); + return -1; } } @@ -1436,27 +1707,25 @@ mod_init(void) } if (setid_avp_param) { - s.s = setid_avp_param; s.len = strlen(s.s); - avp_spec = pv_cache_get(&s); - if (avp_spec==NULL || (avp_spec->type != PVT_AVP)) { - LM_ERR("malformed or non AVP definition <%s>\n", - setid_avp_param); - return -1; - } - if (pv_get_avp_name(0, &(avp_spec->pvp), &setid_avp, - &avp_flags) != 0) { - LM_ERR("invalid AVP definition <%s>\n", setid_avp_param); - return -1; - } - setid_avp_type = avp_flags; + s.s = setid_avp_param; s.len = strlen(s.s); + avp_spec = pv_cache_get(&s); + if (avp_spec==NULL || (avp_spec->type != PVT_AVP)) { + LM_ERR("malformed or non AVP definition <%s>\n", setid_avp_param); + return -1; + } + if (pv_get_avp_name(0, &(avp_spec->pvp), &setid_avp, &avp_flags) != 0) { + LM_ERR("invalid AVP definition <%s>\n", setid_avp_param); + return -1; + } + setid_avp_type = avp_flags; } if (write_sdp_pvar_str.len > 0) { write_sdp_pvar = pv_cache_get(&write_sdp_pvar_str); if (write_sdp_pvar == NULL - || (write_sdp_pvar->type != PVT_AVP && write_sdp_pvar->type != PVT_SCRIPTVAR) ) { + || (write_sdp_pvar->type != PVT_AVP && write_sdp_pvar->type != PVT_SCRIPTVAR) ) { LM_ERR("write_sdp_pv: not a valid AVP or VAR definition <%.*s>\n", - write_sdp_pvar_str.len, write_sdp_pvar_str.s); + write_sdp_pvar_str.len, write_sdp_pvar_str.s); return -1; } } @@ -1464,9 +1733,9 @@ mod_init(void) if (read_sdp_pvar_str.len > 0) { read_sdp_pvar = pv_cache_get(&read_sdp_pvar_str); if (read_sdp_pvar == NULL - || (read_sdp_pvar->type != PVT_AVP && read_sdp_pvar->type != PVT_SCRIPTVAR) ) { + || (read_sdp_pvar->type != PVT_AVP && read_sdp_pvar->type != PVT_SCRIPTVAR) ) { LM_ERR("read_sdp_pv: not a valid AVP or VAR definition <%.*s>\n", - read_sdp_pvar_str.len, read_sdp_pvar_str.s); + read_sdp_pvar_str.len, read_sdp_pvar_str.s); return -1; } } @@ -1482,14 +1751,14 @@ mod_init(void) if (load_tm_api( &tmb ) < 0) { LM_DBG("could not load the TM-functions - answer-offer model" - " auto-detection is disabled\n"); + " auto-detection is disabled\n"); memset(&tmb, 0, sizeof(struct tm_binds)); } /* Determine IP addr type (IPv4 or IPv6 allowed) */ force_send_ip_af = get_ip_type(force_send_ip_str); if (force_send_ip_af != AF_INET && force_send_ip_af != AF_INET6 && - strlen(force_send_ip_str) > 0) { + strlen(force_send_ip_str) > 0) { LM_ERR("%s is an unknown address\n", force_send_ip_str); return -1; } @@ -1502,14 +1771,19 @@ mod_init(void) LM_DBG("rtpengine_hash_table_init(%d) success!\n", hash_table_size); } + /* select the default set */ + default_rtpp_set = select_rtpp_set(setid_default); + if (!default_rtpp_set) { + LM_NOTICE("Default rtpp set %d NOT found\n", setid_default); + } else { + LM_DBG("Default rtpp set %d found\n", setid_default); + } + return 0; } - -static int -child_init(int rank) -{ - int n; +static int build_rtpp_socks(unsigned int current_rtpp_no) { + int n, i; char *cp; struct addrinfo hints, *res; struct rtpp_set *rtpp_list; @@ -1518,22 +1792,26 @@ child_init(int rank) int ip_mtu_discover = IP_PMTUDISC_DONT; #endif - if(rtpp_set_list==NULL ) - return 0; - - /* Iterate known RTP proxies - create sockets */ - mypid = getpid(); + // close current sockets + for (i = 0; i < rtpp_socks_size; i++) { + if (rtpp_socks[i] >= 0) { + close(rtpp_socks[i]); + } + } - rtpp_socks = (int*)pkg_malloc( sizeof(int)*rtpp_no ); - if (rtpp_socks==NULL) { - LM_ERR("no more pkg memory\n"); + rtpp_socks_size = current_rtpp_no; + rtpp_socks = (int*)pkg_realloc(rtpp_socks, sizeof(int)*(rtpp_socks_size)); + if (!rtpp_socks) { + LM_ERR("no more pkg memory for rtpp_socks\n"); return -1; } - for(rtpp_list = rtpp_set_list->rset_first; rtpp_list != 0; - rtpp_list = rtpp_list->rset_next){ + lock_get(rtpp_set_list->rset_head_lock); + for (rtpp_list = rtpp_set_list->rset_first; rtpp_list != 0; + rtpp_list = rtpp_list->rset_next) { - for (pnode=rtpp_list->rn_first; pnode!=0; pnode = pnode->rn_next){ + lock_get(rtpp_list->rset_lock); + for (pnode=rtpp_list->rn_first; pnode!=0; pnode = pnode->rn_next) { char *hostname; if (pnode->rn_umode == 0) { @@ -1548,7 +1826,8 @@ child_init(int rank) hostname = (char*)pkg_malloc(sizeof(char) * (strlen(pnode->rn_address) + 1)); if (hostname==NULL) { LM_ERR("no more pkg memory\n"); - return -1; + rtpp_socks[pnode->idx] = -1; + continue; } strcpy(hostname, pnode->rn_address); @@ -1567,22 +1846,23 @@ child_init(int rank) if ((n = getaddrinfo(hostname, cp, &hints, &res)) != 0) { LM_ERR("%s\n", gai_strerror(n)); pkg_free(hostname); - return -1; + rtpp_socks[pnode->idx] = -1; + continue; } pkg_free(hostname); rtpp_socks[pnode->idx] = socket((pnode->rn_umode == 6) - ? AF_INET6 : AF_INET, SOCK_DGRAM, 0); + ? AF_INET6 : AF_INET, SOCK_DGRAM, 0); if (rtpp_socks[pnode->idx] == -1) { LM_ERR("can't create socket\n"); freeaddrinfo(res); - return -1; + continue; } #ifdef IP_MTU_DISCOVER setsockopt(rtpp_socks[pnode->idx], IPPROTO_IP, - IP_MTU_DISCOVER, &ip_mtu_discover, - sizeof(ip_mtu_discover)); + IP_MTU_DISCOVER, &ip_mtu_discover, + sizeof(ip_mtu_discover)); #endif if (bind_force_send_ip(pnode->idx) == -1) { @@ -1590,7 +1870,7 @@ child_init(int rank) close(rtpp_socks[pnode->idx]); rtpp_socks[pnode->idx] = -1; freeaddrinfo(res); - return -1; + continue; } if (connect(rtpp_socks[pnode->idx], res->ai_addr, res->ai_addrlen) == -1) { @@ -1598,13 +1878,41 @@ child_init(int rank) close(rtpp_socks[pnode->idx]); rtpp_socks[pnode->idx] = -1; freeaddrinfo(res); - return -1; + continue; } freeaddrinfo(res); rptest: pnode->rn_disabled = rtpp_test(pnode, 0, 1); } + lock_release(rtpp_list->rset_lock); + } + lock_release(rtpp_set_list->rset_head_lock); + + return 0; +} + +static int +child_init(int rank) +{ + if(!rtpp_set_list) + return 0; + + mypid = getpid(); + + lock_get(rtpp_no_lock); + rtpp_socks_size = *rtpp_no; + lock_release(rtpp_no_lock); + + rtpp_socks = (int*)pkg_malloc(sizeof(int)*(rtpp_socks_size)); + if (!rtpp_socks) { + LM_ERR("no more pkg memory for rtpp_socks\n"); + return -1; + } + + /* Iterate known RTP proxies - create sockets */ + if (rtpp_socks_size) { + build_rtpp_socks(rtpp_socks_size); } return 0; @@ -1620,11 +1928,39 @@ static void mod_destroy(void) if (natping_state) shm_free(natping_state); - if(rtpp_set_list == NULL) + if (rtpp_no) { + shm_free(rtpp_no); + rtpp_no = NULL; + } + + if (rtpp_no_lock) { + lock_destroy(rtpp_no_lock); + lock_dealloc(rtpp_no_lock); + rtpp_no_lock = NULL; + } + + if (!rtpp_set_list) { + return; + } + + if (!rtpp_set_list->rset_head_lock) { + shm_free(rtpp_set_list); + rtpp_set_list = NULL; return; + } + lock_get(rtpp_set_list->rset_head_lock); for(crt_list = rtpp_set_list->rset_first; crt_list != NULL; ){ + if (!crt_list->rset_lock) { + last_list = crt_list; + crt_list = last_list->rset_next; + shm_free(last_list); + last_list = NULL; + continue; + } + + lock_get(crt_list->rset_lock); for(crt_rtpp = crt_list->rn_first; crt_rtpp != NULL; ){ if(crt_rtpp->rn_url.s) @@ -1634,13 +1970,25 @@ static void mod_destroy(void) crt_rtpp = last_rtpp->rn_next; shm_free(last_rtpp); } - last_list = crt_list; crt_list = last_list->rset_next; + lock_release(crt_list->rset_lock); + + lock_destroy(last_list->rset_lock); + lock_dealloc((void*)last_list->rset_lock); + last_list->rset_lock = NULL; + shm_free(last_list); + last_list = NULL; } + lock_release(rtpp_set_list->rset_head_lock); + + lock_destroy(rtpp_set_list->rset_head_lock); + lock_dealloc((void*)rtpp_set_list->rset_head_lock); + rtpp_set_list->rset_head_lock = NULL; shm_free(rtpp_set_list); + rtpp_set_list = NULL; /* destroy the hastable which keeps the call-id <-> selected_node relation */ if (!rtpengine_hash_table_destroy()) { @@ -2018,11 +2366,11 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_ if (!bencode_dictionary_get_strcmp(resp, "result", "error")) { if (!bencode_dictionary_get_str(resp, "error-reason", &error)) { - LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp); + LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp); } else { if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) && - (strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0)) + (strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0)) { LM_WARN("proxy %.*s: %.*s", node->rn_url.len, node->rn_url.s , error.len, error.s); goto select_node; @@ -2093,7 +2441,7 @@ rtpp_test(struct rtpp_node *node, int isdisabled, int force) int ret; if(node->rn_recheck_ticks == MI_MAX_RECHECK_TICKS){ - LM_DBG("rtpp %s disabled for ever\n", node->rn_url.s); + LM_DBG("rtpp %s disabled for ever\n", node->rn_url.s); return 1; } if (force == 0) { @@ -2114,8 +2462,8 @@ rtpp_test(struct rtpp_node *node, int isdisabled, int force) cp = send_rtpp_command(node, dict, &ret); if (!cp) { - node->rn_disabled = 1; - node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; + node->rn_disabled = 1; + node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; LM_ERR("proxy did not respond to ping\n"); goto error; } @@ -2127,13 +2475,13 @@ rtpp_test(struct rtpp_node *node, int isdisabled, int force) } LM_INFO("rtp proxy <%s> found, support for it %senabled\n", - node->rn_url.s, force == 0 ? "re-" : ""); + node->rn_url.s, force == 0 ? "re-" : ""); bencode_buffer_free(&bencbuf); return 0; benc_error: - LM_ERR("out of memory - bencode failed\n"); + LM_ERR("out of memory - bencode failed\n"); error: bencode_buffer_free(&bencbuf); return 1; @@ -2148,7 +2496,7 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen) static char buf[0x10000]; struct pollfd fds[1]; struct iovec *v; - str out = STR_NULL; + str out = STR_NULL; v = bencode_iovec(dict, &vcnt, 1, 0); if (!v) { @@ -2162,7 +2510,7 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen) memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_LOCAL; strncpy(addr.sun_path, node->rn_address, - sizeof(addr.sun_path) - 1); + sizeof(addr.sun_path) - 1); #ifdef HAVE_SOCKADDR_SA_LEN addr.sun_len = strlen(addr.sun_path); #endif @@ -2200,7 +2548,7 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen) fds[0].revents = 0; /* Drain input buffer */ while ((poll(fds, 1, 0) == 1) && - ((fds[0].revents & POLLIN) != 0)) { + ((fds[0].revents & POLLIN) != 0)) { recv(rtpp_socks[node->idx], buf, sizeof(buf) - 1, 0); fds[0].revents = 0; } @@ -2211,12 +2559,12 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen) len = writev(rtpp_socks[node->idx], v, vcnt + 1); } while (len == -1 && (errno == EINTR || errno == ENOBUFS)); if (len <= 0) { - bencode_get_str(bencode_dictionary_get(dict, "command"), &out); + bencode_get_str(bencode_dictionary_get(dict, "command"), &out); LM_ERR("can't send command \"%.*s\" to RTP proxy <%s>\n", out.len, out.s, node->rn_url.s); goto badproxy; } while ((poll(fds, 1, rtpengine_tout_ms) == 1) && - (fds[0].revents & POLLIN) != 0) { + (fds[0].revents & POLLIN) != 0) { do { len = recv(rtpp_socks[node->idx], buf, sizeof(buf)-1, 0); } while (len == -1 && errno == EINTR); @@ -2225,7 +2573,7 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen) goto badproxy; } if (len >= (v[0].iov_len - 1) && - memcmp(buf, v[0].iov_base, (v[0].iov_len - 1)) == 0) { + memcmp(buf, v[0].iov_base, (v[0].iov_len - 1)) == 0) { len -= (v[0].iov_len - 1); cp += (v[0].iov_len - 1); if (len != 0) { @@ -2261,16 +2609,24 @@ static struct rtpp_set * select_rtpp_set(int id_set ){ struct rtpp_set * rtpp_list; /*is it a valid set_id?*/ - if(!rtpp_set_list || !rtpp_set_list->rset_first){ - LM_ERR("no rtp_proxy configured\n"); + if (!rtpp_set_list) { + LM_ERR("no rtpp_set_list\n"); + return 0; + } + + lock_get(rtpp_set_list->rset_head_lock); + if (!rtpp_set_list->rset_first) { + LM_ERR("no rtpp_set_list->rset_first\n"); + lock_release(rtpp_set_list->rset_head_lock); return 0; } - for(rtpp_list=rtpp_set_list->rset_first; rtpp_list!=0 && - rtpp_list->id_set!=id_set; rtpp_list=rtpp_list->rset_next); - if(!rtpp_list){ + for (rtpp_list=rtpp_set_list->rset_first; rtpp_list!=0 && + rtpp_list->id_set!=id_set; rtpp_list=rtpp_list->rset_next); + if (!rtpp_list) { LM_ERR(" script error-invalid id_set to be selected\n"); } + lock_release(rtpp_set_list->rset_head_lock); return rtpp_list; } @@ -2294,7 +2650,13 @@ select_rtpp_node_new(str callid, str viabranch, int do_test) retry: weight_sum = 0; + lock_get(active_rtpp_set->rset_lock); for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) { + /* Select only between displayed machines */ + if (!node->rn_displayed) { + continue; + } + /* Try to enable if it's time to try. */ if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){ node->rn_disabled = rtpp_test(node, 1, 0); @@ -2305,6 +2667,7 @@ select_rtpp_node_new(str callid, str viabranch, int do_test) weight_sum += node->rn_weight; } } + lock_release(active_rtpp_set->rset_lock); /* No proxies? Force all to be redetected, if not yet */ if (weight_sum == 0) { @@ -2314,9 +2677,16 @@ select_rtpp_node_new(str callid, str viabranch, int do_test) was_forced = 1; + lock_get(active_rtpp_set->rset_lock); for(node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) { + /* Select only between displayed machines */ + if (!node->rn_displayed) { + continue; + } + node->rn_disabled = rtpp_test(node, 1, 1); } + lock_release(active_rtpp_set->rset_lock); goto retry; } @@ -2327,27 +2697,40 @@ select_rtpp_node_new(str callid, str viabranch, int do_test) /* * Scan proxy list and decrease until appropriate proxy is found. */ + lock_get(active_rtpp_set->rset_lock); for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) { + /* Select only between displayed machines */ + if (!node->rn_displayed) { + continue; + } + /* Select only between enabled machines */ if (node->rn_disabled) continue; /* Found enabled machine */ - if (sumcut < node->rn_weight) + if (sumcut < node->rn_weight) { + lock_release(active_rtpp_set->rset_lock); goto found; + } /* Update sumcut if enabled machine */ sumcut -= node->rn_weight; } + lock_release(active_rtpp_set->rset_lock); /* No node list */ return NULL; found: if (do_test) { + lock_get(active_rtpp_set->rset_lock); node->rn_disabled = rtpp_test(node, node->rn_disabled, 0); - if (node->rn_disabled) + if (node->rn_disabled) { + lock_release(active_rtpp_set->rset_lock); goto retry; + } + lock_release(active_rtpp_set->rset_lock); } /* build the entry */ @@ -2357,10 +2740,10 @@ select_rtpp_node_new(str callid, str viabranch, int do_test) callid.len, callid.len, callid.s, viabranch.len, viabranch.s); return node; } - memset(entry, 0, sizeof(struct rtpengine_hash_entry)); + memset(entry, 0, sizeof(struct rtpengine_hash_entry)); /* fill the entry */ - if (callid.s && callid.len > 0) { + if (callid.s && callid.len > 0) { if (shm_str_dup(&entry->callid, &callid) < 0) { LM_ERR("rtpengine hash table fail to duplicate calllen=%d callid=%.*s\n", callid.len, callid.len, callid.s); @@ -2368,7 +2751,7 @@ select_rtpp_node_new(str callid, str viabranch, int do_test) return node; } } - if (viabranch.s && viabranch.len > 0) { + if (viabranch.s && viabranch.len > 0) { if (shm_str_dup(&entry->viabranch, &viabranch) < 0) { LM_ERR("rtpengine hash table fail to duplicate calllen=%d viabranch=%.*s\n", callid.len, viabranch.len, viabranch.s); @@ -2425,8 +2808,22 @@ static struct rtpp_node * select_rtpp_node(str callid, str viabranch, int do_test) { struct rtpp_node *node = NULL; + unsigned int current_rtpp_no; + + lock_get(rtpp_no_lock); + current_rtpp_no = *rtpp_no; + lock_release(rtpp_no_lock); + + if (rtpp_socks_size != current_rtpp_no) { + build_rtpp_socks(current_rtpp_no); + } - if(!active_rtpp_set) { + if (!active_rtpp_set) { + default_rtpp_set = select_rtpp_set(setid_default); + active_rtpp_set = default_rtpp_set; + } + + if (!active_rtpp_set) { LM_ERR("script error - no valid set selected\n"); return NULL; } @@ -2469,11 +2866,11 @@ select_rtpp_node(str callid, str viabranch, int do_test) static int get_extra_id(struct sip_msg* msg, str *id_str) { - if(msg==NULL || extra_id_pv==NULL || id_str==NULL) { + if (msg == NULL || extra_id_pv == NULL || id_str == NULL) { LM_ERR("bad parameters\n"); return -1; } - if (pv_printf_s(msg, extra_id_pv, id_str)<0) { + if (pv_printf_s(msg, extra_id_pv, id_str) < 0) { LM_ERR("cannot print the additional id\n"); return -1; } @@ -2485,36 +2882,34 @@ get_extra_id(struct sip_msg* msg, str *id_str) { static int set_rtpengine_set_from_avp(struct sip_msg *msg, int direction) { - struct usr_avp *avp; - int_str setid_val; - - if ((setid_avp_param == NULL) || - (avp = search_first_avp(setid_avp_type, setid_avp, &setid_val, 0)) - == NULL) - { - if (direction == 1 || !selected_rtpp_set_2) - active_rtpp_set = selected_rtpp_set_1; - else - active_rtpp_set = selected_rtpp_set_2; - return 1; - } - - if (avp->flags&AVP_VAL_STR) { - LM_ERR("setid_avp must hold an integer value\n"); - return -1; - } + struct usr_avp *avp; + int_str setid_val; - active_rtpp_set = select_rtpp_set(setid_val.n); - if(active_rtpp_set == NULL) { - LM_ERR("could not locate rtpproxy set %d\n", setid_val.n); - return -1; - } + if ((setid_avp_param == NULL) || + (avp = search_first_avp(setid_avp_type, setid_avp, &setid_val, 0)) == NULL) { + if (direction == 1 || !selected_rtpp_set_2) + active_rtpp_set = selected_rtpp_set_1; + else + active_rtpp_set = selected_rtpp_set_2; + return 1; + } + + if (avp->flags&AVP_VAL_STR) { + LM_ERR("setid_avp must hold an integer value\n"); + return -1; + } + + active_rtpp_set = select_rtpp_set(setid_val.n); + if(active_rtpp_set == NULL) { + LM_ERR("could not locate rtpproxy set %d\n", setid_val.n); + return -1; + } - LM_DBG("using rtpengine set %d\n", setid_val.n); + LM_DBG("using rtpengine set %d\n", setid_val.n); - current_msg_id = msg->id; + current_msg_id = msg->id; - return 1; + return 1; } static int rtpengine_delete(struct sip_msg *msg, const char *flags) { @@ -2529,7 +2924,7 @@ static int rtpengine_rtpp_set_wrap(struct sip_msg *msg, int (*func)(struct sip_m body_intermediate.s = NULL; if (set_rtpengine_set_from_avp(msg, direction) == -1) - return -1; + return -1; more = 1; if (!selected_rtpp_set_2 || selected_rtpp_set_2 == selected_rtpp_set_1) @@ -2544,7 +2939,7 @@ static int rtpengine_rtpp_set_wrap(struct sip_msg *msg, int (*func)(struct sip_m direction = (direction == 1) ? 2 : 1; if (set_rtpengine_set_from_avp(msg, direction) == -1) - return -1; + return -1; ret = func(msg, data, 0); body_intermediate.s = NULL; @@ -2597,23 +2992,25 @@ set_rtpengine_set_n(struct sip_msg *msg, rtpp_set_link_t *rtpl, struct rtpp_set } current_msg_id = msg->id; + lock_get((*out)->rset_lock); node = (*out)->rn_first; while (node != NULL) { - if (node->rn_disabled == 0) nb_active_nodes++; - node = node->rn_next; + if (node->rn_disabled == 0) nb_active_nodes++; + node = node->rn_next; } + lock_release((*out)->rset_lock); if ( nb_active_nodes > 0 ) { LM_DBG("rtpp: selected proxy set ID %d with %d active nodes.\n", - current_msg_id, nb_active_nodes); + current_msg_id, nb_active_nodes); return nb_active_nodes; } else { LM_WARN("rtpp: selected proxy set ID %d but it has no active node.\n", - current_msg_id); + current_msg_id); return -2; } } @@ -2651,8 +3048,8 @@ rtpengine_manage(struct sip_msg *msg, const char *flags) int method; int nosdp; - if(msg->cseq==NULL && ((parse_headers(msg, HDR_CSEQ_F, 0)==-1) - || (msg->cseq==NULL))) + if (msg->cseq==NULL && ((parse_headers(msg, HDR_CSEQ_F, 0)==-1) || + (msg->cseq==NULL))) { LM_ERR("no CSEQ header\n"); return -1; @@ -2660,19 +3057,19 @@ rtpengine_manage(struct sip_msg *msg, const char *flags) method = get_cseq(msg)->method_id; - if(!(method==METHOD_INVITE || method==METHOD_ACK || method==METHOD_CANCEL - || method==METHOD_BYE || method==METHOD_UPDATE)) + if (!(method==METHOD_INVITE || method==METHOD_ACK || method==METHOD_CANCEL + || method==METHOD_BYE || method==METHOD_UPDATE)) return -1; - if(method==METHOD_CANCEL || method==METHOD_BYE) + if (method==METHOD_CANCEL || method==METHOD_BYE) return rtpengine_delete(msg, flags); - if(msg->msg_flags & FL_SDP_BODY) + if (msg->msg_flags & FL_SDP_BODY) nosdp = 0; else nosdp = parse_sdp(msg); - if(msg->first_line.type == SIP_REQUEST) { + if (msg->first_line.type == SIP_REQUEST) { if(method==METHOD_ACK && nosdp==0) return rtpengine_offer_answer(msg, flags, OP_ANSWER, 0); if(method==METHOD_UPDATE && nosdp==0) @@ -2686,16 +3083,16 @@ rtpengine_manage(struct sip_msg *msg, const char *flags) return rtpengine_delete(msg, flags); return rtpengine_offer_answer(msg, flags, OP_OFFER, 0); } - } else if(msg->first_line.type == SIP_REPLY) { - if(msg->first_line.u.reply.statuscode>=300) + } else if (msg->first_line.type == SIP_REPLY) { + if (msg->first_line.u.reply.statuscode>=300) return rtpengine_delete(msg, flags); - if(nosdp==0) { - if(method==METHOD_UPDATE) + if (nosdp==0) { + if (method==METHOD_UPDATE) return rtpengine_offer_answer(msg, flags, OP_ANSWER, 0); - if(tmb.t_gett==NULL || tmb.t_gett()==NULL + if (tmb.t_gett==NULL || tmb.t_gett()==NULL || tmb.t_gett()==T_UNDEFINED) return rtpengine_offer_answer(msg, flags, OP_ANSWER, 0); - if(tmb.t_gett()->uas.request->msg_flags & FL_SDP_BODY) + if (tmb.t_gett()->uas.request->msg_flags & FL_SDP_BODY) return rtpengine_offer_answer(msg, flags, OP_ANSWER, 0); return rtpengine_offer_answer(msg, flags, OP_OFFER, 0); } @@ -2872,8 +3269,7 @@ static int rtpengine_rtpstat_wrap(struct sip_msg *msg, void *d, int more) { * Returns the current RTP-Statistics from the RTP-Proxy */ static int -pv_get_rtpstat_f(struct sip_msg *msg, pv_param_t *param, - pv_value_t *res) +pv_get_rtpstat_f(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) { void *parms[2]; @@ -2894,8 +3290,7 @@ set_rtp_inst_pvar(struct sip_msg *msg, const str * const uri) { val.flags = PV_VAL_STR; val.rs = *uri; - if (rtp_inst_pvar->setf(msg, &rtp_inst_pvar->pvp, (int)EQ_T, &val) < 0) - { + if (rtp_inst_pvar->setf(msg, &rtp_inst_pvar->pvp, (int)EQ_T, &val) < 0) { LM_ERR("Failed to add RTP Engine URI to pvar\n"); return -1; } diff --git a/modules/rtpengine/rtpengine.h b/modules/rtpengine/rtpengine.h index 71ce2645485..bb9114e9b63 100644 --- a/modules/rtpengine/rtpengine.h +++ b/modules/rtpengine/rtpengine.h @@ -26,47 +26,61 @@ #include "bencode.h" #include "../../str.h" +#define MI_MIN_RECHECK_TICKS 0 +#define MI_MAX_RECHECK_TICKS ((unsigned int)-1) + struct rtpp_node { unsigned int idx; /* overall index */ - str rn_url; /* unparsed, deletable */ - int rn_umode; - char *rn_address; /* substring of rn_url */ - int rn_disabled; /* found unaccessible? */ - unsigned rn_weight; /* for load balancing */ + str rn_url; /* unparsed, deletable */ + int rn_umode; + char *rn_address; /* substring of rn_url */ + int rn_disabled; /* found unaccessible? */ + unsigned int rn_weight; /* for load balancing */ + unsigned int rn_displayed; /* for delete at db reload */ unsigned int rn_recheck_ticks; - int rn_rep_supported; - int rn_ptl_supported; + int rn_rep_supported; + int rn_ptl_supported; struct rtpp_node *rn_next; }; -struct rtpp_set{ +struct rtpp_set { unsigned int id_set; - unsigned weight_sum; + unsigned int weight_sum; unsigned int rtpp_node_count; - int set_disabled; + int set_disabled; unsigned int set_recheck_ticks; struct rtpp_node *rn_first; struct rtpp_node *rn_last; - struct rtpp_set *rset_next; + struct rtpp_set *rset_next; + gen_lock_t *rset_lock; }; -struct rtpp_set_head{ +struct rtpp_set_head { struct rtpp_set *rset_first; struct rtpp_set *rset_last; + gen_lock_t *rset_head_lock; }; +struct rtpp_node *get_rtpp_node(struct rtpp_set *rtpp_list, str *url); struct rtpp_set *get_rtpp_set(int set_id); -int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy); +int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy, unsigned int weight, int disabled, unsigned int ticks, int isDB); + +int rtpengine_delete_node(struct rtpp_node *rtpp_node); +int rtpengine_delete_node_set(struct rtpp_set *rtpp_list); +int rtpengine_delete_node_all(); int init_rtpproxy_db(void); extern str rtpp_db_url; extern str rtpp_table_name; +extern str rtpp_setid_col; extern str rtpp_url_col; +extern str rtpp_weight_col; +extern str rtpp_disabled_col; #endif diff --git a/modules/rtpengine/rtpengine_db.c b/modules/rtpengine/rtpengine_db.c index 82c64bb4a1a..495118221c9 100644 --- a/modules/rtpengine/rtpengine_db.c +++ b/modules/rtpengine/rtpengine_db.c @@ -31,9 +31,11 @@ static db_func_t rtpp_dbf; static db1_con_t *rtpp_db_handle = NULL; str rtpp_db_url = {NULL, 0}; -str rtpp_table_name = str_init("rtpproxy"); -str rtpp_set_id_col = str_init("set_id"); +str rtpp_table_name = str_init("rtpengine"); +str rtpp_setid_col = str_init("setid"); str rtpp_url_col = str_init("url"); +str rtpp_weight_col = str_init("weight"); +str rtpp_disabled_col = str_init("disabled"); static int rtpp_connect_db(void) { @@ -63,13 +65,15 @@ static int rtpp_load_db(void) db1_res_t *res = NULL; db_val_t *values = NULL; db_row_t *rows = NULL; - db_key_t query_cols[] = {&rtpp_set_id_col, &rtpp_url_col}; + db_key_t query_cols[] = {&rtpp_setid_col, &rtpp_url_col, &rtpp_weight_col, &rtpp_disabled_col}; str url; - int set_id; + int setid, disabled; + unsigned int weight, ticks; + /* int weight, flags; */ int n_rows = 0; - int n_cols = 2; + int n_cols = 4; if (rtpp_db_handle == NULL) { @@ -87,6 +91,8 @@ static int rtpp_load_db(void) return -1; } + rtpengine_delete_node_all(); + n_rows = RES_ROW_N(res); rows = RES_ROWS(res); if (n_rows == 0) @@ -94,26 +100,35 @@ static int rtpp_load_db(void) LM_WARN("No rtpproxy instances in database\n"); return 0; } + for (i=0; ifrom_domainDB1_STR updatedDB1_INT + + + rtpengine + mysql + setidDB1_INT + urlDB1_STR + weightDB1_INT + disabledDB1_INT + rtpproxy @@ -3531,6 +3540,29 @@ + + rtpengine + show + rtpengine + DB1_QUERY + + setid + url + weight + disabled + + + add + rtpengine + DB1_INSERT + + setid + url + weight + disabled + + + rtpproxy show diff --git a/utils/kamctl/xhttp_pi/rtpengine-mod b/utils/kamctl/xhttp_pi/rtpengine-mod new file mode 100644 index 00000000000..e97e6043ef8 --- /dev/null +++ b/utils/kamctl/xhttp_pi/rtpengine-mod @@ -0,0 +1,23 @@ + + rtpengine + show + rtpengine + DB1_QUERY + + setid + url + weight + disabled + + + add + rtpengine + DB1_INSERT + + setid + url + weight + disabled + + + diff --git a/utils/kamctl/xhttp_pi/rtpengine-table b/utils/kamctl/xhttp_pi/rtpengine-table new file mode 100644 index 00000000000..4edb001b0a4 --- /dev/null +++ b/utils/kamctl/xhttp_pi/rtpengine-table @@ -0,0 +1,9 @@ + + + rtpengine + mysql + setidDB1_INT + urlDB1_STR + weightDB1_INT + disabledDB1_INT +