-
Notifications
You must be signed in to change notification settings - Fork 68
/
start.go
77 lines (64 loc) 路 2.19 KB
/
start.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package initialization
import (
"context"
"fmt"
"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/collector"
"github.com/kubeshop/tracetest/agent/config"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/poller"
"go.opentelemetry.io/otel/trace"
)
func NewClient(ctx context.Context, config config.Config, traceCache collector.TraceCache) (*client.Client, error) {
client, err := client.Connect(ctx, config.ServerURL,
client.WithAPIKey(config.APIKey),
client.WithAgentName(config.Name),
)
if err != nil {
return nil, err
}
triggerWorker := workers.NewTriggerWorker(client, workers.WithTraceCache(traceCache))
pollingWorker := workers.NewPollerWorker(client, workers.WithInMemoryDatastore(
poller.NewInMemoryDatastore(traceCache),
))
dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(client)
client.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
client.OnTriggerRequest(triggerWorker.Trigger)
client.OnPollingRequest(pollingWorker.Poll)
client.OnConnectionClosed(func(ctx context.Context, sr *proto.ShutdownRequest) error {
fmt.Printf("Server terminated the connection with the agent. Reason: %s\n", sr.Reason)
return client.Close()
})
return client, nil
}
// Start the agent with given configuration
func Start(ctx context.Context, config config.Config) error {
traceCache := collector.NewTraceCache()
client, err := NewClient(ctx, config, traceCache)
if err != nil {
return err
}
err = client.Start(ctx)
if err != nil {
return err
}
err = StartCollector(ctx, config, traceCache)
if err != nil {
return err
}
client.WaitUntilDisconnected()
return nil
}
func StartCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache) error {
noopTracer := trace.NewNoopTracerProvider().Tracer("noop")
collectorConfig := collector.Config{
HTTPPort: config.OTLPServer.HTTPPort,
GRPCPort: config.OTLPServer.GRPCPort,
}
_, err := collector.Start(ctx, collectorConfig, noopTracer, collector.WithTraceCache(traceCache), collector.WithStartRemoteServer(false))
if err != nil {
return err
}
return nil
}