diff --git a/src/modules/dispatcher/dispatch.c b/src/modules/dispatcher/dispatch.c index 12442c822da..a132019c903 100644 --- a/src/modules/dispatcher/dispatch.c +++ b/src/modules/dispatcher/dispatch.c @@ -78,6 +78,7 @@ #define DS_ALG_CALLLOAD 10 #define DS_ALG_RELWEIGHT 11 #define DS_ALG_PARALLEL 12 +#define DS_ALG_LATENCY 13 /* increment call load */ #define DS_LOAD_INC(dgrp, didx) do { \ @@ -2061,11 +2062,37 @@ int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, uint32_t limit, } } - LM_DBG("selected target destinations: %d\n", vstate.cnt); + LM_NOTICE("selected target destinations: %d\n", vstate.cnt); return ret; } +typedef struct sorted_ds { + int idx; + int priority; +} sorted_ds_t; + +int ds_manage_routes_fill_reodered_xavp(sorted_ds_t *ds_sorted, ds_set_t *idx, ds_select_state_t *rstate) +{ + int i; + for(i=0; i < idx->nr && rstate->cnt < rstate->limit; i++) { + if(ds_sorted[i].idx < 0 || ds_skip_dst(idx->dlist[i].flags) + || (ds_use_default != 0 && ds_sorted[i].idx == (idx->nr - 1))) { + continue; + } + if(ds_add_xavp_record(idx, ds_sorted[i].idx, rstate->setid, rstate->alg, + &rstate->lxavp)<0) { + LM_ERR("failed to add destination in the xavp (%d/%d)\n", + ds_sorted[i].idx, rstate->setid); + return -1; + } + LM_ERR("destination added in the xavp (%d/%d)\n", + ds_sorted[i].idx, rstate->setid); + rstate->cnt++; + } + return 0; +} + int ds_manage_routes_fill_xavp(unsigned int hash, ds_set_t *idx, ds_select_state_t *rstate) { int i; @@ -2125,6 +2152,78 @@ int ds_manage_routes_fill_xavp(unsigned int hash, ds_set_t *idx, ds_select_state return 0; } + +void ds_sorted_by_priority(sorted_ds_t * sorted_ds, int size) { + int i,ii; + for(i=0;ilast; + int y = 0; + int z = hash; + int active_priority = 0; + sorted_ds_t *ds_sorted = pkg_malloc(sizeof(sorted_ds_t) * idx->nr); + if(ds_sorted == NULL) { + LM_ERR("no more pkg\n"); + return -1; + } + + for(y=0; ynr ;y++) { + int latency_proirity_handicap = 0; + int gw_priority = idx->dlist[z].priority; + int gw_latency = idx->dlist[z].latency_stats.estimate; + int gw_inactive = ds_skip_dst(idx->dlist[z].flags); + if(!gw_inactive) { + if(gw_latency > gw_priority && gw_priority > 0) + latency_proirity_handicap = gw_latency / gw_priority; + idx->dlist[z].attrs.rpriority = gw_priority - latency_proirity_handicap; + if(idx->dlist[z].attrs.rpriority < 1 && gw_priority > 0) + idx->dlist[z].attrs.rpriority = 1; + if(idx->dlist[z].attrs.rpriority > active_priority) { + hash = z; + active_priority = idx->dlist[z].attrs.rpriority; + } + ds_sorted[y].idx = z; + ds_sorted[y].priority = idx->dlist[z].attrs.rpriority; + LM_NOTICE("[active]idx[%d]uri[%.*s]priority[%d-%d=%d]latency[%dms]flag[%d]\n", + z, idx->dlist[z].uri.len, idx->dlist[z].uri.s, + gw_priority, latency_proirity_handicap, + idx->dlist[z].attrs.rpriority, gw_latency, idx->dlist[z].flags); + } else { + ds_sorted[y].idx = -1; + ds_sorted[y].priority = -1; + LM_NOTICE("[inactive]idx[%d]uri[%.*s]priority[%d]latency[%dms]flag[%d]", + z, idx->dlist[z].uri.len, idx->dlist[z].uri.s, + gw_priority, gw_latency, idx->dlist[z].flags); + } + if(ds_use_default != 0 && idx->nr != 1) + z = (z + 1) % (idx->nr - 1); + else + z = (z + 1) % idx->nr; + } + idx->last = hash % idx->nr; + LM_NOTICE("priority[%d]gateway_selected[%d]next_index[%d]\n", active_priority, hash, idx->last); + ds_sorted_by_priority(ds_sorted, idx->nr); + for(y=0; ynr ;y++) { + LM_NOTICE("ds_sorted:idx[%d]priority[%d]\n", ds_sorted[y].idx, ds_sorted[y].priority); + } + ds_manage_routes_fill_reodered_xavp(ds_sorted, idx, rstate); + return hash; +} + /** * */ @@ -2135,6 +2234,7 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate) ds_set_t *idx = NULL; int ulast = 0; int vlast = 0; + int xavp_filled = 0; if(msg == NULL) { LM_ERR("bad parameters\n"); @@ -2270,6 +2370,13 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate) case DS_ALG_PARALLEL: /* 12 - parallel dispatching */ hash = 0; break; + case DS_ALG_LATENCY: /* 13 - latency optimized round-robin with failover */ + hash = ds_manage_route_algo13(idx, rstate); + if (hash < 0) + return -1; + xavp_filled = 1; + ulast = 1; + break; default: LM_WARN("algo %d not implemented - using first entry...\n", rstate->alg); @@ -2285,7 +2392,7 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate) i = hash; /* if selected address is inactive, find next active */ - while(ds_skip_dst(idx->dlist[i].flags)) { + while(!xavp_filled && ds_skip_dst(idx->dlist[i].flags)) { if(ds_use_default != 0 && idx->nr != 1) i = (i + 1) % (idx->nr - 1); else @@ -2344,8 +2451,11 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate) return 1; } - if (ds_manage_routes_fill_xavp(hash, idx, rstate) == -1) - return -1; + if(!xavp_filled) { + if(ds_manage_routes_fill_xavp(hash, idx, rstate) == -1){ + return -1; + } + } /* add default dst to last position in XAVP list */ if(ds_use_default != 0 && hash != idx->nr - 1 @@ -2683,12 +2793,22 @@ int ds_update_latency(int group, str *address, int code) int congestion_ms; /* Destination address found, this is the gateway that was pinged. */ state = ds_dest->flags; + if (!(state & DS_PROBING_DST)) { + i++; + continue; + } if (code == 408 && latency_stats->timeout < UINT32_MAX) latency_stats->timeout++; gettimeofday(&now, NULL); latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000 + (now.tv_usec - latency_stats->start.tv_usec)/1000; - latency_stats_update(latency_stats, latency_ms); + if (code != 408) + latency_stats_update(latency_stats, latency_ms); + + LM_NOTICE("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]\n", + latency_stats->count, latency_ms, + latency_stats->average, address->len, address->s, + code, ds_dest->attrs.rweight); congestion_ms = latency_stats->estimate - latency_stats->average; if (congestion_ms < 0) congestion_ms = 0; diff --git a/src/modules/dispatcher/dispatch.h b/src/modules/dispatcher/dispatch.h index dada7ed5d85..2da3121fde8 100644 --- a/src/modules/dispatcher/dispatch.h +++ b/src/modules/dispatcher/dispatch.h @@ -190,6 +190,7 @@ typedef struct _ds_attrs { int congestion_control; str ping_from; str obproxy; + int rpriority; } ds_attrs_t; typedef struct _ds_latency_stats { diff --git a/src/modules/dispatcher/doc/dispatcher_admin.xml b/src/modules/dispatcher/doc/dispatcher_admin.xml index d4ed3261ec0..391479f3985 100644 --- a/src/modules/dispatcher/doc/dispatcher_admin.xml +++ b/src/modules/dispatcher/doc/dispatcher_admin.xml @@ -1261,6 +1261,34 @@ modparam("dispatcher", "reload_delta", 1) making sense in this case. + + + 13 - latency optimized dispatching + + + - The algorithm will load balance using round-robin prioritizing the gateways with the highest priority. + + + - If ds_ping_latency_stats is active the algorithm be able to adjust the priority of the gateway automaticaly, + the priority will be lowered by 1 point every time the latency ms is as high as the priority. + + + <function>latency_optimized_dispatching</function> usage + +Using this simple formula : ADJUSTED_PRIORITY = PRIORITY - (ESTIMATED_LATENCY/PRIORITY) + +GATEWAY | PRIORITY | ESTIMATED | ADJUSTED | LOAD + # | | LATENCY | PRIORITY | DISTRIBUTION + 1 | 30 | 21 | 30 | 33% + 2 | 30 | 91 | 27 | 0% + 3 | 30 | 61 | 28 | 0% + 4 | 30 | 19 | 30 | 33% + 5 | 30 | 32 | 29 | 0% + 6 | 30 | 0 | 30 | 33% + 7 | 30 | 201 | 24 | 0% + + + X - if the algorithm is not implemented, the