Skip to content

Commit

Permalink
first pass at a graphtie aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Aug 29, 2012
1 parent fcf641a commit 4971db8
Show file tree
Hide file tree
Showing 3 changed files with 363 additions and 17 deletions.
278 changes: 278 additions & 0 deletions gographite_aggregator/gographite_aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// this is a service that receive's TCP connections from multiple gographites,
// it stores and merges the data, and writes out once to graphite

// this is necesary when you run a distributed system where you want to
// a) use gographite close to the source to lower data transfer volumes
// b) aggregate across hosts so you don't have to put the source hostname in all your keys

package main

import (
"bytes"
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"regexp"
"sort"
"strconv"
"syscall"
"bufio"
"time"
)

const VERSION = "0.1"

var signalchan chan os.Signal

type AggregatePacket struct {
Bucket string
Value int
Modifier string
Aggregation string
}

type Percentiles []int

func (a *Percentiles) Set(s string) error {
i, err := strconv.Atoi(s)
if err != nil {
return err
}
*a = append(*a, i)
return nil
}
func (a *Percentiles) String() string {
return fmt.Sprintf("%v", *a)
}

var (
serviceAddress = flag.String("address", ":2004", "TCP service address")
graphiteAddress = flag.String("graphite", "127.0.0.1:2003", "Graphite service address")
flushInterval = flag.Int64("flush-interval", 10, "Flush interval (seconds)")
showVersion = flag.Bool("version", false, "print version string")
percentThreshold = Percentiles{}
)

func init() {
flag.Var(&percentThreshold, "percent-threshold", "Threshold percent (may be given multiple times)")
}

var (
In = make(chan *AggregatePacket, 1000)
counters = make(map[string]int)
gauges = make(map[string]int)
averaged = make(map[string][]int)
minned = make(map[string][]int)
maxed = make(map[string][]int)
)

func monitor() {
ticker := time.NewTicker(time.Duration(*flushInterval) * time.Second)
for {
select {
case sig := <-signalchan:
fmt.Printf("Caught signal %d... shutting down\n", sig)
submit()
return
case <-ticker.C:
submit()
case s := <-In:
if s.Aggregation == "avg" {
_, ok := averaged[s.Bucket]
if !ok {
var t []int
averaged[s.Bucket] = t
}
averaged[s.Bucket] = append(averaged[s.Bucket], s.Value)
} else if s.Aggregation == "min" {
_, ok := minned[s.Bucket]
if !ok {
var t []int
minned[s.Bucket] = t
}
minned[s.Bucket] = append(minned[s.Bucket], s.Value)
} else if s.Aggregation == "max" {
_, ok := maxed[s.Bucket]
if !ok {
var t []int
maxed[s.Bucket] = t
}
maxed[s.Bucket] = append(maxed[s.Bucket], s.Value)
} else if s.Modifier == "g" {
gauges[s.Bucket] = int(s.Value)
} else {
v, ok := counters[s.Bucket]
if !ok || v == -1 {
counters[s.Bucket] = 0
}
counters[s.Bucket] += s.Value
}
}
}
}

func submit() {
client, err := net.Dial("tcp", *graphiteAddress)
if err != nil {
log.Printf("Error dialing", err.Error())
return
}
defer client.Close()

numStats := 0
now := time.Now().Unix()
buffer := bytes.NewBufferString("")
for s, c := range counters {
if c == -1 {
continue
}
fmt.Fprintf(buffer, "%s %d %d\n", s, c, now)
counters[s] = -1
numStats++
}

for g, c := range gauges {
if c == -1 {
continue
}
fmt.Fprintf(buffer, "%s %d %d\n", g, c, now)
gauges[g] = -1
numStats++
}

for u, t := range averaged {
if len(t) >= 0 {
numStats++

var average int64
for _, tt := range t {
average += int64(tt)
}
average = average / int64(len(t))
fmt.Fprintf(buffer, "%s %d %d", u, average, now)

var z []int
averaged[u] = z
}
}

for u, t := range minned {
if len(t) >= 0 {
numStats++
sort.Ints(t)
min := t[0]
fmt.Fprintf(buffer, "%s %d %d", u, min, now)

var z []int
minned[u] = z
}
}

for u, t := range maxed {
if len(t) >= 0 {
numStats++
sort.Ints(t)
max := t[len(t)-1]
fmt.Fprintf(buffer, "%s %d %d", u, max, now)

var z []int
maxed[u] = z
}
}

if numStats == 0 {
return
}
log.Printf("got %d stats", numStats)
fmt.Fprintf(buffer, "statsd.numStats %d %d\n", numStats, now)
data := buffer.Bytes()
client.Write(data)

}

