Skip to content

Commit

Permalink
Rename strategy store to sampling strategy provider (#5634)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- The original naming was confusing, as both static & adaptive
implementations were returning, but not storing the sampling strategies.
There was also another confusion between "strategy store" (aka provider)
and "sampling store" (an actual storage that records throughputs and
calculated strategies but only for adaptive sampling).

## Description of the changes
- Rename packages, interfaces, methods, and variables
- Move `calculationstrategy` package inside `adaptive` as it's not used
anywhere else

## How was this change tested?
- `go run ./cmd/collector` and `go run ./cmd/all-in-one` must compile

Signed-off-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
yurishkuro committed Jun 17, 2024
1 parent 082d2e0 commit d8185ac
Show file tree
Hide file tree
Showing 60 changed files with 336 additions and 330 deletions.
38 changes: 19 additions & 19 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -68,13 +68,13 @@ func main() {
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}
strategyStoreFactoryConfig, err := ss.FactoryConfigFromEnv()
samplingStrategyFactoryConfig, err := ss.FactoryConfigFromEnv()
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory config: %v", err)
log.Fatalf("Cannot initialize sampling strategy factory config: %v", err)
}
strategyStoreFactory, err := ss.NewFactory(*strategyStoreFactoryConfig)
samplingStrategyFactory, err := ss.NewFactory(*samplingStrategyFactoryConfig)
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory: %v", err)
log.Fatalf("Cannot initialize sampling strategy factory: %v", err)
}

fc := metricsPlugin.FactoryConfigFromEnv()
Expand Down Expand Up @@ -133,13 +133,13 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create sampling store factory", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v, logger)
if err := strategyStoreFactory.Initialize(collectorMetricsFactory, ssFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
samplingStrategyFactory.InitFromViper(v, logger)
if err := samplingStrategyFactory.Initialize(collectorMetricsFactory, ssFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy factory", zap.Error(err))
}
strategyStore, aggregator, err := strategyStoreFactory.CreateStrategyStore()
samplingProvider, samplingAggregator, err := samplingStrategyFactory.CreateStrategyProvider()
if err != nil {
logger.Fatal("Failed to create sampling strategy store", zap.Error(err))
logger.Fatal("Failed to create sampling strategy provider", zap.Error(err))
}

aOpts := new(agentApp.Builder).InitFromViper(v)
Expand All @@ -161,14 +161,14 @@ by default uses only in-memory database.`,

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
ServiceName: "jaeger-collector",
Logger: logger,
MetricsFactory: collectorMetricsFactory,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
Aggregator: aggregator,
HealthCheck: svc.HC(),
TenancyMgr: tm,
ServiceName: "jaeger-collector",
Logger: logger,
MetricsFactory: collectorMetricsFactory,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
SamplingAggregator: samplingAggregator,
HealthCheck: svc.HC(),
TenancyMgr: tm,
})
if err := c.Start(cOpts); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -238,7 +238,7 @@ by default uses only in-memory database.`,
agentGrpcRep.AddFlags,
collectorFlags.AddFlags,
queryApp.AddFlags,
strategyStoreFactory.AddFlags,
samplingStrategyFactory.AddFlags,
metricsReaderFactory.AddFlags,
)

