Skip to content

Commit

Permalink
Refactor Adaptive Sampling Aggregator & Strategy Store (#5441)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- part of #5389

## Description of the changes
- processor is co-located in strategy_store and aggregator.
- In aggregator to run `generateStrategyResponses`,
`runCalculationLoop`.
- In strategy_store to run `loadProbabilities`,
`runUpdateProbabilitiesLoop`

## How was this change tested?
- `make test`

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Pushkar Mishra <pushkarmishra029@gmail.com>
Signed-off-by: pushkarm029 <pushkarmishra029@gmail.com>
  • Loading branch information
Pushkarm029 committed May 21, 2024
1 parent 486f74f commit c9fe891
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 193 deletions.
3 changes: 3 additions & 0 deletions cmd/collector/app/sampling/strategystore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ type Factory interface {

// CreateStrategyStore initializes the StrategyStore and returns it.
CreateStrategyStore() (StrategyStore, Aggregator, error)

// Close closes the factory
Close() error
}
3 changes: 3 additions & 0 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func main() {
if err := storageFactory.Close(); err != nil {
logger.Error("Failed to close storage factory", zap.Error(err))
}
if err := strategyStoreFactory.Close(); err != nil {
logger.Error("Failed to close sampling strategy store factory", zap.Error(err))
}
})
return nil
},
Expand Down
36 changes: 31 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
span_model "github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/hostname"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

Expand All @@ -35,22 +39,36 @@ type aggregator struct {
operationsCounter metrics.Counter
servicesCounter metrics.Counter
currentThroughput serviceOperationThroughput
processor *Processor
aggregationInterval time.Duration
storage samplingstore.Store
stop chan struct{}
bgFinished sync.WaitGroup
}

// NewAggregator creates a throughput aggregator that simply emits metrics
// about the number of operations seen over the aggregationInterval.
func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, storage samplingstore.Store) strategystore.Aggregator {
func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.Factory, participant leaderelection.ElectionParticipant, store samplingstore.Store) (strategystore.Aggregator, error) {
hostname, err := hostname.AsIdentifier()
if err != nil {
return nil, err
}
logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname))

processor, err := newProcessor(options, hostname, store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}

return &aggregator{
operationsCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_operations"}),
servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}),
currentThroughput: make(serviceOperationThroughput),
aggregationInterval: interval,
storage: storage,
aggregationInterval: options.CalculationInterval,
processor: processor,
storage: store,
stop: make(chan struct{}),
}
}, nil
}

func (a *aggregator) runAggregationLoop() {
Expand All @@ -61,6 +79,7 @@ func (a *aggregator) runAggregationLoop() {
a.Lock()
a.saveThroughput()
a.currentThroughput = make(serviceOperationThroughput)
a.processor.runCalculation()
a.Unlock()
case <-a.stop:
ticker.Stop()
Expand Down Expand Up @@ -111,10 +130,17 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa
}

func (a *aggregator) Start() {
go a.runAggregationLoop()
a.processor.Start()

a.bgFinished.Add(1)
go func() {
a.runAggregationLoop()
a.bgFinished.Done()
}()
}

func (a *aggregator) Close() error {
close(a.stop)
a.bgFinished.Wait()
return nil
}
40 changes: 35 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks"
"github.com/jaegertracing/jaeger/storage/samplingstore/mocks"
)

Expand All @@ -32,8 +35,19 @@ func TestAggregator(t *testing.T) {

mockStorage := &mocks.Store{}
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(nil)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(true)
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("B", "POST", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("C", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -60,15 +74,23 @@ func TestAggregator(t *testing.T) {
func TestIncrementThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 different probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001*float64(i))
}
assert.Len(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities, 10)

a = NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err = NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 of the same probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -79,8 +101,16 @@ func TestIncrementThroughput(t *testing.T) {
func TestLowerboundThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeLowerBound, 0.001)
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count)
assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"])
Expand Down
25 changes: 20 additions & 5 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)
Expand All @@ -38,6 +39,7 @@ type Factory struct {
metricsFactory metrics.Factory
lock distributedlock.Lock
store samplingstore.Store
participant *leaderelection.DistributedElectionParticipant
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -66,7 +68,6 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S
if ssFactory == nil {
return errors.New("sampling store factory is nil. Please configure a backend that supports adaptive sampling")
}

var err error
f.logger = logger
f.metricsFactory = metricsFactory
Expand All @@ -78,17 +79,31 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S
if err != nil {
return err
}
f.participant = leaderelection.NewElectionParticipant(f.lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
FollowerLeaseRefreshInterval: f.options.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: f.options.LeaderLeaseRefreshInterval,
Logger: f.logger,
})
f.participant.Start()

return nil
}

// CreateStrategyStore implements strategystore.Factory
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error) {
p, err := NewStrategyStore(*f.options, f.metricsFactory, f.logger, f.lock, f.store)
s := NewStrategyStore(*f.options, f.logger, f.participant, f.store)
a, err := NewAggregator(*f.options, f.logger, f.metricsFactory, f.participant, f.store)
if err != nil {
return nil, nil, err
}
p.Start()
a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store)

s.Start()
a.Start()
return p, a, nil

return s, a, nil
}

// Closes the factory
func (f *Factory) Close() error {
return f.participant.Close()
}
2 changes: 2 additions & 0 deletions plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestFactory(t *testing.T) {
require.NoError(t, err)
require.NoError(t, store.Close())
require.NoError(t, aggregator.Close())
require.NoError(t, f.Close())
}

func TestBadConfigFail(t *testing.T) {
Expand All @@ -97,6 +98,7 @@ func TestBadConfigFail(t *testing.T) {
require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop()))
_, _, err := f.CreateStrategyStore()
require.Error(t, err)
require.NoError(t, f.Close())
}
}

Expand Down

0 comments on commit c9fe891

Please sign in to comment.