From ce710ce13a622c3c62334f2ee045b1983d3122f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pier=C5=9Bcionek?= Date: Sat, 27 Jul 2019 23:07:10 +0200 Subject: [PATCH] async: added support for millisecond resolution sleep - new ms_timer parameter to enable millisecond precision timer - new async_ms_route and async_ms_sleep functions with milliseconds as a param - implementation: Each async_ms_sleep adds an entry to a linked list sorted by expiry time. List is checked every ms_timer ms for expired entries. All expired entries are pushed for execution on a pool of async workers. --- src/modules/async/async_mod.c | 134 +++++++++++++++- src/modules/async/async_sleep.c | 210 +++++++++++++++++++++++++- src/modules/async/async_sleep.h | 8 +- src/modules/async/doc/async_admin.xml | 113 ++++++++++++++ 4 files changed, 454 insertions(+), 11 deletions(-) diff --git a/src/modules/async/async_mod.c b/src/modules/async/async_mod.c index 88affb3bbd3..3989966a4b8 100644 --- a/src/modules/async/async_mod.c +++ b/src/modules/async/async_mod.c @@ -41,15 +41,20 @@ MODULE_VERSION static int async_workers = 1; +static int async_ms_timer = 0; static int mod_init(void); static int child_init(int); static void mod_destroy(void); static int w_async_sleep(sip_msg_t *msg, char *sec, char *str2); +static int w_async_ms_sleep(sip_msg_t *msg, char *sec, char *str2); static int fixup_async_sleep(void **param, int param_no); + static int w_async_route(sip_msg_t *msg, char *rt, char *sec); +static int w_async_ms_route(sip_msg_t *msg, char *rt, char *sec); static int fixup_async_route(void **param, int param_no); + static int w_async_task_route(sip_msg_t *msg, char *rt, char *p2); static int fixup_async_task_route(void **param, int param_no); @@ -60,8 +65,12 @@ struct tm_binds tmb; static cmd_export_t cmds[]={ {"async_route", (cmd_function)w_async_route, 2, fixup_async_route, 0, REQUEST_ROUTE|FAILURE_ROUTE}, + {"async_ms_route", (cmd_function)w_async_ms_route, 2, fixup_async_route, + 0, REQUEST_ROUTE|FAILURE_ROUTE}, {"async_sleep", (cmd_function)w_async_sleep, 1, fixup_async_sleep, 0, REQUEST_ROUTE|FAILURE_ROUTE}, + {"async_ms_sleep", (cmd_function)w_async_ms_sleep, 1, fixup_async_sleep, + 0, REQUEST_ROUTE|FAILURE_ROUTE}, {"async_task_route", (cmd_function)w_async_task_route, 1, fixup_async_task_route, 0, REQUEST_ROUTE|FAILURE_ROUTE}, {0, 0, 0, 0, 0, 0} @@ -69,6 +78,7 @@ static cmd_export_t cmds[]={ static param_export_t params[]={ {"workers", INT_PARAM, &async_workers}, + {"ms_timer", INT_PARAM, &async_ms_timer}, {0, 0, 0} }; @@ -105,7 +115,17 @@ static int mod_init(void) return -1; } - register_basic_timers(async_workers); + if(async_ms_timer == 0) { + LM_INFO("ms_timer is set to 0. Disabling async_ms_sleep and async_ms_route functions\n"); + } else { + if(async_init_ms_timer_list() < 0) { + LM_ERR("cannot initialize internal structure\n"); + return -1; + } + LM_INFO("Enabled async_ms_sleep and async_ms_route functions with resolution of %dms\n", async_ms_timer); + } + + register_basic_timers(async_workers + (async_ms_timer > 0)); return 0; } @@ -131,6 +151,13 @@ static int child_init(int rank) return -1; /* error */ } } + + if((async_ms_timer > 0) && fork_basic_utimer(PROC_TIMER, "ASYNC MOD MILLI TIMER SINGLETON", 1 /*socks flag*/, + async_mstimer_exec, NULL, 1000 * async_ms_timer /*milliseconds*/) + < 0) { + LM_ERR("failed to register millisecond timer singleton as process (%d)\n", i); + return -1; /* error */ + } return 0; } @@ -141,6 +168,7 @@ static int child_init(int rank) static void mod_destroy(void) { async_destroy_timer_list(); + async_destroy_ms_timer_list(); } /** @@ -183,6 +211,46 @@ static int w_async_sleep(sip_msg_t *msg, char *sec, char *str2) return -1; } +/** + * + */ +static int w_async_ms_sleep(sip_msg_t *msg, char *sec, char *str2) +{ + int s; + async_param_t *ap; + + if(msg == NULL) + return -1; + + if(faked_msg_match(msg)) { + LM_ERR("invalid usage for faked message\n"); + return -1; + } + + if(async_workers <= 0) { + LM_ERR("no async mod timer workers (modparam missing?)\n"); + return -1; + } + + ap = (async_param_t *)sec; + if(fixup_get_ivalue(msg, ap->pinterval, &s) != 0) { + LM_ERR("no async sleep time value\n"); + return -1; + } + if(ap->type == 0) { + if(ap->u.paction == NULL || ap->u.paction->next == NULL) { + LM_ERR("cannot be executed as last action in a route block\n"); + return -1; + } + if(async_ms_sleep(msg, s, ap->u.paction->next, NULL) < 0) + return -1; + /* force exit in config */ + return 0; + } + + return -1; +} + /** * */ @@ -243,6 +311,42 @@ int ki_async_route(sip_msg_t *msg, str *rn, int s) return 0; } +/** + * + */ +int ki_async_ms_route(sip_msg_t *msg, str *rn, int s) +{ + cfg_action_t *act = NULL; + int ri; + sr_kemi_eng_t *keng = NULL; + + if(faked_msg_match(msg)) { + LM_ERR("invalid usage for faked message\n"); + return -1; + } + + keng = sr_kemi_eng_get(); + if(keng == NULL) { + ri = route_lookup(&main_rt, rn->s); + if(ri >= 0) { + act = main_rt.rlist[ri]; + if(act == NULL) { + LM_ERR("empty action lists in route block [%.*s]\n", rn->len, + rn->s); + return -1; + } + } else { + LM_ERR("route block not found: %.*s\n", rn->len, rn->s); + return -1; + } + } + + if(async_ms_sleep(msg, s, act, rn) < 0) + return -1; + /* force exit in config */ + return 0; +} + /** * */ @@ -271,6 +375,34 @@ static int w_async_route(sip_msg_t *msg, char *rt, char *sec) return ki_async_route(msg, &rn, s); } +/** + * + */ +static int w_async_ms_route(sip_msg_t *msg, char *rt, char *sec) +{ + int s; + str rn; + + if(msg == NULL) + return -1; + + if(async_workers <= 0) { + LM_ERR("no async mod timer workers\n"); + return -1; + } + + if(fixup_get_svalue(msg, (gparam_t *)rt, &rn) != 0) { + LM_ERR("no async route block name\n"); + return -1; + } + + if(fixup_get_ivalue(msg, (gparam_t *)sec, &s) != 0) { + LM_ERR("no async interval value\n"); + return -1; + } + return ki_async_route(msg, &rn, s); +} + /** * */ diff --git a/src/modules/async/async_sleep.c b/src/modules/async/async_sleep.c index 097237f4c0e..ff1da14639d 100644 --- a/src/modules/async/async_sleep.c +++ b/src/modules/async/async_sleep.c @@ -41,6 +41,14 @@ extern struct tm_binds tmb; /* clang-format off */ +typedef struct async_task_param { + unsigned int tindex; + unsigned int tlabel; + cfg_action_t *ract; + char cbname[ASYNC_CBNAME_SIZE]; + int cbname_len; +} async_task_param_t; + typedef struct async_item { unsigned int tindex; unsigned int tlabel; @@ -51,6 +59,12 @@ typedef struct async_item { struct async_item *next; } async_item_t; +typedef struct async_ms_item { + async_task_t *at; + struct timeval due; + struct async_ms_item *next; +} async_ms_item_t; + typedef struct async_slot { async_item_t *lstart; async_item_t *lend; @@ -58,6 +72,15 @@ typedef struct async_slot { } async_slot_t; #define ASYNC_RING_SIZE 100 +#define MAX_MS_SLEEP 30*1000 +#define MAX_MS_SLEEP_QUEUE 10000 + +static struct async_ms_list { + async_ms_item_t *lstart; + async_ms_item_t *lend; + int len; + gen_lock_t lock; +} *_async_ms_list = NULL; static struct async_list_head { async_slot_t ring[ASYNC_RING_SIZE]; @@ -95,6 +118,32 @@ int async_init_timer_list(void) return 0; } +int async_init_ms_timer_list(void) +{ + _async_ms_list = (struct async_ms_list *)shm_malloc( + sizeof(struct async_ms_list)); + if(_async_ms_list == NULL) { + LM_ERR("no more shm\n"); + return -1; + } + memset(_async_ms_list, 0, sizeof(struct async_ms_list)); + if(lock_init(&_async_ms_list->lock) == 0) { + LM_ERR("cannot init lock \n"); + shm_free(_async_ms_list); + _async_ms_list = 0; + return -1; + } + return 0; +} + +int async_destroy_ms_timer_list(void) +{ + if (_async_ms_list) { + lock_destroy(&_async_ms_list->lock); + } + return 0; +} + int async_destroy_timer_list(void) { int i; @@ -109,6 +158,45 @@ int async_destroy_timer_list(void) return 0; } +int async_insert_item(async_ms_item_t *ai) +{ + struct timeval *due = &ai->due; + + if (unlikely(_async_ms_list == NULL)) + return -1; + lock_get(&_async_ms_list->lock); + // Check if we want to insert in front + if (_async_ms_list->lstart == NULL || timercmp(due, &_async_ms_list->lstart->due, <=)) { + ai->next = _async_ms_list->lstart; + _async_ms_list->lstart = ai; + if (_async_ms_list->lend == NULL) + _async_ms_list->lend = ai; + } else { + // Check if we want to add to the tail + if (_async_ms_list->lend && timercmp(due, &_async_ms_list->lend->due, >)) { + _async_ms_list->lend->next = ai; + _async_ms_list->lend = ai; + } else { + async_ms_item_t *aip; + // Find the place to insert into a sorted timer list + // Most likely head && tail scanarios are covered above + int i = 1; + for (aip = _async_ms_list->lstart; aip->next; aip = aip->next, i++) { + if (timercmp(due, &aip->next->due, <=)) { + ai->next = aip->next; + aip->next = ai; + break; + } + } + } + } + _async_ms_list->len++; + lock_release(&_async_ms_list->lock); + return 0; +} + + + int async_sleep(sip_msg_t *msg, int seconds, cfg_action_t *act, str *cbname) { int slot; @@ -209,13 +297,37 @@ void async_timer_exec(unsigned int ticks, void *param) } } -typedef struct async_task_param { - unsigned int tindex; - unsigned int tlabel; - cfg_action_t *ract; - char cbname[ASYNC_CBNAME_SIZE]; - int cbname_len; -} async_task_param_t; +void async_mstimer_exec(unsigned int ticks, void *param) +{ + struct timeval now; + gettimeofday(&now, NULL); + + if (_async_ms_list == NULL) + return; + lock_get(&_async_ms_list->lock); + + async_ms_item_t *aip, *next; + int i = 0; + for (aip = _async_ms_list->lstart; aip; aip = next, i++) { + next = aip->next; + if (timercmp(&now, &aip->due, >=)) { + if ((_async_ms_list->lstart = next) == NULL) + _async_ms_list->lend = NULL; + if (async_task_push(aip->at)<0) { + shm_free(aip->at); + } + _async_ms_list->len--; + continue; + } + break; + } + + lock_release(&_async_ms_list->lock); + + return; + +} + /** * @@ -246,6 +358,90 @@ void async_exec_task(void *param) /* param is freed along with the async task strucutre in core */ } +int async_ms_sleep(sip_msg_t *msg, int milliseconds, cfg_action_t *act, str *cbname) +{ + async_ms_item_t *ai; + int dsize; + tm_cell_t *t = 0; + unsigned int tindex; + unsigned int tlabel; + async_task_param_t *atp; + async_task_t *at; + + if(milliseconds <= 0) { + LM_ERR("negative or zero sleep time (%d)\n", milliseconds); + return -1; + } + if(milliseconds >= MAX_MS_SLEEP) { + LM_ERR("max sleep time is %d msec\n", MAX_MS_SLEEP); + return -1; + } + if(_async_ms_list->len >= MAX_MS_SLEEP_QUEUE) { + LM_ERR("max sleep queue length exceeded (%d) \n", MAX_MS_SLEEP_QUEUE); + return -1; + } + if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) { + LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s); + return -1; + } + dsize = sizeof(async_task_t) + sizeof(async_task_param_t) + sizeof(async_ms_item_t); + + at = (async_task_t *)shm_malloc(dsize); + if(at == NULL) { + LM_ERR("no more shm memory\n"); + return -1; + } + memset(at, 0, dsize); + at->param = (char *)at + sizeof(async_task_t); + atp = (async_task_param_t *)at->param; + ai = (async_ms_item_t *) ((char *)at + sizeof(async_task_t) + sizeof(async_task_param_t)); + ai->at = at; + + if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) { + LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s); + return -1; + } + + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) { + if(tmb.t_newtran(msg) < 0) { + LM_ERR("cannot create the transaction\n"); + return -1; + } + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) { + LM_ERR("cannot lookup the transaction\n"); + return -1; + } + } + + if(tmb.t_suspend(msg, &tindex, &tlabel) < 0) { + LM_ERR("failed to suspend the processing\n"); + shm_free(ai); + return -1; + } + at->exec = async_exec_task; + at->param = atp; + atp->ract = act; + atp->tindex = tindex; + atp->tlabel = tlabel; + if(cbname && cbname->len>0) { + memcpy(atp->cbname, cbname->s, cbname->len); + atp->cbname[cbname->len] = '\0'; + atp->cbname_len = cbname->len; + } + + struct timeval now, upause; + gettimeofday(&now, NULL); + upause.tv_sec = milliseconds / 1000; + upause.tv_usec = (milliseconds * 1000) % 1000000; + + timeradd(&now, &upause, &ai->due); + async_insert_item(ai); + + return 0; +} + /** * */ diff --git a/src/modules/async/async_sleep.h b/src/modules/async/async_sleep.h index 9fac192de69..7b5d27f34a6 100644 --- a/src/modules/async/async_sleep.h +++ b/src/modules/async/async_sleep.h @@ -39,13 +39,15 @@ typedef struct async_param { /* clang-format on */ int async_init_timer_list(void); - int async_destroy_timer_list(void); - int async_sleep(sip_msg_t *msg, int seconds, cfg_action_t *act, str *cbname); - void async_timer_exec(unsigned int ticks, void *param); +int async_init_ms_timer_list(void); +int async_destroy_ms_timer_list(void); +int async_ms_sleep(sip_msg_t *msg, int milliseconds, cfg_action_t *act, str *cbname); +void async_mstimer_exec(unsigned int ticks, void *param); + int async_send_task(sip_msg_t *msg, cfg_action_t *act, str *cbname); #endif diff --git a/src/modules/async/doc/async_admin.xml b/src/modules/async/doc/async_admin.xml index d829679b2d6..c07beabfaba 100644 --- a/src/modules/async/doc/async_admin.xml +++ b/src/modules/async/doc/async_admin.xml @@ -85,6 +85,28 @@ ... modparam("async", "workers", 2) ... + + + +
+ <varname>ms_timer</varname> (int) + + Enables millisecond timer for async_ms_sleep() and async_ms_route() functions. + The integer value is the timer resolution in milliseconds. + ms_timer = 1 enables 1 millisecond timer but generates higher load on the system. + ms_timer = 20 enables 20 ms timer. + + + + Default value is 0. + + + + Set <varname>ms_timer</varname> parameter + +... +modparam("async", "ms_timer", 1) +...
@@ -134,6 +156,52 @@ route[RESUME] { exit; } ... + + + +
+ + <function moreinfo="none">async_ms_route(routename, milliseconds)</function> + + + Simulate a sleep of 'milliseconds' and then continue the processing of the SIP + request with the route[routename]. In case of internal errors, the + function returns false, otherwise the function exits the execution of + the script at that moment (return 0 behaviour). + This function works only if the ms_timer parameter has a value greater then 0. + + + The routename parameter can be a static string or a dynamic string + value with config variables. + + + The sleep parameter represent the number of milliseconds to suspend the + processing of a SIP request. Maximum value is 30000 (30 sec). The parameter can be + a static integer or a variable holding an integer. + + + Since the SIP request handling is resumed in another process, + the config file execution state is practically lost. Therefore beware + that the execution of config after resume will end once the + route[routename] is finished. + + + This function can be used from REQUEST_ROUTE. + + + <function>async_ms_route</function> usage + +... +request_route { + ... + async_ms_route("RESUME", "250"); + ... +} +route[RESUME] { + send_reply("404", "Not found"); + exit; +} +...
@@ -167,6 +235,51 @@ exit; +
+ + <function moreinfo="none">async_ms_sleep(milliseconds)</function> + + + Simulate a sleep of 'milliseconds' and then continue the processing of SIP + request with the next action. In case of internal errors, the function + returns false. + This function works only if the ms_timer parameter has a value greater then 0. + + + The sleep parameter represent the number of milliseconds to suspend the + processing of SIP request. Maximum value is 30000 (30 sec). The parameter can be + a static integer or a variable holding an integer. + + + This function can be used from REQUEST_ROUTE. + + + <function>async_ms_sleep</function> usage + +... +route[REQUESTSHAPER] { + $var(res) = http_connect("leakybucket", + "/add?key=$fd", $null, $null,"$avp(delay)"); + $var(d) = $(avp(delay){s.int}); + if ($var(d) > 0) { + # Delay the request by $avp(delay) ms + async_ms_sleep("$var(d)"); + if (!t_relay()) { + sl_reply_error(); + } + exit; + } + # No delay + if (!t_relay()) { + sl_reply_error(); + } + exit; +} +... + + +
+
<function moreinfo="none">async_task_route(routename)</function>