Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: pushkarm029 <pushkarmishra029@gmail.com>
  • Loading branch information
Pushkarm029 committed May 21, 2024
1 parent f115849 commit df0bcea
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 67 deletions.
1 change: 1 addition & 0 deletions cmd/collector/app/sampling/strategystore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ type Factory interface {
// CreateStrategyStore initializes the StrategyStore and returns it.
CreateStrategyStore() (StrategyStore, Aggregator, error)

// Close closes the factory
Close() error
}
17 changes: 9 additions & 8 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,9 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S
if ssFactory == nil {
return errors.New("sampling store factory is nil. Please configure a backend that supports adaptive sampling")
}

var err error
f.logger = logger
f.metricsFactory = metricsFactory
f.participant = leaderelection.NewElectionParticipant(f.lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
FollowerLeaseRefreshInterval: f.options.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: f.options.LeaderLeaseRefreshInterval,
Logger: f.logger,
})
f.lock, err = ssFactory.CreateLock()
if err != nil {
return err
Expand All @@ -85,21 +79,28 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S
if err != nil {
return err
}
f.participant = leaderelection.NewElectionParticipant(f.lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
FollowerLeaseRefreshInterval: f.options.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: f.options.LeaderLeaseRefreshInterval,
Logger: f.logger,
})
f.participant.Start()

return nil
}

// CreateStrategyStore implements strategystore.Factory
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error) {
f.participant.Start()
p, err := NewStrategyStore(*f.options, f.logger, f.participant, f.store)
if err != nil {
return nil, nil, err

Check warning on line 96 in plugin/sampling/strategystore/adaptive/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/sampling/strategystore/adaptive/factory.go#L96

Added line #L96 was not covered by tests
}
p.Start()
a, err := NewAggregator(*f.options, f.logger, f.metricsFactory, f.participant, f.store)
if err != nil {
return nil, nil, err
}

p.Start()
a.Start()

return p, a, nil
Expand Down
2 changes: 2 additions & 0 deletions plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestFactory(t *testing.T) {
require.NoError(t, err)
require.NoError(t, store.Close())
require.NoError(t, aggregator.Close())
require.NoError(t, f.Close())
}

func TestBadConfigFail(t *testing.T) {
Expand All @@ -97,6 +98,7 @@ func TestBadConfigFail(t *testing.T) {
require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop()))
_, _, err := f.CreateStrategyStore()
require.Error(t, err)
require.NoError(t, f.Close())
}
}

Expand Down
82 changes: 23 additions & 59 deletions plugin/sampling/strategystore/adaptive/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,9 @@ func TestRunCalculationLoop(t *testing.T) {
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(testThroughputs(), nil)
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage())
mockStorage.On("InsertProbabilitiesAndQPS", "host", mock.AnythingOfType("model.ServiceOperationProbabilities"),
mockStorage.On("InsertProbabilitiesAndQPS", mock.AnythingOfType("string"), mock.AnythingOfType("model.ServiceOperationProbabilities"),
mock.AnythingOfType("model.ServiceOperationQPS")).Return(errTestStorage())
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(errTestStorage())
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
Expand All @@ -357,29 +358,35 @@ func TestRunCalculationLoop(t *testing.T) {
FollowerLeaseRefreshInterval: time.Second,
BucketsForCalculation: 10,
}
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger)
agg, err := NewAggregator(cfg, logger, metrics.NullFactory, mockEP, mockStorage)
require.NoError(t, err)
p.Start()
agg.Start()
defer agg.Close()

for i := 0; i < 1000; i++ {
strategy, _ := p.GetSamplingStrategy(context.Background(), "svcA")
if len(strategy.OperationSampling.PerOperationStrategies) != 0 {
agg.(*aggregator).Lock()
probabilities := agg.(*aggregator).processor.probabilities
agg.(*aggregator).Unlock()
if len(probabilities) != 0 {
break
}
time.Sleep(time.Millisecond)
}
p.Close()

strategy, err := p.GetSamplingStrategy(context.Background(), "svcA")
require.NoError(t, err)
assert.Len(t, strategy.OperationSampling.PerOperationStrategies, 2)
probabilities := agg.(*aggregator).processor.probabilities
require.Len(t, probabilities["svcA"], 2)
}

