Skip to content

Commit

Permalink
dispatcher: latency statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
jchavanton committed Sep 14, 2017
1 parent 90ab9e4 commit 12a25ed
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 2 deletions.
87 changes: 87 additions & 0 deletions src/modules/dispatcher/dispatch.c
Expand Up @@ -31,6 +31,7 @@
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <math.h>

#include "../../core/ut.h"
#include "../../core/trim.h"
Expand Down Expand Up @@ -85,6 +86,8 @@ static int *_ds_ping_active = NULL;

extern int ds_force_dst;
extern str ds_event_callback;
extern int ds_ping_latency_stats;
extern float ds_latency_estimator_alpha;

static db_func_t ds_dbf;
static db1_con_t *ds_db_handle = NULL;
Expand Down Expand Up @@ -2271,6 +2274,85 @@ int ds_mark_dst(struct sip_msg *msg, int state)
return (ret == 0) ? 1 : -1;
}


static inline void latency_stats_update(ds_latency_stats_t *latency_stats, int latency) {
float current_average, current_q;
/* after 2^21 smaples, ~24 days at 1s interval, the average becomes weighted moving average */
if (latency_stats->count < 2097152)
latency_stats->count++;
if (latency_stats->count == 1) {
latency_stats->stdev = 0.0f;
latency_stats->last_q = 0.0f;
latency_stats->max = latency;
latency_stats->min = latency;
latency_stats->average = latency;
latency_stats->estimate = latency;
}
if (latency_stats->min > latency)
latency_stats->min = latency;
if (latency_stats->max < latency)
latency_stats->max = latency;

/* standard deviation of the average/weighted moving average */
if (latency_stats->count > 1) {
current_average = latency_stats->average + (latency - latency_stats->average) / latency_stats->count;
current_q = latency_stats->last_q + (latency - latency_stats->average)*(latency - current_average);
latency_stats->average = current_average;
latency_stats->last_q = current_q;
latency_stats->stdev = sqrt(current_q/(latency_stats->count-1));
}
/* exponentialy weighted moving average */
if (latency_stats->count < 10) {
latency_stats->estimate = latency_stats->average;
} else {
latency_stats->estimate = latency_stats->estimate*ds_latency_estimator_alpha
+ latency*(1-ds_latency_estimator_alpha);
}
}

int ds_update_latency(int group, str *address, int code)
{
int i = 0;
int state = 0;
ds_set_t *idx = NULL;

if(_ds_list == NULL || _ds_list_nr <= 0) {
LM_ERR("the list is null\n");
return -1;
}

/* get the index of the set */
if(ds_get_index(group, *crt_idx, &idx) != 0) {
LM_ERR("destination set [%d] not found\n", group);
return -1;
}

while(i < idx->nr) {
if(idx->dlist[i].uri.len == address->len
&& strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
== 0) {

/* destination address found */
state = idx->dlist[i].flags;
ds_latency_stats_t *latency_stats = &idx->dlist[i].latency_stats;
if (code == 408 && latency_stats->timeout < UINT32_MAX) {
latency_stats->timeout++;
} else {
struct timeval now;
gettimeofday(&now, NULL);
int latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
+ (now.tv_usec - latency_stats->start.tv_usec)/1000;
latency_stats_update(latency_stats, latency_ms);
LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]\n", latency_stats->count, latency_ms,
latency_stats->average, address->len, address->s, code);
}
}
i++;
}
return state;
}


/**
* Get state for given destination
*/
Expand Down Expand Up @@ -2735,6 +2817,8 @@ static void ds_options_callback(
uri.len = t->to.len - 8;
LM_DBG("OPTIONS-Request was finished with code %d (to %.*s, group %d)\n",
ps->code, uri.len, uri.s, group);
if (ds_ping_latency_stats)
ds_update_latency(group, &uri, ps->code);
/* ps->code contains the result-code of the request.
*
* We accept both a "200 OK" or the configured reply as a valid response */
Expand Down Expand Up @@ -2805,6 +2889,9 @@ void ds_ping_set(ds_set_t *node)
&& ds_default_socket.len > 0) {
uac_r.ssock = &ds_default_socket;
}

