Skip to content

Commit

Permalink
client: add meter provider to client
Browse files Browse the repository at this point in the history
This adds a meter provider option to the buildkit client for client-side
metrics to be reported. The session filesync will report the duration of
filesync operations.

Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
  • Loading branch information
jsternberg committed Dec 27, 2023
1 parent 686c0ad commit 8e27915
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 36 deletions.
69 changes: 46 additions & 23 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
Expand All @@ -34,6 +36,7 @@ import (
type Client struct {
conn *grpc.ClientConn
sessionDialer func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error)
meterProvider metric.MeterProvider
}

type ClientOpt interface {
Expand All @@ -51,36 +54,37 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
var unary []grpc.UnaryClientInterceptor
var stream []grpc.StreamClientInterceptor

var customTracer bool // allows manually setting disabling tracing even if tracer in context
var tracerProvider trace.TracerProvider
var tracerDelegate TracerDelegate
var sessionDialer func(context.Context, string, map[string][]string) (net.Conn, error)
var customDialOptions []grpc.DialOption
var creds *withCredentials
var (
customTracer bool // allows manually setting disabling tracing even if tracer in context
tracerProvider trace.TracerProvider
tracerDelegate TracerDelegate
sessionDialer func(context.Context, string, map[string][]string) (net.Conn, error)
customDialOptions []grpc.DialOption
creds *withCredentials
meterProvider metric.MeterProvider
)

for _, o := range opts {
if credInfo, ok := o.(*withCredentials); ok {
switch o := o.(type) {
case *withCredentials:
if creds == nil {
creds = &withCredentials{}
}
creds = creds.merge(credInfo)
}
if wt, ok := o.(*withTracer); ok {
creds = creds.merge(o)
case *withTracer:
customTracer = true
tracerProvider = wt.tp
}
if wd, ok := o.(*withDialer); ok {
gopts = append(gopts, grpc.WithContextDialer(wd.dialer))
tracerProvider = o.tp
case *withDialer:
gopts = append(gopts, grpc.WithContextDialer(o.dialer))
needDialer = false
}
if wt, ok := o.(*withTracerDelegate); ok {
tracerDelegate = wt
}
if sd, ok := o.(*withSessionDialer); ok {
sessionDialer = sd.dialer
}
if opt, ok := o.(*withGRPCDialOption); ok {
customDialOptions = append(customDialOptions, opt.opt)
case *withTracerDelegate:
tracerDelegate = o
case *withSessionDialer:
sessionDialer = o.dialer
case *withGRPCDialOption:
customDialOptions = append(customDialOptions, o.opt)
case *withMeterProvider:
meterProvider = o.mp
}
}

Expand Down Expand Up @@ -150,9 +154,14 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
return nil, errors.Wrapf(err, "failed to dial %q . make sure buildkitd is running", address)
}

if meterProvider == nil {
meterProvider = noop.NewMeterProvider()
}

c := &Client{
conn: conn,
sessionDialer: sessionDialer,
meterProvider: meterProvider,
}

if tracerDelegate != nil {
Expand Down Expand Up @@ -180,6 +189,9 @@ func (c *Client) ContentClient() contentapi.ContentClient {
}

func (c *Client) Dialer() session.Dialer {
if c.sessionDialer != nil {
return c.sessionDialer
}
return grpchijack.Dialer(c.ControlClient())
}

Expand Down Expand Up @@ -397,3 +409,14 @@ func (*withGRPCDialOption) isClientOpt() {}
func WithGRPCDialOption(opt grpc.DialOption) ClientOpt {
return &withGRPCDialOption{opt}
}

type withMeterProvider struct {
mp metric.MeterProvider
}

func (*withMeterProvider) isClientOpt() {}

// WithMeterProvider configures the MeterProvider for this client.
func WithMeterProvider(mp metric.MeterProvider) ClientOpt {
return &withMeterProvider{mp}
}
10 changes: 3 additions & 7 deletions client/solve.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/moby/buildkit/session"
sessioncontent "github.com/moby/buildkit/session/content"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/solver/pb"
spb "github.com/moby/buildkit/sourcepolicy/pb"
"github.com/moby/buildkit/util/bklog"
Expand Down Expand Up @@ -142,7 +141,8 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG

if !opt.SessionPreInitialized {
if len(syncedDirs) > 0 {
s.Allow(filesync.NewFSSyncProvider(syncedDirs))
s.Allow(filesync.NewFSSyncProvider(syncedDirs,
filesync.WithMeterProvider(c.meterProvider)))
}

for _, a := range opt.Session {
Expand Down Expand Up @@ -214,11 +214,7 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
}

eg.Go(func() error {
sd := c.sessionDialer
if sd == nil {
sd = grpchijack.Dialer(c.ControlClient())
}
return s.Run(statusContext, sd)
return s.Run(statusContext, c.Dialer())
})
}

Expand Down
69 changes: 63 additions & 6 deletions session/filesync/filesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ import (
"os"
"strconv"
"strings"
"time"
"unicode"

"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/bklog"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand All @@ -27,12 +32,18 @@ const (
keyFollowPaths = "followpaths"
keyDirName = "dir-name"
keyExporterMetaPrefix = "exporter-md-"
meterName = "github.com/moby/buildkit/session/filesync"
)

type fsSyncProvider struct {
dirs DirSource
p progressCb
doneCh chan error
dirs DirSource
p progressCb
doneCh chan error
metrics *fsSyncProviderMetrics
}

type FSSyncProviderOption interface {
apply(sp *fsSyncProvider)
}

type DirSource interface {
Expand All @@ -49,10 +60,15 @@ func (dirs StaticDirSource) LookupDir(name string) (fsutil.FS, bool) {
}

// NewFSSyncProvider creates a new provider for sending files from client
func NewFSSyncProvider(dirs DirSource) session.Attachable {
return &fsSyncProvider{
dirs: dirs,
func NewFSSyncProvider(dirs DirSource, opts ...FSSyncProviderOption) session.Attachable {
sp := &fsSyncProvider{
dirs: dirs,
metrics: newFSSyncProviderMetrics(noop.NewMeterProvider()),
}
for _, opt := range opts {
opt.apply(sp)
}
return sp
}

func (sp *fsSyncProvider) Register(server *grpc.Server) {
Expand All @@ -67,6 +83,16 @@ func (sp *fsSyncProvider) TarStream(stream FileSync_TarStreamServer) error {
}

func (sp *fsSyncProvider) handle(method string, stream grpc.ServerStream) (retErr error) {
defer func(start time.Time) {
dur := int64(time.Since(start) / time.Millisecond)
sp.metrics.Duration.Record(stream.Context(), dur,
metric.WithAttributes(
attribute.String("method", method),
attribute.Bool("error", retErr != nil),
),
)
}(time.Now())

var pr *protocol
for _, p := range supportedProtocols {
if method == p.name {
Expand Down Expand Up @@ -130,6 +156,37 @@ func (sp *fsSyncProvider) SetNextProgressCallback(f func(int, bool), doneCh chan
sp.doneCh = doneCh
}

type fsSyncProviderOptionFunc func(sp *fsSyncProvider)

func (opt fsSyncProviderOptionFunc) apply(sp *fsSyncProvider) {
opt(sp)
}

func WithMeterProvider(mp metric.MeterProvider) FSSyncProviderOption {
return fsSyncProviderOptionFunc(func(sp *fsSyncProvider) {
sp.metrics = newFSSyncProviderMetrics(mp)
})
}

type fsSyncProviderMetrics struct {
Duration metric.Int64Histogram
}

func newFSSyncProviderMetrics(mp metric.MeterProvider) *fsSyncProviderMetrics {
meter := mp.Meter(meterName)

var err error
metrics := &fsSyncProviderMetrics{}
metrics.Duration, err = meter.Int64Histogram("filesync.duration",
metric.WithDescription("Measures the duration of filesync operations."),
metric.WithUnit("ms"),
)
if err != nil {
otel.Handle(err)
}
return metrics
}

type progressCb func(int, bool)

type protocol struct {
Expand Down

0 comments on commit 8e27915

Please sign in to comment.