Skip to content

Commit

Permalink
fix(agent): updating support for otlp datastore (#3177)
Browse files Browse the repository at this point in the history
* fix(agent): updating support for otlp datastore

* PR comments
  • Loading branch information
xoscar committed Sep 22, 2023
1 parent 4349562 commit 94d1c7f
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 17 deletions.
8 changes: 7 additions & 1 deletion agent/collector/collector.go
Expand Up @@ -28,6 +28,12 @@ func WithTraceCache(traceCache TraceCache) CollectorOption {
}
}

func WithStartRemoteServer(startRemoteServer bool) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.startRemoteServer = startRemoteServer
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand All @@ -49,7 +55,7 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll
opt(&ingesterConfig)
}

ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig)
ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig, ingesterConfig.startRemoteServer)
if err != nil {
return nil, fmt.Errorf("could not start local collector: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions agent/collector/collector_test.go
Expand Up @@ -31,6 +31,7 @@ func TestCollector(t *testing.T) {
RemoteServerURL: targetServer.Addr(),
},
noopTracer,
collector.WithStartRemoteServer(true),
)
require.NoError(t, err)

Expand Down Expand Up @@ -74,6 +75,7 @@ func TestCollectorWatchingSpansFromTest(t *testing.T) {
},
noopTracer,
collector.WithTraceCache(cache),
collector.WithStartRemoteServer(true),
)
require.NoError(t, err)

Expand Down
21 changes: 12 additions & 9 deletions agent/collector/ingester.go
Expand Up @@ -19,7 +19,7 @@ type stoppable interface {
Stop()
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteIngesterConfig remoteIngesterConfig) (otlp.Ingester, error) {
func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteIngesterConfig remoteIngesterConfig, startRemoteServer bool) (otlp.Ingester, error) {
ingester := &forwardIngester{
BatchTimeout: batchTimeout,
RemoteIngester: remoteIngesterConfig,
Expand All @@ -28,12 +28,14 @@ func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteI
traceCache: remoteIngesterConfig.traceCache,
}

err := ingester.connectToRemoteServer(ctx)
if err != nil {
return nil, fmt.Errorf("could not connect to remote server: %w", err)
}
if startRemoteServer {
err := ingester.connectToRemoteServer(ctx)
if err != nil {
return nil, fmt.Errorf("could not connect to remote server: %w", err)
}

go ingester.startBatchWorker()
go ingester.startBatchWorker()
}

return ingester, nil
}
Expand All @@ -50,9 +52,10 @@ type forwardIngester struct {
}

type remoteIngesterConfig struct {
URL string
Token string
traceCache TraceCache
URL string
Token string
traceCache TraceCache
startRemoteServer bool
}

type buffer struct {
Expand Down
4 changes: 2 additions & 2 deletions agent/config/config.go
Expand Up @@ -14,10 +14,10 @@ type Config struct {
Name string `mapstructure:"agent_name"`
ServerURL string `mapstructure:"server_url"`

OTLPServer otlpServer `mapstructure:"otlp_server"`
OTLPServer OtlpServer `mapstructure:"otlp_server"`
}

type otlpServer struct {
type OtlpServer struct {
GRPCPort int `mapstructure:"grpc_port"`
HTTPPort int `mapstructure:"http_port"`
}
Expand Down
9 changes: 9 additions & 0 deletions agent/example/collector.config.yaml
Expand Up @@ -19,9 +19,18 @@ exporters:
tls:
insecure: true

otlp/1:
endpoint: host.docker.internal:4317
tls:
insecure: true

service:
pipelines:
traces:
receivers: [otlp]
processors: [probabilistic_sampler, batch]
exporters: [otlp/jaeger]
traces/1:
receivers: [otlp]
processors: [batch]
exporters: [otlp/1]
6 changes: 3 additions & 3 deletions agent/initialization/start.go
Expand Up @@ -52,7 +52,7 @@ func Start(ctx context.Context, config config.Config) error {
return err
}

err = startCollector(ctx, config, traceCache)
err = StartCollector(ctx, config, traceCache)
if err != nil {
return err
}
Expand All @@ -61,14 +61,14 @@ func Start(ctx context.Context, config config.Config) error {
return nil
}

func startCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache) error {
func StartCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache) error {
noopTracer := trace.NewNoopTracerProvider().Tracer("noop")
collectorConfig := collector.Config{
HTTPPort: config.OTLPServer.HTTPPort,
GRPCPort: config.OTLPServer.GRPCPort,
}

_, err := collector.Start(ctx, collectorConfig, noopTracer, collector.WithTraceCache(traceCache))
_, err := collector.Start(ctx, collectorConfig, noopTracer, collector.WithTraceCache(traceCache), collector.WithStartRemoteServer(false))
if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions cli/pkg/starter/starter.go
Expand Up @@ -100,6 +100,10 @@ func (s *Starter) StartAgent(ctx context.Context, endpoint, name, agentApiKey, u
ServerURL: endpoint,
APIKey: agentApiKey,
Name: name,
OTLPServer: agentConfig.OtlpServer{
GRPCPort: 4317,
HTTPPort: 4318,
},
}

s.ui.Info(fmt.Sprintf("Starting Agent with name %s...", name))
Expand All @@ -114,6 +118,11 @@ func (s *Starter) StartAgent(ctx context.Context, endpoint, name, agentApiKey, u
return err
}

err = initialization.StartCollector(ctx, cfg, cache)
if err != nil {
return err
}

claims, err := s.getTokenClaims(client.SessionConfiguration().AgentIdentification.Token)
if err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions server/app/app.go
Expand Up @@ -230,8 +230,10 @@ func (app *App) Start(opts ...appOption) error {

tracedbFactory := tracedb.Factory(tracesRepo)

eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager)
registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, tracer)
if app.cfg.OtlpServerEnabled() {
eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager)
registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, tracer)
}

testPipeline := buildTestPipeline(
pool,
Expand Down
12 changes: 12 additions & 0 deletions server/config/server.go
Expand Up @@ -86,6 +86,11 @@ var serverOptions = options{
defaultValue: "true",
description: "enable local data store test connection",
},
{
key: "otlpServer.enabled",
defaultValue: "true",
description: "enable otlp server",
},
}

func init() {
Expand Down Expand Up @@ -169,3 +174,10 @@ func (c *AppConfig) DataStorePipelineTestConnectionEnabled() bool {

return c.vp.GetString("dataStorePipelines.testConnection.enabled") == "true"
}

func (c *AppConfig) OtlpServerEnabled() bool {
c.mu.Lock()
defer c.mu.Unlock()

return c.vp.GetString("otlpServer.enabled") == "true"
}

0 comments on commit 94d1c7f

Please sign in to comment.