Skip to content

Commit

Permalink
dispatcher: algorithm 13
Browse files Browse the repository at this point in the history
- latency optimized round-robin with failover
- optionally congestion can be use instead of latency
  • Loading branch information
jchavanton committed Nov 10, 2020
1 parent e9624bc commit 1c819e0
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 5 deletions.
132 changes: 128 additions & 4 deletions src/modules/dispatcher/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
#define DS_ALG_CALLLOAD 10
#define DS_ALG_RELWEIGHT 11
#define DS_ALG_PARALLEL 12
#define DS_ALG_LATENCY 13

/* increment call load */
#define DS_LOAD_INC(dgrp, didx) do { \
Expand Down Expand Up @@ -2066,6 +2067,32 @@ int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, uint32_t limit,
return ret;
}

typedef struct sorted_ds {
int idx;
int priority;
} sorted_ds_t;

int ds_manage_routes_fill_reodered_xavp(sorted_ds_t *ds_sorted, ds_set_t *idx, ds_select_state_t *rstate)
{
int i;
for(i=0; i < idx->nr && rstate->cnt < rstate->limit; i++) {
if(ds_sorted[i].idx < 0 || ds_skip_dst(idx->dlist[i].flags)
|| (ds_use_default != 0 && ds_sorted[i].idx == (idx->nr - 1))) {
continue;
}
if(ds_add_xavp_record(idx, ds_sorted[i].idx, rstate->setid, rstate->alg,
&rstate->lxavp)<0) {
LM_ERR("failed to add destination in the xavp (%d/%d)\n",
ds_sorted[i].idx, rstate->setid);
return -1;
}
LM_DBG("destination added in the xavp (%d/%d)\n",
ds_sorted[i].idx, rstate->setid);
rstate->cnt++;
}
return 0;
}

