Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

progress: add log limits and clipping #1754

Merged
merged 4 commits into from Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion client/client.go
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"net/url"

"github.com/containerd/containerd/defaults"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
controlapi "github.com/moby/buildkit/api/services/control"
Expand All @@ -30,7 +31,10 @@ type ClientOpt interface{}

// New returns a new buildkit client. Address can be empty for the system-default address.
func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error) {
gopts := []grpc.DialOption{}
gopts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
}
needDialer := true
needWithInsecure := true

Expand Down
84 changes: 50 additions & 34 deletions control/control.go
Expand Up @@ -305,40 +305,56 @@ func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Con
if !ok {
return nil
}
sr := controlapi.StatusResponse{}
for _, v := range ss.Vertexes {
sr.Vertexes = append(sr.Vertexes, &controlapi.Vertex{
Digest: v.Digest,
Inputs: v.Inputs,
Name: v.Name,
Started: v.Started,
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
})
}
for _, v := range ss.Statuses {
sr.Statuses = append(sr.Statuses, &controlapi.VertexStatus{
ID: v.ID,
Vertex: v.Vertex,
Name: v.Name,
Current: v.Current,
Total: v.Total,
Timestamp: v.Timestamp,
Started: v.Started,
Completed: v.Completed,
})
}
for _, v := range ss.Logs {
sr.Logs = append(sr.Logs, &controlapi.VertexLog{
Vertex: v.Vertex,
Stream: int64(v.Stream),
Msg: v.Data,
Timestamp: v.Timestamp,
})
}
if err := stream.SendMsg(&sr); err != nil {
return err
logSize := 0
retry := false
for {
sr := controlapi.StatusResponse{}
for _, v := range ss.Vertexes {
sr.Vertexes = append(sr.Vertexes, &controlapi.Vertex{
Digest: v.Digest,
Inputs: v.Inputs,
Name: v.Name,
Started: v.Started,
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
})
}
for _, v := range ss.Statuses {
sr.Statuses = append(sr.Statuses, &controlapi.VertexStatus{
ID: v.ID,
Vertex: v.Vertex,
Name: v.Name,
Current: v.Current,
Total: v.Total,
Timestamp: v.Timestamp,
Started: v.Started,
Completed: v.Completed,
})
}
for i, v := range ss.Logs {
sr.Logs = append(sr.Logs, &controlapi.VertexLog{
Vertex: v.Vertex,
Stream: int64(v.Stream),
Msg: v.Data,
Timestamp: v.Timestamp,
})
logSize += len(v.Data)
// avoid logs growing big and split apart if they do
if logSize > 1024*1024 {
ss.Vertexes = nil
ss.Statuses = nil
ss.Logs = ss.Logs[i+1:]
retry = true
break
}
}
if err := stream.SendMsg(&sr); err != nil {
return err
}
if !retry {
break
}
}
}
})
Expand Down
104 changes: 88 additions & 16 deletions util/progress/logs/logs.go
Expand Up @@ -2,15 +2,26 @@ package logs

import (
"context"
"fmt"
"io"
"math"
"os"
"strconv"
"sync"
"time"

"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"github.com/tonistiigi/units"
)

var defaultMaxLogSize = 1024 * 1024
var defaultMaxLogSpeed = 100 * 1024 // per second

var configCheckOnce sync.Once

func NewLogStreams(ctx context.Context, printOutput bool) (io.WriteCloser, io.WriteCloser) {
return newStreamWriter(ctx, 1, printOutput), newStreamWriter(ctx, 2, printOutput)
}
Expand All @@ -21,31 +32,92 @@ func newStreamWriter(ctx context.Context, stream int, printOutput bool) io.Write
pw: pw,
stream: stream,
printOutput: printOutput,
created: time.Now(),
}
}

type streamWriter struct {
pw progress.Writer
stream int
printOutput bool
pw progress.Writer
stream int
printOutput bool
created time.Time
size int
clipping bool
clipReasonSpeed bool
}

func (sw *streamWriter) Write(dt []byte) (int, error) {
sw.pw.Write(identity.NewID(), client.VertexLog{
Stream: sw.stream,
Data: append([]byte{}, dt...),
func (sw *streamWriter) checkLimit(n int) int {
configCheckOnce.Do(func() {
maxLogSize, err := strconv.ParseInt(os.Getenv("BUILDKIT_STEP_LOG_MAX_SIZE"), 10, 32)
AkihiroSuda marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
defaultMaxLogSize = int(maxLogSize)
}
maxLogSpeed, err := strconv.ParseInt(os.Getenv("BUILDKIT_STEP_LOG_MAX_SPEED"), 10, 32)
if err == nil {
defaultMaxLogSpeed = int(maxLogSpeed)
}
})
if sw.printOutput {
switch sw.stream {
case 1:
return os.Stdout.Write(dt)
case 2:
return os.Stderr.Write(dt)
default:
return 0, errors.Errorf("invalid stream %d", sw.stream)

oldSize := sw.size
sw.size += n

maxSize := -1
if defaultMaxLogSpeed != -1 {
maxSize = int(math.Ceil(time.Since(sw.created).Seconds())) * defaultMaxLogSpeed
sw.clipReasonSpeed = true
}
if maxSize > defaultMaxLogSize {
maxSize = defaultMaxLogSize
sw.clipReasonSpeed = false
}
if maxSize < oldSize {
return 0
}

if maxSize != -1 {
if sw.size > maxSize {
return maxSize - oldSize
}
}
return n
}

func (sw *streamWriter) clipLimitMessage() string {
if sw.clipReasonSpeed {
return fmt.Sprintf("%#g/s", units.Bytes(defaultMaxLogSpeed))
}
return fmt.Sprintf("%#g", units.Bytes(defaultMaxLogSize))
}

func (sw *streamWriter) Write(dt []byte) (int, error) {
oldSize := len(dt)
dt = append([]byte{}, dt[:sw.checkLimit(len(dt))]...)

if sw.clipping && oldSize == len(dt) {
sw.clipping = false
}
if !sw.clipping && oldSize != len(dt) {
dt = append(dt, []byte(fmt.Sprintf("\n[output clipped, log limit %s reached]\n", sw.clipLimitMessage()))...)
sw.clipping = true
}

if len(dt) != 0 {
sw.pw.Write(identity.NewID(), client.VertexLog{
Stream: sw.stream,
Data: dt,
})
if sw.printOutput {
switch sw.stream {
case 1:
return os.Stdout.Write(dt)
case 2:
return os.Stderr.Write(dt)
default:
return 0, errors.Errorf("invalid stream %d", sw.stream)
}
}
}
return len(dt), nil
return oldSize, nil
}

func (sw *streamWriter) Close() error {
Expand Down