Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Adaptive Sampling Aggregator & Strategy Store #5441

Merged
merged 19 commits into from
May 21, 2024
54 changes: 47 additions & 7 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package adaptive

import (
"errors"
"sync"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
span_model "github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/hostname"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

Expand All @@ -35,32 +40,51 @@ type aggregator struct {
operationsCounter metrics.Counter
servicesCounter metrics.Counter
currentThroughput serviceOperationThroughput
postAggregator *PostAggregator
aggregationInterval time.Duration
storage samplingstore.Store
stop chan struct{}
bgFinished sync.WaitGroup
}

// NewAggregator creates a throughput aggregator that simply emits metrics
// about the number of operations seen over the aggregationInterval.
func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, storage samplingstore.Store) strategystore.Aggregator {
func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.Factory, participant leaderelection.ElectionParticipant, store samplingstore.Store) (strategystore.Aggregator, error) {
hostname, err := hostname.AsIdentifier()
if err != nil {
return nil, err
}
logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname))

postAgg, err := newPostAggregator(options, hostname, store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}

return &aggregator{
operationsCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_operations"}),
servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}),
currentThroughput: make(serviceOperationThroughput),
aggregationInterval: interval,
storage: storage,
aggregationInterval: options.CalculationInterval,
postAggregator: postAgg,
storage: store,
stop: make(chan struct{}),
}
}, nil
}

func (a *aggregator) runAggregationLoop() {
ticker := time.NewTicker(a.aggregationInterval)

// NB: the first tick will be slightly delayed by the initializeThroughput call.
a.postAggregator.lastCheckedTime = time.Now().Add(a.postAggregator.Delay * -1)
a.postAggregator.initializeThroughput(a.postAggregator.lastCheckedTime)
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ticker.C:
a.Lock()
a.saveThroughput()
a.currentThroughput = make(serviceOperationThroughput)
a.postAggregator.runCalculation()
a.Unlock()
case <-a.stop:
ticker.Stop()
Expand Down Expand Up @@ -111,10 +135,26 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa
}

func (a *aggregator) Start() {
go a.runAggregationLoop()
a.postAggregator.Start()
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
a.runBackground(a.runAggregationLoop)
}

func (a *aggregator) Close() error {
close(a.stop)
return nil
var errs []error
if err := a.postAggregator.Close(); err != nil {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
errs = append(errs, err)
}
if a.stop != nil {
close(a.stop)
}
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
a.bgFinished.Wait()
return errors.Join(errs...)
}

func (a *aggregator) runBackground(f func()) {
a.bgFinished.Add(1)
go func() {
f()
a.bgFinished.Done()
}()
}
40 changes: 35 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks"
"github.com/jaegertracing/jaeger/storage/samplingstore/mocks"
)

Expand All @@ -32,8 +35,19 @@ func TestAggregator(t *testing.T) {

mockStorage := &mocks.Store{}
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(nil)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(true)
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("B", "POST", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("C", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -60,15 +74,23 @@ func TestAggregator(t *testing.T) {
func TestIncrementThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 different probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001*float64(i))
}
assert.Len(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities, 10)

a = NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err = NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 of the same probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -79,8 +101,16 @@ func TestIncrementThroughput(t *testing.T) {
func TestLowerboundThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeLowerBound, 0.001)
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count)
assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"])
Expand Down
20 changes: 16 additions & 4 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)
Expand Down Expand Up @@ -83,12 +84,23 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S

// CreateStrategyStore implements strategystore.Factory
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error) {
p, err := NewStrategyStore(*f.options, f.metricsFactory, f.logger, f.lock, f.store)
participant := leaderelection.NewElectionParticipant(f.lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
FollowerLeaseRefreshInterval: f.options.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: f.options.LeaderLeaseRefreshInterval,
Logger: f.logger,
})

ss, err := NewStrategyStore(*f.options, f.logger, participant, f.store)
if err != nil {
return nil, nil, err
}
ss.Start()

a, err := NewAggregator(*f.options, f.logger, f.metricsFactory, participant, f.store)
if err != nil {
return nil, nil, err
}
p.Start()
a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store)
a.Start()
return p, a, nil

return ss, a, nil
}