Skip to content

Commit

Permalink
enable collecting traces via control api
Browse files Browse the repository at this point in the history
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
  • Loading branch information
tonistiigi committed Jun 14, 2021
1 parent 9167e98 commit f2d9f02
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 42 deletions.
43 changes: 42 additions & 1 deletion client/client.go
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net"
"net/url"
"strings"

"github.com/containerd/containerd/defaults"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -16,9 +17,13 @@ import (
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/moby/buildkit/util/tracing/otlpgrpc"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -56,7 +61,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
}
if wt, ok := o.(*withTracer); ok {
var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
unary = append(unary, otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(wt), otelgrpc.WithPropagators(propagators)))
unary = append(unary, filterInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(wt), otelgrpc.WithPropagators(propagators))))
stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(wt), otelgrpc.WithPropagators(propagators)))
}
if wd, ok := o.(*withDialer); ok {
Expand Down Expand Up @@ -106,12 +111,39 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q . make sure buildkitd is running", address)
}

c := &Client{
conn: conn,
}

_ = c.setupDelegatedTracing(ctx) // ignore error

return c, nil
}

func (c *Client) setupDelegatedTracing(ctx context.Context) error {
exp, err := detect.Exporter()
if err != nil {
return err
}
if exp == nil {
return nil
}
del, ok := exp.(interface {
SetDelegate(context.Context, sdktrace.SpanExporter) error
})
if !ok {
return nil
}

pd := otlpgrpc.NewDriver(c.conn)
e, err := otlp.NewExporter(ctx, pd)
if err != nil {
return nil
}
return del.SetDelegate(ctx, e)
}

func (c *Client) controlClient() controlapi.ControlClient {
return controlapi.NewControlClient(c.conn)
}
Expand Down Expand Up @@ -207,3 +239,12 @@ func resolveDialer(address string) (func(context.Context, string) (net.Conn, err
// basic dialer
return dialer, nil
}

func filterInterceptor(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if strings.HasSuffix(method, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return invoker(ctx, method, req, reply, cc, opts...)
}
return intercept(ctx, method, req, reply, cc, invoker, opts...)
}
}
1 change: 1 addition & 0 deletions cmd/buildctl/main.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/util/stack"
_ "github.com/moby/buildkit/util/tracing/detect/delegated"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
"github.com/moby/buildkit/version"
"github.com/sirupsen/logrus"
Expand Down
7 changes: 7 additions & 0 deletions cmd/buildkitd/main.go
Expand Up @@ -639,6 +639,12 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
"registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn),
"local": localremotecache.ResolveCacheImporterFunc(sessionManager),
}

tc, err := detect.Exporter()
if err != nil {
return nil, err
}

return control.NewController(control.Opt{
SessionManager: sessionManager,
WorkerController: wc,
Expand All @@ -647,6 +653,7 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
ResolveCacheImporterFuncs: remoteCacheImporterFuncs,
CacheKeyStorage: cacheStorage,
Entitlements: cfg.Entitlements,
TraceCollector: tc,
})
}

