diff --git a/src/modules/dispatcher/dispatch.c b/src/modules/dispatcher/dispatch.c index 12442c822da..d33135098aa 100644 --- a/src/modules/dispatcher/dispatch.c +++ b/src/modules/dispatcher/dispatch.c @@ -2653,14 +2653,47 @@ static inline void latency_stats_update(ds_latency_stats_t *latency_stats, int l } } +typedef struct congestion_control_state { + int gw_congested_count; + int gw_normal_count; + int total_congestion_ms; + int enabled; + int apply_rweights; +} congestion_control_state_t; + +int ds_update_weighted_congestion_control(congestion_control_state_t *cc, int weight, ds_latency_stats_t *latency_stats) +{ + int active_weight = 0; + int congestion_ms = latency_stats->estimate - latency_stats->average; + if (weight <= 0) return 0; + if (congestion_ms < 0) congestion_ms = 0; + cc->total_congestion_ms += congestion_ms; + active_weight = weight - congestion_ms; + if (active_weight < 0) active_weight = 0; + if (active_weight == 0) { + cc->gw_congested_count++; + } else { + cc->gw_normal_count++; + } + return active_weight; +} + +void ds_init_congestion_control_state(congestion_control_state_t *cc) +{ + cc->gw_congested_count = 0; + cc->gw_normal_count = 0; + cc->total_congestion_ms = 0; + cc->enabled = 1; + cc->apply_rweights = 0; +} + int ds_update_latency(int group, str *address, int code) { int i = 0; int state = 0; ds_set_t *idx = NULL; - int apply_rweights = 0; - int all_gw_congested = 1; - int total_congestion_ms = 0; + congestion_control_state_t cc; + ds_init_congestion_control_state(&cc); if(_ds_list == NULL || _ds_list_nr <= 0) { LM_ERR("the list is null\n"); @@ -2680,7 +2713,6 @@ int ds_update_latency(int group, str *address, int code) && strncasecmp(ds_dest->uri.s, address->s, address->len) == 0) { struct timeval now; int latency_ms; - int congestion_ms; /* Destination address found, this is the gateway that was pinged. */ state = ds_dest->flags; if (code == 408 && latency_stats->timeout < UINT32_MAX) @@ -2690,43 +2722,28 @@ int ds_update_latency(int group, str *address, int code) + (now.tv_usec - latency_stats->start.tv_usec)/1000; latency_stats_update(latency_stats, latency_ms); - congestion_ms = latency_stats->estimate - latency_stats->average; - if (congestion_ms < 0) congestion_ms = 0; - total_congestion_ms += congestion_ms; - /* Adjusting weight using congestion detection based on latency estimator. */ - if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight) { - int active_weight = ds_dest->attrs.weight - congestion_ms; - if (active_weight <= 0) { - active_weight = 0; - } else { - all_gw_congested = 0; - } + if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight > 0) { + int active_weight = ds_update_weighted_congestion_control(&cc, ds_dest->attrs.weight, latency_stats); if (ds_dest->attrs.rweight != active_weight) { - apply_rweights = 1; + cc.apply_rweights = 1; ds_dest->attrs.rweight = active_weight; } LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]cms[%d]\n", latency_stats->count, latency_ms, latency_stats->average, address->len, address->s, - code, ds_dest->attrs.rweight, congestion_ms); + code, ds_dest->attrs.rweight, ds_dest->attrs.weight - active_weight); } - } else { - /* Another gateway in the set, we verify if it is congested. */ - int congestion_ms; - int active_weight; - congestion_ms = latency_stats->estimate - latency_stats->average; - if (congestion_ms < 0) congestion_ms = 0; - total_congestion_ms += congestion_ms; - active_weight = ds_dest->attrs.weight - congestion_ms; - if (active_weight > 0) all_gw_congested = 0; + } else if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight > 0) { + /* This is another gateway in the set, we verify if it is congested. */ + ds_update_weighted_congestion_control(&cc, ds_dest->attrs.weight, latency_stats); } - if (!ds_dest->attrs.congestion_control) all_gw_congested = 0; + if (!ds_dest->attrs.congestion_control) cc.enabled = 0; i++; } /* All the GWs are above their congestion threshold, load distribution will now be based on * the ratio of congestion_ms each GW is facing. */ - if (all_gw_congested) { + if (cc.enabled && cc.gw_congested_count > 1 && cc.gw_normal_count == 0) { i = 0; while (i < idx->nr) { int congestion_ms; @@ -2735,21 +2752,21 @@ int ds_update_latency(int group, str *address, int code) ds_latency_stats_t *latency_stats = &ds_dest->latency_stats; congestion_ms = latency_stats->estimate - latency_stats->average; /* We multiply by 2^4 to keep enough precision */ - active_weight = (total_congestion_ms << 4) / congestion_ms; + active_weight = (cc.total_congestion_ms << 4) / congestion_ms; if (ds_dest->attrs.rweight != active_weight) { - apply_rweights = 1; + cc.apply_rweights = 1; ds_dest->attrs.rweight = active_weight; } LM_DBG("all gw congested[%d][%d]latency_avg[%.2f][%.*s]code[%d]rweight[%d/%d:%d]cms[%d]\n", - total_congestion_ms, latency_stats->count, latency_stats->average, - address->len, address->s, code, total_congestion_ms, congestion_ms, + cc.total_congestion_ms, latency_stats->count, latency_stats->average, + ds_dest->uri.len, ds_dest->uri.s, code, cc.total_congestion_ms, congestion_ms, ds_dest->attrs.rweight, congestion_ms); i++; } } lock_release(&idx->lock); - if (apply_rweights) dp_init_relative_weights(idx); + if (cc.enabled && cc.apply_rweights) dp_init_relative_weights(idx); return state; } diff --git a/src/modules/dispatcher/doc/dispatcher_admin.xml b/src/modules/dispatcher/doc/dispatcher_admin.xml index d4ed3261ec0..425ecaf61bd 100644 --- a/src/modules/dispatcher/doc/dispatcher_admin.xml +++ b/src/modules/dispatcher/doc/dispatcher_admin.xml @@ -1241,7 +1241,7 @@ modparam("dispatcher", "reload_delta", 1) Using this algorithm, you can also enable congestion control by setting the - attibute 'cc=1', when 'cc' is enabled the 'rweight' attribute will also be + attibute 'cc=1', when 'cc' is enabled the 'weight' attribute will also be used to control congestion tolerance. When facing congestion the weight of a gateway is lowered by 1 for every ms of estimated congestion, a 'rweight' value of 50 is recommended. See the example "configuring load balancing with