Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Rework to include simpler sampling scheme and time measurement

  • Loading branch information...
commit 6d0d58df8fcbcc141ebc3681c4879cde3f8a0143 1 parent 5fd8384
@sflow authored
View
10 configure.ac
@@ -488,13 +488,13 @@ AC_PATH_PROG([XSLTPROC], [xsltproc], "no")
AM_CONDITIONAL([BUILD_SPECIFICATIONS],
[test "x$enable_docs" != "xno" -a "x$XML2RFC" != "xno" -a "x$XSLTPROC" != "xno"])
-dnl Option to include sFlow
+dnl Option to disable sFlow
AC_ARG_ENABLE(sflow,
- [AS_HELP_STRING([--enable-sflow],[Include sFlow instrumentation])])
-if test "x$enable_sflow" = "xyes"; then
- AC_DEFINE([ENABLE_SFLOW],1,[Set to nonzero if you want to include sFlow])
+ [AS_HELP_STRING([--disable-sflow],[Disable sFlow instrumentation])])
+if test "x$enable_sflow" != "xno"; then
+ AC_DEFINE([ENABLE_SFLOW],1,[Set to nonzero for sFlow])
fi
-AM_CONDITIONAL([BUILD_SFLOW],[test "x$enable_sflow" = "xyes"])
+AM_CONDITIONAL([BUILD_SFLOW],[test "x$enable_sflow" != "xno"])
dnl Let the compiler be a bit more picky. Please note that you cannot
dnl specify these flags to the compiler before AC_CHECK_FUNCS, because
View
146 memcached.c
@@ -47,10 +47,10 @@
#include <sysexits.h>
#include <stddef.h>
-#ifdef ENABLE_SFLOW
+/* include sflow_mc.h even if ENABLE_SFLOW is not defined
+ so that the SFLOW_SAMPLE macro can be defined to do
+ something or be a no-op */
#include "sflow_mc.h"
-static SFMC sFlow;
-#endif
/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
#ifndef IOV_MAX
@@ -623,6 +623,7 @@ static void conn_set_state(conn *c, enum conn_states state) {
if (state == conn_write || state == conn_mwrite) {
MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
+ SFLOW_SAMPLE(SFMC_CMD_OTHER, c, NULL, 0, 0, -1, -1); // catch-all
}
}
}
@@ -811,19 +812,8 @@ static void complete_nread_ascii(conn *c) {
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
ret = store_item(it, comm, c);
-
-#ifdef ENABLE_SFLOW
- if(SFLOW_SAMPLE_TEST(c)) {
- sflow_sample(&sFlow, c, SFMC_PROT_ASCII,
- sflow_map_nread(c->cmd),
- ITEM_key(it),
- it->nkey,
- SFLOW_TOKENS_UNKNOWN,
- (ret == STORED) ? it->nbytes : 0,
- SFLOW_DURATION_UNKNOWN,
- sflow_map_status(ret));
- }
-#endif
+
+ SFLOW_SAMPLE(SFMC_CMD_OTHER, c, ITEM_key(it), it->nkey, 0, (ret==STORED) ? it->nbytes : 0, ret);
#ifdef ENABLE_DTRACE
uint64_t cas = ITEM_get_cas(it);
@@ -1044,19 +1034,7 @@ static void complete_incr_bin(conn *c) {
it = item_get(key, nkey);
-
-#ifdef ENABLE_SFLOW
- if(SFLOW_SAMPLE_TEST(c)) {
- sflow_sample(&sFlow, c, SFMC_PROT_BINARY,
- SFMC_CMD_INCR,
- key,
- nkey,
- SFLOW_TOKENS_UNKNOWN,
- it ? it->nbytes : 0,
- SFLOW_DURATION_UNKNOWN,
- SFMC_OP_OK);
- }
-#endif
+ SFLOW_SAMPLE(SFMC_CMD_INCR, c, key, nkey, 0, it ? it->nbytes : 0, EXISTS);
if (it && (c->binary_header.request.cas == 0 ||
c->binary_header.request.cas == ITEM_get_cas(it))) {
@@ -1141,19 +1119,8 @@ static void complete_update_bin(conn *c) {
*(ITEM_data(it) + it->nbytes - 1) = '\n';
ret = store_item(it, c->cmd, c);
-
-#ifdef ENABLE_SFLOW
- if(SFLOW_SAMPLE_TEST(c)) {
- sflow_sample(&sFlow, c, SFMC_PROT_BINARY,
- sflow_map_nread(c->cmd),
- ITEM_key(it),
- it->nkey,
- SFLOW_TOKENS_UNKNOWN,
- (ret == STORED) ? it->nbytes : 0,
- SFLOW_DURATION_UNKNOWN,
- sflow_map_status(ret));
- }
-#endif
+
+ SFLOW_SAMPLE(SFMC_CMD_OTHER, c, ITEM_key(it), it->nkey, 0, (ret == STORED) ? it->nbytes : 0, ret);
#ifdef ENABLE_DTRACE
uint64_t cas = ITEM_get_cas(it);
@@ -1224,18 +1191,9 @@ static void process_bin_get(conn *c) {
}
it = item_get(key, nkey);
-#ifdef ENABLE_SFLOW
- if(SFLOW_SAMPLE_TEST(c)) {
- sflow_sample(&sFlow, c, SFMC_PROT_BINARY,
- SFMC_CMD_GET,
- key,
- nkey,
- SFLOW_TOKENS_UNKNOWN,
- it ? it->nbytes : 0,
- SFLOW_DURATION_UNKNOWN,
- it ? SFMC_OP_OK : SFMC_OP_NOT_FOUND);
- }
-#endif
+
+ SFLOW_SAMPLE(SFMC_CMD_GET, c, key, nkey, 0, it ? it->nbytes : 0, it ? EXISTS : NOT_FOUND);
+
if (it) {
/* the length has two unnecessary bytes ("\r\n") */
uint16_t keylen = 0;
@@ -1417,18 +1375,7 @@ static void process_bin_stat(conn *c) {
fprintf(stderr, "\n");
}
-#ifdef ENABLE_SFLOW
- if(SFLOW_SAMPLE_TEST(c)) {
- sflow_sample(&sFlow, c, SFMC_PROT_BINARY,
- SFMC_CMD_STATS,
- NULL,
- 0,
- SFLOW_TOKENS_UNKNOWN,
- 0,
- SFLOW_DURATION_UNKNOWN,
- SFMC_OP_OK);
- }
-#endif
+ SFLOW_SAMPLE(SFMC_CMD_STATS, c, NULL, 0, 0, 0, -1);
if (nkey == 0) {
/* request all statistics */
@@ -1745,6 +1692,7 @@ static void dispatch_bin_command(conn *c) {
}
MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
+ SFLOW_SAMPLE_TEST(c);
c->noreply = true;
/* binprot supports 16bit keys, but internals are still 8bit */
@@ -2051,18 +1999,7 @@ static void process_bin_flush(conn *c) {
exptime = ntohl(req->message.body.expiration);
}
-#ifdef ENABLE_SFLOW
- if(SFLOW_SAMPLE_TEST(c)) {
- sflow_sample(&sFlow, c, SFMC_PROT_BINARY,
- SFMC_CMD_FLUSH,
- NULL,
- 0,
- SFLOW_TOKENS_UNKNOWN,
- 0,
- SFLOW_DURATION_UNKNOWN,
- SFMC_OP_OK);
- }
-#endif
+ SFLOW_SAMPLE(SFMC_CMD_FLUSH, c, NULL, 0, 0, 0, -1);
set_current_time();
@@ -2100,18 +2037,7 @@ static void process_bin_delete(conn *c) {
it = item_get(key, nkey);
-#ifdef ENABLE_SFLOW
- if(SFLOW_SAMPLE_TEST(c)) {
- sflow_sample(&sFlow, c, SFMC_PROT_BINARY,
- SFMC_CMD_DELETE,
- key,
- nkey,
- SFLOW_TOKENS_UNKNOWN,
- it ? it->nbytes : 0,
- SFLOW_DURATION_UNKNOWN,
- it ? SFMC_OP_OK : SFMC_OP_NOT_FOUND);
- }
-#endif
+ SFLOW_SAMPLE(SFMC_CMD_DELETE, c, key, nkey, 0, it ? it->nbytes : 0, it ? EXISTS : NOT_FOUND);
if (it) {
uint64_t cas = ntohll(req->message.header.request.cas);
@@ -2649,18 +2575,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
stats_prefix_record_get(key, nkey, NULL != it);
}
-#ifdef ENABLE_SFLOW
- if(SFLOW_SAMPLE_TEST(c)) {
- sflow_sample(&sFlow, c, SFMC_PROT_ASCII,
- return_cas ? SFMC_CMD_GETS : SFMC_CMD_GET,
- key,
- nkey,
- ntokens - 2,
- it ? it->nbytes : 0,
- SFLOW_DURATION_UNKNOWN,
- it ? SFMC_OP_OK : SFMC_OP_NOT_FOUND);
- }
-#endif
+ SFLOW_SAMPLE(return_cas ? SFMC_CMD_GETS : SFMC_CMD_GET, c, key, nkey, ntokens-2, it ? it->nbytes : 0, it ? EXISTS : NOT_FOUND);
if (it) {
if (i >= c->isize) {
@@ -2763,6 +2678,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
if(key_token->value != NULL) {
ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
key_token = tokens;
+ SFLOW_SAMPLE_TEST(c);
}
} while(key_token->value != NULL);
@@ -3069,6 +2985,7 @@ static void process_command(conn *c, char *command) {
assert(c != NULL);
MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
+ SFLOW_SAMPLE_TEST(c);
if (settings.verbose > 1)
fprintf(stderr, "<%d %s\n", c->sfd, command);
@@ -4157,9 +4074,7 @@ static void clock_handler(const int fd, const short which, void *arg) {
evtimer_add(&clockevent, &t);
set_current_time();
-#ifdef ENABLE_SFLOW
- sflow_tick(&sFlow);
-#endif
+ SFLOW_TICK(current_time);
}
static void usage(void) {
@@ -4211,10 +4126,6 @@ static void usage(void) {
#ifdef ENABLE_SASL
printf("-S Turn on Sasl authentication\n");
#endif
-#ifdef ENABLE_SFLOW
- printf("-o sflow=on|off - enable sFlow monitoring\n");
- printf("-o sflowconfig=<file> - config file (default /etc/sflow.auto)\n");
-#endif
return;
}
@@ -4460,9 +4371,6 @@ int main (int argc, char **argv) {
"B:" /* Binding protocol */
"I:" /* Max item size */
"S" /* Sasl ON */
-#ifdef ENABLE_SFLOW
- "o:" /* other var=val setting */
-#endif
))) {
switch (c) {
case 'a':
@@ -4625,15 +4533,6 @@ int main (int argc, char **argv) {
#endif
settings.sasl = true;
break;
-#ifdef ENABLE_SFLOW
- case 'o': /* other var=val arg */
- if(strchr(optarg, '=') == NULL) {
- fprintf(stderr, "-o option expects <var>=<value> arg\n");
- exit(EX_USAGE);
- }
- sflow_processVarValueOption(&sFlow, optarg);
- break;
-#endif
default:
fprintf(stderr, "Illegal argument \"%c\"\n", c);
return 1;
@@ -4757,11 +4656,6 @@ int main (int argc, char **argv) {
assoc_init();
conn_init();
slabs_init(settings.maxbytes, settings.factor, preallocate);
-#ifdef ENABLE_SFLOW
- if(sFlow.enabled) {
- sflow_init(&sFlow);
- }
-#endif
/*
* ignore SIGPIPE signals; we can use errno == EPIPE if we
View
9 memcached.h
@@ -319,13 +319,13 @@ typedef struct {
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
#ifdef ENABLE_SFLOW
- uint32_t sflow_skip;
- uint32_t sflow_last_skip;
+ uint32_t sflow_sample_pool;
+ uint32_t sflow_random;
#endif
} LIBEVENT_THREAD;
#ifdef ENABLE_SFLOW
-uint32_t sflow_skip_init(uint32_t *thread_skips);
+uint32_t sflow_sample_pool_aggregate(void); // in thread.c
#endif
typedef struct {
@@ -423,6 +423,9 @@ struct conn {
int keylen;
conn *next; /* Used for generating a list of conn structures */
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
+#ifdef ENABLE_SFLOW
+ struct timeval sflow_start_time;
+#endif
};
View
589 sflow_api.c
@@ -1,17 +1,28 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/* Copyright (c) 2002-2010 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
+/* Copyright (c) 2002-2011 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
/* http://www.inmon.com/technology/sflowlicense.txt */
#include "sflow_api.h"
+/* internal fns */
+static void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent);
+static void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi);
+static void sfl_poller_init(SFLPoller *poller, SFLAgent *agent, SFLDataSource_instance *pdsi, void *magic, getCountersFn_t getCountersFn);
+static void sfl_receiver_tick(SFLReceiver *receiver, time_t now);
+static void sfl_poller_tick(SFLPoller *poller, time_t now);
+static int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs);
+static int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs);
+static void sfl_agent_error(SFLAgent *agent, char *modName, char *msg);
+
+#define SFL_ALLOC malloc
+#define SFL_FREE free
+
/* ===================================================*/
/* ===================== AGENT =======================*/
static void * sflAlloc(SFLAgent *agent, size_t bytes);
static void sflFree(SFLAgent *agent, void *obj);
-static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler);
-static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler);
/*_________________---------------------------__________________
_________________ alloc and free __________________
@@ -36,7 +47,7 @@ static void sflFree(SFLAgent *agent, void *obj)
*/
#define MAX_ERRMSG_LEN 1000
-void sfl_agent_error(SFLAgent *agent, char *modName, char *msg)
+static void sfl_agent_error(SFLAgent *agent, char *modName, char *msg)
{
char errm[MAX_ERRMSG_LEN];
sprintf(errm, "sfl_agent_error: %s: %s\n", modName, msg);
@@ -47,21 +58,6 @@ void sfl_agent_error(SFLAgent *agent, char *modName, char *msg)
}
}
-void sfl_agent_sysError(SFLAgent *agent, char *modName, char *msg)
-{
- char errm[MAX_ERRMSG_LEN];
- sprintf(errm, "sfl_agent_sysError: %s: %s (errno = %d - %s)\n",
- modName,
- msg,
- errno,
- strerror(errno));
- if(agent->errorFn) (*agent->errorFn)(agent->magic, agent, errm);
- else {
- fprintf(stderr, "%s\n", errm);
- fflush(stderr);
- }
-}
-
/*________________--------------------------__________________
________________ sfl_agent_init __________________
----------------__________________________------------------
@@ -69,7 +65,7 @@ void sfl_agent_sysError(SFLAgent *agent, char *modName, char *msg)
void sfl_agent_init(SFLAgent *agent,
SFLAddress *myIP, /* IP address of this agent in net byte order */
- uint32_t subId, /* agent_sub_id */
+ uint32_t subId, /* agent_sub_id */
time_t bootTime, /* agent boot time */
time_t now, /* time now */
void *magic, /* ptr to pass back in logging and alloc fns */
@@ -90,16 +86,6 @@ void sfl_agent_init(SFLAgent *agent,
agent->freeFn = freeFn;
agent->errorFn = errorFn;
agent->sendFn = sendFn;
-
-#ifdef SFLOW_DO_SOCKET
- if(sendFn == NULL) {
- /* open the socket - really need one for v4 and another for v6? */
- if((agent->receiverSocket4 = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
- sfl_agent_sysError(agent, "agent", "IPv4 socket open failed");
- if((agent->receiverSocket6 = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)) == -1)
- sfl_agent_sysError(agent, "agent", "IPv6 socket open failed");
- }
-#endif
}
/*_________________---------------------------__________________
@@ -136,12 +122,6 @@ void sfl_agent_release(SFLAgent *agent)
rcv = nextRcv;
}
agent->receivers = NULL;
-
-#ifdef SFLOW_DO_SOCKET
- /* close the sockets */
- if(agent->receiverSocket4 > 0) close(agent->receiverSocket4);
- if(agent->receiverSocket6 > 0) close(agent->receiverSocket6);
-#endif
}
/*_________________---------------------------__________________
@@ -152,14 +132,11 @@ void sfl_agent_release(SFLAgent *agent)
void sfl_agent_tick(SFLAgent *agent, time_t now)
{
SFLReceiver *rcv;
- SFLSampler *sm;
SFLPoller *pl;
agent->now = now;
/* receivers use ticks to flush send data */
for( rcv = agent->receivers; rcv != NULL; rcv = rcv->nxt) sfl_receiver_tick(rcv, now);
- /* samplers use ticks to decide when they are sampling too fast */
- for( sm = agent->samplers; sm != NULL; sm = sm->nxt) sfl_sampler_tick(sm, now);
/* pollers use ticks to decide when to ask for counters */
for( pl = agent->pollers; pl != NULL; pl = pl->nxt) sfl_poller_tick(pl, now);
}
@@ -212,7 +189,7 @@ static int sfl_dsi_compare(SFLDataSource_instance *pdsi1, SFLDataSource_instance
SFLSampler *sfl_agent_addSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
{
- SFLSampler *newsm, *prev, *sm, *test;
+ SFLSampler *newsm, *prev, *sm;
prev = NULL;
sm = agent->samplers;
@@ -228,17 +205,6 @@ SFLSampler *sfl_agent_addSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
if(prev) prev->nxt = newsm;
else agent->samplers = newsm;
newsm->nxt = sm;
-
- /* see if we should go in the ifIndex jumpTable */
- if(SFL_DS_CLASS(newsm->dsi) == 0) {
- test = sfl_agent_getSamplerByIfIndex(agent, SFL_DS_INDEX(newsm->dsi));
- if(test && (SFL_DS_INSTANCE(newsm->dsi) < SFL_DS_INSTANCE(test->dsi))) {
- /* replace with this new one because it has a lower ds_instance number */
- sfl_agent_jumpTableRemove(agent, test);
- test = NULL;
- }
- if(test == NULL) sfl_agent_jumpTableAdd(agent, newsm);
- }
return newsm;
}
@@ -270,217 +236,6 @@ SFLPoller *sfl_agent_addPoller(SFLAgent *agent,
return newpl;
}
-/*_________________---------------------------__________________
- _________________ sfl_agent_removeSampler __________________
- -----------------___________________________------------------
-*/
-
-int sfl_agent_removeSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
-{
- SFLSampler *prev, *sm;
-
- /* find it, unlink it and free it */
- for(prev = NULL, sm = agent->samplers; sm != NULL; prev = sm, sm = sm->nxt) {
- if(sfl_dsi_compare(pdsi, &sm->dsi) == 0) {
- if(prev == NULL) agent->samplers = sm->nxt;
- else prev->nxt = sm->nxt;
- sfl_agent_jumpTableRemove(agent, sm);
- sflFree(agent, sm);
- return 1;
- }
- }
- /* not found */
- return 0;
-}
-
-/*_________________---------------------------__________________
- _________________ sfl_agent_removePoller __________________
- -----------------___________________________------------------
-*/
-
-int sfl_agent_removePoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
-{
- SFLPoller *prev, *pl;
- /* find it, unlink it and free it */
- for(prev = NULL, pl = agent->pollers; pl != NULL; prev = pl, pl = pl->nxt) {
- if(sfl_dsi_compare(pdsi, &pl->dsi) == 0) {
- if(prev == NULL) agent->pollers = pl->nxt;
- else prev->nxt = pl->nxt;
- sflFree(agent, pl);
- return 1;
- }
- }
- /* not found */
- return 0;
-}
-
-/*_________________--------------------------------__________________
- _________________ sfl_agent_jumpTableAdd __________________
- -----------------________________________________------------------
-*/
-
-static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler)
-{
- uint32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ;
- sampler->hash_nxt = agent->jumpTable[hashIndex];
- agent->jumpTable[hashIndex] = sampler;
-}
-
-/*_________________--------------------------------__________________
- _________________ sfl_agent_jumpTableRemove __________________
- -----------------________________________________------------------
-*/
-
-static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler)
-{
- uint32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ;
- SFLSampler *search = agent->jumpTable[hashIndex], *prev = NULL;
- for( ; search != NULL; prev = search, search = search->hash_nxt) if(search == sampler) break;
- if(search) {
- /* found - unlink */
- if(prev) prev->hash_nxt = search->hash_nxt;
- else agent->jumpTable[hashIndex] = search->hash_nxt;
- search->hash_nxt = NULL;
- }
-}
-
-/*_________________--------------------------------__________________
- _________________ sfl_agent_getSamplerByIfIndex __________________
- -----------------________________________________------------------
- fast lookup (pointers cached in hash table). If there are multiple
- sampler instances for a given ifIndex, then this fn will return
- the one with the lowest instance number. Since the samplers
- list is sorted, this means the other instances will be accesible
- by following the sampler->nxt pointer (until the ds_class
- or ds_index changes). This is helpful if you need to offer
- the same flowSample to multiple samplers.
-*/
-
-SFLSampler *sfl_agent_getSamplerByIfIndex(SFLAgent *agent, uint32_t ifIndex)
-{
- SFLSampler *search = agent->jumpTable[ifIndex % SFL_HASHTABLE_SIZ];
- for( ; search != NULL; search = search->hash_nxt) if(SFL_DS_INDEX(search->dsi) == ifIndex) break;
- return search;
-}
-
-/*_________________---------------------------__________________
- _________________ sfl_agent_getSampler __________________
- -----------------___________________________------------------
-*/
-
-SFLSampler *sfl_agent_getSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
-{
- SFLSampler *sm;
-
- /* find it and return it */
- for( sm = agent->samplers; sm != NULL; sm = sm->nxt)
- if(sfl_dsi_compare(pdsi, &sm->dsi) == 0) return sm;
- /* not found */
- return NULL;
-}
-
-/*_________________---------------------------__________________
- _________________ sfl_agent_getPoller __________________
- -----------------___________________________------------------
-*/
-
-SFLPoller *sfl_agent_getPoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
-{
- SFLPoller *pl;
-
- /* find it and return it */
- for( pl = agent->pollers; pl != NULL; pl = pl->nxt)
- if(sfl_dsi_compare(pdsi, &pl->dsi) == 0) return pl;
- /* not found */
- return NULL;
-}
-
-/*_________________---------------------------__________________
- _________________ sfl_agent_getReceiver __________________
- -----------------___________________________------------------
-*/
-
-SFLReceiver *sfl_agent_getReceiver(SFLAgent *agent, uint32_t receiverIndex)
-{
- SFLReceiver *rcv;
-
- uint32_t rcvIdx = 0;
- for( rcv = agent->receivers; rcv != NULL; rcv = rcv->nxt)
- if(receiverIndex == ++rcvIdx) return rcv;
-
- /* not found - ran off the end of the table */
- return NULL;
-}
-
-/*_________________---------------------------__________________
- _________________ sfl_agent_getNextSampler __________________
- -----------------___________________________------------------
-*/
-
-SFLSampler *sfl_agent_getNextSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
-{
- /* return the one lexograpically just after it - assume they are sorted
- correctly according to the lexographical ordering of the object ids */
- SFLSampler *sm = sfl_agent_getSampler(agent, pdsi);
- return sm ? sm->nxt : NULL;
-}
-
-/*_________________---------------------------__________________
- _________________ sfl_agent_getNextPoller __________________
- -----------------___________________________------------------
-*/
-
-SFLPoller *sfl_agent_getNextPoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
-{
- /* return the one lexograpically just after it - assume they are sorted
- correctly according to the lexographical ordering of the object ids */
- SFLPoller *pl = sfl_agent_getPoller(agent, pdsi);
- return pl ? pl->nxt : NULL;
-}
-
-/*_________________---------------------------__________________
- _________________ sfl_agent_getNextReceiver __________________
- -----------------___________________________------------------
-*/
-
-SFLReceiver *sfl_agent_getNextReceiver(SFLAgent *agent, uint32_t receiverIndex)
-{
- return sfl_agent_getReceiver(agent, receiverIndex + 1);
-}
-
-
-/*_________________---------------------------__________________
- _________________ sfl_agent_resetReceiver __________________
- -----------------___________________________------------------
-*/
-
-void sfl_agent_resetReceiver(SFLAgent *agent, SFLReceiver *receiver)
-{
- SFLReceiver *rcv;
- SFLSampler *sm;
- SFLPoller *pl;
-
- /* tell samplers and pollers to stop sending to this receiver */
- /* first get his receiverIndex */
- uint32_t rcvIdx = 0;
- for( rcv = agent->receivers; rcv != NULL; rcv = rcv->nxt) {
- rcvIdx++; /* thanks to Diego Valverde for pointing out this bugfix */
- if(rcv == receiver) {
- /* now tell anyone that is using it to stop */
- for( sm = agent->samplers; sm != NULL; sm = sm->nxt)
- if(sfl_sampler_get_sFlowFsReceiver(sm) == rcvIdx) sfl_sampler_set_sFlowFsReceiver(sm, 0);
-
- for( pl = agent->pollers; pl != NULL; pl = pl->nxt)
- if(sfl_poller_get_sFlowCpReceiver(pl) == rcvIdx) sfl_poller_set_sFlowCpReceiver(pl, 0);
-
- break;
- }
- }
-}
-
-
-
-
/* ===================================================*/
/* ===================== SAMPLER =====================*/
@@ -489,7 +244,7 @@ void sfl_agent_resetReceiver(SFLAgent *agent, SFLReceiver *receiver)
-----------------__________________________------------------
*/
-void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi)
+static void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi)
{
/* copy the dsi in case it points to sampler->dsi, which we are about to clear.
(Thanks to Jagjit Choudray of Force 10 Networks for pointing out this bug) */
@@ -510,7 +265,6 @@ void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instan
sampler->dsi = dsi;
/* set defaults */
- sfl_sampler_set_sFlowFsMaximumHeaderSize(sampler, SFL_DEFAULT_HEADER_SIZE);
sfl_sampler_set_sFlowFsPacketSamplingRate(sampler, SFL_DEFAULT_SAMPLING_RATE);
}
@@ -519,29 +273,6 @@ void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instan
-----------------__________________________------------------
*/
-static void resetSampler(SFLSampler *sampler)
-{
- SFLDataSource_instance dsi = sampler->dsi;
- sfl_sampler_init(sampler, sampler->agent, &dsi);
-}
-
-/*_________________---------------------------__________________
- _________________ MIB access __________________
- -----------------___________________________------------------
-*/
-uint32_t sfl_sampler_get_sFlowFsReceiver(SFLSampler *sampler) {
- return sampler->sFlowFsReceiver;
-}
-
-void sfl_sampler_set_sFlowFsReceiver(SFLSampler *sampler, uint32_t sFlowFsReceiver) {
- sampler->sFlowFsReceiver = sFlowFsReceiver;
- if(sFlowFsReceiver == 0) resetSampler(sampler);
- else {
- /* retrieve and cache a direct pointer to my receiver */
- sampler->myReceiver = sfl_agent_getReceiver(sampler->agent, sampler->sFlowFsReceiver);
- }
-}
-
uint32_t sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler *sampler) {
return sampler->sFlowFsPacketSamplingRate;
}
@@ -552,30 +283,6 @@ void sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler *sampler, uint32_t sFl
sampler->skip = sfl_random(sFlowFsPacketSamplingRate);
}
-uint32_t sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler *sampler) {
- return sampler->sFlowFsMaximumHeaderSize;
-}
-
-void sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler *sampler, uint32_t sFlowFsMaximumHeaderSize) {
- sampler->sFlowFsMaximumHeaderSize = sFlowFsMaximumHeaderSize;
-}
-
-/* call this to set a maximum samples-per-second threshold. If the sampler reaches this
- threshold it will automatically back off the sampling rate. A value of 0 disables the
- mechanism */
-
-void sfl_sampler_set_backoffThreshold(SFLSampler *sampler, uint32_t samplesPerSecond) {
- sampler->backoffThreshold = samplesPerSecond;
-}
-
-uint32_t sfl_sampler_get_backoffThreshold(SFLSampler *sampler) {
- return sampler->backoffThreshold;
-}
-
-uint32_t sfl_sampler_get_samplesLastTick(SFLSampler *sampler) {
- return sampler->samplesLastTick;
-}
-
/*_________________---------------------------------__________________
_________________ sequence number reset __________________
-----------------_________________________________------------------
@@ -584,26 +291,6 @@ uint32_t sfl_sampler_get_samplesLastTick(SFLSampler *sampler) {
*/
void sfl_sampler_resetFlowSeqNo(SFLSampler *sampler) { sampler->flowSampleSeqNo = 0; }
-
-/*_________________---------------------------__________________
- _________________ sfl_sampler_tick __________________
- -----------------___________________________------------------
-*/
-
-void sfl_sampler_tick(SFLSampler *sampler, time_t now)
-{
- if(sampler->backoffThreshold && sampler->samplesThisTick > sampler->backoffThreshold) {
- /* automatic backoff. If using hardware sampling then this is where you have to */
- /* call out to change the sampling rate and make sure that any other registers/variables */
- /* that hold this value are updated. */
- sampler->sFlowFsPacketSamplingRate *= 2;
- }
- sampler->samplesLastTick = sampler->samplesThisTick;
- sampler->samplesThisTick = 0;
-}
-
-
-
/*_________________------------------------------__________________
_________________ sfl_sampler_writeFlowSample __________________
-----------------______________________________------------------
@@ -612,20 +299,16 @@ void sfl_sampler_tick(SFLSampler *sampler, time_t now)
void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs)
{
if(fs == NULL) return;
- sampler->samplesThisTick++;
/* increment the sequence number */
fs->sequence_number = ++sampler->flowSampleSeqNo;
/* copy the other header fields in */
-#ifdef SFL_USE_32BIT_INDEX
- fs->ds_class = SFL_DS_CLASS(sampler->dsi);
- fs->ds_index = SFL_DS_INDEX(sampler->dsi);
-#else
fs->source_id = SFL_DS_DATASOURCE(sampler->dsi);
-#endif
/* the sampling rate may have been set already. */
if(fs->sampling_rate == 0) fs->sampling_rate = sampler->sFlowFsPacketSamplingRate;
/* the samplePool may be maintained upstream too. */
if( fs->sample_pool == 0) fs->sample_pool = sampler->samplePool;
+ /* and the same for the drop event counter */
+ if(fs->drops == 0) fs->drops = sampler->dropEvents;
/* sent to my receiver */
if(sampler->myReceiver) sfl_receiver_writeFlowSample(sampler->myReceiver, fs);
}
@@ -647,22 +330,8 @@ void sfl_random_init(uint32_t seed) {
SFLRandom = seed;
}
-/*_________________---------------------------__________________
- _________________ sfl_sampler_takeSample __________________
- -----------------___________________________------------------
-*/
-
-int sfl_sampler_takeSample(SFLSampler *sampler)
-{
- /* increment the samplePool */
- sampler->samplePool++;
-
- if(--sampler->skip == 0) {
- /* reached zero. Set the next skip and return true. */
- sampler->skip = sfl_random((2 * sampler->sFlowFsPacketSamplingRate) - 1);
- return 1;
- }
- return 0;
+uint32_t sfl_sampler_next_skip(SFLSampler *sampler) {
+ return sfl_random((2 * sampler->sFlowFsPacketSamplingRate) - 1);
}
@@ -675,7 +344,7 @@ int sfl_sampler_takeSample(SFLSampler *sampler)
-----------------__________________________------------------
*/
-void sfl_poller_init(SFLPoller *poller,
+static void sfl_poller_init(SFLPoller *poller,
SFLAgent *agent,
SFLDataSource_instance *pdsi,
void *magic, /* ptr to pass back in getCountersFn() */
@@ -701,33 +370,10 @@ void sfl_poller_init(SFLPoller *poller,
poller->getCountersFn = getCountersFn;
}
-/*_________________--------------------------__________________
- _________________ reset __________________
- -----------------__________________________------------------
-*/
-
-static void resetPoller(SFLPoller *poller)
-{
- SFLDataSource_instance dsi = poller->dsi;
- sfl_poller_init(poller, poller->agent, &dsi, poller->magic, poller->getCountersFn);
-}
-
/*_________________---------------------------__________________
_________________ MIB access __________________
-----------------___________________________------------------
*/
-uint32_t sfl_poller_get_sFlowCpReceiver(SFLPoller *poller) {
- return poller->sFlowCpReceiver;
-}
-
-void sfl_poller_set_sFlowCpReceiver(SFLPoller *poller, uint32_t sFlowCpReceiver) {
- poller->sFlowCpReceiver = sFlowCpReceiver;
- if(sFlowCpReceiver == 0) resetPoller(poller);
- else {
- /* retrieve and cache a direct pointer to my receiver */
- poller->myReceiver = sfl_agent_getReceiver(poller->agent, poller->sFlowCpReceiver);
- }
-}
uint32_t sfl_poller_get_sFlowCpInterval(SFLPoller *poller) {
return (uint32_t)poller->sFlowCpInterval;
@@ -736,8 +382,8 @@ uint32_t sfl_poller_get_sFlowCpInterval(SFLPoller *poller) {
void sfl_poller_set_sFlowCpInterval(SFLPoller *poller, uint32_t sFlowCpInterval) {
poller->sFlowCpInterval = sFlowCpInterval;
/* Set the countersCountdown to be a randomly selected value between 1 and
- sFlowCpInterval. That way the counter polling would be desynchronised
- (on a 200-port switch, polling all the counters in one second could be harmful). */
+ sFlowCpInterval. That way the counter polling would be desynchronised even
+ if everything came up at the same instant. */
poller->countersCountdown = sfl_random(sFlowCpInterval);
}
@@ -754,10 +400,9 @@ void sfl_poller_resetCountersSeqNo(SFLPoller *poller) { poller->countersSampleS
-----------------___________________________------------------
*/
-void sfl_poller_tick(SFLPoller *poller, time_t now)
+static void sfl_poller_tick(SFLPoller *poller, time_t now)
{
if(poller->countersCountdown == 0) return; /* counters retrieval was not enabled */
- if(poller->sFlowCpReceiver == 0) return;
if(--poller->countersCountdown == 0) {
if(poller->getCountersFn != NULL) {
@@ -782,12 +427,7 @@ void sfl_poller_writeCountersSample(SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE
{
/* fill in the rest of the header fields, and send to the receiver */
cs->sequence_number = ++poller->countersSampleSeqNo;
-#ifdef SFL_USE_32BIT_INDEX
- cs->ds_class = SFL_DS_CLASS(poller->dsi);
- cs->ds_index = SFL_DS_INDEX(poller->dsi);
-#else
cs->source_id = SFL_DS_DATASOURCE(poller->dsi);
-#endif
/* sent to my receiver */
if(poller->myReceiver) sfl_receiver_writeCountersSample(poller->myReceiver, cs);
}
@@ -804,16 +444,13 @@ static void sendSample(SFLReceiver *receiver);
static void receiverError(SFLReceiver *receiver, char *errm);
static void putNet32(SFLReceiver *receiver, uint32_t val);
static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
-#ifdef SFLOW_DO_SOCKET
-static void initSocket(SFLReceiver *receiver);
-#endif
/*_________________--------------------------__________________
_________________ sfl_receiver_init __________________
-----------------__________________________------------------
*/
-void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
+static void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
{
/* first clear everything */
memset(receiver, 0, sizeof(*receiver));
@@ -823,74 +460,16 @@ void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
/* set defaults */
receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
- receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
-#ifdef SFLOW_DO_SOCKET
- /* initialize the socket address */
- initSocket(receiver);
-#endif
/* prepare to receive the first sample */
resetSampleCollector(receiver);
}
-/*_________________---------------------------__________________
- _________________ reset __________________
- -----------------___________________________------------------
-
- called on timeout, or when owner string is cleared
-*/
-
-static void resetReceiver(SFLReceiver *receiver) {
- /* ask agent to tell samplers and pollers to stop sending samples */
- sfl_agent_resetReceiver(receiver->agent, receiver);
- /* reinitialize */
- sfl_receiver_init(receiver, receiver->agent);
-}
-
-#ifdef SFLOW_DO_SOCKET
-/*_________________---------------------------__________________
- _________________ initSocket __________________
- -----------------___________________________------------------
-*/
-
-static void initSocket(SFLReceiver *receiver) {
- if(receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
- struct sockaddr_in6 *sa6 = &receiver->receiver6;
- sa6->sin6_port = htons((uint16_t)receiver->sFlowRcvrPort);
- sa6->sin6_family = AF_INET6;
- sa6->sin6_addr = receiver->sFlowRcvrAddress.address.ip_v6;
- }
- else {
- struct sockaddr_in *sa4 = &receiver->receiver4;
- sa4->sin_port = htons((uint16_t)receiver->sFlowRcvrPort);
- sa4->sin_family = AF_INET;
- sa4->sin_addr = receiver->sFlowRcvrAddress.address.ip_v4;
- }
-}
-#endif
-
/*_________________----------------------------------------_____________
_________________ MIB Vars _____________
-----------------________________________________________-------------
*/
-
-char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
- return receiver->sFlowRcvrOwner;
-}
-void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
- receiver->sFlowRcvrOwner = sFlowRcvrOwner;
- if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
- /* reset condition! owner string was cleared */
- resetReceiver(receiver);
- }
-}
-time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
- return receiver->sFlowRcvrTimeout;
-}
-void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
- receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
-}
uint32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
return receiver->sFlowRcvrMaximumDatagramSize;
}
@@ -899,40 +478,16 @@ void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, uint32
if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
receiver->sFlowRcvrMaximumDatagramSize = mdz;
}
-SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
- return &receiver->sFlowRcvrAddress;
-}
-void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
- if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; /* structure copy */
-#ifdef SFLOW_DO_SOCKET
- initSocket(receiver);
-#endif
-}
-uint32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
- return receiver->sFlowRcvrPort;
-}
-void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, uint32_t sFlowRcvrPort) {
- receiver->sFlowRcvrPort = sFlowRcvrPort;
- /* update the socket structure */
-#ifdef SFLOW_DO_SOCKET
- initSocket(receiver);
-#endif
-}
/*_________________---------------------------__________________
_________________ sfl_receiver_tick __________________
-----------------___________________________------------------
*/
-void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
+static void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
{
/* if there are any samples to send, flush them now */
if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
- /* check the timeout */
- if(receiver->sFlowRcvrTimeout && (uint32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
- /* count down one tick and reset if we reach 0 */
- if(--receiver->sFlowRcvrTimeout == 0) resetReceiver(receiver);
- }
}
/*_________________-----------------------------__________________
@@ -1028,13 +583,8 @@ static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs
{
SFLFlow_sample_element *elem;
uint32_t elemSiz;
-#ifdef SFL_USE_32BIT_INDEX
- uint siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
- sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
-#else
uint32_t siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
sample_pool, drops, input, output, number of elements */
-#endif
/* hard code the wire-encoding sizes, in case the structures are expanded to be 64-bit aligned */
@@ -1069,7 +619,7 @@ static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs
-----------------_______________________________------------------
*/
-int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
+static int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
{
int packedSize;
SFLFlow_sample_element *elem;
@@ -1079,9 +629,7 @@ int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs
if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
/* check in case this one sample alone is too big for the datagram */
- /* in fact - if it is even half as big then we should ditch it. Very */
- /* important to avoid overruning the packet buffer. */
- if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
+ if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize - 32)) {
receiverError(receiver, "flow sample too big for datagram");
return -1;
}
@@ -1093,36 +641,15 @@ int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs
receiver->sampleCollector.numSamples++;
-#ifdef SFL_USE_32BIT_INDEX
- putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
-#else
putNet32(receiver, SFLFLOW_SAMPLE);
-#endif
-
putNet32(receiver, packedSize - 8); /* don't include tag and len */
putNet32(receiver, fs->sequence_number);
-
-#ifdef SFL_USE_32BIT_INDEX
- putNet32(receiver, fs->ds_class);
- putNet32(receiver, fs->ds_index);
-#else
putNet32(receiver, fs->source_id);
-#endif
-
putNet32(receiver, fs->sampling_rate);
putNet32(receiver, fs->sample_pool);
putNet32(receiver, fs->drops);
-
-#ifdef SFL_USE_32BIT_INDEX
- putNet32(receiver, fs->inputFormat);
putNet32(receiver, fs->input);
- putNet32(receiver, fs->outputFormat);
putNet32(receiver, fs->output);
-#else
- putNet32(receiver, fs->input);
- putNet32(receiver, fs->output);
-#endif
-
putNet32(receiver, fs->num_elements);
for(elem = fs->elements; elem != NULL; elem = elem->nxt) {
@@ -1182,12 +709,7 @@ static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_
SFLCounters_sample_element *elem;
uint32_t elemSiz;
-#ifdef SFL_USE_32BIT_INDEX
- uint siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
-#else
uint32_t siz = 20; /* tag, length, sequence_number, source_id, number of elements */
-#endif
-
cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
for( elem = cs->elements; elem != NULL; elem = elem->nxt) {
cs->num_elements++;
@@ -1220,7 +742,7 @@ static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_
-----------------__________________________________------------------
*/
-int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
+static int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
{
int packedSize;
SFLCounters_sample_element *elem;
@@ -1244,22 +766,10 @@ int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_
receiver->sampleCollector.numSamples++;
-#ifdef SFL_USE_32BIT_INDEX
- putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
-#else
putNet32(receiver, SFLCOUNTERS_SAMPLE);
-#endif
-
putNet32(receiver, packedSize - 8); /* tag and length not included */
putNet32(receiver, cs->sequence_number);
-
-#ifdef SFL_USE_32BIT_INDEX
- putNet32(receiver, cs->ds_class);
- putNet32(receiver, cs->ds_index);
-#else
putNet32(receiver, cs->source_id);
-#endif
-
putNet32(receiver, cs->num_elements);
for(elem = cs->elements; elem != NULL; elem = elem->nxt) {
@@ -1331,16 +841,6 @@ int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_
return packedSize;
}
-/*_________________---------------------------------__________________
- _________________ sfl_receiver_samplePacketsSent __________________
- -----------------_________________________________------------------
-*/
-
-uint32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
-{
- return receiver->sampleCollector.packetSeqNo;
-}
-
/*_________________---------------------------__________________
_________________ sendSample __________________
-----------------___________________________------------------
@@ -1366,33 +866,6 @@ static void sendSample(SFLReceiver *receiver)
receiver,
(u_char *)receiver->sampleCollector.data,
receiver->sampleCollector.pktlen);
- else {
-#ifdef SFLOW_DO_SOCKET
- /* send it myself */
- if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
- uint32_t soclen = sizeof(struct sockaddr_in6);
- int result = sendto(agent->receiverSocket6,
- receiver->sampleCollector.data,
- receiver->sampleCollector.pktlen,
- 0,
- (struct sockaddr *)&receiver->receiver6,
- soclen);
- if(result == -1 && errno != EINTR) sfl_agent_sysError(agent, "receiver", "IPv6 socket sendto error");
- if(result == 0) sfl_agent_error(agent, "receiver", "IPv6 socket sendto returned 0");
- }
- else {
- uint32_t soclen = sizeof(struct sockaddr_in);
- int result = sendto(agent->receiverSocket4,
- receiver->sampleCollector.data,
- receiver->sampleCollector.pktlen,
- 0,
- (struct sockaddr *)&receiver->receiver4,
- soclen);
- if(result == -1 && errno != EINTR) sfl_agent_sysError(agent, "receiver", "socket sendto error");
- if(result == 0) sfl_agent_error(agent, "receiver", "socket sendto returned 0");
- }
-#endif
- }
/* reset for the next time */
resetSampleCollector(receiver);
View
478 sflow_api.h
@@ -1,27 +1,10 @@
-/* Copyright (c) 2002-2010 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/* Copyright (c) 2002-2011 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
/* http://www.inmon.com/technology/sflowlicense.txt */
-/*
- sFlow Agent Library
- ===================
- sflow_api.h - API for sflow agent library.
- sflow.h - structure definitions for sFlow.
- sflow_api.c - Agent, Sampler, Poller and Receiver
-
- Neil McKee
- InMon Corp.
- http://www.inmon.com
- email: neil.mckee@inmon.com
-*/
-
#ifndef SFLOW_API_H
#define SFLOW_API_H 1
-/* define SFLOW_DO_SOCKET to 1 if you want the agent
- to send the packets itself, otherwise set the sendFn
- callback in sfl_agent_init.*/
-/* #define SFLOW_DO_SOCKET */
-
#include <stdio.h>
#include <stdlib.h>
#ifndef _WIN32
@@ -32,37 +15,298 @@
#include <string.h>
#include <sys/types.h>
-
-#ifdef SFLOW_DO_SOCKET
-#include <sys/socket.h>
-#include <netinet/in_systm.h>
-#include <netinet/in.h>
-#include <netinet/ip.h>
+/* ================== sFlow v5 structure definitions (www.sflow.org) ==================*/
+
+typedef struct {
+ uint32_t addr;
+} SFLIPv4;
+
+typedef struct {
+ u_char addr[16];
+} SFLIPv6;
+
+typedef union _SFLAddress_value {
+ SFLIPv4 ip_v4;
+ SFLIPv6 ip_v6;
+} SFLAddress_value;
+
+enum SFLAddress_type {
+ SFLADDRESSTYPE_UNDEFINED = 0,
+ SFLADDRESSTYPE_IP_V4 = 1,
+ SFLADDRESSTYPE_IP_V6 = 2
+};
+
+typedef struct _SFLAddress {
+ uint32_t type; /* enum SFLAddress_type */
+ SFLAddress_value address;
+} SFLAddress;
+
+enum SFL_DSCLASS {
+ SFL_DSCLASS_IFINDEX=0,
+ SFL_DSCLASS_VLAN=1,
+ SFL_DSCLASS_PHYSICAL_ENTITY=2,
+ SFL_DSCLASS_LOGICAL_ENTITY=3
+};
+
+/* Packet header data */
+
+#define SFL_DEFAULT_HEADER_SIZE 128
+#define SFL_DEFAULT_COLLECTOR_PORT 6343
+#define SFL_DEFAULT_SAMPLING_RATE 400
+#define SFL_DEFAULT_POLLING_INTERVAL 30
+
+/* Extended data types */
+
+typedef struct _SFLString {
+ uint32_t len;
+ char *str;
+} SFLString;
+
+/* Extended socket information,
+ Must be filled in for all application transactions associated with a network socket
+ Omit if transaction associated with non-network IPC */
+
+/* IPv4 Socket */
+/* opaque = flow_data; enterprise = 0; format = 2100 */
+typedef struct _SFLExtended_socket_ipv4 {
+ uint32_t protocol; /* IP Protocol (e.g. TCP = 6, UDP = 17) */
+ SFLIPv4 local_ip; /* local IP address */
+ SFLIPv4 remote_ip; /* remote IP address */
+ uint32_t local_port; /* TCP/UDP local port number or equivalent */
+ uint32_t remote_port; /* TCP/UDP remote port number of equivalent */
+} SFLExtended_socket_ipv4;
+
+#define XDRSIZ_SFLEXTENDED_SOCKET4 20
+
+/* IPv6 Socket */
+/* opaque = flow_data; enterprise = 0; format = 2101 */
+typedef struct _SFLExtended_socket_ipv6 {
+ uint32_t protocol; /* IP Protocol (e.g. TCP = 6, UDP = 17) */
+ SFLIPv6 local_ip; /* local IP address */
+ SFLIPv6 remote_ip; /* remote IP address */
+ uint32_t local_port; /* TCP/UDP local port number or equivalent */
+ uint32_t remote_port; /* TCP/UDP remote port number of equivalent */
+} SFLExtended_socket_ipv6;
+
+#define XDRSIZ_SFLEXTENDED_SOCKET6 44
+
+typedef enum {
+ SFMC_PROT_OTHER = 0,
+ SFMC_PROT_ASCII = 1,
+ SFMC_PROT_BINARY = 2,
+} SFLMemcache_prot;
+
+#if 0 /* this one moved to sflow_mc.h to expose it to memcached.c */
+typedef enum {
+ SFMC_CMD_OTHER = 0,
+ SFMC_CMD_SET = 1,
+ SFMC_CMD_ADD = 2,
+ SFMC_CMD_REPLACE = 3,
+ SFMC_CMD_APPEND = 4,
+ SFMC_CMD_PREPEND = 5,
+ SFMC_CMD_CAS = 6,
+ SFMC_CMD_GET = 7,
+ SFMC_CMD_GETS = 8,
+ SFMC_CMD_INCR = 9,
+ SFMC_CMD_DECR = 10,
+ SFMC_CMD_DELETE = 11,
+ SFMC_CMD_STATS = 12,
+ SFMC_CMD_FLUSH = 13,
+ SFMC_CMD_VERSION = 14,
+ SFMC_CMD_QUIT = 15,
+} SFLMemcache_cmd;
#endif
-#include "sflow.h"
-
-/*
- uncomment this preprocessor flag (or compile with -DSFL_USE_32BIT_INDEX)
- if your ds_index numbers can ever be >= 2^30-1 (i.e. >= 0x3FFFFFFF)
-*/
-/* #define SFL_USE_32BIT_INDEX */
-
-
-/* Used to combine ds_class, ds_index and instance into
- a single 64-bit number like this:
- __________________________________
- | cls| index | instance |
- ----------------------------------
-
- but now is opened up to a 12-byte struct to ensure
- that ds_index has a full 32-bit field, and to make
- accessing the components simpler. The macros have
- the same behavior as before, so this change should
- be transparent. The only difference is that these
- objects are now passed around by reference instead
- of by value, and the comparison is done using a fn.
-*/
+typedef enum {
+ SFMC_OP_UNKNOWN = 0,
+ SFMC_OP_OK = 1,
+ SFMC_OP_ERROR = 2,
+ SFMC_OP_CLIENT_ERROR = 3,
+ SFMC_OP_SERVER_ERROR = 4,
+ SFMC_OP_STORED = 5,
+ SFMC_OP_NOT_STORED = 6,
+ SFMC_OP_EXISTS = 7,
+ SFMC_OP_NOT_FOUND = 8,
+ SFMC_OP_DELETED = 9,
+} SFLMemcache_operation_status;
+
+typedef struct _SFLSampled_memcache {
+ uint32_t protocol; /* SFLMemcache_prot */
+ uint32_t command; /* SFLMemcache_cmd */
+ SFLString key; /* up to 255 chars */
+ uint32_t nkeys;
+ uint32_t value_bytes;
+ uint32_t duration_uS;
+ uint32_t status; /* SFLMemcache_operation_status */
+} SFLSampled_memcache;
+
+
+enum SFLFlow_type_tag {
+ /* enterprise = 0, format = ... */
+ SFLFLOW_EX_SOCKET4 = 2100,
+ SFLFLOW_EX_SOCKET6 = 2101,
+ SFLFLOW_MEMCACHE = 2200,
+};
+
+typedef union _SFLFlow_type {
+ SFLSampled_memcache memcache;
+ SFLExtended_socket_ipv4 socket4;
+ SFLExtended_socket_ipv6 socket6;
+} SFLFlow_type;
+
+typedef struct _SFLFlow_sample_element {
+ struct _SFLFlow_sample_element *nxt;
+ uint32_t tag; /* SFLFlow_type_tag */
+ uint32_t length;
+ SFLFlow_type flowType;
+} SFLFlow_sample_element;
+
+enum SFL_sample_tag {
+ SFLFLOW_SAMPLE = 1, /* enterprise = 0 : format = 1 */
+ SFLCOUNTERS_SAMPLE = 2, /* enterprise = 0 : format = 2 */
+};
+
+/* Format of a single flow sample */
+
+typedef struct _SFLFlow_sample {
+ /* uint32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 1 */
+ /* uint32_t length; */
+ uint32_t sequence_number; /* Incremented with each flow sample
+ generated */
+ uint32_t source_id; /* fsSourceId */
+ uint32_t sampling_rate; /* fsPacketSamplingRate */
+ uint32_t sample_pool; /* Total number of packets that could have been
+ sampled (i.e. packets skipped by sampling
+ process + total number of samples) */
+ uint32_t drops; /* Number of times a packet was dropped due to
+ lack of resources */
+ uint32_t input; /* SNMP ifIndex of input interface.
+ 0 if interface is not known. */
+ uint32_t output; /* SNMP ifIndex of output interface,
+ 0 if interface is not known.
+ Set most significant bit to indicate
+ multiple destination interfaces
+ (i.e. in case of broadcast or multicast)
+ and set lower order bits to indicate
+ number of destination interfaces.
+ Examples:
+ 0x00000002 indicates ifIndex = 2
+ 0x00000000 ifIndex unknown.
+ 0x80000007 indicates a packet sent
+ to 7 interfaces.
+ 0x80000000 indicates a packet sent to
+ an unknown number of
+ interfaces greater than 1.*/
+ uint32_t num_elements;
+ SFLFlow_sample_element *elements;
+} SFLFlow_sample;
+
+
+/* Counter types */
+
+#define XDRSIZ_SFLHOST_VRT_NIO_COUNTERS 40
+
+typedef struct _SFLMemcache_counters {
+ uint32_t uptime; /* Number of seconds this server has been running */
+ uint32_t rusage_user; /* Accumulated user time for this process (ms)*/
+ uint32_t rusage_system; /* Accumulated system time for this process (ms)*/
+ uint32_t curr_connections; /* Number of open connections */
+ uint32_t total_connections; /* Total number of connections opened since
+ the server started running */
+ uint32_t connection_structures; /* Number of connection structures
+ allocated by the server */
+ uint32_t cmd_get; /* Cumulative number of retrieval requests */
+ uint32_t cmd_set; /* Cumulative number of storage requests */
+ uint32_t cmd_flush; /* */
+ uint32_t get_hits; /* Number of keys that have been requested and
+ found present */
+ uint32_t get_misses; /* Number of items that have been requested
+ and not found */
+ uint32_t delete_misses;
+ uint32_t delete_hits;
+ uint32_t incr_misses;
+ uint32_t incr_hits;
+ uint32_t decr_misses;
+ uint32_t decr_hits;
+ uint32_t cas_misses;
+ uint32_t cas_hits;
+ uint32_t cas_badval;
+ uint32_t auth_cmds;
+ uint32_t auth_errors;
+ uint64_t bytes_read;
+ uint64_t bytes_written;
+ uint32_t limit_maxbytes;
+ uint32_t accepting_conns;
+ uint32_t listen_disabled_num;
+ uint32_t threads;
+ uint32_t conn_yields;
+ uint64_t bytes;
+ uint32_t curr_items;
+ uint32_t total_items;
+ uint32_t evictions;
+} SFLMemcache_counters;
+
+#define XDRSIZ_SFLMEMCACHE_COUNTERS (36*4)
+
+/* Counters data */
+
+enum SFLCounters_type_tag {
+ /* enterprise = 0, format = ... */
+ SFLCOUNTERS_MEMCACHE = 2200, /* memcached counters */
+};
+
+typedef union _SFLCounters_type {
+ SFLMemcache_counters memcache;
+} SFLCounters_type;
+
+typedef struct _SFLCounters_sample_element {
+ struct _SFLCounters_sample_element *nxt; /* linked list */
+ uint32_t tag; /* SFLCounters_type_tag */
+ uint32_t length;
+ SFLCounters_type counterBlock;
+} SFLCounters_sample_element;
+
+typedef struct _SFLCounters_sample {
+ /* uint32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 2 */
+ /* uint32_t length; */
+ uint32_t sequence_number; /* Incremented with each counters sample
+ generated by this source_id */
+ uint32_t source_id; /* fsSourceId */
+ uint32_t num_elements;
+ SFLCounters_sample_element *elements;
+} SFLCounters_sample;
+
+#define SFLADD_ELEMENT(_sm, _el) do { (_el)->nxt = (_sm)->elements; (_sm)->elements = (_el); } while(0)
+
+/* Format of a sample datagram */
+
+enum SFLDatagram_version {
+ SFLDATAGRAM_VERSION2 = 2,
+ SFLDATAGRAM_VERSION4 = 4,
+ SFLDATAGRAM_VERSION5 = 5
+};
+
+typedef struct _SFLSample_datagram_hdr {
+ uint32_t datagram_version; /* (enum SFLDatagram_version) = VERSION5 = 5 */
+ SFLAddress agent_address; /* IP address of sampling agent */
+ uint32_t sub_agent_id; /* Used to distinguishing between datagram
+ streams from separate agent sub entities
+ within an device. */
+ uint32_t sequence_number; /* Incremented with each sample datagram
+ generated */
+ uint32_t uptime; /* Current time (in milliseconds since device
+ last booted). Should be set as close to
+ datagram transmission time as possible.*/
+ uint32_t num_records; /* Number of tag-len-val flow/counter records to follow */
+} SFLSample_datagram_hdr;
+
+
+/* =============== sFlow agent API ==================*/
+
+#define SFL_MAX_DATAGRAM_SIZE 1500
+#define SFL_MIN_DATAGRAM_SIZE 200
+#define SFL_DEFAULT_DATAGRAM_SIZE 1400
+#define SFL_DATA_PAD 400
typedef struct _SFLDataSource_instance {
uint32_t ds_class;
@@ -70,17 +314,9 @@ typedef struct _SFLDataSource_instance {
uint32_t ds_instance;
} SFLDataSource_instance;
-#ifdef SFL_USE_32BIT_INDEX
-#define SFL_FLOW_SAMPLE_TYPE SFLFlow_sample_expanded
-#define SFL_COUNTERS_SAMPLE_TYPE SFLCounters_sample_expanded
-#else
#define SFL_FLOW_SAMPLE_TYPE SFLFlow_sample
#define SFL_COUNTERS_SAMPLE_TYPE SFLCounters_sample
-/* if index numbers are not going to use all 32 bits, then we can use
- the more compact encoding, with the dataSource class and index merged */
#define SFL_DS_DATASOURCE(dsi) (((dsi).ds_class << 24) + (dsi).ds_index)
-#endif
-
#define SFL_DS_INSTANCE(dsi) (dsi).ds_instance
#define SFL_DS_CLASS(dsi) (dsi).ds_class
#define SFL_DS_INDEX(dsi) (dsi).ds_index
@@ -105,96 +341,62 @@ struct _SFLAgent; /* forward decl */
typedef struct _SFLReceiver {
struct _SFLReceiver *nxt;
- /* MIB fields */
- char *sFlowRcvrOwner;
- time_t sFlowRcvrTimeout;
uint32_t sFlowRcvrMaximumDatagramSize;
- SFLAddress sFlowRcvrAddress;
- uint32_t sFlowRcvrPort;
uint32_t sFlowRcvrDatagramVersion;
- /* public fields */
- struct _SFLAgent *agent; /* pointer to my agent */
- /* private fields */
+ struct _SFLAgent *agent;
SFLSampleCollector sampleCollector;
-#ifdef SFLOW_DO_SOCKET
- struct sockaddr_in receiver4;
- struct sockaddr_in6 receiver6;
-#endif
} SFLReceiver;
typedef struct _SFLSampler {
- /* for linked list */
struct _SFLSampler *nxt;
- /* for hash lookup table */
- struct _SFLSampler *hash_nxt;
- /* MIB fields */
SFLDataSource_instance dsi;
- uint32_t sFlowFsReceiver;
uint32_t sFlowFsPacketSamplingRate;
- uint32_t sFlowFsMaximumHeaderSize;
- /* public fields */
- struct _SFLAgent *agent; /* pointer to my agent */
- void *userData; /* can be useful to hang something else here */
- /* private fields */
+ struct _SFLAgent *agent;
SFLReceiver *myReceiver;
uint32_t skip;
uint32_t samplePool;
+ uint32_t dropEvents;
uint32_t flowSampleSeqNo;
- /* rate checking */
- uint32_t samplesThisTick;
- uint32_t samplesLastTick;
- uint32_t backoffThreshold;
} SFLSampler;
/* declare */
struct _SFLPoller;
-typedef void (*getCountersFn_t)(void *magic, /* callback to get counters */
+typedef void (*getCountersFn_t)(void *magic, /* callback to get counters */
struct _SFLPoller *sampler, /* called with self */
SFL_COUNTERS_SAMPLE_TYPE *cs); /* struct to fill in */
typedef struct _SFLPoller {
- /* for linked list */
struct _SFLPoller *nxt;
- /* MIB fields */
SFLDataSource_instance dsi;
- uint32_t sFlowCpReceiver;
time_t sFlowCpInterval;
- /* public fields */
- struct _SFLAgent *agent; /* pointer to my agent */
+ struct _SFLAgent *agent;
void *magic; /* ptr to pass back in getCountersFn() */
- void *userData; /* can be useful to hang something else here */
getCountersFn_t getCountersFn;
- /* private fields */
SFLReceiver *myReceiver;
time_t countersCountdown;
uint32_t countersSampleSeqNo;
} SFLPoller;
-typedef void *(*allocFn_t)(void *magic, /* callback to allocate space on heap */
+typedef void *(*allocFn_t)(void *magic, /* callback to allocate space on heap */
struct _SFLAgent *agent, /* called with self */
size_t bytes); /* bytes requested */
-typedef int (*freeFn_t)(void *magic, /* callback to free space on heap */
+typedef int (*freeFn_t)(void *magic, /* callback to free space on heap */
struct _SFLAgent *agent, /* called with self */
void *obj); /* obj to free */
-typedef void (*errorFn_t)(void *magic, /* callback to log error message */
+typedef void (*errorFn_t)(void *magic, /* callback to log error message */
struct _SFLAgent *agent, /* called with self */
char *msg); /* error message */
-typedef void (*sendFn_t)(void *magic, /* optional override fn to send packet */
+typedef void (*sendFn_t)(void *magic, /* optional override fn to send packet */
struct _SFLAgent *agent,
SFLReceiver *receiver,
u_char *pkt,
uint32_t pktLen);
-
-/* prime numbers are good for hash tables */
-#define SFL_HASHTABLE_SIZ 199
-
typedef struct _SFLAgent {
- SFLSampler *jumpTable[SFL_HASHTABLE_SIZ]; /* fast lookup table for samplers (by ifIndex) */
SFLSampler *samplers; /* the list of samplers */
SFLPoller *pollers; /* the list of samplers */
SFLReceiver *receivers; /* the array of receivers */
@@ -207,16 +409,12 @@ typedef struct _SFLAgent {
freeFn_t freeFn;
errorFn_t errorFn;
sendFn_t sendFn;
-#ifdef SFLOW_DO_SOCKET
- int receiverSocket4;
- int receiverSocket6;
-#endif
} SFLAgent;
/* call this at the start with a newly created agent */
void sfl_agent_init(SFLAgent *agent,
SFLAddress *myIP, /* IP address of this agent */
- uint32_t subId, /* agent_sub_id */
+ uint32_t subId, /* agent_sub_id */
time_t bootTime, /* agent boot time */
time_t now, /* time now */
void *magic, /* ptr to pass back in logging and alloc fns */
@@ -237,26 +435,6 @@ SFLPoller *sfl_agent_addPoller(SFLAgent *agent,
/* call this to create receivers */
SFLReceiver *sfl_agent_addReceiver(SFLAgent *agent);
-/* call this to remove samplers */
-int sfl_agent_removeSampler(SFLAgent *agent, SFLDataSource_instance *pdsi);
-
-/* call this to remove pollers */
-int sfl_agent_removePoller(SFLAgent *agent, SFLDataSource_instance *pdsi);
-
-/* note: receivers should not be removed. Typically the receivers
- list will be created at init time and never changed */
-
-/* call these fns to retrieve sampler, poller or receiver (e.g. for SNMP GET or GETNEXT operation) */
-SFLSampler *sfl_agent_getSampler(SFLAgent *agent, SFLDataSource_instance *pdsi);
-SFLSampler *sfl_agent_getNextSampler(SFLAgent *agent, SFLDataSource_instance *pdsi);
-SFLPoller *sfl_agent_getPoller(SFLAgent *agent, SFLDataSource_instance *pdsi);
-SFLPoller *sfl_agent_getNextPoller(SFLAgent *agent, SFLDataSource_instance *pdsi);
-SFLReceiver *sfl_agent_getReceiver(SFLAgent *agent, uint32_t receiverIndex);
-SFLReceiver *sfl_agent_getNextReceiver(SFLAgent *agent, uint32_t receiverIndex);
-
-/* jump table access - for performance */
-SFLSampler *sfl_agent_getSamplerByIfIndex(SFLAgent *agent, uint32_t ifIndex);
-
/* random number generator - used by sampler and poller */
uint32_t sfl_random(uint32_t mean);
void sfl_random_init(uint32_t seed);
@@ -264,26 +442,12 @@ void sfl_random_init(uint32_t seed);
/* call these functions to GET and SET MIB values */
/* receiver */
-char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver);
-void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner);
-time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver);
-void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout);
uint32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver);
void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, uint32_t sFlowRcvrMaximumDatagramSize);
-SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver);
-void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress);
-uint32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver);
-void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, uint32_t sFlowRcvrPort);
/* sampler */
-uint32_t sfl_sampler_get_sFlowFsReceiver(SFLSampler *sampler);
-void sfl_sampler_set_sFlowFsReceiver(SFLSampler *sampler, uint32_t sFlowFsReceiver);
uint32_t sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler *sampler);
void sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler *sampler, uint32_t sFlowFsPacketSamplingRate);
-uint32_t sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler *sampler);
-void sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler *sampler, uint32_t sFlowFsMaximumHeaderSize);
/* poller */
-uint32_t sfl_poller_get_sFlowCpReceiver(SFLPoller *poller);
-void sfl_poller_set_sFlowCpReceiver(SFLPoller *poller, uint32_t sFlowCpReceiver);
uint32_t sfl_poller_get_sFlowCpInterval(SFLPoller *poller);
void sfl_poller_set_sFlowCpInterval(SFLPoller *poller, uint32_t sFlowCpInterval);
@@ -294,18 +458,9 @@ void sfl_sampler_resetFlowSeqNo(SFLSampler *sampler);
/* call this to indicate a discontinuity with one or more of the counters so that the
sflow collector will ignore the next delta */
void sfl_poller_resetCountersSeqNo(SFLPoller *poller);
-
-/* software sampling: call this with every packet - returns non-zero if the packet
- should be sampled (in which case you then call sfl_sampler_writeFlowSample()) */
-int sfl_sampler_takeSample(SFLSampler *sampler);
-
-/* call this to set a maximum samples-per-second threshold. If the sampler reaches this
- threshold it will automatically back off the sampling rate. A value of 0 disables the
- mechanism */
-void sfl_sampler_set_backoffThreshold(SFLSampler *sampler, uint32_t samplesPerSecond);
-uint32_t sfl_sampler_get_backoffThreshold(SFLSampler *sampler);
-uint32_t sfl_sampler_get_samplesLastTick(SFLSampler *sampler);
+/* You can use this one if you are managing the skip countdown elsewhere */
+uint32_t sfl_sampler_next_skip(SFLSampler *sampler);
/* call this once per second (N.B. not on interrupt stack i.e. not hard real-time) */
void sfl_agent_tick(SFLAgent *agent, time_t now);
@@ -319,29 +474,4 @@ void sfl_poller_writeCountersSample(SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE
/* call this to deallocate resources */
void sfl_agent_release(SFLAgent *agent);
-
-/* internal fns */
-
-void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent);
-void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi);
-void sfl_poller_init(SFLPoller *poller, SFLAgent *agent, SFLDataSource_instance *pdsi, void *magic, getCountersFn_t getCountersFn);
-
-
-void sfl_receiver_tick(SFLReceiver *receiver, time_t now);
-void sfl_poller_tick(SFLPoller *poller, time_t now);
-void sfl_sampler_tick(SFLSampler *sampler, time_t now);
-
-int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs);
-int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs);
-
-void sfl_agent_resetReceiver(SFLAgent *agent, SFLReceiver *receiver);
-
-void sfl_agent_error(SFLAgent *agent, char *modName, char *msg);
-void sfl_agent_sysError(SFLAgent *agent, char *modName, char *msg);
-
-uint32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver);
-
-#define SFL_ALLOC malloc
-#define SFL_FREE free
-
#endif /* SFLOW_API_H */
View
673 sflow_mc.c
@@ -1,4 +1,5 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+#include "config.h"
#include "memcached.h"
#include <sys/stat.h>
#include <sys/socket.h>
@@ -20,29 +21,71 @@
#include <limits.h>
#include <sysexits.h>
#include <stddef.h>
+#include <syslog.h>
#include "sflow_mc.h"
-static int sfmc_debug = 0;
-static void sfmc_log(int syslogType, char *fmt, ...)
-{
- va_list args;
- va_start(args, fmt);
- if(sfmc_debug) {
- vfprintf(stderr, fmt, args);
- fprintf(stderr, "\n");
- }
- else {
- vsyslog(syslogType, fmt, args);
- }
-}
+#include "sflow_api.h"
+#define SFMC_VERSION "0.91"
+#define SFMC_DEFAULT_CONFIGFILE "/etc/hsflowd.auto"
+#define SFMC_SEPARATORS " \t\r\n="
+/* SFMC_MAX LINE LEN must be enough to hold the whole list of targets */
+#define SFMC_MAX_LINELEN 1024
+#define SFMC_MAX_COLLECTORS 10
+
+typedef struct _SFMCCollector {
+ struct sockaddr sa;
+ SFLAddress addr;
+ uint16_t port;
+ uint16_t priority;
+} SFMCCollector;
+
+typedef struct _SFMCConfig {
+ int error;
+ uint32_t sampling_n;
+ uint32_t polling_secs;
+ SFLAddress agentIP;
+ uint32_t num_collectors;
+ SFMCCollector collectors[SFMC_MAX_COLLECTORS];
+} SFMCConfig;
+
+typedef struct _SFMC {
+ /* sampling parameters */
+ uint32_t sflow_random_seed;
+ uint32_t sflow_random_threshold;
+ /* the sFlow agent */
+ SFLAgent *agent;
+ /* need mutex when building sample */
+ pthread_mutex_t *mutex;
+ /* time */
+ struct timeval start_time;
+ rel_time_t tick;
+ /* config */
+ char *configFile;
+ time_t configFile_modTime;
+ SFMCConfig *config;
+ uint32_t configTests;
+ /* UDP send sockets */
+ int socket4;
+ int socket6;
+} SFMC;
+
+#define SFMC_ATOMIC_FETCH_ADD(_c, _inc) __sync_fetch_and_add(&(_c), (_inc))
+#define SFMC_ATOMIC_INC(_c) SFMC_ATOMIC_FETCH_ADD((_c), 1)
+#define SFMC_ATOMIC_DEC(_c) SFMC_ATOMIC_FETCH_ADD((_c), -1)
+
+#define SFLOW_DURATION_UNKNOWN 0
+
+/* file-scoped globals */
+static SFMC sfmc;
+
+static void sflow_init(SFMC *sm);
static void *sfmc_calloc(size_t bytes)
{
void *mem = calloc(1, bytes);
if(mem == NULL) {
- sfmc_log(LOG_ERR, "calloc() failed : %s", strerror(errno));
- // if(sfmc_debug) malloc_stats();
+ perror("sfmc_calloc");
exit(EXIT_FAILURE);
}
return mem;
@@ -50,7 +93,7 @@ static void *sfmc_calloc(size_t bytes)
static bool lockOrDie(pthread_mutex_t *sem) {
if(sem && pthread_mutex_lock(sem) != 0) {
- sfmc_log(LOG_ERR, "failed to lock semaphore!");
+ perror("lockOrDie");
exit(EXIT_FAILURE);
}
return true;
@@ -58,7 +101,7 @@ static bool lockOrDie(pthread_mutex_t *sem) {
static bool releaseOrDie(pthread_mutex_t *sem) {
if(sem && pthread_mutex_unlock(sem) != 0) {
- sfmc_log(LOG_ERR, "failed to unlock semaphore!");
+ perror("releaseOrDie");
exit(EXIT_FAILURE);
}
return true;
@@ -80,183 +123,309 @@ static int sfmc_cb_free(void *magic, SFLAgent *agent, void *obj)
static void sfmc_cb_error(void *magic, SFLAgent *agent, char *msg)
{
- sfmc_log(LOG_ERR, "sflow agent error: %s", msg);
+ perror(msg);
}
static void sfmc_cb_counters(void *magic, SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE *cs)
{
SFMC *sm = (SFMC *)poller->magic;
- SEMLOCK_DO(sm->mutex) {
-
- if(sm->config == NULL) {
- /* config is disabled */
- return;
- }
- if(sm->config->polling_secs == 0) {
- /* polling is off */
- return;
- }
-
- sfmc_log(LOG_INFO, "in sfmc_cb_counters!");
-
- SFLCounters_sample_element mcElem = { 0 };
- mcElem.tag = SFLCOUNTERS_MEMCACHE;
+ if(sm->config == NULL ||
+ sm->config->polling_secs == 0) {
+ /* not configured */
+ return;
+ }
- struct thread_stats thread_stats;
- threadlocal_stats_aggregate(&thread_stats);
- struct slab_stats slab_stats;
- slab_stats_aggregate(&thread_stats, &slab_stats);
+ SFLCounters_sample_element mcElem = { 0 };
+ mcElem.tag = SFLCOUNTERS_MEMCACHE;
+ struct thread_stats thread_stats;
+ threadlocal_stats_aggregate(&thread_stats);
+ struct slab_stats slab_stats;
+ slab_stats_aggregate(&thread_stats, &slab_stats);
+
#ifndef WIN32
- struct rusage usage;
- getrusage(RUSAGE_SELF, &usage);
+ struct rusage usage;
+ getrusage(RUSAGE_SELF, &usage);
#endif /* !WIN32 */
- STATS_LOCK();
- mcElem.counterBlock.memcache.uptime = current_time;
+ STATS_LOCK();
+ mcElem.counterBlock.memcache.uptime = sm->tick;
#ifdef WIN32
- mcElem.counterBlock.memcache.rusage_user = 0xFFFFFFFF;
- mcElem.counterBlock.memcache.rusage_system = 0xFFFFFFFF;
+ mcElem.counterBlock.memcache.rusage_user = 0xFFFFFFFF;
+ mcElem.counterBlock.memcache.rusage_system = 0xFFFFFFFF;
#else
- mcElem.counterBlock.memcache.rusage_user = (usage.ru_utime.tv_sec * 1000) + (usage.ru_utime.tv_usec / 1000);
- mcElem.counterBlock.memcache.rusage_system = (usage.ru_stime.tv_sec * 1000) + (usage.ru_stime.tv_usec / 1000);
+ mcElem.counterBlock.memcache.rusage_user = (usage.ru_utime.tv_sec * 1000) + (usage.ru_utime.tv_usec / 1000);
+ mcElem.counterBlock.memcache.rusage_system = (usage.ru_stime.tv_sec * 1000) + (usage.ru_stime.tv_usec / 1000);
#endif /* WIN32 */
- mcElem.counterBlock.memcache.curr_connections = stats.curr_conns - 1;
- mcElem.counterBlock.memcache.total_connections = stats.total_conns;
- mcElem.counterBlock.memcache.connection_structures = stats.conn_structs;
- mcElem.counterBlock.memcache.cmd_get = thread_stats.get_cmds;
- mcElem.counterBlock.memcache.cmd_set = slab_stats.set_cmds;
- mcElem.counterBlock.memcache.cmd_flush = thread_stats.flush_cmds;
- mcElem.counterBlock.memcache.get_hits = slab_stats.get_hits;
- mcElem.counterBlock.memcache.get_misses = thread_stats.get_misses;
- mcElem.counterBlock.memcache.delete_misses = thread_stats.delete_misses;
- mcElem.counterBlock.memcache.delete_hits = slab_stats.delete_hits;
- mcElem.counterBlock.memcache.incr_misses = thread_stats.incr_misses;
- mcElem.counterBlock.memcache.incr_hits = slab_stats.incr_hits;
- mcElem.counterBlock.memcache.decr_misses = thread_stats.decr_misses;
- mcElem.counterBlock.memcache.decr_hits = slab_stats.decr_hits;
- mcElem.counterBlock.memcache.cas_misses = thread_stats.cas_misses;
- mcElem.counterBlock.memcache.cas_hits = slab_stats.cas_hits;
- mcElem.counterBlock.memcache.cas_badval = slab_stats.cas_badval;
- mcElem.counterBlock.memcache.auth_cmds = thread_stats.auth_cmds;
- mcElem.counterBlock.memcache.auth_errors = thread_stats.auth_errors;
- mcElem.counterBlock.memcache.bytes_read = thread_stats.bytes_read;
- mcElem.counterBlock.memcache.bytes_written = thread_stats.bytes_written;
- mcElem.counterBlock.memcache.limit_maxbytes = settings.maxbytes;
- mcElem.counterBlock.memcache.accepting_conns = stats.accepting_conns;
- mcElem.counterBlock.memcache.listen_disabled_num = stats.listen_disabled_num;
- mcElem.counterBlock.memcache.threads = settings.num_threads;
- mcElem.counterBlock.memcache.conn_yields = thread_stats.conn_yields;
- STATS_UNLOCK();
- SFLADD_ELEMENT(cs, &mcElem);
+ mcElem.counterBlock.memcache.curr_connections = stats.curr_conns - 1;
+ mcElem.counterBlock.memcache.total_connections = stats.total_conns;
+ mcElem.counterBlock.memcache.connection_structures = stats.conn_structs;
+ mcElem.counterBlock.memcache.cmd_get = thread_stats.get_cmds;
+ mcElem.counterBlock.memcache.cmd_set = slab_stats.set_cmds;
+ mcElem.counterBlock.memcache.cmd_flush = thread_stats.flush_cmds;
+ mcElem.counterBlock.memcache.get_hits = slab_stats.get_hits;
+ mcElem.counterBlock.memcache.get_misses = thread_stats.get_misses;
+ mcElem.counterBlock.memcache.delete_misses = thread_stats.delete_misses;
+ mcElem.counterBlock.memcache.delete_hits = slab_stats.delete_hits;
+ mcElem.counterBlock.memcache.incr_misses = thread_stats.incr_misses;
+ mcElem.counterBlock.memcache.incr_hits = slab_stats.incr_hits;
+ mcElem.counterBlock.memcache.decr_misses = thread_stats.decr_misses;
+ mcElem.counterBlock.memcache.decr_hits = slab_stats.decr_hits;
+ mcElem.counterBlock.memcache.cas_misses = thread_stats.cas_misses;
+ mcElem.counterBlock.memcache.cas_hits = slab_stats.cas_hits;
+ mcElem.counterBlock.memcache.cas_badval = slab_stats.cas_badval;
+ mcElem.counterBlock.memcache.auth_cmds = thread_stats.auth_cmds;
+ mcElem.counterBlock.memcache.auth_errors = thread_stats.auth_errors;
+ mcElem.counterBlock.memcache.bytes_read = thread_stats.bytes_read;
+ mcElem.counterBlock.memcache.bytes_written = thread_stats.bytes_written;
+ mcElem.counterBlock.memcache.limit_maxbytes = settings.maxbytes;
+ mcElem.counterBlock.memcache.accepting_conns = stats.accepting_conns;
+ mcElem.counterBlock.memcache.listen_disabled_num = stats.listen_disabled_num;
+ mcElem.counterBlock.memcache.threads = settings.num_threads;
+ mcElem.counterBlock.memcache.conn_yields = thread_stats.conn_yields;
+ STATS_UNLOCK();
+ SFLADD_ELEMENT(cs, &mcElem);
+ SEMLOCK_DO(sm->mutex) {
sfl_poller_writeCountersSample(poller, cs);
}
}
-void sflow_sample(SFMC *sm, struct conn *c, SFLMemcache_prot prot, SFLMemcache_cmd cmd, char *key, size_t keylen, uint32_t nkeys, size_t value_bytes, uint32_t duration_uS, uint32_t status)
-{
- SEMLOCK_DO(sm->mutex) {
+static SFLMemcache_prot sflow_map_protocol(enum protocol prot) {
+ SFLMemcache_prot sflprot = SFMC_PROT_OTHER;
+ switch(prot) {
+ case ascii_prot: sflprot = SFMC_PROT_ASCII; break;
+ case binary_prot: sflprot = SFMC_PROT_BINARY; break;
+ case negotiating_prot:
+ default: break;
+ }
+ return sflprot;
+}
+
+static SFLMemcache_operation_status sflow_map_status(int ret) {
+ SFLMemcache_operation_status sflret = SFMC_OP_UNKNOWN;
+ switch(ret) {
+ case STORED: sflret = SFMC_OP_STORED; break;
+ case EXISTS: sflret = SFMC_OP_EXISTS; break;
+ case NOT_FOUND: sflret = SFMC_OP_NOT_FOUND; break;
+ case NOT_STORED: sflret = SFMC_OP_NOT_STORED; break;
+ }
+ return sflret;
+}
+
+
+static SFLMemcache_cmd sflow_map_ascii_op(int op) {
+ SFLMemcache_cmd sflcmd = SFMC_CMD_OTHER;
+ switch(op) {
+ case NREAD_ADD: sflcmd=SFMC_CMD_ADD; break;
+ case NREAD_REPLACE: sflcmd = SFMC_CMD_REPLACE; break;
+ case NREAD_APPEND: sflcmd = SFMC_CMD_APPEND; break;
+ case NREAD_PREPEND: sflcmd = SFMC_CMD_PREPEND; break;
+ case NREAD_SET: sflcmd = SFMC_CMD_SET; break;
+ case NREAD_CAS: sflcmd = SFMC_CMD_CAS; break;
+ /* SFMC_CMD_GET */
+ /* SFMC_CMD_GETS */
+ /* SFMC_CMD_INCR */
+ /* SFMC_CMD_DECR */
+ /* SFMC_CMD_DELETE */
+ /* SFMC_CMD_STATS */
+ /* SFMC_CMD_FLUSH */
+ /* SFMC_CMD_VERSION */
+ /* SFMC_CMD_QUIT */
+ default:
+ break;
+ }
+ return sflcmd;
+}
+
+static SFLMemcache_cmd sflow_map_binary_cmd(int cmd) {
+ SFLMemcache_cmd sflcmd = SFMC_CMD_OTHER;
+ switch(cmd) {
+ case PROTOCOL_BINARY_CMD_GET: sflcmd = SFMC_CMD_GET; break;
+ case PROTOCOL_BINARY_CMD_SET: sflcmd = SFMC_CMD_SET; break;
+ case PROTOCOL_BINARY_CMD_ADD: sflcmd = SFMC_CMD_ADD; break;
+ case PROTOCOL_BINARY_CMD_REPLACE: sflcmd = SFMC_CMD_REPLACE; break;
+ case PROTOCOL_BINARY_CMD_DELETE: sflcmd = SFMC_CMD_DELETE; break;
+ case PROTOCOL_BINARY_CMD_INCREMENT: sflcmd = SFMC_CMD_INCR; break;
+ case PROTOCOL_BINARY_CMD_DECREMENT: sflcmd = SFMC_CMD_DECR; break;
+ case PROTOCOL_BINARY_CMD_QUIT: sflcmd = SFMC_CMD_QUIT; break;
+ case PROTOCOL_BINARY_CMD_FLUSH: sflcmd = SFMC_CMD_FLUSH; break;
+ case PROTOCOL_BINARY_CMD_GETQ: break;
+ case PROTOCOL_BINARY_CMD_NOOP: break;
+ case PROTOCOL_BINARY_CMD_VERSION: sflcmd = SFMC_CMD_VERSION; break;
+ case PROTOCOL_BINARY_CMD_GETK: break;
+ case PROTOCOL_BINARY_CMD_GETKQ: break;
+ case PROTOCOL_BINARY_CMD_APPEND: sflcmd = SFMC_CMD_APPEND; break;
+ case PROTOCOL_BINARY_CMD_PREPEND: sflcmd = SFMC_CMD_PREPEND; break;
+ case PROTOCOL_BINARY_CMD_STAT: sflcmd = SFMC_CMD_STATS; break;
+ case PROTOCOL_BINARY_CMD_SETQ: break;
+ case PROTOCOL_BINARY_CMD_ADDQ: break;
+ case PROTOCOL_BINARY_CMD_REPLACEQ: break;
+ case PROTOCOL_BINARY_CMD_DELETEQ: break;
+ case PROTOCOL_BINARY_CMD_INCREMENTQ: break;
+ case PROTOCOL_BINARY_CMD_DECREMENTQ: break;
+ case PROTOCOL_BINARY_CMD_QUITQ: break;
+ case PROTOCOL_BINARY_CMD_FLUSHQ: break;
+ case PROTOCOL_BINARY_CMD_APPENDQ: break;
+ case PROTOCOL_BINARY_CMD_PREPENDQ: break;
+ //case PROTOCOL_BINARY_CMD_VERBOSITY: break;
+ //case PROTOCOL_BINARY_CMD_TOUCH: break;
+ //case PROTOCOL_BINARY_CMD_GAT: break;
+ //case PROTOCOL_BINARY_CMD_GATQ: break;
+ case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: break;
+ case PROTOCOL_BINARY_CMD_SASL_AUTH: break;
+ case PROTOCOL_BINARY_CMD_SASL_STEP: break;
+ default:
+ break;
+ }
+ return sflcmd;
+}
+
+/* This is the 32-bit PRNG recommended in G. Marsaglia, "Xorshift RNGs",
+ * _Journal of Statistical Software_ 8:14 (July 2003). According to the paper,
+ * it has a period of 2**32 - 1 and passes almost all tests of randomness. It
+ * is currently also used for sFlow sampling in the Open vSwitch project
+ * at http://www.openvswitch.org.
+ */
+void sflow_sample_test(struct conn *c) {
+ if(unlikely(!sfmc.sflow_random_seed)) {
+ /* sampling not configured */
+ return;
+ }
+ c->thread->sflow_sample_pool++;
+ uint32_t seed = c->thread->sflow_random;
+ if(unlikely(seed == 0)) {
+ /* initialize random number generation */
+ seed = sfmc.sflow_random_seed ^ c->thread->thread_id;
+ }
+ seed ^= seed << 13;
+ seed ^= seed >> 17;
+ seed ^= seed << 5;
+ c->thread->sflow_random = seed;
+ if(unlikely(seed <= sfmc.sflow_random_threshold)) {
+ /* Relax. We are out of the critical path now. */
+ /* since we are sampling at the start of the transaction
+ all we have to do here is record the wall-clock time.
+ The rest is done at the end of the transaction.
+ We could use clock_gettime(CLOCK_REALTIME) here to get
+ nanosecond resolution but it is not always implemented
+ as efficiently as gettimeofday and it's not clear that
+ that we can really do better than microsecond accuracy
+ anyway. */
+ gettimeofday(&c->sflow_start_time, NULL);
+ }
+}
- if(sm->config == NULL) {
- /* config is disabled */
- return;
+void sflow_sample(SFLMemcache_cmd command, struct conn *c, const void *key, size_t keylen, uint32_t nkeys, size_t value_bytes, int status)
+{
+ SFMC *sm = &sfmc;
+ if(sm->config == NULL ||
+ sm->config->sampling_n == 0 ||
+ sm->agent == NULL ||
+ sm->agent->samplers == NULL) {
+ /* sFlow not configured yet - may be waiting for DNS-SD request */
+ return;
+ }
+ SFLSampler *sampler = sm->agent->samplers;
+ struct timeval timenow,elapsed;
+ gettimeofday(&timenow, NULL);
+ timersub(&timenow, &c->sflow_start_time, &elapsed);
+ timerclear(&c->sflow_start_time);
+
+ SFL_FLOW_SAMPLE_TYPE fs = { 0 };
+
+ /* have to add up the pool from all the threads */
+ fs.sample_pool = sflow_sample_pool_aggregate();
+
+ /* indicate that I am the server by setting the
+ destination interface to 0x3FFFFFFF=="internal"
+ and leaving the source interface as 0=="unknown" */
+ fs.output = 0x3FFFFFFF;
+
+ SFLFlow_sample_element mcopElem = { 0 };
+ mcopElem.tag = SFLFLOW_MEMCACHE;
+ mcopElem.flowType.memcache.protocol = sflow_map_protocol(c->protocol);
+
+ // sometimes we pass the command in explicitly
+ // otherwise we allow it to be inferred
+ if(command == SFMC_CMD_OTHER) {
+ if(c->protocol == binary_prot) {
+ /* binary protocol has c->cmd */
+ command = sflow_map_binary_cmd(c->cmd);
+ }
+ else {
+ /* ascii protocol - infer cmd from the store_op */
+ /* actually this older version stores it in c->cmd too */
+ command = sflow_map_ascii_op(c->cmd);
}
+ }
+ mcopElem.flowType.memcache.command = command;
+
+ mcopElem.flowType.memcache.key.str = (char *)key;
+ mcopElem.flowType.memcache.key.len = (key ? keylen : 0);
+ mcopElem.flowType.memcache.nkeys = (nkeys == 0) ? 1 : nkeys;
+ mcopElem.flowType.memcache.value_bytes = value_bytes;
+ mcopElem.flowType.memcache.duration_uS = (elapsed.tv_sec * 1000000) + elapsed.tv_usec;
+ mcopElem.flowType.memcache.status = sflow_map_status(status);
+ SFLADD_ELEMENT(&fs, &mcopElem);
+
+ SFLFlow_sample_element socElem = { 0 };
+
+ if(c->transport == tcp_transport ||
+ c->transport == udp_transport) {
+ /* add a socket structure */
+ struct sockaddr_storage localsoc;
+ socklen_t localsoclen = sizeof(localsoc);
+ struct sockaddr_storage peersoc;
+ socklen_t peersoclen = sizeof(peersoc);
- if(sm->config->sampling_n == 0) {
- /* sampling is off */
- return;
+ /* ask the fd for the local socket - may have wildcards, but
+ at least we may learn the local port */
+ getsockname(c->sfd, (struct sockaddr *)&localsoc, &localsoclen);
+ /* for tcp the socket can tell us the peer info */
+ if(c->transport == tcp_transport) {
+ getpeername(c->sfd, (struct sockaddr *)&peersoc, &peersoclen);
}
-
- SFLSampler *sampler = sm->agent->samplers;
- if(sampler == NULL) {
- return;
+ else {
+ /* for UDP the peer can be different for every packet, but
+ this info is capture in the recvfrom() and given to us */
+ memcpy(&peersoc, &c->request_addr, c->request_addr_size);
}
- /* update the all-important sample_pool */
- sampler->samplePool += c->thread->sflow_last_skip;
-
- SFL_FLOW_SAMPLE_TYPE fs = { 0 };
-
- /* indicate that I am the server by setting the
- destination interface to 0x3FFFFFFF=="internal"
- and leaving the source interface as 0=="unknown" */
- fs.output = 0x3FFFFFFF;
+ /* two possibilities here... */
+ struct sockaddr_in *soc4 = (struct sockaddr_in *)&peersoc;
+ struct sockaddr_in6 *soc6 = (struct sockaddr_in6 *)&peersoc;
- sfmc_log(LOG_INFO, "in sfmc_sample_operation!");
-
- SFLFlow_sample_element mcopElem = { 0 };
- mcopElem.tag = SFLFLOW_MEMCACHE;
- mcopElem.flowType.memcache.protocol = prot;
- mcopElem.flowType.memcache.command = cmd;
- mcopElem.flowType.memcache.key.str = key;
- mcopElem.flowType.memcache.key.len = (key ? keylen : 0);
- mcopElem.flowType.memcache.nkeys = (nkeys == SFLOW_TOKENS_UNKNOWN) ? 1 : nkeys;
- mcopElem.flowType.memcache.value_bytes = value_bytes;
- mcopElem.flowType.memcache.duration_uS = duration_uS;
- mcopElem.flowType.memcache.status = status;
- SFLADD_ELEMENT(&fs, &mcopElem);
-
- SFLFlow_sample_element socElem = { 0 };
-
- if(c->transport == tcp_transport ||
- c->transport == udp_transport) {
- /* add a socket structure */
- struct sockaddr_storage localsoc;
- socklen_t localsoclen = sizeof(localsoc);
- struct sockaddr_storage peersoc;
- socklen_t peersoclen = sizeof(peersoc);
-
- /* ask the fd for the local socket - may have wildcards, but
- at least we may learn the local port */
- getsockname(c->sfd, (struct sockaddr *)&localsoc, &localsoclen);
- /* for tcp the socket can tell us the peer info */
- if(c->transport == tcp_transport) {
- getpeername(c->sfd, (struct sockaddr *)&peersoc, &peersoclen);
- }
- else {
- /* for UDP the peer can be different for every packet, but
- this info is capture in the recvfrom() and given to us */
- memcpy(&peersoc, &c->request_addr, c->request_addr_size);
- }
-
- /* two possibilities here... */
- struct sockaddr_in *soc4 = (struct sockaddr_in *)&peersoc;
- struct sockaddr_in6 *soc6 = (struct sockaddr_in6 *)&peersoc;
-
- if(peersoclen == sizeof(*soc4) && soc4->sin_family == AF_INET) {
- struct sockaddr_in *lsoc4 = (struct sockaddr_in *)&localsoc;
- socElem.tag = SFLFLOW_EX_SOCKET4;
- socElem.flowType.socket4.protocol = (c->transport == tcp_transport ? 6 : 17);
-