Skip to content

Commit

Permalink
Fix HTTP reporter potential unbounded goroutine creation (#146)
Browse files Browse the repository at this point in the history
* Fix HTTP reporter potential unbounded goroutine creation

Currently the HTTP reporter spawns a new goroutine each time a new batch needs
to be sent to the server. Execution of those goroutines gets serialized through
the `sendMtx`.

This behavior is problematic when the zipkin server is down and leads to
creation of unbounded number of goroutines each of which will wait for its turn
on the `sendMtx` and then fail.

Fix this by creating one send goroutine to handle sending data to the server.
This also removes the need for the `sendMtx` since `sendBatch` will only be
called by the send goroutine.

Signed-off-by: Slavomir Kaslev <kaslevs@vmware.com>

* Add more HTTP reporter test cases

Add HTTP reporter test cases for BatchInterval() and BatchSize() options
behavior.

Signed-off-by: Slavomir Kaslev <kaslevs@vmware.com>
  • Loading branch information
skaslev authored and jcchavezs committed Sep 15, 2019
1 parent e33faeb commit c29478e
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 47 deletions.
34 changes: 21 additions & 13 deletions reporter/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ type httpReporter struct {
batchInterval time.Duration
batchSize int
maxBacklog int
sendMtx *sync.Mutex
batchMtx *sync.Mutex
batch []*model.SpanModel
spanC chan *model.SpanModel
sendC chan struct{}
quit chan struct{}
shutdown chan error
reqCallback RequestCallbackFn
Expand Down Expand Up @@ -80,24 +80,35 @@ func (r *httpReporter) loop() {
currentBatchSize := r.append(span)
if currentBatchSize >= r.batchSize {
nextSend = time.Now().Add(r.batchInterval)
go func() {
_ = r.sendBatch()
}()
r.enqueueSend()
}
case <-tickerChan:
if time.Now().After(nextSend) {
nextSend = time.Now().Add(r.batchInterval)
go func() {
_ = r.sendBatch()
}()
r.enqueueSend()
}
case <-r.quit:
r.shutdown <- r.sendBatch()
close(r.sendC)
return
}
}
}

func (r *httpReporter) sendLoop() {
for range r.sendC {
_ = r.sendBatch()
}
r.shutdown <- r.sendBatch()
}

func (r *httpReporter) enqueueSend() {
select {
case r.sendC <- struct{}{}:
default:
// Do nothing if there's a pending send request already
}
}

func (r *httpReporter) append(span *model.SpanModel) (newBatchSize int) {
r.batchMtx.Lock()

Expand All @@ -114,10 +125,6 @@ func (r *httpReporter) append(span *model.SpanModel) (newBatchSize int) {
}

func (r *httpReporter) sendBatch() error {
// in order to prevent sending the same batch twice
r.sendMtx.Lock()
defer r.sendMtx.Unlock()

// Select all current spans in the batch to be sent
r.batchMtx.Lock()
sendBatch := r.batch[:]
Expand Down Expand Up @@ -232,9 +239,9 @@ func NewReporter(url string, opts ...ReporterOption) reporter.Reporter {
maxBacklog: defaultMaxBacklog,
batch: []*model.SpanModel{},
spanC: make(chan *model.SpanModel),
sendC: make(chan struct{}, 1),
quit: make(chan struct{}, 1),
shutdown: make(chan error, 1),
sendMtx: &sync.Mutex{},
batchMtx: &sync.Mutex{},
serializer: reporter.JSONSerializer{},
}
Expand All @@ -244,6 +251,7 @@ func NewReporter(url string, opts ...ReporterOption) reporter.Reporter {
}

go r.loop()
go r.sendLoop()

return &r
}
146 changes: 112 additions & 34 deletions reporter/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,28 @@
package http_test

import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"

"fmt"
"time"

"strings"

"github.com/openzipkin/zipkin-go/idgenerator"
"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/reporter"
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
)

