Skip to content

Commit

Permalink
Remove numBuckets argument from CreateSamplingStore() API
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
yurishkuro committed Jun 16, 2024
1 parent 252980d commit e0efd2b
Show file tree
Hide file tree
Showing 23 changed files with 63 additions and 83 deletions.
9 changes: 2 additions & 7 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
"reflect"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Memory map[string]memory.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.ConfigV2 `mapstructure:"grpc"`
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Expand All @@ -27,11 +27,6 @@ type Config struct {
// Option: instead of looking for specific name, check interface.
}

type MemoryStorage struct {
Name string `mapstructure:"name"`
memoryCfg.Configuration
}

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
if reflect.DeepEqual(*cfg, *emptyCfg) {
Expand Down
5 changes: 2 additions & 3 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.uber.org/zap"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
Expand Down Expand Up @@ -96,13 +95,13 @@ func (s *starter[Config, Factory]) build(_ context.Context, _ component.Host) er
}

func (s *storageExt) Start(ctx context.Context, host component.Host) error {
memStarter := &starter[memoryCfg.Configuration, *memory.Factory]{
memStarter := &starter[memory.Configuration, *memory.Factory]{

Check warning on line 98 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L98

Added line #L98 was not covered by tests
ext: s,
storageKind: "memory",
cfg: s.config.Memory,
// memory factory does not return an error, so need to wrap it
builder: func(
cfg memoryCfg.Configuration,
cfg memory.Configuration,

Check warning on line 104 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L104

Added line #L104 was not covered by tests
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*memory.Factory, error) {
Expand Down
6 changes: 3 additions & 3 deletions cmd/jaeger/internal/extension/jaegerstorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"go.uber.org/zap"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestStorageExtensionConfigError(t *testing.T) {

func TestStorageExtensionNameConflict(t *testing.T) {
storageExtension := makeStorageExtenion(t, &Config{
Memory: map[string]memoryCfg.Configuration{
Memory: map[string]memory.Configuration{
"foo": {MaxTraces: 10000},
},
Badger: map[string]badgerCfg.NamespaceConfig{
Expand Down Expand Up @@ -219,7 +219,7 @@ func makeStorageExtenion(t *testing.T, config *Config) component.Component {

func startStorageExtension(t *testing.T, memstoreName string) component.Component {
config := &Config{
Memory: map[string]memoryCfg.Configuration{
Memory: map[string]memory.Configuration{
memstoreName: {MaxTraces: 10000},
},
}
Expand Down
25 changes: 0 additions & 25 deletions pkg/memory/config/empty_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion plugin/sampling/strategyprovider/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S
if err != nil {
return err
}
f.store, err = ssFactory.CreateSamplingStore(f.options.AggregationBuckets)
f.store, err = ssFactory.CreateSamplingStore()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategyprovider/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) {
return mockLock, nil
}

func (m *mockSamplingStoreFactory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (m *mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) {
if m.storeFailsWith != nil {
return nil, m.storeFailsWith
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategyprovider/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,6 @@ func (*mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) {
return nil, nil
}

func (*mockSamplingStoreFactory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (*mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return badgerSampling.NewSamplingStore(f.store), nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) {
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateLock()
require.NoError(t, err)

_, err = f.CreateSamplingStore(0)
_, err = f.CreateSamplingStore()
require.NoError(t, err)

require.NoError(t, f.Close())
Expand Down
10 changes: 6 additions & 4 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ const (
var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
// TODO does not implement CreateLock !
// _ storage.SamplingStoreFactory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
)

// Factory implements storage.Factory for Elasticsearch backend.
Expand Down Expand Up @@ -296,7 +298,7 @@ func createSpanWriter(
return writer, nil
}

func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
params := esSampleStore.Params{
Client: f.getPrimaryClient,
Logger: f.logger,
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestElasticsearchFactory(t *testing.T) {
_, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)

_, err = f.CreateSamplingStore(1)
_, err = f.CreateSamplingStore()
require.NoError(t, err)

require.NoError(t, f.Close())
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestCreateTemplateError(t *testing.T) {
assert.Nil(t, w)
require.Error(t, err, "template-error")

s, err := f.CreateSamplingStore(1)
s, err := f.CreateSamplingStore()
assert.Nil(t, s)
require.Error(t, err, "template-error")
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
s.SpanReader, err = s.factory.CreateSpanReader()
require.NoError(t, err)

s.SamplingStore, err = s.factory.CreateSamplingStore(0)
s.SamplingStore, err = s.factory.CreateSamplingStore()
require.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) {
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)
s.SamplingStore, err = f.CreateSamplingStore(0)
s.SamplingStore, err = f.CreateSamplingStore()
require.NoError(t, err)
s.initializeDependencyReaderAndWriter(t, f)
t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
require.NoError(t, err)
s.DependencyWriter = s.DependencyReader.(dependencystore.Writer)

s.SamplingStore, err = f.CreateSamplingStore(1)
s.SamplingStore, err = f.CreateSamplingStore()
require.NoError(t, err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package config
package memory

// Configuration describes the options to customize the storage behavior
type Configuration struct {
// MaxTraces is the maximum number of traces that can be stored in memory in FIFO manner.
MaxTraces int `mapstructure:"max_traces"`

// SamplingAggregationBuckets is used with adaptive sampling to control how many buckets
// of trace throughput is stored in memory.
SamplingAggregationBuckets int `mapstructure:"sampling_aggregation_buckets"`
}
14 changes: 7 additions & 7 deletions plugin/storage/memory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/jaegertracing/jaeger/internal/safeexpvar"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -54,12 +53,12 @@ func NewFactory() *Factory {

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg config.Configuration,
cfg Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) *Factory {
f := NewFactory()
f.configureFromOptions(Options{Configuration: cfg})
f.configureFromOptions(Options{Config: cfg})

Check warning on line 61 in plugin/storage/memory/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/memory/factory.go#L61

Added line #L61 was not covered by tests
_ = f.Initialize(metricsFactory, logger)
return f
}
Expand All @@ -82,7 +81,7 @@ func (f *Factory) configureFromOptions(opts Options) {
// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
f.store = WithConfiguration(f.options.Configuration)
f.store = WithConfiguration(f.options.Config)
logger.Info("Memory storage initialized", zap.Any("configuration", f.store.defaultConfig))
f.publishOpts()

Expand Down Expand Up @@ -115,8 +114,8 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (*Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return NewSamplingStore(maxBuckets), nil
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return NewSamplingStore(f.options.Config.SamplingAggregationBuckets), nil

Check warning on line 118 in plugin/storage/memory/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/memory/factory.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}

// CreateLock implements storage.SamplingStoreFactory
Expand All @@ -125,5 +124,6 @@ func (*Factory) CreateLock() (distributedlock.Lock, error) {
}

func (f *Factory) publishOpts() {
safeexpvar.SetInt("jaeger_storage_memory_max_traces", int64(f.options.Configuration.MaxTraces))
safeexpvar.SetInt("jaeger_storage_memory_max_traces", int64(f.options.Config.MaxTraces))
safeexpvar.SetInt("jaeger_storage_memory_sampling_aggregation_buckets", int64(f.options.Config.MaxTraces))
}
9 changes: 4 additions & 5 deletions plugin/storage/memory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
memCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/storage"
)
Expand All @@ -44,7 +43,7 @@ func TestMemoryStorageFactory(t *testing.T) {
depReader, err := f.CreateDependencyReader()
require.NoError(t, err)
assert.Equal(t, f.store, depReader)
samplingStore, err := f.CreateSamplingStore(2)
samplingStore, err := f.CreateSamplingStore()
require.NoError(t, err)
assert.Equal(t, 2, samplingStore.(*SamplingStore).maxBuckets)
lock, err := f.CreateLock()
Expand All @@ -57,15 +56,15 @@ func TestWithConfiguration(t *testing.T) {
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{"--memory.max-traces=100"})
f.InitFromViper(v, zap.NewNop())
assert.Equal(t, 100, f.options.Configuration.MaxTraces)
assert.Equal(t, 100, f.options.Config.MaxTraces)
}

func TestNewFactoryWithConfig(t *testing.T) {
cfg := memCfg.Configuration{
cfg := Configuration{
MaxTraces: 42,
}
f := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
assert.Equal(t, cfg, f.options.Configuration)
assert.Equal(t, cfg, f.options.Config)
}

func TestPublishOpts(t *testing.T) {
Expand Down
13 changes: 6 additions & 7 deletions plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand All @@ -36,7 +35,7 @@ type Store struct {
sync.RWMutex
// Each tenant gets a copy of default config.
// In the future this can be extended to contain per-tenant configuration.
defaultConfig config.Configuration
defaultConfig Configuration
perTenant map[string]*Tenant
}

Expand All @@ -48,24 +47,24 @@ type Tenant struct {
services map[string]struct{}
operations map[string]map[spanstore.Operation]struct{}
deduper adjuster.Adjuster
config config.Configuration
config Configuration
index int
}

// NewStore creates an unbounded in-memory store
func NewStore() *Store {
return WithConfiguration(config.Configuration{MaxTraces: 0})
return WithConfiguration(Configuration{MaxTraces: 0})
}

// WithConfiguration creates a new in memory storage based on the given configuration
func WithConfiguration(configuration config.Configuration) *Store {
func WithConfiguration(config Configuration) *Store {
return &Store{
defaultConfig: configuration,
defaultConfig: config,
perTenant: make(map[string]*Tenant),
}
}

func newTenant(cfg config.Configuration) *Tenant {
func newTenant(cfg Configuration) *Tenant {
return &Tenant{
ids: make([]*model.TraceID, cfg.MaxTraces),
traces: map[model.TraceID]*model.Trace{},
Expand Down
3 changes: 1 addition & 2 deletions plugin/storage/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down Expand Up @@ -170,7 +169,7 @@ func TestStoreWriteSpan(t *testing.T) {

func TestStoreWithLimit(t *testing.T) {
maxTraces := 100
store := WithConfiguration(config.Configuration{MaxTraces: maxTraces})
store := WithConfiguration(Configuration{MaxTraces: maxTraces})

for i := 0; i < maxTraces*2; i++ {
id := model.NewTraceID(1, uint64(i))
Expand Down
Loading

0 comments on commit e0efd2b

Please sign in to comment.