diff --git a/src/modules/dispatcher/dispatch.c b/src/modules/dispatcher/dispatch.c
index 28b305352c4..cbda9731e6f 100644
--- a/src/modules/dispatcher/dispatch.c
+++ b/src/modules/dispatcher/dispatch.c
@@ -269,6 +269,9 @@ int ds_set_attrs(ds_dest_t *dest, str *attrs)
for(pit = params_list; pit; pit = pit->next) {
if(pit->name.len == 4 && strncasecmp(pit->name.s, "duid", 4) == 0) {
dest->attrs.duid = pit->body;
+ } else if(pit->name.len == 2
+ && strncasecmp(pit->name.s, "cc", 2) == 0) {
+ str2sint(&pit->body, &dest->attrs.congestion_control);
} else if(pit->name.len == 6
&& strncasecmp(pit->name.s, "weight", 6) == 0) {
str2sint(&pit->body, &dest->attrs.weight);
@@ -520,6 +523,7 @@ int dp_init_relative_weights(ds_set_t *dset)
if(dset == NULL || dset->dlist == NULL)
return -1;
+ lock_get(&dset->lock);
int rw_sum = 0;
/* find the sum of relative weights*/
for(j = 0; j < dset->nr; j++) {
@@ -529,6 +533,7 @@ int dp_init_relative_weights(ds_set_t *dset)
}
if(rw_sum == 0) {
+ lock_release(&dset->lock);
return 0;
}
@@ -540,11 +545,13 @@ int dp_init_relative_weights(ds_set_t *dset)
int current_slice =
dset->dlist[j].attrs.rweight * 100 / rw_sum; //truncate here;
+ LM_DBG("rw_sum[%d][%d][%d]\n",j, rw_sum, current_slice);
for(k = 0; k < current_slice; k++) {
dset->rwlist[t] = (unsigned int)j;
t++;
}
}
+
/* if the array was not completely filled (i.e., the sum of rweights is
* less than 100 due to truncated), then use last address to fill the rest */
unsigned int last_insert =
@@ -557,7 +564,7 @@ int dp_init_relative_weights(ds_set_t *dset)
* sending first 20 calls to it, but ensure that within a 100 calls,
* 20 go to first address */
shuffle_uint100array(dset->rwlist);
-
+ lock_release(&dset->lock);
return 0;
}
@@ -2290,6 +2297,8 @@ static inline void latency_stats_update(ds_latency_stats_t *latency_stats, int l
latency_stats->average = latency;
latency_stats->estimate = latency;
}
+ /* train the average if stable after 10 samples */
+ if (latency_stats->count > 10 && latency_stats->stdev < 0.5) latency_stats->count = 500000;
if (latency_stats->min > latency)
latency_stats->min = latency;
if (latency_stats->max < latency)
@@ -2329,29 +2338,81 @@ int ds_update_latency(int group, str *address, int code)
LM_ERR("destination set [%d] not found\n", group);
return -1;
}
-
- while(i < idx->nr) {
- if(idx->dlist[i].uri.len == address->len
- && strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
- == 0) {
-
- /* destination address found */
- state = idx->dlist[i].flags;
- ds_latency_stats_t *latency_stats = &idx->dlist[i].latency_stats;
- if (code == 408 && latency_stats->timeout < UINT32_MAX) {
+ int apply_rweights = 0;
+ int all_gw_congested = 1;
+ int total_congestion_ms = 0;
+ lock_get(&idx->lock);
+ while (i < idx->nr) {
+ ds_dest_t *ds_dest = &idx->dlist[i];
+ ds_latency_stats_t *latency_stats = &ds_dest->latency_stats;
+ if (ds_dest->uri.len == address->len
+ && strncasecmp(ds_dest->uri.s, address->s, address->len) == 0) {
+ /* Destination address found, this is the gateway that was pinged. */
+ state = ds_dest->flags;
+ if (code == 408 && latency_stats->timeout < UINT32_MAX)
latency_stats->timeout++;
- } else {
- struct timeval now;
- gettimeofday(&now, NULL);
- int latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
- + (now.tv_usec - latency_stats->start.tv_usec)/1000;
- latency_stats_update(latency_stats, latency_ms);
- LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]\n", latency_stats->count, latency_ms,
- latency_stats->average, address->len, address->s, code);
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ int latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
+ + (now.tv_usec - latency_stats->start.tv_usec)/1000;
+ latency_stats_update(latency_stats, latency_ms);
+
+ int congestion_ms = latency_stats->estimate - latency_stats->average;
+ if (congestion_ms < 0) congestion_ms = 0;
+ total_congestion_ms += congestion_ms;
+
+ /* Adjusting weight using congestion detection based on latency estimator. */
+ if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight) {
+ int active_weight = ds_dest->attrs.weight - congestion_ms;
+ if (active_weight <= 0) {
+ active_weight = 0;
+ } else {
+ all_gw_congested = 0;
+ }
+ if (ds_dest->attrs.rweight != active_weight) {
+ apply_rweights = 1;
+ ds_dest->attrs.rweight = active_weight;
+ }
+ LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]cms[%d]\n",
+ latency_stats->count, latency_ms,
+ latency_stats->average, address->len, address->s,
+ code, ds_dest->attrs.rweight, congestion_ms);
}
- }
+ } else {
+ /* Another gateway in the set, we verify if it is congested. */
+ int congestion_ms = latency_stats->estimate - latency_stats->average;
+ if (congestion_ms < 0) congestion_ms = 0;
+ total_congestion_ms += congestion_ms;
+ int active_weight = ds_dest->attrs.weight - congestion_ms;
+ if (active_weight > 0) all_gw_congested = 0;
+ }
+ if (!ds_dest->attrs.congestion_control) all_gw_congested = 0;
i++;
}
+ /* All the GWs are above their congestion threshold, load distribution will now be based on
+ * the ratio of congestion_ms each GW is facing. */
+ if (all_gw_congested) {
+ i = 0;
+ while (i < idx->nr) {
+ ds_dest_t *ds_dest = &idx->dlist[i];
+ ds_latency_stats_t *latency_stats = &ds_dest->latency_stats;
+ int congestion_ms = latency_stats->estimate - latency_stats->average;
+ /* We multiply by 2^4 to keep enough precision */
+ int active_weight = (total_congestion_ms << 4) / congestion_ms;
+ if (ds_dest->attrs.rweight != active_weight) {
+ apply_rweights = 1;
+ ds_dest->attrs.rweight = active_weight;
+ }
+ LM_DBG("all gw congested[%d][%d]latency_avg[%.2f][%.*s]code[%d]rweight[%d/%d:%d]cms[%d]\n",
+ total_congestion_ms, latency_stats->count, latency_stats->average,
+ address->len, address->s, code, total_congestion_ms, congestion_ms,
+ ds_dest->attrs.rweight, congestion_ms);
+ i++;
+ }
+ }
+
+ lock_release(&idx->lock);
+ if (apply_rweights) dp_init_relative_weights(idx);
return state;
}
@@ -3099,7 +3160,7 @@ ds_set_t *ds_avl_insert(ds_set_t **root, int id, int *setn)
node->id = id;
node->longer = AVL_NEITHER;
*root = node;
-
+ lock_init(&node->lock);
avl_rebalance(rotation_top, id);
(*setn)++;
diff --git a/src/modules/dispatcher/dispatch.h b/src/modules/dispatcher/dispatch.h
index 2ebaa4b3d50..a65c5b90556 100644
--- a/src/modules/dispatcher/dispatch.h
+++ b/src/modules/dispatcher/dispatch.h
@@ -155,6 +155,7 @@ typedef struct _ds_attrs {
int maxload;
int weight;
int rweight;
+ int congestion_control;
} ds_attrs_t;
typedef struct _ds_latency_stats {
@@ -195,6 +196,7 @@ typedef struct _ds_set {
unsigned int rwlist[100];
struct _ds_set *next[2];
int longer;
+ gen_lock_t lock;
} ds_set_t;
/* clang-format on */
diff --git a/src/modules/dispatcher/doc/dispatcher.xml b/src/modules/dispatcher/doc/dispatcher.xml
index b9655851593..8502aec0a3a 100644
--- a/src/modules/dispatcher/doc/dispatcher.xml
+++ b/src/modules/dispatcher/doc/dispatcher.xml
@@ -81,7 +81,7 @@
Alessandro Arrichiello, Hewlett Packard
- 2017
+ 2017, 2018
Julien chavanton, Flowroute
diff --git a/src/modules/dispatcher/doc/dispatcher_admin.xml b/src/modules/dispatcher/doc/dispatcher_admin.xml
index 33238ed2e73..8037184273e 100644
--- a/src/modules/dispatcher/doc/dispatcher_admin.xml
+++ b/src/modules/dispatcher/doc/dispatcher_admin.xml
@@ -1110,6 +1110,19 @@ end
will be distributed as 25/50/25. After third host failing
distribution will be changed to 33/67/0.
+
+ Using this algorithm, you can also enable congestion control by setting the
+ attibute 'cc=1', when 'cc' is enabled the 'rweight' attribute will also be
+ used to control congestion tolerance. When facing congestion the weight of
+ a gateway is lowered by 1 for every ms of estimated congestion, a 'rweight'
+ value of 50 is recommended. See the example "configuring load balancing with
+ congestion detection" bellow.
+
+
+ The congestion estimation is done using an EWMA (see ds_latency_estimator_alpha).
+ If all the gateways in a set are above their congestion threshold(weight), the
+ load distribution is instead done using the ratio of estimated congestion ms.
+
@@ -1150,6 +1163,48 @@ ds_select_dst("1", "$var(a)");
...
ds_select_dst("1", "4", "3");
...
+
+
+
+ configuring load balancing with congestion detection
+
+...
+# sample of SQL provisionning statements
+INSERT INTO "dispatcher"
+VALUES(1,1,'sip:192.168.0.1:5060',0,12,'rweight=50;weight=50;cc=1;','');
+INSERT INTO "dispatcher"
+VALUES(2,1,'sip:192.168.0.2:5060',0,12,'rweight=50;weight=50;cc=1;','');
+...
+modparam("dispatcher", "ds_ping_interval", 1) # ping gateways once/second
+modparam("dispatcher", "ds_ping_latency_stats", 1) # update congestion metrics
+# configure the latency estimator
+modparam("dispatcher", "ds_latency_estimator_alpha", 900)
+...
+if (!ds_select_dst("1", "11")) { # use relative weight based load distribution
+...
+# sample of output from 'kamcmd dispatcher.list'
+DEST: {
+ URI: sip:192.168.0.1:5060
+ FLAGS: AP
+ PRIORITY: 12
+ ATTRS: {
+ BODY: rweight=50;weight=50;cc=1 # configuration values
+ DUID:
+ MAXLOAD: 0
+ WEIGHT: 50
+ RWEIGHT: 50
+ SOCKET:
+ }
+ LATENCY: {
+ AVG: 20.104000
+ STD: 1.273000
+ # estimated congestion is currently 25ms = 45ms(EST) -20ms(AVG)
+ EST: 45.005000
+ MAX: 132
+ TIMEOUT: 3
+ }
+}
+...