func TestSpanIsBeingReported(t *testing.T) {
func generateSpans(n int) []*model.SpanModel {
spans := make([]*model.SpanModel, n)
idGen := idgenerator.NewRandom64()
traceID := idGen.TraceID()

nSpans := 2
var aSpans []model.SpanModel
var eSpans []string

for i := 0; i < nSpans; i++ {
span := model.SpanModel{
for i := 0; i < n; i++ {
spans[i] = &model.SpanModel{
SpanContext: model.SpanContext{
TraceID: traceID,
ID: idGen.SpanID(traceID),
Expand All @@ -48,44 +45,125 @@ func TestSpanIsBeingReported(t *testing.T) {
Kind: model.Client,
Timestamp: time.Now(),
}

aSpans = append(aSpans, span)
eSpans = append(
eSpans,
fmt.Sprintf(
`{"timestamp":%d,"traceId":"%s","id":"%s","name":"%s","kind":"%s"}`,
span.Timestamp.Round(time.Microsecond).UnixNano()/1e3,
span.SpanContext.TraceID,
span.SpanContext.ID,
span.Name,
span.Kind,
),
)
}

eSpansPayload := fmt.Sprintf("[%s]", strings.Join(eSpans, ","))
return spans
}

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
func newTestServer(t *testing.T, spans []*model.SpanModel, serializer reporter.SpanSerializer, onReceive func(int)) *httptest.Server {
sofar := 0
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Errorf("expected 'POST' request, got '%s'", r.Method)
}

aSpanPayload, err := ioutil.ReadAll(r.Body)
aPayload, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Errorf("unexpected error: %s", err.Error())
t.Errorf("unexpected error: %v", err)
}

if eSpansPayload != string(aSpanPayload) {
t.Errorf("unexpected span payload \nwant %s, \nhave %s\n", eSpansPayload, string(aSpanPayload))
var aSpans []*model.SpanModel
err = json.Unmarshal(aPayload, &aSpans)
if err != nil {
t.Errorf("failed to parse json payload: %v", err)
}
eSpans := spans[sofar : sofar+len(aSpans)]
sofar += len(aSpans)
onReceive(len(aSpans))

ePayload, err := serializer.Serialize(eSpans)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

if !bytes.Equal(aPayload, ePayload) {
t.Errorf("unexpected span payload\nhave %s\nwant %s", string(aPayload), string(ePayload))
}
}))
}

func TestSpanIsBeingReported(t *testing.T) {
serializer := reporter.JSONSerializer{}

var numSpans int64
eNumSpans := 2
spans := generateSpans(eNumSpans)
ts := newTestServer(t, spans, serializer, func(num int) { atomic.AddInt64(&numSpans, int64(num)) })
defer ts.Close()

rep := zipkinhttp.NewReporter(ts.URL, zipkinhttp.Serializer(serializer))
for _, span := range spans {
rep.Send(*span)
}
rep.Close()

aNumSpans := int(atomic.LoadInt64(&numSpans))
if aNumSpans != eNumSpans {
t.Errorf("unexpected number of spans received\nhave: %d, want: %d", aNumSpans, eNumSpans)
}
}

func TestSpanIsReportedOnTime(t *testing.T) {
serializer := reporter.JSONSerializer{}
batchInterval := 200 * time.Millisecond

var numSpans int64
eNumSpans := 2
spans := generateSpans(eNumSpans)
ts := newTestServer(t, spans, serializer, func(num int) { atomic.AddInt64(&numSpans, int64(num)) })
defer ts.Close()

rep := zipkinhttp.NewReporter(ts.URL)
defer rep.Close()
rep := zipkinhttp.NewReporter(ts.URL,
zipkinhttp.Serializer(serializer),
zipkinhttp.BatchInterval(batchInterval))

for _, span := range spans {
rep.Send(*span)
}

time.Sleep(3 * batchInterval / 2)

aNumSpans := int(atomic.LoadInt64(&numSpans))
if aNumSpans != eNumSpans {
t.Errorf("unexpected number of spans received\nhave: %d, want: %d", aNumSpans, eNumSpans)
}

rep.Close()
}

func TestSpanIsReportedAfterBatchSize(t *testing.T) {
serializer := reporter.JSONSerializer{}
batchSize := 2

var numSpans int64
eNumSpans := 6
spans := generateSpans(eNumSpans)
ts := newTestServer(t, spans, serializer, func(num int) { atomic.AddInt64(&numSpans, int64(num)) })
defer ts.Close()

rep := zipkinhttp.NewReporter(ts.URL,
zipkinhttp.Serializer(serializer),
zipkinhttp.BatchSize(batchSize))

for _, span := range spans[:batchSize] {
rep.Send(*span)
}

time.Sleep(100 * time.Millisecond)

aNumSpans := int(atomic.LoadInt64(&numSpans))
if aNumSpans != batchSize {
t.Errorf("unexpected number of spans received\nhave: %d, want: %d", aNumSpans, batchSize)
}

for _, span := range spans[batchSize:] {
rep.Send(*span)
}

rep.Close()

for _, span := range aSpans {
rep.Send(span)
aNumSpans = int(atomic.LoadInt64(&numSpans))
if aNumSpans != eNumSpans {
t.Errorf("unexpected number of spans received\nhave: %d, want: %d", aNumSpans, eNumSpans)
}
}

0 comments on commit c29478e

Please sign in to comment.