func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
logger, logBuffer := testutils.NewLogger()
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(nil, errTestStorage())
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage())
mockStorage.On("InsertProbabilitiesAndQPS", mock.AnythingOfType("string"), mock.AnythingOfType("model.ServiceOperationProbabilities"),
mock.AnythingOfType("model.ServiceOperationQPS")).Return(errTestStorage())
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(errTestStorage())

mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
Expand All @@ -390,12 +397,9 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
AggregationBuckets: 2,
BucketsForCalculation: 10,
}
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger)
agg, err := NewAggregator(cfg, logger, metrics.NullFactory, mockEP, mockStorage)
require.NoError(t, err)
p.shutdown = make(chan struct{})
defer close(p.shutdown)
go p.runCalculationLoop()

agg.Start()
for i := 0; i < 1000; i++ {
// match logs specific to getThroughputErrMsg. We expect to see more than 2, once during
// initialization and one or more times during the loop.
Expand All @@ -406,13 +410,14 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
}
match, errMsg := testutils.LogMatcher(2, getThroughputErrMsg, logBuffer.Lines())
assert.True(t, match, errMsg)
require.NoError(t, agg.Close())
}

func TestLoadProbabilities(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)

p := &Processor{storage: mockStorage}
p := &StrategyStore{storage: mockStorage}
require.Nil(t, p.probabilities)
p.loadProbabilities()
require.NotNil(t, p.probabilities)
Expand All @@ -426,7 +431,7 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) {
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(false)

p := &Processor{
p := &StrategyStore{
storage: mockStorage,
shutdown: make(chan struct{}),
followerRefreshInterval: time.Millisecond,
Expand Down Expand Up @@ -480,7 +485,7 @@ func TestRealisticRunCalculationLoop(t *testing.T) {
AggregationBuckets: 1,
Delay: time.Second * 10,
}
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger)
p, err := NewStrategyStore(cfg, logger, mockEP, mockStorage)
require.NoError(t, err)
p.Start()

Expand Down Expand Up @@ -557,7 +562,7 @@ func TestGenerateStrategyResponses(t *testing.T) {
"GET": 0.5,
},
}
p := &Processor{
p := &StrategyStore{
probabilities: probabilities,
Options: Options{
InitialSamplingProbability: 0.001,
Expand Down Expand Up @@ -853,44 +858,3 @@ func TestCalculateProbabilitiesAndQPSMultiple(t *testing.T) {
p.probabilities = probabilities
p.qps = qps
}

func TestErrors(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage())
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(nil, nil)

cfg := Options{
TargetSamplesPerSecond: 1.0,
DeltaTolerance: 0.1,
InitialSamplingProbability: 0.001,
CalculationInterval: time.Millisecond * 5,
AggregationBuckets: 2,
Delay: time.Millisecond * 5,
LeaderLeaseRefreshInterval: time.Millisecond,
FollowerLeaseRefreshInterval: time.Second,
BucketsForCalculation: 10,
}

// start errors
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(errors.New("bad"))
mockEP.On("Close").Return(errors.New("also bad"))
mockEP.On("IsLeader").Return(false)

p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
require.Error(t, p.Start())
require.Error(t, p.Close())

// close errors
mockEP = &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(errors.New("still bad"))
mockEP.On("IsLeader").Return(false)

p, err = newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
require.NoError(t, p.Start())
require.Error(t, p.Close())
}
9 changes: 9 additions & 0 deletions plugin/sampling/strategystore/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ func TestNewFactory(t *testing.T) {
require.NoError(t, f.Initialize(metrics.NullFactory, mockSSFactory, zap.NewNop()))
_, _, err = f.CreateStrategyStore()
require.NoError(t, err)
require.NoError(t, f.Close())

// force the mock to return errors
mock.retError = true
require.EqualError(t, f.Initialize(metrics.NullFactory, mockSSFactory, zap.NewNop()), "error initializing store")
_, _, err = f.CreateStrategyStore()
require.EqualError(t, err, "error creating store")
require.EqualError(t, f.Close(), "error closing store")

// request something that doesn't exist
f.StrategyStoreType = "doesntexist"
Expand Down Expand Up @@ -144,6 +146,13 @@ func (f *mockFactory) Initialize(metricsFactory metrics.Factory, ssFactory stora
return nil
}

func (f *mockFactory) Close() error {
if f.retError {
return errors.New("error closing store")
}
return nil
}

type mockSamplingStoreFactory struct{}

func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) {
Expand Down

0 comments on commit df0bcea

Please sign in to comment.