Skip to content

Commit

Permalink
Fix the RemotelyControlledSampler so that it terminates go-routine on…
Browse files Browse the repository at this point in the history
… Close() (#260)

* Fix the RemotelyControlledSampler so that it has the pollController go-routine terminate when Close() is called

Signed-off-by: Scott Kidder <scott@mux.com>
Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
skidder authored and yurishkuro committed Apr 4, 2018
1 parent 6cc524e commit 996debd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
55 changes: 40 additions & 15 deletions sampler.go
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/uber/jaeger-client-go/log"
Expand Down Expand Up @@ -373,13 +374,16 @@ func (s *adaptiveSampler) update(strategies *sampling.PerOperationSamplingStrate
// for the appropriate sampling strategy, constructs a corresponding sampler and
// delegates to it for sampling decisions.
type RemotelyControlledSampler struct {
// These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
// Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
closed int64 // 0 - not closed, 1 - closed

sync.RWMutex
samplerOptions

serviceName string
timer *time.Ticker
manager sampling.SamplingManager
pollStopped sync.WaitGroup
doneChan chan *sync.WaitGroup
}

type httpSamplingManager struct {
Expand All @@ -406,10 +410,9 @@ func NewRemotelyControlledSampler(
sampler := &RemotelyControlledSampler{
samplerOptions: options,
serviceName: serviceName,
timer: time.NewTicker(options.samplingRefreshInterval),
manager: &httpSamplingManager{serverURL: options.samplingServerURL},
doneChan: make(chan *sync.WaitGroup),
}

go sampler.pollController()
return sampler
}
Expand Down Expand Up @@ -449,11 +452,15 @@ func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (boo

// Close implements Close() of Sampler.
func (s *RemotelyControlledSampler) Close() {
s.RLock()
s.timer.Stop()
s.RUnlock()
if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped {
s.logger.Error("Repeated attempt to close the sampler is ignored")
return
}

s.pollStopped.Wait()
var wg sync.WaitGroup
wg.Add(1)
s.doneChan <- &wg
wg.Wait()
}

// Equal implements Equal() of Sampler.
Expand All @@ -471,15 +478,33 @@ func (s *RemotelyControlledSampler) Equal(other Sampler) bool {
}

func (s *RemotelyControlledSampler) pollController() {
// in unit tests we re-assign the timer Ticker, so need to lock to avoid data races
ticker := time.NewTicker(s.samplingRefreshInterval)
defer ticker.Stop()
s.pollControllerWithTicker(ticker)
}

func (s *RemotelyControlledSampler) pollControllerWithTicker(ticker *time.Ticker) {
for {
select {
case <-ticker.C:
s.updateSampler()
case wg := <-s.doneChan:
wg.Done()
return
}
}
}

func (s *RemotelyControlledSampler) getSampler() Sampler {
s.Lock()
timer := s.timer
s.Unlock()
defer s.Unlock()
return s.sampler
}

for range timer.C {
s.updateSampler()
}
s.pollStopped.Add(1)
func (s *RemotelyControlledSampler) setSampler(sampler Sampler) {
s.Lock()
defer s.Unlock()
s.sampler = sampler
}

func (s *RemotelyControlledSampler) updateSampler() {
Expand Down
21 changes: 10 additions & 11 deletions sampler_test.go
Expand Up @@ -310,7 +310,7 @@ func TestRemotelyControlledSampler(t *testing.T) {
agent, remoteSampler, metricsFactory := initAgent(t)
defer agent.Close()

initSampler, ok := remoteSampler.sampler.(*ProbabilisticSampler)
initSampler, ok := remoteSampler.getSampler().(*ProbabilisticSampler)
assert.True(t, ok)

agent.AddSamplingStrategy("client app",
Expand All @@ -320,9 +320,9 @@ func TestRemotelyControlledSampler(t *testing.T) {
{Name: "jaeger.sampler_queries", Tags: map[string]string{"result": "ok"}, Value: 1},
{Name: "jaeger.sampler_updates", Tags: map[string]string{"result": "ok"}, Value: 1},
}...)
_, ok = remoteSampler.sampler.(*ProbabilisticSampler)
s1, ok := remoteSampler.getSampler().(*ProbabilisticSampler)
assert.True(t, ok)
assert.NotEqual(t, initSampler, remoteSampler.sampler, "Sampler should have been updated")
assert.NotEqual(t, initSampler, s1, "Sampler should have been updated")

sampled, tags := remoteSampler.IsSampled(TraceID{Low: testMaxID + 10}, testOperationName)
assert.False(t, sampled)
Expand All @@ -331,20 +331,19 @@ func TestRemotelyControlledSampler(t *testing.T) {
assert.True(t, sampled)
assert.Equal(t, testProbabilisticExpectedTags, tags)

remoteSampler.sampler = initSampler
remoteSampler.setSampler(initSampler)

c := make(chan time.Time)
remoteSampler.Lock()
remoteSampler.timer = &time.Ticker{C: c}
remoteSampler.Unlock()
go remoteSampler.pollController()
ticker := &time.Ticker{C: c}
go remoteSampler.pollControllerWithTicker(ticker)

c <- time.Now() // force update based on timer
time.Sleep(10 * time.Millisecond)
remoteSampler.Close()

_, ok = remoteSampler.sampler.(*ProbabilisticSampler)
s2, ok := remoteSampler.getSampler().(*ProbabilisticSampler)
assert.True(t, ok)
assert.NotEqual(t, initSampler, remoteSampler.sampler, "Sampler should have been updated from timer")
assert.NotEqual(t, initSampler, s2, "Sampler should have been updated from timer")

assert.True(t, remoteSampler.Equal(remoteSampler))
}
Expand Down Expand Up @@ -499,7 +498,7 @@ func (c *fakeSamplingManager) GetSamplingStrategy(serviceName string) (*sampling
func TestRemotelyControlledSampler_updateSamplerFromAdaptiveSampler(t *testing.T) {
agent, remoteSampler, metricsFactory := initAgent(t)
defer agent.Close()
remoteSampler.Close() // stop timer-based updates, we want to call them manually
remoteSampler.Close() // close the second time (initAgent already called Close)

strategies := &sampling.PerOperationSamplingStrategies{
DefaultSamplingProbability: testDefaultSamplingProbability,
Expand Down

0 comments on commit 996debd

Please sign in to comment.