Expand Down
16 changes: 16 additions & 0 deletions control/control.go
Expand Up @@ -20,9 +20,13 @@ import (
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/throttle"
"github.com/moby/buildkit/util/tracing/transform"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
Expand All @@ -35,9 +39,12 @@ type Opt struct {
ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc
ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
Entitlements []string
TraceCollector sdktrace.SpanExporter
}

type Controller struct { // TODO: ControlService
*tracev1.UnimplementedTraceServiceServer

buildCount int64
opt Opt
solver *llbsolver.Solver
Expand Down Expand Up @@ -75,6 +82,7 @@ func NewController(opt Opt) (*Controller, error) {
func (c *Controller) Register(server *grpc.Server) error {
controlapi.RegisterControlServer(server, c)
c.gatewayForwarder.Register(server)
tracev1.RegisterTraceServiceServer(server, c)
return nil
}

Expand Down Expand Up @@ -184,6 +192,14 @@ func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Contr
return eg2.Wait()
}

func (c *Controller) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
err := c.opt.TraceCollector.ExportSpans(ctx, transform.SpanData(req.GetResourceSpans()))
if err != nil {
return nil, err
}
return &v1.ExportTraceServiceResponse{}, nil
}

func translateLegacySolveRequest(req *controlapi.SolveRequest) error {
// translates ExportRef and ExportAttrs to new Exports (v0.4.0)
if legacyExportRef := req.Cache.ExportRefDeprecated; legacyExportRef != "" {
Expand Down
23 changes: 4 additions & 19 deletions util/tracing/detect/delegated/delegated.go
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/moby/buildkit/util/tracing/detect"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)

const maxBuffer = 128
Expand All @@ -16,23 +15,7 @@ var exp = &Exporter{}
func init() {
detect.Register("delegated", func() (sdktrace.SpanExporter, error) {
return exp, nil
})
}

func ExporterFromContext(ctx context.Context) *Exporter {
s := trace.SpanFromContext(ctx)
if s == nil {
return nil
}
t := s.Tracer()
if te, ok := t.(interface {
SpanExporter() sdktrace.SpanExporter
}); ok {
if exp, ok := te.SpanExporter().(*Exporter); ok {
return exp
}
}
return nil
}, 100)
}

type Exporter struct {
Expand Down Expand Up @@ -77,7 +60,9 @@ func (e *Exporter) SetDelegate(ctx context.Context, del sdktrace.SpanExporter) e
e.delegate = del

if len(e.buffer) > 0 {
return e.delegate.ExportSpans(ctx, e.buffer)
err := e.delegate.ExportSpans(ctx, e.buffer)
e.buffer = nil
return err
}
return nil
}
52 changes: 33 additions & 19 deletions util/tracing/detect/detect.go
Expand Up @@ -3,6 +3,7 @@ package detect
import (
"context"
"os"
"sort"
"strconv"
"sync"

Expand All @@ -15,17 +16,26 @@ import (

type ExporterDetector func() (sdktrace.SpanExporter, error)

var detectors map[string]ExporterDetector
type detector struct {
f ExporterDetector
priority int
}

var detectors map[string]detector
var once sync.Once
var tracer trace.Tracer
var exporter sdktrace.SpanExporter
var closers []func(context.Context) error
var err error

func Register(name string, exp ExporterDetector) {
func Register(name string, exp ExporterDetector, priority int) {
if detectors == nil {
detectors = map[string]ExporterDetector{}
detectors = map[string]detector{}
}
detectors[name] = detector{
f: exp,
priority: priority,
}
detectors[name] = exp
}

func detectExporter() (sdktrace.SpanExporter, error) {
Expand All @@ -37,10 +47,17 @@ func detectExporter() (sdktrace.SpanExporter, error) {
}
return nil, errors.Errorf("unsupported opentelemetry tracer %v", n)
}
return d()
return d.f()
}
arr := make([]detector, 0, len(detectors))
for _, d := range detectors {
exp, err := d()
arr = append(arr, d)
}
sort.Slice(arr, func(i, j int) bool {
return arr[i].priority < arr[j].priority
})
for _, d := range arr {
exp, err := d.f()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -74,10 +91,8 @@ func detect() error {
sdktp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sp), sdktrace.WithResource(res))
closers = append(closers, sdktp.Shutdown)
tp = sdktp
tracer = &tracerWithExporter{
Tracer: tp.Tracer(""),
exp: exp,
}
tracer = tp.Tracer("")
exporter = exp

return nil
}
Expand All @@ -95,6 +110,14 @@ func Tracer() (trace.Tracer, error) {
return tracer, nil
}

func Exporter() (sdktrace.SpanExporter, error) {
_, err := Tracer()
if err != nil {
return nil, err
}
return exporter, nil
}

func Shutdown(ctx context.Context) error {
for _, c := range closers {
if err := c(ctx); err != nil {
Expand All @@ -117,12 +140,3 @@ func (serviceNameDetector) Detect(ctx context.Context) (*resource.Resource, erro
},
).Detect(ctx)
}

type tracerWithExporter struct {
trace.Tracer
exp sdktrace.SpanExporter
}

func (t *tracerWithExporter) SpanExporter() sdktrace.SpanExporter {
return t.exp
}
2 changes: 1 addition & 1 deletion util/tracing/detect/jaeger/jaeger.go
Expand Up @@ -11,7 +11,7 @@ import (
)

func init() {
detect.Register("jaeger", jaegerExporter)
detect.Register("jaeger", jaegerExporter, 11)
}

func jaegerExporter() (sdktrace.SpanExporter, error) {
Expand Down
2 changes: 1 addition & 1 deletion util/tracing/detect/otlp.go
Expand Up @@ -12,7 +12,7 @@ import (
)

func init() {
Register("otlp", otlpExporter)
Register("otlp", otlpExporter, 10)
}

func otlpExporter() (sdktrace.SpanExporter, error) {
Expand Down
3 changes: 2 additions & 1 deletion util/tracing/otlpgrpc/driver.go
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"google.golang.org/grpc"

Expand Down Expand Up @@ -91,7 +92,7 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
}
ctx, cancel := d.tracesDriver.connection.contextWithStop(ctx)
defer cancel()
ctx, tCancel := context.WithTimeout(ctx, 30)
ctx, tCancel := context.WithTimeout(ctx, 30*time.Second)
defer tCancel()

protoSpans := transform.SpanData(ss)
Expand Down

0 comments on commit f2d9f02

Please sign in to comment.