Skip to content

Commit

Permalink
Remove numBuckets argument from CreateSamplingStore() API
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
yurishkuro committed Jun 17, 2024
1 parent d8185ac commit b6c45b8
Show file tree
Hide file tree
Showing 26 changed files with 91 additions and 129 deletions.
24 changes: 24 additions & 0 deletions cmd/collector/app/server/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,35 @@
package server

import (
"context"
"testing"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

type mockSamplingProvider struct{}

func (mockSamplingProvider) GetSamplingStrategy(context.Context, string /* serviceName */) (*api_v2.SamplingStrategyResponse, error) {
return nil, nil
}

func (mockSamplingProvider) Close() error {
return nil
}

type mockSpanProcessor struct{}

func (*mockSpanProcessor) Close() error {
return nil
}

func (*mockSpanProcessor) ProcessSpans([]*model.Span, processor.SpansOptions) ([]bool, error) {
return []bool{}, nil
}

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
43 changes: 0 additions & 43 deletions cmd/collector/app/server/test.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/model"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

type storageHost struct {
Expand Down Expand Up @@ -143,7 +143,7 @@ func makeStorageExtension(t *testing.T, memstoreName string) storageHost {
TracerProvider: nooptrace.NewTracerProvider(),
},
},
&jaegerstorage.Config{Memory: map[string]memoryCfg.Configuration{
&jaegerstorage.Config{Memory: map[string]memory.Configuration{
memstoreName: {MaxTraces: 10000},
}})
require.NoError(t, err)
Expand Down
9 changes: 2 additions & 7 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
"reflect"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Memory map[string]memory.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.ConfigV2 `mapstructure:"grpc"`
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Expand All @@ -27,11 +27,6 @@ type Config struct {
// Option: instead of looking for specific name, check interface.
}

type MemoryStorage struct {
Name string `mapstructure:"name"`
memoryCfg.Configuration
}

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
if reflect.DeepEqual(*cfg, *emptyCfg) {
Expand Down
5 changes: 2 additions & 3 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.uber.org/zap"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
Expand Down Expand Up @@ -96,13 +95,13 @@ func (s *starter[Config, Factory]) build(_ context.Context, _ component.Host) er
}

func (s *storageExt) Start(ctx context.Context, host component.Host) error {
memStarter := &starter[memoryCfg.Configuration, *memory.Factory]{
memStarter := &starter[memory.Configuration, *memory.Factory]{
ext: s,
storageKind: "memory",
cfg: s.config.Memory,
// memory factory does not return an error, so need to wrap it
builder: func(
cfg memoryCfg.Configuration,
cfg memory.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*memory.Factory, error) {
Expand Down
6 changes: 3 additions & 3 deletions cmd/jaeger/internal/extension/jaegerstorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"go.uber.org/zap"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestStorageExtensionConfigError(t *testing.T) {

func TestStorageExtensionNameConflict(t *testing.T) {
storageExtension := makeStorageExtenion(t, &Config{
Memory: map[string]memoryCfg.Configuration{
Memory: map[string]memory.Configuration{
"foo": {MaxTraces: 10000},
},
Badger: map[string]badgerCfg.NamespaceConfig{
Expand Down Expand Up @@ -219,7 +219,7 @@ func makeStorageExtenion(t *testing.T, config *Config) component.Component {

func startStorageExtension(t *testing.T, memstoreName string) component.Component {
config := &Config{
Memory: map[string]memoryCfg.Configuration{
Memory: map[string]memory.Configuration{
memstoreName: {MaxTraces: 10000},
},
}
Expand Down
25 changes: 0 additions & 25 deletions pkg/memory/config/empty_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion plugin/sampling/strategyprovider/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S
if err != nil {
return err
}
f.store, err = ssFactory.CreateSamplingStore(f.options.AggregationBuckets)
f.store, err = ssFactory.CreateSamplingStore()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategyprovider/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) {
return mockLock, nil
}

func (m *mockSamplingStoreFactory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (m *mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) {
if m.storeFailsWith != nil {
return nil, m.storeFailsWith
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategyprovider/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,6 @@ func (*mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) {
return nil, nil
}

func (*mockSamplingStoreFactory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (*mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return badgerSampling.NewSamplingStore(f.store), nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) {
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateLock()
require.NoError(t, err)

_, err = f.CreateSamplingStore(0)
_, err = f.CreateSamplingStore()
require.NoError(t, err)

require.NoError(t, f.Close())
Expand Down
10 changes: 6 additions & 4 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ const (
var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
// TODO does not implement CreateLock !
// _ storage.SamplingStoreFactory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
)

// Factory implements storage.Factory for Elasticsearch backend.
Expand Down Expand Up @@ -296,7 +298,7 @@ func createSpanWriter(
return writer, nil
}

func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
params := esSampleStore.Params{
Client: f.getPrimaryClient,
Logger: f.logger,
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestElasticsearchFactory(t *testing.T) {
_, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)

_, err = f.CreateSamplingStore(1)
_, err = f.CreateSamplingStore()
require.NoError(t, err)

require.NoError(t, f.Close())
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestCreateTemplateError(t *testing.T) {
assert.Nil(t, w)
require.Error(t, err, "template-error")

s, err := f.CreateSamplingStore(1)
s, err := f.CreateSamplingStore()
assert.Nil(t, s)
require.Error(t, err, "template-error")
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
s.SpanReader, err = s.factory.CreateSpanReader()
require.NoError(t, err)

s.SamplingStore, err = s.factory.CreateSamplingStore(0)
s.SamplingStore, err = s.factory.CreateSamplingStore()
require.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) {
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)
s.SamplingStore, err = f.CreateSamplingStore(0)
s.SamplingStore, err = f.CreateSamplingStore()
require.NoError(t, err)
s.initializeDependencyReaderAndWriter(t, f)
t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
require.NoError(t, err)
s.DependencyWriter = s.DependencyReader.(dependencystore.Writer)

s.SamplingStore, err = f.CreateSamplingStore(1)
s.SamplingStore, err = f.CreateSamplingStore()
require.NoError(t, err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package config
package memory

// Configuration describes the options to customize the storage behavior
type Configuration struct {
// MaxTraces is the maximum number of traces that can be stored in memory in FIFO manner.
MaxTraces int `mapstructure:"max_traces"`

// SamplingAggregationBuckets is used with adaptive sampling to control how many buckets
// of trace throughput is stored in memory.
SamplingAggregationBuckets int `mapstructure:"sampling_aggregation_buckets"`
}
14 changes: 7 additions & 7 deletions plugin/storage/memory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/jaegertracing/jaeger/internal/safeexpvar"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -54,12 +53,12 @@ func NewFactory() *Factory {

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg config.Configuration,
cfg Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) *Factory {
f := NewFactory()
f.configureFromOptions(Options{Configuration: cfg})
f.configureFromOptions(Options{Config: cfg})
_ = f.Initialize(metricsFactory, logger)
return f
}
Expand All @@ -82,7 +81,7 @@ func (f *Factory) configureFromOptions(opts Options) {
// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
f.store = WithConfiguration(f.options.Configuration)
f.store = WithConfiguration(f.options.Config)
logger.Info("Memory storage initialized", zap.Any("configuration", f.store.defaultConfig))
f.publishOpts()

Expand Down Expand Up @@ -115,8 +114,8 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (*Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return NewSamplingStore(maxBuckets), nil
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return NewSamplingStore(f.options.Config.SamplingAggregationBuckets), nil
}

// CreateLock implements storage.SamplingStoreFactory
Expand All @@ -125,5 +124,6 @@ func (*Factory) CreateLock() (distributedlock.Lock, error) {
}

func (f *Factory) publishOpts() {
safeexpvar.SetInt("jaeger_storage_memory_max_traces", int64(f.options.Configuration.MaxTraces))
safeexpvar.SetInt("jaeger_storage_memory_max_traces", int64(f.options.Config.MaxTraces))
safeexpvar.SetInt("jaeger_storage_memory_sampling_aggregation_buckets", int64(f.options.Config.MaxTraces))
}
Loading

0 comments on commit b6c45b8

Please sign in to comment.