Navigation Menu

Skip to content

Commit

Permalink
Make ScribeCollector reconnect on write errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Teichman committed Nov 7, 2014
1 parent cadb120 commit c18860c
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 22 deletions.
1 change: 1 addition & 0 deletions AUTHORS
@@ -1 +1,2 @@
Space Monkey, Inc.
Peter Teichman <peter@teichman.org>
35 changes: 35 additions & 0 deletions trace/backoff.go
@@ -0,0 +1,35 @@
package trace

import "time"

// backoff implements a backoff policy, randomizing its delays and
// saturating at its last value.
type backoff struct {
millis []int
}

// defaultBackoff is a backoff policy ranging up to 5s.
var defaultBackoff = backoff{
[]int{0, 10, 10, 100, 100, 500, 500, 3000, 3000, 5000},
}

// duration returns the time duration of the n'th wait cycle in its
// backoff policy. This is backoff.millis[n], randomized to avoid
// thundering herds.
func (b backoff) duration(n int) time.Duration {
if n >= len(b.millis) {
n = len(b.millis) - 1
}

return time.Duration(fudge(b.millis[n])) * time.Millisecond
}

// fudge returns a random integer uniformly distributed in the range
// [0.5 * millis .. 1.5 * millis]
func fudge(millis int) int {
if millis == 0 {
return 0
}

return millis/2 + Rng.Intn(millis)
}
127 changes: 105 additions & 22 deletions trace/scribe.go
Expand Up @@ -16,7 +16,7 @@ package trace

import (
"encoding/base64"
"fmt"
"errors"
"net"
"time"

Expand All @@ -33,8 +33,10 @@ var (
// ScribeCollector matches the TraceCollector interface, but writes directly
// to a connected Scribe socket.
type ScribeCollector struct {
transport *thrift.TFramedTransport
client *scribe.ScribeClient
addr *net.TCPAddr
done chan struct{}

logs chan *scribe.LogEntry
}

// NewScribeCollector creates a ScribeCollector. scribe_addr is the address
Expand All @@ -44,36 +46,91 @@ func NewScribeCollector(scribe_addr string) (*ScribeCollector, error) {
if err != nil {
return nil, err
}
transport := thrift.NewTFramedTransport(
thrift.NewTSocketFromAddrTimeout(sa, 10*time.Second))
err = transport.Open()
if err != nil {
return nil, err

s := ScribeCollector{
addr: sa,
done: make(chan struct{}),
logs: make(chan *scribe.LogEntry, 100),
}

proto := thrift.NewTBinaryProtocolTransport(transport)
return &ScribeCollector{
transport: transport,
client: scribe.NewScribeClientProtocol(transport, proto, proto)}, nil
go s.pumpWrites()

return &s, nil
}

// pumpWrites sends all messages on s.logs to scribe. It creates the
// scribe connections, recreates them when write errors occur, and
// backs off on consecutive connection errors.
//
// When a write error occurs, pumpWrites will lose that log entry.
func (s *ScribeCollector) pumpWrites() {
var backoff int

for {
select {
case <-s.done:
return
case <-time.After(defaultBackoff.duration(backoff)):
}

conn, err := newScribeConn(s.addr)
if err != nil {
logger.Errorf("connect error: %s", err)
backoff++
continue
}

err = writeAll(conn, s.logs, s.done)
if err != nil {
logger.Errorf("write error: %s", err)
}

backoff = 0
}
}

// writeAll sends all logs to c, stopping when done is signaled.
func writeAll(c *scribeConn, logs <-chan *scribe.LogEntry, done <-chan struct{}) error {
for {
select {
case log := <-logs:
rc, err := c.client.Log([]*scribe.LogEntry{log})
if err != nil {
return err
}

// Non-OK responses are logged but not fatal
// for the connection.
if rc != scribe.ResultCode_OK {
logger.Errorf("scribe result code not OK: %s", rc)
}
case <-done:
return nil
}
}
}

// Close closes an existing ScribeCollector
func (s *ScribeCollector) Close() error {
return s.transport.Close()
close(s.done)
return nil
}

// CollectSerialized will send a serialized zipkin.Span to the Scribe endpoint
// CollectSerialized buffers a serialized zipkin.Span to be sent to
// the Scribe endpoint. It returns an error and loses the log entry if
// the buffer is full.
func (c *ScribeCollector) CollectSerialized(serialized []byte) error {
rc, err := c.client.Log([]*scribe.LogEntry{
{Category: "zipkin",
Message: base64.StdEncoding.EncodeToString(serialized)}})
if err != nil {
return err
entry := scribe.LogEntry{
Category: "zipkin",
Message: base64.StdEncoding.EncodeToString(serialized),
}
if rc != scribe.ResultCode_OK {
return fmt.Errorf("scribe result code not OK: %s", rc)

select {
case c.logs <- &entry:
return nil
default:
return errors.New("skipping scribe log: buffer full")
}
return nil
}

// Collect will serialize and send a zipkin.Span to the configured Scribe
Expand All @@ -89,4 +146,30 @@ func (c *ScribeCollector) Collect(s *zipkin.Span) {
}
}

type scribeConn struct {
transport *thrift.TFramedTransport
client *scribe.ScribeClient
}

func newScribeConn(addr *net.TCPAddr) (*scribeConn, error) {
transport := thrift.NewTFramedTransport(
thrift.NewTSocketFromAddrTimeout(addr, 10*time.Second))
err := transport.Open()
if err != nil {
return nil, err
}

proto := thrift.NewTBinaryProtocolTransport(transport)
conn := scribeConn{
transport: transport,
client: scribe.NewScribeClientProtocol(transport, proto, proto),
}

return &conn, nil
}

func (c *scribeConn) Close() {
c.transport.Close()
}

var _ TraceCollector = (*ScribeCollector)(nil)

0 comments on commit c18860c

Please sign in to comment.