int ds_manage_routes_fill_xavp(unsigned int hash, ds_set_t *idx, ds_select_state_t *rstate)
{
int i;
Expand Down Expand Up @@ -2125,6 +2152,80 @@ int ds_manage_routes_fill_xavp(unsigned int hash, ds_set_t *idx, ds_select_state
return 0;
}


void ds_sorted_by_priority(sorted_ds_t * sorted_ds, int size) {
int i,ii;
for(i=0;i<size;++i) {
for(ii=1;ii<size;++ii) {
sorted_ds_t temp;
if(sorted_ds[ii-1].priority < sorted_ds[ii].priority) {
temp.idx = sorted_ds[ii].idx;
temp.priority = sorted_ds[ii].priority;
sorted_ds[ii].idx = sorted_ds[ii-1].idx;
sorted_ds[ii].priority = sorted_ds[ii-1].priority;
sorted_ds[ii-1].idx = temp.idx;
sorted_ds[ii-1].priority = temp.priority;
}
}
}
}

int ds_manage_route_algo13(ds_set_t *idx, ds_select_state_t *rstate) {
int hash = idx->last;
int y = 0;
int z = hash;
int active_priority = 0;
sorted_ds_t *ds_sorted = pkg_malloc(sizeof(sorted_ds_t) * idx->nr);
if(ds_sorted == NULL) {
LM_ERR("no more pkg\n");
return -1;
}

for(y=0; y<idx->nr ;y++) {
int latency_proirity_handicap = 0;
ds_dest_t * ds_dest = &idx->dlist[z];
int gw_priority = ds_dest->priority;
int gw_latency = ds_dest->latency_stats.estimate;
int gw_inactive = ds_skip_dst(ds_dest->flags);
// if cc is enabled, the latency is the congestion ms instead of the estimated latency.
if (ds_dest->attrs.congestion_control)
gw_latency = ds_dest->latency_stats.estimate - ds_dest->latency_stats.average;
if(!gw_inactive) {
if(gw_latency > gw_priority && gw_priority > 0)
latency_proirity_handicap = gw_latency / gw_priority;
ds_dest->attrs.rpriority = gw_priority - latency_proirity_handicap;
if(ds_dest->attrs.rpriority < 1 && gw_priority > 0)
ds_dest->attrs.rpriority = 1;
if(ds_dest->attrs.rpriority > active_priority) {
hash = z;
active_priority = ds_dest->attrs.rpriority;
}
ds_sorted[y].idx = z;
ds_sorted[y].priority = ds_dest->attrs.rpriority;
LM_DBG("[active]idx[%d]uri[%.*s]priority[%d-%d=%d]latency[%dms]flag[%d]\n",
z, ds_dest->uri.len, ds_dest->uri.s,
gw_priority, latency_proirity_handicap,
ds_dest->attrs.rpriority, gw_latency, ds_dest->flags);
} else {
ds_sorted[y].idx = -1;
ds_sorted[y].priority = -1;
LM_DBG("[inactive]idx[%d]uri[%.*s]priority[%d]latency[%dms]flag[%d]",
z, ds_dest->uri.len, ds_dest->uri.s,
gw_priority, gw_latency, ds_dest->flags);
}
if(ds_use_default != 0 && idx->nr != 1)
z = (z + 1) % (idx->nr - 1);
else
z = (z + 1) % idx->nr;
}
idx->last = hash % idx->nr;
LM_DBG("priority[%d]gateway_selected[%d]next_index[%d]\n", active_priority, hash, idx->last);
ds_sorted_by_priority(ds_sorted, idx->nr);
ds_manage_routes_fill_reodered_xavp(ds_sorted, idx, rstate);
pkg_free(ds_sorted);
return hash;
}

/**
*
*/
Expand All @@ -2135,6 +2236,7 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
ds_set_t *idx = NULL;
int ulast = 0;
int vlast = 0;
int xavp_filled = 0;

if(msg == NULL) {
LM_ERR("bad parameters\n");
Expand Down Expand Up @@ -2270,6 +2372,15 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
case DS_ALG_PARALLEL: /* 12 - parallel dispatching */
hash = 0;
break;
case DS_ALG_LATENCY: /* 13 - latency optimized round-robin with failover */
lock_get(&idx->lock);
hash = ds_manage_route_algo13(idx, rstate);
lock_release(&idx->lock);
if (hash < 0)
return -1;
xavp_filled = 1;
ulast = 1;
break;
default:
LM_WARN("algo %d not implemented - using first entry...\n",
rstate->alg);
Expand All @@ -2285,7 +2396,7 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
i = hash;

/* if selected address is inactive, find next active */
while(ds_skip_dst(idx->dlist[i].flags)) {
while(!xavp_filled && ds_skip_dst(idx->dlist[i].flags)) {
if(ds_use_default != 0 && idx->nr != 1)
i = (i + 1) % (idx->nr - 1);
else
Expand Down Expand Up @@ -2344,8 +2455,11 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
return 1;
}

if (ds_manage_routes_fill_xavp(hash, idx, rstate) == -1)
return -1;
if(!xavp_filled) {
if(ds_manage_routes_fill_xavp(hash, idx, rstate) == -1){
return -1;
}
}

/* add default dst to last position in XAVP list */
if(ds_use_default != 0 && hash != idx->nr - 1
Expand Down Expand Up @@ -2715,12 +2829,22 @@ int ds_update_latency(int group, str *address, int code)
int latency_ms;
/* Destination address found, this is the gateway that was pinged. */
state = ds_dest->flags;
if (!(state & DS_PROBING_DST)) {
i++;
continue;
}
if (code == 408 && latency_stats->timeout < UINT32_MAX)
latency_stats->timeout++;
gettimeofday(&now, NULL);
latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
+ (now.tv_usec - latency_stats->start.tv_usec)/1000;
latency_stats_update(latency_stats, latency_ms);
if (code != 408)
latency_stats_update(latency_stats, latency_ms);

LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]\n",
latency_stats->count, latency_ms,
latency_stats->average, address->len, address->s,
code, ds_dest->attrs.rweight);

/* Adjusting weight using congestion detection based on latency estimator. */
if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight > 0) {
Expand Down
1 change: 1 addition & 0 deletions src/modules/dispatcher/dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ typedef struct _ds_attrs {
int congestion_control;
str ping_from;
str obproxy;
int rpriority;
} ds_attrs_t;

typedef struct _ds_latency_stats {
Expand Down
39 changes: 38 additions & 1 deletion src/modules/dispatcher/doc/dispatcher_admin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ modparam("dispatcher", "reload_delta", 1)
</para>
<para>
Using this algorithm, you can also enable congestion control by setting the
attibute 'cc=1', when 'cc' is enabled the 'weight' attribute will also be
attribute 'cc=1', when 'cc' is enabled the 'weight' attribute will also be
used to control congestion tolerance. When facing congestion the weight of
a gateway is lowered by 1 for every ms of estimated congestion, a 'rweight'
value of 50 is recommended. See the example "configuring load balancing with
Expand All @@ -1261,6 +1261,43 @@ modparam("dispatcher", "reload_delta", 1)
making sense in this case.
</para>
</listitem>
<listitem>
<para>
<quote>13</quote> - latency optimized dispatching
</para>
<para>
- The algorithm will load balance using round-robin prioritizing the gateways with the highest priority.
</para>
<para>
- If ds_ping_latency_stats is active the algorithm will adjust the priority of the gateway automatically,
the priority will be lowered by 1 point every time the latency ms is as high as the priority.
</para>
<para>
- If the attribute 'cc=1' is set, the latency used is congestion ms : estimate (current latency ms) - average (normal condition latency ms).
</para>
<example>
<title><function>latency_optimized_dispatching</function> usage</title>
<programlisting format="linespecific">
Using this simple formula :
ADJUSTED_PRIORITY = PRIORITY - (ESTIMATED_LATENCY_MS/PRIORITY)

GATEWAY | PRIORITY | ESTIMATED | ADJUSTED | LOAD
# | | LATENCY | PRIORITY | DISTRIBUTION
1 | 30 | 21 | 30 | 33%
2 | 30 | 91 | 27 | 0%
3 | 30 | 61 | 28 | 0%
4 | 30 | 19 | 30 | 33%
5 | 30 | 32 | 29 | 0%
6 | 30 | 0 | 30 | 33%
7 | 30 | 201 | 24 | 0%


With congestion control the formula becomes :
CONGESTION_MS = CURRENT_LATENCY_MS - NORMAL_CONDITION_LATENCY_MS
ADJUSTED_PRIORITY = PRIORITY - (CONGESTION_MS/PRIORITY)
</programlisting>
</example>
</listitem>
<listitem>
<para>
<quote>X</quote> - if the algorithm is not implemented, the
Expand Down

0 comments on commit 1c819e0

Please sign in to comment.