Skip to content

Commit

Permalink
Initial import.
Browse files Browse the repository at this point in the history
  • Loading branch information
rcrowley committed Oct 29, 2011
0 parents commit 972932e
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
@@ -0,0 +1,5 @@
*.[68]
*.a
*.swp
_obj
cmd/librato/librato
17 changes: 17 additions & 0 deletions Makefile
@@ -0,0 +1,17 @@
include $(GOROOT)/src/Make.inc

TARG=librato
GOFILES=librato.go

include $(GOROOT)/src/Make.pkg

all: uninstall clean install
make -C cmd/librato uninstall clean install

uninstall:
rm -f $(GOROOT)/pkg/$(GOOS)_$(GOARCH)/$(TARG).a
rm -f $(GOROOT)/pkg/$(GOOS)_$(GOARCH)/github.com/rcrowley/$(TARG).a
rm -rf $(GOROOT)/src/pkg/github.com/rcrowley/$(TARG)
make -C cmd/librato uninstall

.PHONY: all uninstall
11 changes: 11 additions & 0 deletions cmd/librato/Makefile
@@ -0,0 +1,11 @@
include $(GOROOT)/src/Make.inc

TARG=librato
GOFILES=librato.go

include $(GOROOT)/src/Make.cmd

uninstall:
rm -f $(GOROOT)/bin/$(TARG)

.PHONY: uninstall
152 changes: 152 additions & 0 deletions cmd/librato/librato.go
@@ -0,0 +1,152 @@
package main

import (
"bufio"
"flag"
"fmt"
"librato"
"log"
"os"
"regexp"
"strconv"
)

// Storage for flags.
var user, token, source string

// Create a map suitable for use as a custom counter metric from the given
// regular expression match.
func customCounter(match []string) map[string]int64 {
obj := make(map[string]int64)
value, err := strconv.Atoi64(match[3])
if nil == err { obj["value"] = value }
measureTime, err := strconv.Atoi64(match[4])
if nil == err { obj["measure_time"] = measureTime }
return obj
}

// Create a map suitable for use as a custom gauge metric from the given
// regular expression match.
func customGauge(match []string) map[string]int64 {
obj := make(map[string]int64)
value, err := strconv.Atoi64(match[3])
if nil == err { obj["value"] = value }
measureTime, err := strconv.Atoi64(match[4])
if nil == err { obj["measure_time"] = measureTime }
count, err := strconv.Atoi64(match[5])
if nil == err { obj["count"] = count }
sum, err := strconv.Atoi64(match[6])
if nil == err { obj["sum"] = sum }
max, err := strconv.Atoi64(match[7])
if nil == err { obj["max"] = max }
min, err := strconv.Atoi64(match[8])
if nil == err { obj["min"] = min }
sumSquares, err := strconv.Atoi64(match[9])
if nil == err { obj["sum_squares"] = sumSquares }
return obj
}

// Initialize the flags with their default values from the environment.
func init() {
flag.StringVar(
&user,
"u",
os.Getenv("LIBRATO_USER"),
"Librato user",
)
flag.StringVar(
&token,
"t",
os.Getenv("LIBRATO_TOKEN"),
"Librato API token",
)
flag.StringVar(
&source,
"s",
os.Getenv("LIBRATO_SOURCE"),
"metric source",
)
}

