diff --git a/client/client.go b/client/client.go index 05164363d601..dca2383c0198 100644 --- a/client/client.go +++ b/client/client.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net" "net/url" + "strings" "github.com/containerd/containerd/defaults" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -16,9 +17,13 @@ import ( "github.com/moby/buildkit/session/grpchijack" "github.com/moby/buildkit/util/appdefaults" "github.com/moby/buildkit/util/grpcerrors" + "github.com/moby/buildkit/util/tracing/detect" + "github.com/moby/buildkit/util/tracing/otlpgrpc" "github.com/pkg/errors" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/exporters/otlp" "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -56,7 +61,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error } if wt, ok := o.(*withTracer); ok { var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) - unary = append(unary, otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(wt), otelgrpc.WithPropagators(propagators))) + unary = append(unary, filterInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(wt), otelgrpc.WithPropagators(propagators)))) stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(wt), otelgrpc.WithPropagators(propagators))) } if wd, ok := o.(*withDialer); ok { @@ -106,12 +111,39 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error if err != nil { return nil, errors.Wrapf(err, "failed to dial %q . make sure buildkitd is running", address) } + c := &Client{ conn: conn, } + + _ = c.setupDelegatedTracing(ctx) // ignore error + return c, nil } +func (c *Client) setupDelegatedTracing(ctx context.Context) error { + exp, err := detect.Exporter() + if err != nil { + return err + } + if exp == nil { + return nil + } + del, ok := exp.(interface { + SetDelegate(context.Context, sdktrace.SpanExporter) error + }) + if !ok { + return nil + } + + pd := otlpgrpc.NewDriver(c.conn) + e, err := otlp.NewExporter(ctx, pd) + if err != nil { + return nil + } + return del.SetDelegate(ctx, e) +} + func (c *Client) controlClient() controlapi.ControlClient { return controlapi.NewControlClient(c.conn) } @@ -207,3 +239,12 @@ func resolveDialer(address string) (func(context.Context, string) (net.Conn, err // basic dialer return dialer, nil } + +func filterInterceptor(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + if strings.HasSuffix(method, "opentelemetry.proto.collector.trace.v1.TraceService/Export") { + return invoker(ctx, method, req, reply, cc, opts...) + } + return intercept(ctx, method, req, reply, cc, invoker, opts...) + } +} diff --git a/cmd/buildctl/main.go b/cmd/buildctl/main.go index cfd84701eb80..f324e78cdba9 100644 --- a/cmd/buildctl/main.go +++ b/cmd/buildctl/main.go @@ -13,6 +13,7 @@ import ( "github.com/moby/buildkit/util/appdefaults" "github.com/moby/buildkit/util/profiler" "github.com/moby/buildkit/util/stack" + _ "github.com/moby/buildkit/util/tracing/detect/delegated" _ "github.com/moby/buildkit/util/tracing/detect/jaeger" "github.com/moby/buildkit/version" "github.com/sirupsen/logrus" diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index 140f3a1cdb3b..771dd7ef54c9 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -639,6 +639,12 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont "registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn), "local": localremotecache.ResolveCacheImporterFunc(sessionManager), } + + tc, err := detect.Exporter() + if err != nil { + return nil, err + } + return control.NewController(control.Opt{ SessionManager: sessionManager, WorkerController: wc, @@ -647,6 +653,7 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont ResolveCacheImporterFuncs: remoteCacheImporterFuncs, CacheKeyStorage: cacheStorage, Entitlements: cfg.Entitlements, + TraceCollector: tc, }) } diff --git a/control/control.go b/control/control.go index 048e6966834d..950270bdc520 100644 --- a/control/control.go +++ b/control/control.go @@ -20,9 +20,13 @@ import ( "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/imageutil" "github.com/moby/buildkit/util/throttle" + "github.com/moby/buildkit/util/tracing/transform" "github.com/moby/buildkit/worker" "github.com/pkg/errors" "github.com/sirupsen/logrus" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" + v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) @@ -35,9 +39,12 @@ type Opt struct { ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc Entitlements []string + TraceCollector sdktrace.SpanExporter } type Controller struct { // TODO: ControlService + *tracev1.UnimplementedTraceServiceServer + buildCount int64 opt Opt solver *llbsolver.Solver @@ -75,6 +82,7 @@ func NewController(opt Opt) (*Controller, error) { func (c *Controller) Register(server *grpc.Server) error { controlapi.RegisterControlServer(server, c) c.gatewayForwarder.Register(server) + tracev1.RegisterTraceServiceServer(server, c) return nil } @@ -184,6 +192,14 @@ func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Contr return eg2.Wait() } +func (c *Controller) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) { + err := c.opt.TraceCollector.ExportSpans(ctx, transform.SpanData(req.GetResourceSpans())) + if err != nil { + return nil, err + } + return &v1.ExportTraceServiceResponse{}, nil +} + func translateLegacySolveRequest(req *controlapi.SolveRequest) error { // translates ExportRef and ExportAttrs to new Exports (v0.4.0) if legacyExportRef := req.Cache.ExportRefDeprecated; legacyExportRef != "" { diff --git a/util/tracing/detect/delegated/delegated.go b/util/tracing/detect/delegated/delegated.go index c535a5bfe983..1d9b42ab41f9 100644 --- a/util/tracing/detect/delegated/delegated.go +++ b/util/tracing/detect/delegated/delegated.go @@ -6,7 +6,6 @@ import ( "github.com/moby/buildkit/util/tracing/detect" sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace" ) const maxBuffer = 128 @@ -16,23 +15,7 @@ var exp = &Exporter{} func init() { detect.Register("delegated", func() (sdktrace.SpanExporter, error) { return exp, nil - }) -} - -func ExporterFromContext(ctx context.Context) *Exporter { - s := trace.SpanFromContext(ctx) - if s == nil { - return nil - } - t := s.Tracer() - if te, ok := t.(interface { - SpanExporter() sdktrace.SpanExporter - }); ok { - if exp, ok := te.SpanExporter().(*Exporter); ok { - return exp - } - } - return nil + }, 100) } type Exporter struct { @@ -77,7 +60,9 @@ func (e *Exporter) SetDelegate(ctx context.Context, del sdktrace.SpanExporter) e e.delegate = del if len(e.buffer) > 0 { - return e.delegate.ExportSpans(ctx, e.buffer) + err := e.delegate.ExportSpans(ctx, e.buffer) + e.buffer = nil + return err } return nil } diff --git a/util/tracing/detect/detect.go b/util/tracing/detect/detect.go index 4458e23cd80d..575dde7aa885 100644 --- a/util/tracing/detect/detect.go +++ b/util/tracing/detect/detect.go @@ -3,6 +3,7 @@ package detect import ( "context" "os" + "sort" "strconv" "sync" @@ -15,17 +16,26 @@ import ( type ExporterDetector func() (sdktrace.SpanExporter, error) -var detectors map[string]ExporterDetector +type detector struct { + f ExporterDetector + priority int +} + +var detectors map[string]detector var once sync.Once var tracer trace.Tracer +var exporter sdktrace.SpanExporter var closers []func(context.Context) error var err error -func Register(name string, exp ExporterDetector) { +func Register(name string, exp ExporterDetector, priority int) { if detectors == nil { - detectors = map[string]ExporterDetector{} + detectors = map[string]detector{} + } + detectors[name] = detector{ + f: exp, + priority: priority, } - detectors[name] = exp } func detectExporter() (sdktrace.SpanExporter, error) { @@ -37,10 +47,17 @@ func detectExporter() (sdktrace.SpanExporter, error) { } return nil, errors.Errorf("unsupported opentelemetry tracer %v", n) } - return d() + return d.f() } + arr := make([]detector, 0, len(detectors)) for _, d := range detectors { - exp, err := d() + arr = append(arr, d) + } + sort.Slice(arr, func(i, j int) bool { + return arr[i].priority < arr[j].priority + }) + for _, d := range arr { + exp, err := d.f() if err != nil { return nil, err } @@ -74,10 +91,8 @@ func detect() error { sdktp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sp), sdktrace.WithResource(res)) closers = append(closers, sdktp.Shutdown) tp = sdktp - tracer = &tracerWithExporter{ - Tracer: tp.Tracer(""), - exp: exp, - } + tracer = tp.Tracer("") + exporter = exp return nil } @@ -95,6 +110,14 @@ func Tracer() (trace.Tracer, error) { return tracer, nil } +func Exporter() (sdktrace.SpanExporter, error) { + _, err := Tracer() + if err != nil { + return nil, err + } + return exporter, nil +} + func Shutdown(ctx context.Context) error { for _, c := range closers { if err := c(ctx); err != nil { @@ -117,12 +140,3 @@ func (serviceNameDetector) Detect(ctx context.Context) (*resource.Resource, erro }, ).Detect(ctx) } - -type tracerWithExporter struct { - trace.Tracer - exp sdktrace.SpanExporter -} - -func (t *tracerWithExporter) SpanExporter() sdktrace.SpanExporter { - return t.exp -} diff --git a/util/tracing/detect/jaeger/jaeger.go b/util/tracing/detect/jaeger/jaeger.go index 2554fbef012e..9f7a8c6b2ac5 100644 --- a/util/tracing/detect/jaeger/jaeger.go +++ b/util/tracing/detect/jaeger/jaeger.go @@ -11,7 +11,7 @@ import ( ) func init() { - detect.Register("jaeger", jaegerExporter) + detect.Register("jaeger", jaegerExporter, 11) } func jaegerExporter() (sdktrace.SpanExporter, error) { diff --git a/util/tracing/detect/otlp.go b/util/tracing/detect/otlp.go index b1c9ee4fcd4f..8c34d4f09b11 100644 --- a/util/tracing/detect/otlp.go +++ b/util/tracing/detect/otlp.go @@ -12,7 +12,7 @@ import ( ) func init() { - Register("otlp", otlpExporter) + Register("otlp", otlpExporter, 10) } func otlpExporter() (sdktrace.SpanExporter, error) { diff --git a/util/tracing/otlpgrpc/driver.go b/util/tracing/otlpgrpc/driver.go index 4bfec8ecc478..1ff477edf3e1 100644 --- a/util/tracing/otlpgrpc/driver.go +++ b/util/tracing/otlpgrpc/driver.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "sync" + "time" "google.golang.org/grpc" @@ -91,7 +92,7 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) } ctx, cancel := d.tracesDriver.connection.contextWithStop(ctx) defer cancel() - ctx, tCancel := context.WithTimeout(ctx, 30) + ctx, tCancel := context.WithTimeout(ctx, 30*time.Second) defer tCancel() protoSpans := transform.SpanData(ss)