diff --git a/.travis.yml b/.travis.yml index f0431a6fb..4093e1889 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,5 @@ language: go sudo: false - go: - 1.4 - 1.5 @@ -16,8 +15,8 @@ script: - go fmt ./... - go vet ./... - go test -i -race ./... -- GOMAXPROCS=1 go test -v -race ./... -- GOMAXPROCS=1 go test -v -covermode=count -coverprofile=coverage.out +- go test -v -race ./... +- go test -v -covermode=count -coverprofile=coverage.out - $HOME/gopath/bin/goveralls -coverprofile coverage.out -service travis-ci env: global: diff --git a/nats.go b/nats.go index ccf86ae85..22c201ec8 100644 --- a/nats.go +++ b/nats.go @@ -1081,15 +1081,18 @@ func (nc *Conn) flusher() { defer nc.wg.Done() // snapshot the bw and conn since they can change from underneath of us. + nc.mu.Lock() bw := nc.bw conn := nc.conn + fch := nc.fch + nc.mu.Unlock() if conn == nil || bw == nil { return } for { - if _, ok := <-nc.fch; !ok { + if _, ok := <-fch; !ok { return } nc.mu.Lock() @@ -1619,6 +1622,9 @@ func (nc *Conn) resetPendingFlush() { // This will clear any pending flush calls and release pending calls. func (nc *Conn) clearPendingFlushCalls() { + nc.mu.Lock() + defer nc.mu.Unlock() + // Clear any queued pongs, e.g. pending flush calls. for _, ch := range nc.pongs { if ch != nil { diff --git a/test/sub_test.go b/test/sub_test.go index 405dfba6a..649f95b96 100644 --- a/test/sub_test.go +++ b/test/sub_test.go @@ -221,9 +221,14 @@ func TestAsyncErrHandler(t *testing.T) { ch := make(chan bool) + aeCalled := false + nc.Opts.AsyncErrorCB = func(c *nats.Conn, s *nats.Subscription, e error) { // Suppress additional calls - nc.Opts.AsyncErrorCB = nil + if aeCalled { + return + } + aeCalled = true if s != sub { t.Fatal("Did not receive proper subscription")