Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Fix undefined switch logic when reporter is closed #239

Closed
wants to merge 10 commits into from

Conversation

black-adder
Copy link
Contributor

There's a race condition in remoteReporter.Report() where if both r.closed and r.queue channels are closed, the switch stmt will pick one arbitrarily causing a panic if Report is called post close. This PR keeps r.queue open so that even if somehow the queue is filled after close() is called, it will never be flushed. An atomic flag might've worked here but didn't want to add extra sync given the channels already do that for us.

Signed-off-by: Won Jun Jang <wjang@uber.com>
@codecov
Copy link

codecov bot commented Dec 6, 2017

Codecov Report

Merging #239 into master will increase coverage by 0.06%.
The diff coverage is 71.42%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #239      +/-   ##
=========================================
+ Coverage   83.24%   83.3%   +0.06%     
=========================================
  Files          51      51              
  Lines        2691    2696       +5     
=========================================
+ Hits         2240    2246       +6     
  Misses        326     326              
+ Partials      125     124       -1
Impacted Files Coverage Δ
transport/zipkin/http.go 78.68% <50%> (+3.24%) ⬆️
reporter.go 69.9% <76.47%> (+0.29%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 617d557...be27f64. Read the comment docs.

@black-adder
Copy link
Contributor Author

@alexeykudinkin can you take a look?

reporter.go Outdated
@@ -233,6 +235,12 @@ func (r *remoteReporter) Close() {
// reporting new spans.
func (r *remoteReporter) processQueue() {
timer := time.NewTicker(r.bufferFlushInterval)
close := func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of assigning to built-in function name close. Also, can't you just do defer func() {...}() in this case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, defer func() {...}() should do it

reporter.go Outdated
@@ -233,6 +235,12 @@ func (r *remoteReporter) Close() {
// reporting new spans.
func (r *remoteReporter) processQueue() {
timer := time.NewTicker(r.bufferFlushInterval)
close := func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, defer func() {...}() should do it


// After that, drain the queue
// Cut off report requests still in-flight and drain the queue
// NB we keep r.queue open because it causes a race condition in Report() where since both
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not close r.queue when exiting processQueue()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we run into the same race condition in Report()

@alexeykudinkin
Copy link
Contributor

@black-adder i think we can do it much simpler and just order the checks, splitting the select in 2:

select {
    case <-r.closed:
        r.reporterOptions.logger.Infof("Span not reported since Reporter is already closed: %+v", span)
    case r.queue <- span:
        atomic.AddInt64(&r.queueLength, 1)
    // ...
}

into

select {
    case <-r.closed:
        r.reporterOptions.logger.Infof("Span not reported since Reporter is already closed: %+v", span)
        return
    default:
}

select{
    case r.queue <- span:
        atomic.AddInt64(&r.queueLength, 1)
    // ...
}

@black-adder
Copy link
Contributor Author

I still think we could potentially run into a race condition (I maybe completely wrong). I'm inlining the original Close() function and the newly suggested Report() function to show a potential race condition (although highly unlikely):

// From Report()
select {
    case <-r.closed:
        r.reporterOptions.logger.Infof("Span not reported since Reporter is already closed: %+v", span)
        return
    default:
}

// From Close()
close(r.closed)
r.queueDrained.Add(1)
close(r.queue)

// From Report(), this will panic
select{
    case r.queue <- span:
        atomic.AddInt64(&r.queueLength, 1)
    // ...
}

Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@black-adder you're right, this very well might happen. If we absolutely need to close the r.queue we need to have the same reportersDrained wait group which would allow to drain all of the reports in-flight before closing the r.queue

reporter.go Outdated
@@ -247,12 +255,12 @@ func (r *remoteReporter) processQueue() {
r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a dead code, given that we're not closing r.queue anymore

Signed-off-by: Won Jun Jang <wjang@uber.com>
@black-adder
Copy link
Contributor Author

I don't really see need to close the queue channel apart from being a bad "citizen"

reporter.go Outdated
@@ -246,13 +248,12 @@ func (r *remoteReporter) processQueue() {
// to reduce the number of gauge stats, we only emit queue length on flush
r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
}
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need to read ok in this case, can remove if ok

Signed-off-by: Won Jun Jang <wjang@uber.com>
@yurishkuro
Copy link
Member

I don't really see need to close the queue channel apart from being a bad "citizen"

fwiw, the Ticker we're using for periodic flushes also doesn't close the channel when you close the ticker.

@black-adder
Copy link
Contributor Author

I somehow introduced some flakiness, will address

Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
@black-adder
Copy link
Contributor Author

Hmm, I figured out the cause of the flakiness. The flakiness fix has made me reconsider the changes this PR made.

essentially what's happening was:

span1.Finish()
tracer.Close()
span2.Finish() 

From a user's perspective, this should save span1 and it's understandable that span2 isn't saved. However with the changes in this PR, span1 isn't guaranteed to be persisted. I might have to add a mutex around the whole class which is unfortunate...

@yurishkuro
Copy link
Member

can you explain why span1 may not be saved? the close() call has a waitgroup that ensures that the queue is drained.

@alexeykudinkin
Copy link
Contributor

@yurishkuro b/c of what i was talking about: Close doesn't wait for outstanding Reports to finish.

@isaachier
Copy link
Contributor

@alexeykudinkin IDK what you mean. Report calls r.queueDrained.Done() once the report is finished. Close blocks on r.queueDrained.Wait() until the aforementioned Done is called.

@alexeykudinkin
Copy link
Contributor

@isaachier processQueue does that, not Report

@isaachier
Copy link
Contributor

My mistake. Do you think adding a wait group to Report would alleviate the problem?

@black-adder
Copy link
Contributor Author

I'm giving up on this solution and gonna just add a mutex: https://github.com/jaegertracing/jaeger-client-go/pull/240/files

@alexeykudinkin
Copy link
Contributor

@black-adder putting mutex will make every Report serialized. I will put up a diff w/o mutex

@yurishkuro
Copy link
Member

I was just looking at lightstep implementation, it's not using channel for the span queue, but an explicit buffer protected by a mutex (mutex on the tracer, which allows things like checking if it's closed). The side effect is that they don't have eager flushing when the packet size is reached, instead they run a flush loop at 500ms frequency.

@isaachier
Copy link
Contributor

@yurishkuro did the same for C++ client but I feel it is very "un-Go" to avoid the channel altogether.

@yurishkuro
Copy link
Member

it is very "un-Go" to avoid the channel altogether.

Channels are not some free magic, they are typically more expensive than a similar explicit code with a mutex. Their benefit is their use in select, since Go's mutex does not provide any signaling primitives (like Object.notify() in Java).

Anyway, I am not suggesting we change the implementation completely. We could simplify the logic by using just a single queue with a command pattern. That is, instead of chan *Span we use chan spanQueueCommand where

type spanQueueCommand struct {
    span *Span
    close *WaitGroup
    flush *WaitGroup // may replace the flushSignal queue, although not necessary
}

This way we don't need to close the queue from the Close() method, we just send a command with close field set to a WG. Any spans that already passed through the Report method will be guaranteed to get processed before the close command is read. One gotcha is that because the queue is bounded, the Close() may need to try inserting the close command repeatedly in a loop, with a small sleep:

func (r *remoteReporter) Close() {
  queueDrained := *sync.WaitGroup()
  queueDrained.Add(1)
  iterations := r.reporterOptions.CloseTimeout / time.MILLISECOND
  for i := 0; i < iterations, i++ {
    select {
       case r.queue <- spanQueueCommand{ close: queueDrained }: // no allocation, I assume
           queueDrained.Wait()
           r.sender.Close()
           return
       default:
           time.Sleep(time.MILLISECOND)
    }
  }
  // maybe log a warning that wasn't able to finish flushing
}

@alexeykudinkin
Copy link
Contributor

@black-adder @yurishkuro addressed in #241

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants