From 13351fab04743058a30212f39ec39cc0dca81c08 Mon Sep 17 00:00:00 2001 From: Tyler Moore Date: Mon, 7 Aug 2023 14:13:28 -0400 Subject: [PATCH 1/2] dispatcher: fix weighted sort for n<100 calls - update weighted/relative weighted algos to use probability based sort - calls from 0-99 now follow the weighted distributions - removed the now unused shuffle_uint100array() function --- src/modules/dispatcher/dispatch.c | 291 ++++++++++++------ src/modules/dispatcher/doc/dispatcher.xml | 9 + .../dispatcher/doc/dispatcher_admin.xml | 20 +- 3 files changed, 213 insertions(+), 107 deletions(-) diff --git a/src/modules/dispatcher/dispatch.c b/src/modules/dispatcher/dispatch.c index e8d7886ec9b..c89a1541236 100644 --- a/src/modules/dispatcher/dispatch.c +++ b/src/modules/dispatcher/dispatch.c @@ -131,7 +131,6 @@ static int *ds_next_idx = NULL; static void ds_run_route( struct sip_msg *msg, str *uri, char *route, ds_rctx_t *rctx); -void shuffle_uint100array(unsigned int *arr); int ds_reinit_rweight_on_state_change( int old_state, int new_state, ds_set_t *dset); @@ -597,24 +596,6 @@ int add_dest2list(int id, str uri, int flags, int priority, str *attrs, return -1; } - -/* for internal usage; arr must be arr[100] */ -void shuffle_uint100array(unsigned int *arr) -{ - int k; - int j; - unsigned int t; - if(arr == NULL) - return; - for(j = 0; j < 100; j++) { - k = j + (kam_rand() % (100 - j)); - t = arr[j]; - arr[j] = arr[k]; - arr[k] = t; - } -} - - /** * Initialize the relative weight distribution for a destination set * - fill the array of 0..99 elements where to keep the index of the @@ -623,77 +604,130 @@ void shuffle_uint100array(unsigned int *arr) */ int dp_init_relative_weights(ds_set_t *dset) { - int j; - int k; - int t; - int *ds_dests_flags = NULL; - int *ds_dests_rweights = NULL; - int current_slice; - int rw_sum; - unsigned int last_insert; - + unsigned int i, j, k; + unsigned int counts[dset->nr]; + unsigned int counts_total = 0; + int rw_sum = 0; + unsigned int rw_slices[dset->nr]; + unsigned int rw_slices_total = 0; + unsigned int probability, last_valid_dst; + + /* argument validation */ if(dset == NULL || dset->dlist == NULL || dset->nr < 2) return -1; - /* local copy to avoid synchronization problems */ - ds_dests_flags = pkg_malloc(sizeof(int) * dset->nr); + /* + * heap allocations + * local copies are needed to avoid synchronization problems + */ + int *ds_dests_flags = pkg_malloc(sizeof(int) * dset->nr); if(ds_dests_flags == NULL) { PKG_MEM_ERROR; return -1; } - ds_dests_rweights = pkg_malloc(sizeof(int) * dset->nr); + int *ds_dests_rweights = pkg_malloc(sizeof(int) * dset->nr); if(ds_dests_rweights == NULL) { PKG_MEM_ERROR; pkg_free(ds_dests_flags); return -1; } + /* zero out arrays */ + memset(counts, 0, sizeof(unsigned int) * dset->nr); + memset(rw_slices, 0, sizeof(unsigned int) * dset->nr); /* needed to sync the rwlist access */ lock_get(&dset->lock); - rw_sum = 0; - /* find the sum of relative weights */ - for(j = 0; j < dset->nr; j++) { - ds_dests_flags[j] = dset->dlist[j].flags; - ds_dests_rweights[j] = dset->dlist[j].attrs.rweight; - if(ds_skip_dst(ds_dests_flags[j])) - continue; - rw_sum += ds_dests_rweights[j]; - } - if(rw_sum == 0) - goto ret; + /* find the sum of the weights given and the last valid destination */ + for(i = 0; i < dset->nr; i++) { + ds_dests_flags[i] = dset->dlist[i].flags; + ds_dests_rweights[i] = dset->dlist[i].attrs.rweight; - /* fill the array based on the relative weight of each destination */ - t = 0; - for(j = 0; j < dset->nr; j++) { - if(ds_skip_dst(ds_dests_flags[j])) + /* rweight is zero, destination ignored */ + if(ds_dests_rweights[i] == 0) { + LM_INFO("destination %d in group %d ignored (rweight=0)\n", i, dset->id); continue; + } - current_slice = ds_dests_rweights[j] * 100 / rw_sum; /* truncate here */ - LM_DBG("rw_sum[%d][%d][%d]\n", j, rw_sum, current_slice); - for(k = 0; k < current_slice; k++) { - dset->rwlist[t] = (unsigned int)j; - t++; + /* disabled or inactive destination */ + if(ds_skip_dst(ds_dests_flags[i])) { + LM_INFO("destination %d in group %d ignored (disabled or inactive)\n", i, dset->id); + ds_dests_rweights[i] = 0; + continue; } + + rw_sum += ds_dests_rweights[i]; + last_valid_dst = i; } - /* if the array was not completely filled (i.e., the sum of rweights is - * less than 100 due to truncated), then use last address to fill the rest */ - last_insert = t > 0 ? dset->rwlist[t - 1] : (unsigned int)(dset->nr - 1); - if(t < 100) { - LM_INFO("extra rweight %d for last active destination in group %d\n", - (100 - t), dset->id); + /* calculate the slices */ + if(rw_sum == 0) { + goto ret; } - for(j = t; j < 100; j++) - dset->rwlist[j] = last_insert; - /* shuffle the content of the array in order to mix the selection - * of the addresses (e.g., if first address has weight=20, avoid - * sending first 20 calls to it, but ensure that within a 100 calls, - * 20 go to first address */ - shuffle_uint100array(dset->rwlist); - goto ret; + /* calculdate the slices */ + for(i = 0; i < dset->nr; i++) { + /* slice truncated by integer division */ + rw_slices[i] = ds_dests_rweights[i] * 100 / rw_sum; + rw_slices_total += rw_slices[i]; + } + if(rw_slices_total < 100) { + LM_INFO("extra rweight percentage %d for last destination in group %d\n", + (100 - rw_slices_total), dset->id); + rw_slices[last_valid_dst] = rw_slices[last_valid_dst] + (100 - rw_slices_total); + } + + /* + * load the rweights in order of probablity based on weight + * each "call to distribute" having its probability recalculated + * this will make distribution under n=100 calls adhere to the slice values + * note that first 2 iterations are unwrapped (avoid n/0 & wasted cpu cycles) + */ + dset->rwlist[0] = 0; + counts[0] = 1; + counts_total = 1; + i = 1; + j = 1; + while(i < 100) { + /* skip if the slice is zero */ + if(rw_slices[j] == 0) { + j = (j + 1) % dset->nr; + continue; + } + /* calculate the current percentage of "calls" to this destination */ + probability = (counts[j] * 100) / i; + /* if no other destination can take the "next call" then route it to this destination */ + if(probability == rw_slices[j]) { + for (k = 0; k < dset->nr; k++) { + j = (j + 1) % dset->nr; + if (rw_slices[j] == 0) { + continue; + } + probability = (counts[j] * 100) / i; + if (probability < rw_slices[j]) { + break; + } + } + + dset->rwlist[i] = j; + counts[j] += 1; + counts_total += 1; + i += 1; + j = (j + 1) % dset->nr; + continue; + } + /* skip this destination if proability exceeds weight */ + if(probability > rw_slices[j]) { + j = (j + 1) % dset->nr; + continue; + } + /* still processing this destination, don't move to the next weight just yet */ + dset->rwlist[i] = j; + counts[j] += 1; + counts_total += 1; + i += 1; + } ret: lock_release(&dset->lock); @@ -711,45 +745,108 @@ int dp_init_relative_weights(ds_set_t *dset) */ int dp_init_weights(ds_set_t *dset) { - int j; - int k; - int t; - - if(dset == NULL || dset->dlist == NULL) + unsigned int i, j, k; + unsigned int weights[dset->nr]; + unsigned int weights_total = 0; + unsigned int counts[dset->nr]; + unsigned int counts_total = 0; + unsigned int last_valid_dst, probability; + + if(dset == NULL || dset->dlist == NULL) { return -1; + } /* is weight set for dst list? (first address must have weight!=0) */ - if(dset->dlist[0].attrs.weight == 0) + if(dset->dlist[0].attrs.weight == 0) { return 0; + } - /* first fill the array based on the weight of each destination - * - the weight is the percentage (e.g., if weight=20, the afferent - * address gets its index 20 times in the array) - * - if the sum of weights is more than 100, the addresses over the - * limit are ignored */ - t = 0; - for(j = 0; j < dset->nr; j++) { - for(k = 0; k < dset->dlist[j].attrs.weight; k++) { - if(t >= 100) - goto randomize; - dset->wlist[t] = (unsigned int)j; - t++; + /* if there is only 1 record then optimize out any sorting */ + if(dset->nr == 1) { + memset(dset->wlist, 0, sizeof(unsigned int) * 100); + return 0; + } + + /* zero out arrays */ + memset(weights, 0, sizeof(unsigned int) * dset->nr); + memset(counts, 0, sizeof(unsigned int) * dset->nr); + + /* find the sum of the weights given and the last valid destination */ + for(i = 0; i < dset->nr; i++) { + /* weight is zero, destination ignored */ + if(dset->dlist[i].attrs.weight == 0) { + LM_INFO("destination %d in group %d ignored (weight=0)\n", i, dset->id); + weights[i] = 0; + continue; + } + + /* weights total would exceed 100, destination ignored */ + if(weights_total + dset->dlist[i].attrs.weight > 100) { + LM_INFO("destination %d in group %d ignored (exceeds total weight>100)\n", i, dset->id); + weights[i] = 0; + continue; + } + + weights_total += dset->dlist[i].attrs.weight; + weights[i] = dset->dlist[i].attrs.weight; + last_valid_dst = i; + } + if(weights_total < 100) { + LM_INFO("extra weight %d for last destination in group %d\n", + (100 - weights_total), dset->id); + weights[last_valid_dst] = weights[last_valid_dst] + (100 - weights_total); + } + + /* + * load the weights in order of probablity based on weight + * each "call to distribute" having its probability recalculated + * this will make distribution under n=100 calls adhere to the weighted values + * note that first 2 iterations are unwrapped (avoid n/0 & wasted cpu cycles) + */ + dset->wlist[0] = 0; + counts[0] = 1; + counts_total = 1; + i = 1; + j = 1; + while(i < 100) { + /* skip if the weight is zero */ + if(weights[j] == 0) { + j = (j + 1) % dset->nr; + continue; + } + /* calculate the current percentage of "calls" to this destination */ + probability = (counts[j] * 100) / i; + /* if no other destination can take the "next call" then route it to this destination */ + if(probability == weights[j]) { + for (k = 0; k < dset->nr; k++) { + j = (j + 1) % dset->nr; + if (weights[j] == 0) { + continue; + } + probability = (counts[j] * 100) / i; + if (probability < weights[j]) { + break; + } + } + + dset->wlist[i] = j; + counts[j] += 1; + counts_total += 1; + i += 1; + j = (j + 1) % dset->nr; + continue; + } + /* skip this destination if proability exceeds weight */ + if(probability > weights[j]) { + j = (j + 1) % dset->nr; + continue; } + /* still processing this destination, don't move to the next weight just yet */ + dset->wlist[i] = j; + counts[j] += 1; + counts_total += 1; + i += 1; } - /* if the array was not completely filled (i.e., the sum of weights is - * less than 100), then use last address to fill the rest */ - if(t < 100) { - LM_INFO("extra weight %d for last destination in group %d\n", (100 - t), - dset->id); - } - for(; t < 100; t++) - dset->wlist[t] = (unsigned int)(dset->nr - 1); -randomize: - /* shuffle the content of the array in order to mix the selection - * of the addresses (e.g., if first address has weight=20, avoid - * sending first 20 calls to it, but ensure that within a 100 calls, - * 20 go to first address */ - shuffle_uint100array(dset->wlist); return 0; } diff --git a/src/modules/dispatcher/doc/dispatcher.xml b/src/modules/dispatcher/doc/dispatcher.xml index af040e737d0..de41d7f8a3c 100644 --- a/src/modules/dispatcher/doc/dispatcher.xml +++ b/src/modules/dispatcher/doc/dispatcher.xml @@ -62,6 +62,11 @@ Cabiddu federico.cabiddu@gmail.com + + Tyler + Moore + tmoore@dopensource.com + 2004 @@ -91,6 +96,10 @@ 2020 Federico Cabiddu, Libon + + 2023 + Tyler Moore (devopsec), dOpenSource + diff --git a/src/modules/dispatcher/doc/dispatcher_admin.xml b/src/modules/dispatcher/doc/dispatcher_admin.xml index 86d93ca27ef..91904c51ae1 100644 --- a/src/modules/dispatcher/doc/dispatcher_admin.xml +++ b/src/modules/dispatcher/doc/dispatcher_admin.xml @@ -667,7 +667,7 @@ modparam("dispatcher", "ds_ping_reply_codes", "class=2;code=403;code=488;class=3 Value 0: If set to 0, only the gateways with state PROBING are tested. After a gateway is probed, the PROBING state is cleared in this mode. This means that no probing will be executed at all only if flag in config file is set to 8/PROBING - (please check destination list file syntaxis for more details), it will probe only one time at startup or + (please check destination list file syntaxis for more details), it will probe only one time at startup or after dispatcher reload. @@ -1425,9 +1425,9 @@ ds_select_dst("1", "4", "3"); ... # sample of SQL provisioning statements -INSERT INTO "dispatcher" +INSERT INTO "dispatcher" VALUES(1,1,'sip:192.168.0.1:5060',0,12,'rweight=50;weight=50;cc=1;',''); -INSERT INTO "dispatcher" +INSERT INTO "dispatcher" VALUES(2,1,'sip:192.168.0.2:5060',0,12,'rweight=50;weight=50;cc=1;',''); ... modparam("dispatcher", "ds_ping_interval", 1) # ping gateways once/second @@ -1444,13 +1444,13 @@ DEST: { PRIORITY: 12 ATTRS: { BODY: rweight=50;weight=50;cc=1 # configuration values - DUID: + DUID: MAXLOAD: 0 WEIGHT: 50 RWEIGHT: 50 - SOCKET: - SOCKNAME: - OBPROXY: + SOCKET: + SOCKNAME: + OBPROXY: } LATENCY: { AVG: 20.104000 @@ -2288,9 +2288,9 @@ kamctl rpc dispatcher.hash 4 bob server.com The value represents the percent of calls to be sent to that gateways. The sum must not exceed 100, otherwise the destinations whose weight added to the sum go over 100 are ignored. If the sum is less than 100, - then the last destination is used to fill the missing percentage. See - also the description of the corresponding algorithm parameter for - ds_select_dst(). + then the last non-zero destination is used to fill the missing + percentage. See also the description of the corresponding algorithm + parameter for ds_select_dst(). 'rweight' - used for relative weight based load distribution. It From 972723b29c20ae05be22285ca759414987424a68 Mon Sep 17 00:00:00 2001 From: Tyler Moore Date: Thu, 14 Sep 2023 10:34:30 -0400 Subject: [PATCH 2/2] htable add column packing features - add support for changing the column delimeter in a hash table - add support for changing the column null character --- src/modules/htable/doc/htable.xml | 9 ++++++ src/modules/htable/doc/htable_admin.xml | 37 +++++++++++++++++++++++++ src/modules/htable/ht_api.c | 31 ++++++++++++++++++--- src/modules/htable/ht_api.h | 2 +- src/modules/htable/ht_db.c | 10 +++++-- 5 files changed, 81 insertions(+), 8 deletions(-) diff --git a/src/modules/htable/doc/htable.xml b/src/modules/htable/doc/htable.xml index 96bf75832e6..92469a7a377 100644 --- a/src/modules/htable/doc/htable.xml +++ b/src/modules/htable/doc/htable.xml @@ -39,11 +39,20 @@ Sas osas@voipembedded.com + + Tyler + Moore + tmoore@dopensource.com + 2008-2011 http://www.asipto.com + + 2023 + Tyler Moore (devopsec), dOpenSource + diff --git a/src/modules/htable/doc/htable_admin.xml b/src/modules/htable/doc/htable_admin.xml index 0dd01bea5a0..69ad5e64581 100644 --- a/src/modules/htable/doc/htable_admin.xml +++ b/src/modules/htable/doc/htable_admin.xml @@ -412,6 +412,43 @@ $ kamcmd htable.dump htable peers). Please note, module parameter enable_dmq must also be set in order for this to apply (see below). Default is 0 (no replication). + + + + coldelim - the character delimeter to use when packing the htable. + When set, this parameter changes the column delimeter between columns in a multiple + column hash table. This can be useful when loading JSON data types into a hash table + as they conflict with the default , delimeter, allowing these values + to be parsed in the routing config. See the example below on parsing a JSON array from + an htable with ; as the column delimeter. + + +... +modparam("htable", "htable", "customer=>size=8;dbtable=customer;cols='dids,description';coldelim=';'") +... +$avp(customer) = $sht(customer=>1); +$var(customer_dids) = $(avp(customer){s.select,0,;}); +$var(customer_desc) = $(avp(customer){s.select,1,;}); +... + + + + + colnull - the character to use when packing a NULL value into + the htable from the database. This parameter can be set to the empty string or a + single character. This can be used to simplify checking a single column in a row + for emptiness, in the routing config. The example below shows how one would do that. + + +... +modparam("htable", "htable", "customer=>size=8;dbtable=customer;cols='name,description';colnull=''") +... +$avp(customer) = $sht(customer=>1); +$var(customer_name) = $(avp(customer){s.select,0,;}); +if (!strempty($var(customer_name))) { +... +} + diff --git a/src/modules/htable/ht_api.c b/src/modules/htable/ht_api.c index 888605e4daa..2391cb5c5d0 100644 --- a/src/modules/htable/ht_api.c +++ b/src/modules/htable/ht_api.c @@ -255,7 +255,7 @@ ht_t *ht_get_table(str *name) int ht_add_table(str *name, int autoexp, str *dbtable, str *dbcols, int size, int dbmode, int itype, int_str *ival, int updateexpire, - int dmqreplicate) + int dmqreplicate, char coldelim, char colnull) { unsigned int htid; ht_t *ht; @@ -342,8 +342,8 @@ int ht_add_table(str *name, int autoexp, str *dbtable, str *dbcols, int size, } ht->ncols = c + 1; ht->pack[0] = 'l'; - ht->pack[1] = ','; - ht->pack[2] = '*'; + ht->pack[1] = coldelim; + ht->pack[2] = colnull; } ht->next = _ht_root; @@ -958,6 +958,8 @@ int ht_table_spec(char *spec) unsigned int dbmode = 0; unsigned int updateexpire = 1; unsigned int dmqreplicate = 0; + char coldelim = ','; + char colnull = '*'; str in; str tok; param_t *pit = NULL; @@ -1024,13 +1026,34 @@ int ht_table_spec(char *spec) LM_DBG("htable [%.*s] - dmqreplicate [%u]\n", name.len, name.s, dmqreplicate); + } else if(pit->name.len == 8 + && strncmp(pit->name.s, "coldelim", 8) == 0) { + if(tok.len > 1) + goto error; + + coldelim = tok.s[0]; + LM_DBG("htable [%.*s] - coldelim [%c]\n", name.len, name.s, + coldelim); + } else if(pit->name.len == 7 + && strncmp(pit->name.s, "colnull", 7) == 0) { + if(tok.len > 1) + goto error; + + if(tok.len == 0) { + colnull = '\0'; + } else { + colnull = tok.s[0]; + } + + LM_DBG("htable [%.*s] - colnull [%c]\n", name.len, name.s, + colnull); } else { goto error; } } return ht_add_table(&name, autoexpire, &dbtable, &dbcols, size, dbmode, - itype, &ival, updateexpire, dmqreplicate); + itype, &ival, updateexpire, dmqreplicate, coldelim, colnull); error: LM_ERR("invalid htable parameter [%.*s]\n", in.len, in.s); diff --git a/src/modules/htable/ht_api.h b/src/modules/htable/ht_api.h index d8bdc2aab25..e24a93b1f17 100644 --- a/src/modules/htable/ht_api.h +++ b/src/modules/htable/ht_api.h @@ -88,7 +88,7 @@ typedef struct _ht_pv int ht_add_table(str *name, int autoexp, str *dbtable, str *dbcols, int size, int dbmode, int itype, int_str *ival, int updateexpire, - int dmqreplicate); + int dmqreplicate, char coldelim, char colnull); int ht_init_tables(void); int ht_destroy(void); int ht_set_cell(ht_t *ht, str *name, int type, int_str *val, int mode); diff --git a/src/modules/htable/ht_db.c b/src/modules/htable/ht_db.c index 7e8b8cc6b3c..7a22ff6c484 100644 --- a/src/modules/htable/ht_db.c +++ b/src/modules/htable/ht_db.c @@ -121,7 +121,9 @@ static int ht_pack_values( len = 0; for(c = 1; c < cols; c++) { if(VAL_NULL(&RES_ROWS(db_res)[row].values[c])) { - len += 1; + if(ht->pack[2] != '\0') { + len += 1; + } } else if(RES_ROWS(db_res)[row].values[c].type == DB1_STRING) { len += strlen(RES_ROWS(db_res)[row].values[c].val.string_val); } else if(RES_ROWS(db_res)[row].values[c].type == DB1_STR) { @@ -143,8 +145,10 @@ static int ht_pack_values( p = vbuf; for(c = 1; c < cols; c++) { if(VAL_NULL(&RES_ROWS(db_res)[row].values[c])) { - *p = ht->pack[2]; - p++; + if(ht->pack[2] != '\0') { + *p = ht->pack[2]; + p++; + } } else if(RES_ROWS(db_res)[row].values[c].type == DB1_STRING) { strcpy(p, RES_ROWS(db_res)[row].values[c].val.string_val); p += strlen(RES_ROWS(db_res)[row].values[c].val.string_val);