Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[adaptive] Separate Provider and Aggregator settings #5639

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -68,13 +68,13 @@ func main() {
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}
strategyStoreFactoryConfig, err := ss.FactoryConfigFromEnv()
samplingStrategyFactoryConfig, err := ss.FactoryConfigFromEnv()
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory config: %v", err)
log.Fatalf("Cannot initialize sampling strategy factory config: %v", err)
}
strategyStoreFactory, err := ss.NewFactory(*strategyStoreFactoryConfig)
samplingStrategyFactory, err := ss.NewFactory(*samplingStrategyFactoryConfig)
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory: %v", err)
log.Fatalf("Cannot initialize sampling strategy factory: %v", err)
}

fc := metricsPlugin.FactoryConfigFromEnv()
Expand Down Expand Up @@ -133,13 +133,13 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create sampling store factory", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v, logger)
if err := strategyStoreFactory.Initialize(collectorMetricsFactory, ssFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
samplingStrategyFactory.InitFromViper(v, logger)
if err := samplingStrategyFactory.Initialize(collectorMetricsFactory, ssFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy factory", zap.Error(err))
}
strategyStore, aggregator, err := strategyStoreFactory.CreateStrategyStore()
samplingProvider, samplingAggregator, err := samplingStrategyFactory.CreateStrategyProvider()
if err != nil {
logger.Fatal("Failed to create sampling strategy store", zap.Error(err))
logger.Fatal("Failed to create sampling strategy provider", zap.Error(err))
}

aOpts := new(agentApp.Builder).InitFromViper(v)
Expand All @@ -161,14 +161,14 @@ by default uses only in-memory database.`,

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
ServiceName: "jaeger-collector",
Logger: logger,
MetricsFactory: collectorMetricsFactory,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
Aggregator: aggregator,
HealthCheck: svc.HC(),
TenancyMgr: tm,
ServiceName: "jaeger-collector",
Logger: logger,
MetricsFactory: collectorMetricsFactory,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
SamplingAggregator: samplingAggregator,
HealthCheck: svc.HC(),
TenancyMgr: tm,
})
if err := c.Start(cOpts); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -238,7 +238,7 @@ by default uses only in-memory database.`,
agentGrpcRep.AddFlags,
collectorFlags.AddFlags,
queryApp.AddFlags,
strategyStoreFactory.AddFlags,
samplingStrategyFactory.AddFlags,
metricsReaderFactory.AddFlags,
)

