diff --git a/agent/collector/collector.go b/agent/collector/collector.go index 4093c6aaca..237ac4eb4e 100644 --- a/agent/collector/collector.go +++ b/agent/collector/collector.go @@ -28,6 +28,12 @@ func WithTraceCache(traceCache TraceCache) CollectorOption { } } +func WithStartRemoteServer(startRemoteServer bool) CollectorOption { + return func(ric *remoteIngesterConfig) { + ric.startRemoteServer = startRemoteServer + } +} + type collector struct { grpcServer stoppable httpServer stoppable @@ -49,7 +55,7 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll opt(&ingesterConfig) } - ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig) + ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig, ingesterConfig.startRemoteServer) if err != nil { return nil, fmt.Errorf("could not start local collector: %w", err) } diff --git a/agent/collector/collector_test.go b/agent/collector/collector_test.go index 4f941dcaac..b14ce182e0 100644 --- a/agent/collector/collector_test.go +++ b/agent/collector/collector_test.go @@ -31,6 +31,7 @@ func TestCollector(t *testing.T) { RemoteServerURL: targetServer.Addr(), }, noopTracer, + collector.WithStartRemoteServer(true), ) require.NoError(t, err) @@ -74,6 +75,7 @@ func TestCollectorWatchingSpansFromTest(t *testing.T) { }, noopTracer, collector.WithTraceCache(cache), + collector.WithStartRemoteServer(true), ) require.NoError(t, err) diff --git a/agent/collector/ingester.go b/agent/collector/ingester.go index 5daf97aa1d..1434554393 100644 --- a/agent/collector/ingester.go +++ b/agent/collector/ingester.go @@ -19,7 +19,7 @@ type stoppable interface { Stop() } -func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteIngesterConfig remoteIngesterConfig) (otlp.Ingester, error) { +func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteIngesterConfig remoteIngesterConfig, startRemoteServer bool) (otlp.Ingester, error) { ingester := &forwardIngester{ BatchTimeout: batchTimeout, RemoteIngester: remoteIngesterConfig, @@ -28,12 +28,14 @@ func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteI traceCache: remoteIngesterConfig.traceCache, } - err := ingester.connectToRemoteServer(ctx) - if err != nil { - return nil, fmt.Errorf("could not connect to remote server: %w", err) - } + if startRemoteServer { + err := ingester.connectToRemoteServer(ctx) + if err != nil { + return nil, fmt.Errorf("could not connect to remote server: %w", err) + } - go ingester.startBatchWorker() + go ingester.startBatchWorker() + } return ingester, nil } @@ -50,9 +52,10 @@ type forwardIngester struct { } type remoteIngesterConfig struct { - URL string - Token string - traceCache TraceCache + URL string + Token string + traceCache TraceCache + startRemoteServer bool } type buffer struct { diff --git a/agent/config/config.go b/agent/config/config.go index 0ec93d5afa..3dedbb71ff 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -14,10 +14,10 @@ type Config struct { Name string `mapstructure:"agent_name"` ServerURL string `mapstructure:"server_url"` - OTLPServer otlpServer `mapstructure:"otlp_server"` + OTLPServer OtlpServer `mapstructure:"otlp_server"` } -type otlpServer struct { +type OtlpServer struct { GRPCPort int `mapstructure:"grpc_port"` HTTPPort int `mapstructure:"http_port"` } diff --git a/agent/example/collector.config.yaml b/agent/example/collector.config.yaml index b761f10e2d..01dd5bf996 100644 --- a/agent/example/collector.config.yaml +++ b/agent/example/collector.config.yaml @@ -19,9 +19,18 @@ exporters: tls: insecure: true + otlp/1: + endpoint: host.docker.internal:4317 + tls: + insecure: true + service: pipelines: traces: receivers: [otlp] processors: [probabilistic_sampler, batch] exporters: [otlp/jaeger] + traces/1: + receivers: [otlp] + processors: [batch] + exporters: [otlp/1] diff --git a/agent/initialization/start.go b/agent/initialization/start.go index 52aac17897..725cd28025 100644 --- a/agent/initialization/start.go +++ b/agent/initialization/start.go @@ -52,7 +52,7 @@ func Start(ctx context.Context, config config.Config) error { return err } - err = startCollector(ctx, config, traceCache) + err = StartCollector(ctx, config, traceCache) if err != nil { return err } @@ -61,14 +61,14 @@ func Start(ctx context.Context, config config.Config) error { return nil } -func startCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache) error { +func StartCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache) error { noopTracer := trace.NewNoopTracerProvider().Tracer("noop") collectorConfig := collector.Config{ HTTPPort: config.OTLPServer.HTTPPort, GRPCPort: config.OTLPServer.GRPCPort, } - _, err := collector.Start(ctx, collectorConfig, noopTracer, collector.WithTraceCache(traceCache)) + _, err := collector.Start(ctx, collectorConfig, noopTracer, collector.WithTraceCache(traceCache), collector.WithStartRemoteServer(false)) if err != nil { return err } diff --git a/cli/pkg/starter/starter.go b/cli/pkg/starter/starter.go index 9d25c50790..a9cfc1334b 100644 --- a/cli/pkg/starter/starter.go +++ b/cli/pkg/starter/starter.go @@ -100,6 +100,10 @@ func (s *Starter) StartAgent(ctx context.Context, endpoint, name, agentApiKey, u ServerURL: endpoint, APIKey: agentApiKey, Name: name, + OTLPServer: agentConfig.OtlpServer{ + GRPCPort: 4317, + HTTPPort: 4318, + }, } s.ui.Info(fmt.Sprintf("Starting Agent with name %s...", name)) @@ -114,6 +118,11 @@ func (s *Starter) StartAgent(ctx context.Context, endpoint, name, agentApiKey, u return err } + err = initialization.StartCollector(ctx, cfg, cache) + if err != nil { + return err + } + claims, err := s.getTokenClaims(client.SessionConfiguration().AgentIdentification.Token) if err != nil { return err diff --git a/server/app/app.go b/server/app/app.go index d4cd7cb0d3..a0375d9464 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -230,8 +230,10 @@ func (app *App) Start(opts ...appOption) error { tracedbFactory := tracedb.Factory(tracesRepo) - eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager) - registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, tracer) + if app.cfg.OtlpServerEnabled() { + eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager) + registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, tracer) + } testPipeline := buildTestPipeline( pool, diff --git a/server/config/server.go b/server/config/server.go index a68aca3647..e7a22c721e 100644 --- a/server/config/server.go +++ b/server/config/server.go @@ -86,6 +86,11 @@ var serverOptions = options{ defaultValue: "true", description: "enable local data store test connection", }, + { + key: "otlpServer.enabled", + defaultValue: "true", + description: "enable otlp server", + }, } func init() { @@ -169,3 +174,10 @@ func (c *AppConfig) DataStorePipelineTestConnectionEnabled() bool { return c.vp.GetString("dataStorePipelines.testConnection.enabled") == "true" } + +func (c *AppConfig) OtlpServerEnabled() bool { + c.mu.Lock() + defer c.mu.Unlock() + + return c.vp.GetString("otlpServer.enabled") == "true" +}