func parseSingleMessage(buf *bytes.Buffer) *AggregatePacket {
var sanitizeRegexp = regexp.MustCompile("[^a-zA-Z0-9\\-_\\.:\\|@]")
var packetRegexp = regexp.MustCompile("([a-zA-Z0-9_\\.]+):([0-9]+)\\|(g|c)(\\|(avg|min|max))?")

s := sanitizeRegexp.ReplaceAllString(buf.String(), "")

for _, item := range packetRegexp.FindAllStringSubmatch(s, -1) {
log.Printf("got %s %v", s, item)

value, err := strconv.Atoi(item[2])
if err != nil {
// todo print out this error
log.Printf("error %s", err.Error())
}
agg := ""
if len(item) == 6 {
agg = item[5]
}

packet := &AggregatePacket{
Bucket: item[1],
Value: value,
Modifier: item[3],
Aggregation: agg,
}
return packet
}
return nil
}

func tcpListener() {
address, _ := net.ResolveTCPAddr("tcp", *serviceAddress)
log.Printf("Listening on %s", address)
listener, err := net.ListenTCP("tcp", address)
if err != nil {
log.Fatalf("ListenAndServe: %s", err.Error())
}
defer listener.Close()

for {
conn, err := listener.Accept()
if err != nil {
// handle error
continue
}
log.Println("new connection from %s", conn.RemoteAddr().String())
go handleConnection(conn)
}
}

func handleConnection(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
line, isPrefix, err := reader.ReadLine()
if err != nil {
log.Printf("error reading line %s from %s", err.Error(), conn.RemoteAddr().String())
break
}
if isPrefix == true {
log.Printf("got partial line")
break
}
buf := bytes.NewBuffer(line)
packet := parseSingleMessage(buf)
if packet != nil {
In <- packet
}
}
}

func main() {
flag.Parse()
if *showVersion {
fmt.Printf("gographite_aggregator v%s\n", VERSION)
return
}
signalchan = make(chan os.Signal, 1)
signal.Notify(signalchan, syscall.SIGTERM)

go tcpListener()
monitor()
}
30 changes: 30 additions & 0 deletions gographite_aggregator/parsing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"bytes"
"github.com/bmizerany/assert"
"testing"
)

func TestAggregatePacketParse(t *testing.T) {

d := []byte("gaugor:333|g")
packet := parseSingleMessage(bytes.NewBuffer(d))
assert.Equal(t, "gaugor", packet.Bucket)
assert.Equal(t, 333, packet.Value)
assert.Equal(t, "g", packet.Modifier)

d = []byte("gorets:2|g|avg")
packet = parseSingleMessage(bytes.NewBuffer(d))
assert.Equal(t, "gorets", packet.Bucket)
assert.Equal(t, 2, packet.Value)
assert.Equal(t, "g", packet.Modifier)
assert.Equal(t, "avg", packet.Aggregation)

d = []byte("gorets:4|c")
packet = parseSingleMessage(bytes.NewBuffer(d))
assert.Equal(t, "gorets", packet.Bucket)
assert.Equal(t, 4, packet.Value)
assert.Equal(t, "c", packet.Modifier)

}
Loading

0 comments on commit 4971db8

Please sign in to comment.