func main() {

log.SetFlags(log.Ltime | log.Lmicroseconds | log.Lshortfile)

flag.Usage = usage
flag.Parse()

// The `user` and `token` flags are required. The `source` flag is not.
if "" == user {
log.Fatalln("no Librato user found in -u or LIBRATO_USER")
}
if "" == token {
log.Fatalln("no Librato API token found in -t or LIBRATO_TOKEN")
}

// Create a Librato Metrics client with the given credentials and source.
m := librato.NewMetrics(user, token, source)

// Regular expressions for parsing standard input. Valid lines contain
// a literal 'c' or 'g' character to identify the type of the metric, a
// name, and one or more numeric fields. Counters can accomodate up to
// two numeric fields, with the second representing the `measure_time`
// field. Gauges can accommodate up to seven numeric fields, which
// represent, in order, `value`, `measure_time`, `count`, `sum`, `max`,
// `min`, and `sum_squares` as documented by Librato:
// <http://dev.librato.com/v1/post/gauges/:name>.
//
// cg name value
re := regexp.MustCompile("^([cg]) ([^ ]+) ([0-9]+)$")
// c name value measure_time
reCustomCounter := regexp.MustCompile("^(c) ([^ ]+) ([0-9]+) (-|[0-9]+)$")
reCustomGauge := regexp.MustCompile(
// g name value measure_time count sum
"^(g) ([^ ]+) (-|[0-9]+) (-|[0-9]+) (-|[0-9]+) (-|[0-9]+) " +
// max min sum_squares
"(-|[0-9]+) (-|[0-9]+) (-|[0-9]+)$")

// Read standard input line-buffered. Break out of this loop on EOF.
// Log an error message and exit if any other error is encountered.
stdin := bufio.NewReader(os.Stdin)
for {
line, _, err := stdin.ReadLine()
s := string(line)
if os.EOF == err { break }
if nil != err { log.Fatalln(err) }

// Match this line against the regular expressions above. In
// case a line doesn't match, log the line and continue. Get
// the appropriate channel and send the metric.
if match := re.FindStringSubmatch(s); nil != match {
var ch chan int64
switch match[1] {
case "c": ch = m.GetCounter(match[2])
case "g": ch = m.GetGauge(match[2])
}
value, _ := strconv.Atoi64(match[3])
ch <- value
} else if match := reCustomCounter.FindStringSubmatch(s); nil != match {
m.GetCustomCounter(match[2]) <- customCounter(match)
} else if match := reCustomGauge.FindStringSubmatch(s); nil != match {
m.GetCustomGauge(match[2]) <- customGauge(match)
} else { log.Printf("malformed line \"%v\"\n", s) }

}

// Close all metric channels so no new messages may be sent. Wait
// for all outstanding HTTP requests to finish.
//
// This can deadlock in the event that EOF is seen (and hence execution
// arrives here) before a single metric has been sent. The Go runtime
// will detect the deadlock and abort with a nasty stack trace.
m.Close()
m.Wait()

}

func usage() {
fmt.Fprintln(
os.Stderr,
"Usage: librato [-u <user>] [-t <token>] [-s <source>]",
)
}
196 changes: 196 additions & 0 deletions librato.go
@@ -0,0 +1,196 @@
// Go client for Librato Metrics
//
// <https://github.com/rcrowley/go-librato>
package librato

import (
"bytes"
"fmt"
"http"
"io"
"json"
"log"
"os"
"url"
)

// Librato `Metrics` structs encapsulate the credentials used to send metrics
// to the API, the source tag for these metrics, bookkeeping for goroutines,
// and lookup tables for existing metric channels.
type Metrics struct {
user, token, source string
quit, running chan bool
counters, gauges map[string]chan int64
customCounters, customGauges map[string]chan map[string]int64
}

// Create a new `Metrics` struct with the given credentials and source tag.
// Initialize all the channels, maps, and goroutines used internally.
func NewMetrics(user, token, source string) *Metrics {
m := &Metrics{
user, token, source,
make(chan bool), make(chan bool),
make(map[string]chan int64), make(map[string]chan int64),
make(map[string]chan map[string]int64),
make(map[string]chan map[string]int64),
}
go func() {
var n uint
for {
if <-m.running { n++ } else if 0 < n { n-- }
if 0 == n { break }
}
m.quit <- true
}()
return m
}

// TODO Offer a way to do collated POSTs of a bunch of metrics at once.

// Close all metric channels so no new messages may be sent. This is
// a prerequisite to `Wait`ing.
func (m *Metrics) Close() {
for _, ch := range m.counters { close(ch) }
for _, ch := range m.gauges { close(ch) }
for _, ch := range m.customCounters { close(ch) }
for _, ch := range m.customGauges { close(ch) }
}

// Get (possibly by creating) a counter channel by the given name.
func (m *Metrics) GetCounter(name string) chan int64 {
ch, ok := m.counters[name]
if ok { return ch }
return m.NewCounter(name)
}