Expand Down
78 changes: 39 additions & 39 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/internal/safeexpvar"
"github.com/jaegertracing/jaeger/model"
Expand All @@ -46,16 +46,16 @@ const (
// Collector returns the collector as a manageable unit of work
type Collector struct {
// required to start a new collector
serviceName string
logger *zap.Logger
metricsFactory metrics.Factory
spanWriter spanstore.Writer
strategyStore strategystore.StrategyStore
aggregator strategystore.Aggregator
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
tenancyMgr *tenancy.Manager
serviceName string
logger *zap.Logger
metricsFactory metrics.Factory
spanWriter spanstore.Writer
samplingProvider samplingstrategy.Provider
samplingAggregator samplingstrategy.Aggregator
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
tenancyMgr *tenancy.Manager

// state, read only
hServer *http.Server
Expand All @@ -69,27 +69,27 @@ type Collector struct {

// CollectorParams to construct a new Jaeger Collector.
type CollectorParams struct {
ServiceName string
Logger *zap.Logger
MetricsFactory metrics.Factory
SpanWriter spanstore.Writer
StrategyStore strategystore.StrategyStore
Aggregator strategystore.Aggregator
HealthCheck *healthcheck.HealthCheck
TenancyMgr *tenancy.Manager
ServiceName string
Logger *zap.Logger
MetricsFactory metrics.Factory
SpanWriter spanstore.Writer
SamplingProvider samplingstrategy.Provider
SamplingAggregator samplingstrategy.Aggregator
HealthCheck *healthcheck.HealthCheck
TenancyMgr *tenancy.Manager
}

// New constructs a new collector component, ready to be started
func New(params *CollectorParams) *Collector {
return &Collector{
serviceName: params.ServiceName,
logger: params.Logger,
metricsFactory: params.MetricsFactory,
spanWriter: params.SpanWriter,
strategyStore: params.StrategyStore,
aggregator: params.Aggregator,
hCheck: params.HealthCheck,
tenancyMgr: params.TenancyMgr,
serviceName: params.ServiceName,
logger: params.Logger,
metricsFactory: params.MetricsFactory,
spanWriter: params.SpanWriter,
samplingProvider: params.SamplingProvider,
samplingAggregator: params.SamplingAggregator,
hCheck: params.HealthCheck,
tenancyMgr: params.TenancyMgr,
}
}

Expand All @@ -104,9 +104,9 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
}

var additionalProcessors []ProcessSpan
if c.aggregator != nil {
if c.samplingAggregator != nil {
additionalProcessors = append(additionalProcessors, func(span *model.Span, _ /* tenant */ string) {
c.aggregator.HandleRootSpan(span, c.logger)
c.samplingAggregator.HandleRootSpan(span, c.logger)
})
}

Expand All @@ -117,7 +117,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
HostPort: options.GRPC.HostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: options.GRPC.TLS,
SamplingStore: c.strategyStore,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength,
MaxConnectionAge: options.GRPC.MaxConnectionAge,
Expand All @@ -129,13 +129,13 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
c.grpcServer = grpcServer

httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
HostPort: options.HTTP.HostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.HTTP.TLS,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingStore: c.strategyStore,
Logger: c.logger,
HostPort: options.HTTP.HostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.HTTP.TLS,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
})
if err != nil {
return fmt.Errorf("could not start HTTP server: %w", err)
Expand Down Expand Up @@ -213,8 +213,8 @@ func (c *Collector) Close() error {
}

// aggregator does not exist for all strategy stores. only Close() if exists.
if c.aggregator != nil {
if err := c.aggregator.Close(); err != nil {
if c.samplingAggregator != nil {
if err := c.samplingAggregator.Close(); err != nil {
c.logger.Error("failed to close aggregator.", zap.Error(err))
}
}
Expand Down
72 changes: 36 additions & 36 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ func TestNewCollector(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
defer baseMetrics.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
})

collectorOpts := optionsForEphemeralPorts()
Expand All @@ -102,17 +102,17 @@ func TestCollector_StartErrors(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
defer baseMetrics.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
})
err := c.Start(options)
require.Error(t, err)
Expand Down Expand Up @@ -144,13 +144,13 @@ func TestCollector_StartErrors(t *testing.T) {
run("OTLP/HTTP", options, "could not start OTLP receiver")
}

type mockStrategyStore struct{}
type mockSamplingProvider struct{}

func (*mockStrategyStore) GetSamplingStrategy(context.Context, string /* serviceName */) (*api_v2.SamplingStrategyResponse, error) {
func (*mockSamplingProvider) GetSamplingStrategy(context.Context, string /* serviceName */) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{}, nil
}

func (*mockStrategyStore) Close() error {
func (*mockSamplingProvider) Close() error {
return nil
}

Expand All @@ -161,17 +161,17 @@ func TestCollector_PublishOpts(t *testing.T) {
metricsFactory := metricstest.NewFactory(time.Second)
defer metricsFactory.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: metricsFactory,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: metricsFactory,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 24
Expand All @@ -191,19 +191,19 @@ func TestAggregator(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
defer baseMetrics.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
agg := &mockAggregator{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
Aggregator: agg,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
SamplingAggregator: agg,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 10
Expand Down
10 changes: 5 additions & 5 deletions cmd/collector/app/sampling/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ package sampling
import (
"context"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler is sampling strategy handler for gRPC.
type GRPCHandler struct {
store strategystore.StrategyStore
samplingProvider samplingstrategy.Provider
}

// NewGRPCHandler creates a handler that controls sampling strategies for services.
func NewGRPCHandler(store strategystore.StrategyStore) GRPCHandler {
func NewGRPCHandler(provider samplingstrategy.Provider) GRPCHandler {
return GRPCHandler{
store: store,
samplingProvider: provider,
}
}

// GetSamplingStrategy returns sampling decision from store.
func (s GRPCHandler) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
return s.store.GetSamplingStrategy(ctx, param.GetServiceName())
return s.samplingProvider.GetSamplingStrategy(ctx, param.GetServiceName())
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package strategystore
package samplingstrategy

import (
"testing"
Expand Down
Loading

0 comments on commit d8185ac

Please sign in to comment.