Skip to content

Commit

Permalink
More work on graphite submission using dynamic string construction.
Browse files Browse the repository at this point in the history
  • Loading branch information
jbuchbinder committed Oct 9, 2012
1 parent 7d37fa1 commit dbf8093
Showing 1 changed file with 40 additions and 74 deletions.
114 changes: 40 additions & 74 deletions src/statsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

#include "json-c/json.h"
#include "uthash/utarray.h"
#include "uthash/utstring.h"
#include "queue.h"
#include "statsd.h"
#include "serialize.h"
Expand Down Expand Up @@ -81,8 +82,8 @@ pthread_t thread_mgmt;
pthread_t thread_flush;
pthread_t thread_queue;
int port = PORT, mgmt_port = MGMT_PORT, ganglia_port = GANGLIA_PORT, flush_interval = FLUSH_INTERVAL;
int debug = 0, friendly = 0, clear_stats = 0, daemonize = 0, enable_gmetric = 0;
char *serialize_file = NULL, *ganglia_host = NULL, *ganglia_spoof = NULL, *ganglia_metric_prefix = NULL, *lock_file = NULL;
int debug = 0, friendly = 0, clear_stats = 0, daemonize = 0, enable_gmetric = 0, enable_graphite = 0;
char *serialize_file = NULL, *ganglia_host = NULL, *ganglia_spoof = NULL, *graphite_host = NULL, *ganglia_metric_prefix = NULL, *lock_file = NULL;
int percentiles[5], num_percentiles = 0;