Expand Down
78 changes: 39 additions & 39 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/internal/safeexpvar"
"github.com/jaegertracing/jaeger/model"
Expand All @@ -46,16 +46,16 @@ const (
// Collector returns the collector as a manageable unit of work
type Collector struct {
// required to start a new collector
serviceName string
logger *zap.Logger
metricsFactory metrics.Factory
spanWriter spanstore.Writer
strategyStore strategystore.StrategyStore
aggregator strategystore.Aggregator
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
tenancyMgr *tenancy.Manager
serviceName string
logger *zap.Logger
metricsFactory metrics.Factory
spanWriter spanstore.Writer
samplingProvider samplingstrategy.Provider
samplingAggregator samplingstrategy.Aggregator
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
tenancyMgr *tenancy.Manager

// state, read only
hServer *http.Server
Expand All @@ -69,27 +69,27 @@ type Collector struct {

// CollectorParams to construct a new Jaeger Collector.
type CollectorParams struct {
ServiceName string
Logger *zap.Logger
MetricsFactory metrics.Factory
SpanWriter spanstore.Writer
StrategyStore strategystore.StrategyStore
Aggregator strategystore.Aggregator
HealthCheck *healthcheck.HealthCheck
TenancyMgr *tenancy.Manager
ServiceName string
Logger *zap.Logger
MetricsFactory metrics.Factory
SpanWriter spanstore.Writer
SamplingProvider samplingstrategy.Provider
SamplingAggregator samplingstrategy.Aggregator
HealthCheck *healthcheck.HealthCheck
TenancyMgr *tenancy.Manager
}

// New constructs a new collector component, ready to be started
func New(params *CollectorParams) *Collector {
return &Collector{
serviceName: params.ServiceName,
logger: params.Logger,
metricsFactory: params.MetricsFactory,
spanWriter: params.SpanWriter,
strategyStore: params.StrategyStore,
aggregator: params.Aggregator,
hCheck: params.HealthCheck,
tenancyMgr: params.TenancyMgr,
serviceName: params.ServiceName,
logger: params.Logger,
metricsFactory: params.MetricsFactory,
spanWriter: params.SpanWriter,
samplingProvider: params.SamplingProvider,
samplingAggregator: params.SamplingAggregator,
hCheck: params.HealthCheck,
tenancyMgr: params.TenancyMgr,
}
}

Expand All @@ -104,9 +104,9 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
}

var additionalProcessors []ProcessSpan
if c.aggregator != nil {
if c.samplingAggregator != nil {
additionalProcessors = append(additionalProcessors, func(span *model.Span, _ /* tenant */ string) {
c.aggregator.HandleRootSpan(span, c.logger)
c.samplingAggregator.HandleRootSpan(span, c.logger)
})
}

Expand All @@ -117,7 +117,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
HostPort: options.GRPC.HostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: options.GRPC.TLS,
SamplingStore: c.strategyStore,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength,
MaxConnectionAge: options.GRPC.MaxConnectionAge,
Expand All @@ -129,13 +129,13 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
c.grpcServer = grpcServer

httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
HostPort: options.HTTP.HostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.HTTP.TLS,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingStore: c.strategyStore,
Logger: c.logger,
HostPort: options.HTTP.HostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.HTTP.TLS,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
})
if err != nil {
return fmt.Errorf("could not start HTTP server: %w", err)
Expand Down Expand Up @@ -213,8 +213,8 @@ func (c *Collector) Close() error {
}

// aggregator does not exist for all strategy stores. only Close() if exists.
if c.aggregator != nil {
if err := c.aggregator.Close(); err != nil {
if c.samplingAggregator != nil {
if err := c.samplingAggregator.Close(); err != nil {
c.logger.Error("failed to close aggregator.", zap.Error(err))
}
}
Expand Down
72 changes: 36 additions & 36 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ func TestNewCollector(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
defer baseMetrics.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
})

collectorOpts := optionsForEphemeralPorts()
Expand All @@ -102,17 +102,17 @@ func TestCollector_StartErrors(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
defer baseMetrics.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
})
err := c.Start(options)
require.Error(t, err)
Expand Down Expand Up @@ -144,13 +144,13 @@ func TestCollector_StartErrors(t *testing.T) {
run("OTLP/HTTP", options, "could not start OTLP receiver")
}

type mockStrategyStore struct{}
type mockSamplingProvider struct{}

func (*mockStrategyStore) GetSamplingStrategy(context.Context, string /* serviceName */) (*api_v2.SamplingStrategyResponse, error) {
func (*mockSamplingProvider) GetSamplingStrategy(context.Context, string /* serviceName */) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{}, nil
}

func (*mockStrategyStore) Close() error {
func (*mockSamplingProvider) Close() error {
return nil
}

Expand All @@ -161,17 +161,17 @@ func TestCollector_PublishOpts(t *testing.T) {
metricsFactory := metricstest.NewFactory(time.Second)
defer metricsFactory.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: metricsFactory,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: metricsFactory,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 24
Expand All @@ -191,19 +191,19 @@ func TestAggregator(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
defer baseMetrics.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
agg := &mockAggregator{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
Aggregator: agg,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
SamplingAggregator: agg,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 10
Expand Down
10 changes: 5 additions & 5 deletions cmd/collector/app/sampling/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ package sampling
import (
"context"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler is sampling strategy handler for gRPC.
type GRPCHandler struct {
store strategystore.StrategyStore
samplingProvider samplingstrategy.Provider
}

// NewGRPCHandler creates a handler that controls sampling strategies for services.
func NewGRPCHandler(store strategystore.StrategyStore) GRPCHandler {
func NewGRPCHandler(provider samplingstrategy.Provider) GRPCHandler {
return GRPCHandler{
store: store,
samplingProvider: provider,
}
}

// GetSamplingStrategy returns sampling decision from store.
func (s GRPCHandler) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
return s.store.GetSamplingStrategy(ctx, param.GetServiceName())
return s.samplingProvider.GetSamplingStrategy(ctx, param.GetServiceName())
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package strategystore
package samplingstrategy

import (
"testing"
Expand Down
Loading
Loading