// Get (possibly by creating) a custom counter channel by the given name.
func (m *Metrics) GetCustomCounter(name string) chan map[string]int64 {
ch, ok := m.customCounters[name]
if ok { return ch }
return m.NewCustomCounter(name)
}

// Get (possibly by creating) a custom gauge channel by the given name.
func (m *Metrics) GetCustomGauge(name string) chan map[string]int64 {
ch, ok := m.customGauges[name]
if ok { return ch }
return m.NewCustomGauge(name)
}

// Get (possibly by creating) a gauge channel by the given name.
func (m *Metrics) GetGauge(name string) chan int64 {
ch, ok := m.gauges[name]
if ok { return ch }
return m.NewGauge(name)
}

// Create a counter channel by the given name.
func (m *Metrics) NewCounter(name string) chan int64 {
ch := m.newMetric("/counters/%s.json", name)
m.counters[name] = ch
return ch
}

// Create a custom counter channel by the given name.
func (m *Metrics) NewCustomCounter(name string) chan map[string]int64 {
ch := m.newCustomMetric("/counters/%s.json", name)
m.customCounters[name] = ch
return ch
}

// Create a custom gauge channel by the given name.
func (m *Metrics) NewCustomGauge(name string) chan map[string]int64 {
ch := m.newCustomMetric("/gauges/%s.json", name)
m.customGauges[name] = ch
return ch
}

// Create a gauge channel by the given name.
func (m *Metrics) NewGauge(name string) chan int64 {
ch := m.newMetric("/gauges/%s.json", name)
m.gauges[name] = ch
return ch
}

// TODO Offer a way to convert many individual values sent through a channel
// TODO into one API call that sets count, sum, max, min, and sum_squares.

// Create an `http.Request` object. This wrapper prefixes the URL with
// the Librato Metrics API endpoint, sets the `Content-Type` header to
// `application/json`, and sets the `Authorization` header for HTTP Basic
// authentication from the `Metrics` struct.
func (m *Metrics) NewRequest(
method, path string,
body io.Reader,
) (*http.Request, os.Error) {
url := fmt.Sprintf("https://metrics-api.librato.com/v1%s", path)
req, err := http.NewRequest(method, url, body)
if nil != err { return req, err }
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(m.user, m.token)
return req, nil
}

// Wait for all outstanding HTTP requests to finish. This must be called
// after `Close` has been called.
func (m *Metrics) Wait() {
<-m.quit
}

// Serialize an `application/json` request body and do one HTTP roundtrip
// using the `http` package's `DefaultClient`.
func (m *Metrics) do(
format, name string,
body map[string]interface{},
) (*http.Response, os.Error) {
if "" != m.source { body["source"] = m.source }
b, err := json.Marshal(body)
if nil != err { return err }
req, err := m.NewRequest(
"POST",
fmt.Sprintf(format, url.QueryEscape(name)),
bytes.NewBuffer(b),
)
if nil != err { return err }
fmt.Printf("req: %v\n", req)
resp, err := http.DefaultClient.Do(req)
fmt.Printf("resp: %v\n", resp)
return resp, err
}

// Create a metric channel and begin processing messages sent
// to it in a background goroutine.
func (m *Metrics) newMetric(format, name string) chan int64 {
ch := make(chan int64) // TODO Buffer this channel?
go func() {
m.running <- true
for {
v, ok := <-ch
if !ok { break }
body := map[string]interface{} { "value": v }
err := m.do(format, name, body)
if nil != err { log.Println(err) }
}
m.running <- false
}()
return ch
}

// Create a custom metric channel and begin processing messages sent
// to it in a background goroutine.
func (m *Metrics) newCustomMetric(format, name string) chan map[string]int64 {
ch := make(chan map[string]int64) // TODO Buffer this channel?
go func() {
m.running <- true
for {
obj, ok := <-ch
if !ok { break }
body := make(map[string]interface{})
for k, v := range obj { body[k] = v }
err := m.do(format, name, body)
if nil != err { log.Println(err) }
}
m.running <- false
}()
return ch
}

0 comments on commit 972932e

Please sign in to comment.