gettimeofday(&node->dlist[j].latency_stats.start, NULL);

if(tmb.t_request(&uac_r, &node->dlist[j].uri, &node->dlist[j].uri,
&ds_ping_from, &ds_outbound_proxy)
< 0) {
Expand Down
13 changes: 13 additions & 0 deletions src/modules/dispatcher/dispatch.h
Expand Up @@ -156,12 +156,25 @@ typedef struct _ds_attrs {
int rweight;
} ds_attrs_t;

typedef struct _ds_latency_stats {
struct timeval start;
int min;
int max;
float average; // weigthed average, estimate of the last few weeks
float stdev; // last standard deviation
float estimate; // short term estimate, EWMA exponential weighted moving average
float last_q; // q for N-1
int32_t count;
uint32_t timeout;
} ds_latency_stats_t;

typedef struct _ds_dest {
str uri;
int flags;
int priority;
int dload;
ds_attrs_t attrs;
ds_latency_stats_t latency_stats;
struct socket_info * sock;
struct ip_addr ip_address; /*!< IP-Address of the entry */
unsigned short int port; /*!< Port of the URI */
Expand Down
28 changes: 27 additions & 1 deletion src/modules/dispatcher/dispatcher.c
Expand Up @@ -103,6 +103,9 @@ int inactive_threshold = 1; /* number of replied requests, before a destination
str ds_ping_method = str_init("OPTIONS");
str ds_ping_from = str_init("sip:dispatcher@localhost");
static int ds_ping_interval = 0;
int ds_ping_latency_stats = 0;
int ds_latency_estimator_alpha_i = 900;
float ds_latency_estimator_alpha = 0.9f;
int ds_probing_mode = DS_PROBE_NONE;

static str ds_ping_reply_codes_str= STR_NULL;
Expand Down Expand Up @@ -242,6 +245,8 @@ static param_export_t params[]={
{"ds_ping_method", PARAM_STR, &ds_ping_method},
{"ds_ping_from", PARAM_STR, &ds_ping_from},
{"ds_ping_interval", INT_PARAM, &ds_ping_interval},
{"ds_ping_latency_stats", INT_PARAM, &ds_ping_latency_stats},
{"ds_latency_estimator_alpha", INT_PARAM, &ds_latency_estimator_alpha_i},
{"ds_ping_reply_codes", PARAM_STR, &ds_ping_reply_codes_str},
{"ds_probing_mode", INT_PARAM, &ds_probing_mode},
{"ds_hash_size", INT_PARAM, &ds_hash_size},
Expand Down Expand Up @@ -527,7 +532,12 @@ static int mod_init(void)
return -1;
}
}

if (ds_latency_estimator_alpha_i > 0 && ds_latency_estimator_alpha_i < 1000) {
ds_latency_estimator_alpha = ds_latency_estimator_alpha_i/1000.0f;
} else {
LM_ERR("invalid ds_latency_estimator_alpha must be between 0 and 1000,"
" using default[%.3f]\n", ds_latency_estimator_alpha);
}
return 0;
}

Expand Down Expand Up @@ -1184,6 +1194,7 @@ int ds_rpc_print_set(ds_set_t *node, rpc_t *rpc, void *ctx, void *rpc_handle)
void *sh;
void *vh;
void *wh;
void *lh; // latency stats handle
int j;
char c[3];
str data = STR_NULL;
Expand Down Expand Up @@ -1248,6 +1259,21 @@ int ds_rpc_print_set(ds_set_t *node, rpc_t *rpc, void *ctx, void *rpc_handle)
return -1;
}
}
if (ds_ping_latency_stats) {
if(rpc->struct_add(vh, "{", "LATENCY", &lh) < 0) {
rpc->fault(ctx, 500, "Internal error creating dest");
return -1;
}
if (rpc->struct_add(lh, "fffdd", "AVG", node->dlist[j].latency_stats.average,
"STD", node->dlist[j].latency_stats.stdev,
"EST", node->dlist[j].latency_stats.estimate,
"MAX", node->dlist[j].latency_stats.max,
"TIMEOUT", node->dlist[j].latency_stats.timeout)
< 0) {
rpc->fault(ctx, 500, "Internal error creating dest struct");
return -1;
}
}
}

return 0;
Expand Down
9 changes: 9 additions & 0 deletions src/modules/dispatcher/doc/dispatcher.xml
Expand Up @@ -54,6 +54,11 @@
<email>martingil.luis@gmail.com</email>
</address>
</editor>
<editor>
<firstname>Julien</firstname>
<surname>Chavanton</surname>
<email>jchavanton@gmail.com</email>
</editor>
</authorgroup>
<copyright>
<year>2004</year>
Expand All @@ -75,6 +80,10 @@
<year>2015</year>
<holder>Alessandro Arrichiello, Hewlett Packard</holder>
</copyright>
<copyright>
<year>2017</year>
<holder>Julien chavanton, Flowroute</holder>
</copyright>
</bookinfo>
<toc></toc>

Expand Down
75 changes: 74 additions & 1 deletion src/modules/dispatcher/doc/dispatcher_admin.xml
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding='ISO-8859-1'?>
<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [

<!-- Include general documentation entities -->
<!ENTITY % docentities SYSTEM "../../../../doc/docbook/entities.xml">
%docentities;
Expand Down Expand Up @@ -706,6 +706,79 @@ modparam("dispatcher", "force_dst", 1)
</example>
</section>

<section id="dispatcher.p.ds_ping_latency_stats">
<title><varname>ds_ping_latency_stats</varname> (int)</title>
<para>
Enable latency measurement when pinging nodes
</para>

<itemizedlist>
<listitem>
<para>If set to 0, disable latency measurement.</para>
</listitem>
<listitem>
<para>If set to 1, enable latency measurement.
</para>
</listitem>
</itemizedlist>
<para>
<emphasis>
Default value is <quote>0</quote>.
</emphasis>
</para>
<example>
<title>accessing the metrics</title>
<programlisting format="linespecific">
# using the command :
kamcmd dispatcher.list
...
DEST: {
URI: sip:1.2.3.4
FLAGS: AX
PRIORITY: 9
LATENCY: {
AVG: 24.250000 # weigthed moving average for the last few weeks
STD: 1.035000 # standard deviation of AVG
EST: 25.000000 # short term estimate, see parameter: ds_latency_estimator_alpha
MAX: 26 # maximun value seen
TIMEOUT: 0 # count of ping timeouts
}
}
...
</programlisting>
</example>
<example>
<title>Set the <quote>ds_ping_latency_stats</quote> parameter</title>
<programlisting format="linespecific">
...
modparam("dispatcher", "ds_ping_latency_stats", 1)
...
</programlisting>
</example>
</section>
<section id="dispatcher.p.ds_latency_estimator_alpha">
<title><varname>ds_latency_estimator_alpha</varname> (int)</title>
<para>
The value to be used to control the memory of the estimator EWMA "exponential weighted moving average" or
"the speed at which the older samples are dampened"
a goog explanation can be found here : http://www.itl.nist.gov/div898/handbook/pmc/section3/pmc324.htm
Because Kamailio doesn't support float parameter types, the value in the parameter is divided by 1000 and stored as float.
For example, if you want to set the alpha to be 0.75, use value 750 here.
</para>
<para>
<emphasis>
Default value is <quote>900 => 0.9</quote>.
</emphasis>
</para>
<example>
<title>Set the <quote>ds_hash_size</quote> parameter</title>
<programlisting format="linespecific">
...
modparam("dispatcher", "ds_latency_estimator_alpha", 900)
...
</programlisting>
</example>
</section>
<section id="dispatcher.p.ds_hash_size">
<title><varname>ds_hash_size</varname> (int)</title>
<para>
Expand Down

0 comments on commit 12a25ed

Please sign in to comment.