Skip to content

Commit

Permalink
Add librato logging
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Aug 21, 2017
1 parent 2c141b9 commit b330039
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 12 deletions.
4 changes: 4 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type ChannelType string
// AnyChannelType is our empty channel type used when doing lookups without channel type assertions
var AnyChannelType = ChannelType("")

func (ct ChannelType) String() string {
return string(ct)
}

// ChannelUUID is our typing of a channel's UUID
type ChannelUUID struct {
uuid.UUID
Expand Down
3 changes: 3 additions & 0 deletions config/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Courier struct {

MaxWorkers int `default:"32"`

LibratoUsername string `default:""`
LibratoToken string `default:""`

RapidproHandleURL string `default:"https://app.rapidpro.io/handlers/mage/handle_message"`
RapidproToken string `default:"missing_rapidpro_token"`

Expand Down
148 changes: 148 additions & 0 deletions librato/librato.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package librato

import (
"bytes"
"encoding/json"
"net/http"
"strings"
"sync"
"time"

"github.com/nyaruka/courier/utils"
"github.com/sirupsen/logrus"
)

// Default is our default librato collector
var Default *Sender

// NewSender creates a new librato Sender with the passed in parameters
func NewSender(waitGroup *sync.WaitGroup, username string, token string, source string, timeout time.Duration) *Sender {
return &Sender{
waitGroup: waitGroup,
stop: make(chan bool),

buffer: make(chan gauge, 1000),
username: username,
token: token,
source: source,
timeout: timeout,
}
}

// AddGauge can be used to add a new gauge to be sent to librato
func (c *Sender) AddGauge(name string, value float64) {
// if no librato configured, return
if c == nil {
return
}

// our buffer is full, log an error but continue
if len(c.buffer) >= cap(c.buffer) {
logrus.Error("unable to add new gauges, buffer full, you may want to increase your buffer size or decrease your timeout")
return
}

c.buffer <- gauge{Name: strings.ToLower(name), Value: value, MeasureTime: time.Now().Unix()}
}

// Start starts our librato sender, callers can use Stop to stop it
func (c *Sender) Start() {
if c == nil {
return
}

go func() {
c.waitGroup.Add(1)
defer c.waitGroup.Done()
for {
select {
case <-c.stop:
c.flush(1000)
logrus.WithField("comp", "librato").Info("stopped")
return

case <-time.After(c.timeout * time.Second):
c.flush(300)
}
}
}()
}

func (c *Sender) flush(count int) {
if len(c.buffer) <= 0 {
return
}

// build our payload
reqPayload := &payload{
MeasureTime: time.Now().Unix(),
Source: c.source,
Gauges: make([]gauge, 0, len(c.buffer)),
}

// read up to our count of gauges
for i := 0; i < count; i++ {
select {
case g := <-c.buffer:
reqPayload.Gauges = append(reqPayload.Gauges, g)
default:
break
}
}

// send it off
encoded, err := json.Marshal(reqPayload)
if err != nil {
logrus.WithField("comp", "librato").WithError(err).Error("error encoding librato metrics")
return
}

req, err := http.NewRequest("POST", "https://metrics-api.librato.com/v1/metrics", bytes.NewReader(encoded))
if err != nil {
logrus.WithField("comp", "librato").WithError(err).Error("error sending librato metrics")
return
}
req.SetBasicAuth(c.username, c.token)
req.Header.Set("Content-Type", "application/json")
_, err = utils.MakeHTTPRequest(req)

if err != nil {
logrus.WithField("comp", "librato").WithError(err).Error("error sending librato metrics")
return
}

logrus.WithField("comp", "librato").WithField("body", string(encoded)).WithField("count", len(reqPayload.Gauges)).Debug("flushed to librato")
}

// Stop stops our sender, callers can use the WaitGroup used during initialization to block for stop
func (c *Sender) Stop() {
if c == nil {
return
}
close(c.stop)
}

type gauge struct {
Name string `json:"name"`
Value float64 `json:"value"`
MeasureTime int64 `json:"measure_time"`
}

type payload struct {
MeasureTime int64 `json:"measure_time"`
Source string `json:"source"`
Gauges []gauge `json:"gauges"`
}

// Sender is responsible for collecting gauges and sending them in batches to our librato server
type Sender struct {
waitGroup *sync.WaitGroup
stop chan bool

buffer chan gauge

username string
token string
source string
timeout time.Duration
}
11 changes: 9 additions & 2 deletions sender.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package courier

import (
"fmt"
"time"

"github.com/nyaruka/courier/librato"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -171,11 +173,16 @@ func (w *Sender) Send() {
} else {
// send our message
status, err = server.SendMsg(msg)
duration := time.Now().Sub(start)
secondDuration := float64(duration) / float64(time.Second)

if err != nil {
status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgErrored)
msgLog.WithError(err).WithField("elapsed", time.Now().Sub(start)).Error("msg errored")
msgLog.WithError(err).WithField("elapsed", duration).Error("msg errored")
librato.Default.AddGauge(fmt.Sprintf("courier.msg_send_error_%s", msg.Channel().ChannelType()), secondDuration)
} else {
msgLog.WithField("elapsed", time.Now().Sub(start)).Info("msg sent")
msgLog.WithField("elapsed", duration).Info("msg sent")
librato.Default.AddGauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration)
}
}

Expand Down
38 changes: 28 additions & 10 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net/http"
"net/http/httputil"
"os"
"sort"
"strings"
"time"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/nyaruka/courier/config"
"github.com/nyaruka/courier/librato"
"github.com/nyaruka/courier/utils"
"github.com/pressly/lg"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -88,6 +90,13 @@ func (s *server) Start() error {
// set our user agent, needs to happen before we do anything so we don't change have threading issues
utils.HTTPUserAgent = fmt.Sprintf("Courier/%s", s.config.Version)

// configure librato if we have configuration options for it
host, _ := os.Hostname()
if s.config.LibratoUsername != "" {
librato.Default = librato.NewSender(s.waitGroup, s.config.LibratoUsername, s.config.LibratoToken, host, 5)
librato.Default.Start()
}

// start our backend
err := s.backend.Start()
if err != nil {
Expand Down Expand Up @@ -161,6 +170,9 @@ func (s *server) Stop() error {
s.stopped = true
close(s.stopChan)

// stop our librato sender
librato.Default.Stop()

// wait for everything to stop
s.waitGroup.Wait()

Expand Down Expand Up @@ -265,15 +277,18 @@ func (s *server) channelUpdateStatusWrapper(handler ChannelHandler, handlerFunc

logs := make([]*ChannelLog, 0, 1)
statuses, err := handlerFunc(channel, ww, r)
elapsed := time.Now().Sub(start)
duration := time.Now().Sub(start)
secondDuration := float64(duration) / float64(time.Second)

// create channel logs for each of our msgs or errors
if err != nil {
WriteError(ww, r, err)
logs = append(logs, NewChannelLog(channel, NilMsgID, r.Method, url, ww.Status(), err, string(request), response.String(), elapsed, start))
logs = append(logs, NewChannelLog(channel, NilMsgID, r.Method, url, ww.Status(), err, string(request), response.String(), duration, start))
librato.Default.AddGauge(fmt.Sprintf("courier.msg_status_error_%s", channel.ChannelType()), secondDuration)
}

// create channel logs for each of our msgs
for _, status := range statuses {
logs = append(logs, NewChannelLog(channel, status.ID(), r.Method, url, ww.Status(), err, string(request), response.String(), elapsed, start))
logs = append(logs, NewChannelLog(channel, status.ID(), r.Method, url, ww.Status(), err, string(request), response.String(), duration, start))
librato.Default.AddGauge(fmt.Sprintf("courier.msg_status_%s", channel.ChannelType()), secondDuration)
}

// and write these out
Expand Down Expand Up @@ -305,15 +320,18 @@ func (s *server) channelReceiveMsgWrapper(handler ChannelHandler, handlerFunc Ch

logs := make([]*ChannelLog, 0, 1)
msgs, err := handlerFunc(channel, ww, r)
elapsed := time.Now().Sub(start)
duration := time.Now().Sub(start)
secondDuration := float64(duration) / float64(time.Second)

// create channel logs for each of our msgs or errors
if err != nil {
WriteError(ww, r, err)
logs = append(logs, NewChannelLog(channel, NilMsgID, r.Method, url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), elapsed, start))
logs = append(logs, NewChannelLog(channel, NilMsgID, r.Method, url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), duration, start))
librato.Default.AddGauge(fmt.Sprintf("courier.msg_receive_error_%s", channel.ChannelType()), secondDuration)
}

// create channel logs for each of our msgs
for _, msg := range msgs {
logs = append(logs, NewChannelLog(channel, msg.ID(), r.Method, url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), elapsed, start))
logs = append(logs, NewChannelLog(channel, msg.ID(), r.Method, url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), duration, start))
librato.Default.AddGauge(fmt.Sprintf("courier.msg_receive_%s", channel.ChannelType()), secondDuration)
}

// and write these out
Expand Down

0 comments on commit b330039

Please sign in to comment.