/*
Expand Down Expand Up @@ -244,6 +245,7 @@ void syntax(char *argv[]) {
fprintf(stderr, "\t-s file serialize state to and from file (default disabled)\n");
fprintf(stderr, "\t-G host ganglia host (default disabled)\n");
fprintf(stderr, "\t-g port ganglia port (default 8649)\n");
fprintf(stderr, "\t-R host graphite host (default disabled)\n");
fprintf(stderr, "\t-S spoofhost ganglia spoof host (default statsd:statsd)\n");
fprintf(stderr, "\t-P prefix ganglia metric prefix (default is none)\n");
fprintf(stderr, "\t-l lockfile lock file (only used when daemonizing)\n");
Expand Down Expand Up @@ -273,7 +275,7 @@ int main(int argc, char *argv[]) {

queue_init();

while ((opt = getopt(argc, argv, "dDfhp:m:s:cg:G:F:S:P:l:T:")) != -1) {
while ((opt = getopt(argc, argv, "dDfhp:m:s:cg:G:F:S:P:l:T:R:")) != -1) {
switch (opt) {
case 'd':
printf("Debug enabled.\n");
Expand Down Expand Up @@ -307,6 +309,11 @@ int main(int argc, char *argv[]) {
clear_stats = 1;
printf("Clearing stats on start.\n");
break;
case 'R':
graphite_host = strdup(optarg);
enable_graphite = 1;
printf("Graphite host %s\n", graphite_host);
break;
case 'G':
ganglia_host = strdup(optarg);
enable_gmetric = 1;
Expand Down Expand Up @@ -1051,10 +1058,9 @@ void p_thread_flush(void *ptr) {
long ts = time(NULL);
char *ts_string = ltoa(ts);
int numStats = 0;
#ifdef SEND_GRAPHITE
char *statString = NULL;
#endif
UT_string *statString;

utstring_new(statString);

/* ---------------------------------------------------------------------
Process counter metrics
Expand All @@ -1063,10 +1069,9 @@ void p_thread_flush(void *ptr) {
statsd_counter_t *s_counter, *tmp;
HASH_ITER(hh, counters, s_counter, tmp) {
long double value = s_counter->value / flush_interval;
#ifdef SEND_GRAPHITE
char message[BUFLEN];
sprintf(message, "stats.%s %Lf %ld\nstats_counts_%s %Lf %ld\n", s_counter->key, value, ts, s_counter->key, s_counter->value, ts);
#endif
if (enable_graphite) {
utstring_printf(statString, "stats.%s %Lf %ld\nstats_counts_%s %Lf %ld\n", s_counter->key, value, ts, s_counter->key, s_counter->value, ts);
}
if (enable_gmetric) {
{
char *k = NULL;
Expand All @@ -1080,20 +1085,9 @@ void p_thread_flush(void *ptr) {
if (k) free(k);
}
{
//char *k = malloc(strlen(s_counter->key) + 13);
// sprintf(k, "%s", s_counter->key);
SEND_GMETRIC_DOUBLE(s_counter->key, s_counter->key, s_counter->value, "count");
//if (k) free(k);
}
}
#ifdef SEND_GRAPHITE
if (statString) {
statString = realloc(statString, strlen(statString) + strlen(message));
strcat(statString, message);
} else {
statString = strdup(message);
}
#endif

/* Clear counter after we're done with it */
wait_for_counters_lock();
Expand Down Expand Up @@ -1160,20 +1154,19 @@ void p_thread_flush(void *ptr) {
remove_timers_lock();


#ifdef SEND_GRAPHITE
char message[BUFLEN];
sprintf(message, "stats.timers.%s.mean %f %ld\n"
"stats.timers.%s.upper %f %ld\n"
"stats.timers.%s.upper_%d %f %ld\n"
"stats.timers.%s.lower %f %ld\n"
"stats.timers.%s.count %d %ld\n",
s_timer->key, mean, ts,
s_timer->key, max, ts,
s_timer->key, pctThreshold, maxAtThreshold, ts,
s_timer->key, min, ts,
s_timer->key, s_timer->count, ts
);
#endif
if (enable_graphite) {
utstring_printf(statString, "stats.timers.%s.mean %f %ld\n"
"stats.timers.%s.upper %f %ld\n"
"stats.timers.%s.upper_%d %f %ld\n"
"stats.timers.%s.lower %f %ld\n"
"stats.timers.%s.count %d %ld\n",
s_timer->key, mean, ts,
s_timer->key, max, ts,
s_timer->key, pctThreshold, maxAtThreshold, ts,
s_timer->key, min, ts,
s_timer->key, s_timer->count, ts
);
}

if (enable_gmetric) {
{
Expand Down Expand Up @@ -1205,14 +1198,6 @@ void p_thread_flush(void *ptr) {
SEND_GMETRIC_INT(s_timer->key, k, s_timer->count, "count");
}
}
#ifdef SEND_GRAPHITE
if (statString) {
statString = realloc(statString, strlen(statString) + strlen(message));
strcat(statString, message);
} else {
statString = strdup(message);
}
#endif
}
numStats++;
}
Expand All @@ -1228,10 +1213,9 @@ void p_thread_flush(void *ptr) {
statsd_gauge_t *s_gauge, *tmp;
HASH_ITER(hh, gauges, s_gauge, tmp) {
long double value = s_gauge->value;
#ifdef SEND_GRAPHITE
char message[BUFLEN];
sprintf(message, "stats.%s %Lf %ld\nstats_gauges_%s %Lf %ld\n", s_gauge->key, value, ts, s_gauge->key, s_gauge->value, ts);
#endif
if (enable_graphite) {
utstring_printf(statString, "stats.%s %Lf %ld\nstats_gauges_%s %Lf %ld\n", s_gauge->key, value, ts, s_gauge->key, s_gauge->value, ts);
}
if (enable_gmetric) {
{
char *k = NULL;
Expand All @@ -1251,15 +1235,6 @@ void p_thread_flush(void *ptr) {
//if (k) free(k);
}
}
#ifdef SEND_GRAPHITE
if (statString) {
statString = realloc(statString, strlen(statString) + strlen(message));
strcat(statString, message);
} else {
statString = strdup(message);
}
#endif

numStats++;
}
if (s_gauge) free(s_gauge);
Expand All @@ -1271,36 +1246,27 @@ void p_thread_flush(void *ptr) {
-------------------------------------------------------------------- */

{
#ifdef SEND_GRAPHITE
char *message = malloc(sizeof(char) * BUFLEN);
sprintf(message, "statsd.numStats %d %ld\n", numStats, ts);
#endif
if (enable_graphite) {
utstring_printf(statString, "statsd.numStats %d %ld\n", numStats, ts);
}
if (enable_gmetric) {
SEND_GMETRIC_INT("statsd", "statsd_numstats_collected", numStats, "count");
}
#ifdef SEND_GRAPHITE
if (statString) {
statString = realloc(statString, strlen(statString) + strlen(message));
strcat(statString, message);
} else {
statString = strdup(message);
}
#endif
}

/* TODO: Flush to graphite */
#ifdef SEND_GRAPHITE
printf("Messages:\n%s", statString);
#endif
if (enable_graphite) {
printf("Messages:\n%s", utstring_body(statString));
}

if (enable_gmetric) {
gmetric_close(&gm);
}

if (ts_string) free(ts_string);
#ifdef SEND_GRAPHITE
if (statString) free(statString);
#endif
if (enable_graphite) {
utstring_free(statString);
}
}

syslog(LOG_INFO, "Thread[Flush]: Ending thread %d\n", (int) *((int *) ptr));
Expand Down

0 comments on commit dbf8093

Please sign in to comment.