From 77c25dd08dfab03bfb636704cca85d6a6cf71a03 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Mon, 21 Feb 2022 20:04:02 +0100 Subject: [PATCH] Add exponential backoff to profile writes and debuginfo upload Most of the other things are gofumpt formattings and a couple of improvements --- cmd/debug-info/main.go | 9 ++++--- cmd/parca-agent/main.go | 23 ++++++++--------- go.mod | 2 +- pkg/agent/write_client.go | 45 ++++++++++++++++++++++------------ pkg/agent/write_client_test.go | 5 ++-- pkg/debuginfo/debuginfo.go | 19 +++++++++++++- pkg/profiler/profiler.go | 2 +- pkg/target/manager.go | 7 +++--- pkg/target/profiler_pool.go | 7 +++--- 9 files changed, 76 insertions(+), 43 deletions(-) diff --git a/cmd/debug-info/main.go b/cmd/debug-info/main.go index 1717c77019..a13f970fbc 100644 --- a/cmd/debug-info/main.go +++ b/cmd/debug-info/main.go @@ -69,8 +69,8 @@ func main() { logger := logger.NewLogger(flags.LogLevel, logger.LogFormatLogfmt, "") var ( - g run.Group - dc debuginfo.Client = debuginfo.NewNoopClient() + g run.Group + debugInfoClient = debuginfo.NewNoopClient() ) if len(flags.Upload.StoreAddress) > 0 { @@ -82,10 +82,10 @@ func main() { } defer conn.Close() - dc = parcadebuginfo.NewDebugInfoClient(conn) + debugInfoClient = parcadebuginfo.NewDebugInfoClient(conn) } - die := debuginfo.NewExtractor(logger, dc, flags.TempDir) + die := debuginfo.NewExtractor(logger, debugInfoClient, flags.TempDir) ctx, cancel := context.WithCancel(context.Background()) switch kongCtx.Command() { @@ -171,6 +171,7 @@ func main() { g.Add(run.SignalHandler(ctx, os.Interrupt, os.Kill)) if err := g.Run(); err != nil { level.Error(logger).Log("err", err) + os.Exit(1) } level.Info(logger).Log("msg", "done!") } diff --git a/cmd/parca-agent/main.go b/cmd/parca-agent/main.go index 04560aedd9..d86c3ed19e 100644 --- a/cmd/parca-agent/main.go +++ b/cmd/parca-agent/main.go @@ -119,8 +119,8 @@ func main() { collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), ) - var wc profilestorepb.ProfileStoreServiceClient = agent.NewNoopProfileStoreClient() - var dc debuginfo.Client = debuginfo.NewNoopClient() + profileStoreClient := agent.NewNoopProfileStoreClient() + debugInfoClient := debuginfo.NewNoopClient() if len(flags.StoreAddress) > 0 { conn, err := grpcConn(reg, flags) @@ -130,16 +130,17 @@ func main() { } // Initialize actual clients with the connection. - wc = profilestorepb.NewProfileStoreServiceClient(conn) - dc = parcadebuginfo.NewDebugInfoClient(conn) + profileStoreClient = profilestorepb.NewProfileStoreServiceClient(conn) + debugInfoClient = parcadebuginfo.NewDebugInfoClient(conn) } ksymCache := ksym.NewKsymCache(logger) var ( - configs discovery.Configs - bwc = agent.NewBatchWriteClient(logger, wc) - listener = agent.NewProfileListener(logger, bwc) + configs discovery.Configs + // TODO(Sylfrena): Make ticker duration configurable + batchWriteClient = agent.NewBatchWriteClient(logger, profileStoreClient, 10*time.Second) + profileListener = agent.NewProfileListener(logger, batchWriteClient) ) if flags.Kubernetes { @@ -158,7 +159,7 @@ func main() { } externalLabels := getExternalLabels(flags.ExternalLabel, flags.Node) - tm := target.NewManager(logger, reg, externalLabels, ksymCache, listener, dc, flags.ProfilingDuration, flags.TempDir) + tm := target.NewManager(logger, reg, externalLabels, ksymCache, profileListener, debugInfoClient, flags.ProfilingDuration, flags.TempDir) mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) mux.HandleFunc("/debug/pprof/", pprof.Index) @@ -251,7 +252,7 @@ func main() { ctx, cancel := context.WithTimeout(ctx, time.Second*11) defer cancel() - profile, err := listener.NextMatchingProfile(ctx, matchers) + profile, err := profileListener.NextMatchingProfile(ctx, matchers) if profile == nil || err == context.Canceled { http.Error(w, "No profile taken in the last 11 seconds that matches the requested label-matchers query. Profiles are taken every 10 seconds so either the profiler matching the label-set has stopped profiling, or the label-set was incorrect.", http.StatusNotFound) return @@ -322,7 +323,7 @@ func main() { ctx, cancel := context.WithCancel(ctx) g.Add(func() error { level.Debug(logger).Log("msg", "starting batch write client") - return bwc.Run(ctx) + return batchWriteClient.Run(ctx) }, func(error) { cancel() }) @@ -424,7 +425,7 @@ func grpcConn(reg prometheus.Registerer, flags flags) (*grpc.ClientConn, error) return nil, fmt.Errorf("failed to read bearer token from file: %w", err) } opts = append(opts, grpc.WithPerRPCCredentials(&perRequestBearerToken{ - token: string(b), + token: strings.TrimSpace(string(b)), insecure: flags.Insecure, })) } diff --git a/go.mod b/go.mod index 4f1e94108b..0079b17f91 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.17 require ( github.com/alecthomas/kong v0.4.1 github.com/aquasecurity/libbpfgo v0.2.4-libbpf-0.6.1 + github.com/cenkalti/backoff/v4 v4.1.2 github.com/cespare/xxhash/v2 v2.1.2 github.com/containerd/cgroups v1.0.3 github.com/containerd/containerd v1.6.0 @@ -46,7 +47,6 @@ require ( github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible // indirect github.com/baidubce/bce-sdk-go v0.9.81 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/containerd/console v1.0.3 // indirect github.com/containerd/go-runc v1.0.0 // indirect diff --git a/pkg/agent/write_client.go b/pkg/agent/write_client.go index 7216ab3874..db54e842cf 100644 --- a/pkg/agent/write_client.go +++ b/pkg/agent/write_client.go @@ -18,6 +18,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff/v4" "github.com/go-kit/log" "github.com/go-kit/log/level" profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1" @@ -25,8 +26,9 @@ import ( ) type Batcher struct { - logger log.Logger - writeClient profilestorepb.ProfileStoreServiceClient + logger log.Logger + writeClient profilestorepb.ProfileStoreServiceClient + writeInterval time.Duration mtx *sync.RWMutex series []*profilestorepb.RawProfileSeries @@ -35,10 +37,11 @@ type Batcher struct { lastBatchSendError error } -func NewBatchWriteClient(logger log.Logger, wc profilestorepb.ProfileStoreServiceClient) *Batcher { +func NewBatchWriteClient(logger log.Logger, wc profilestorepb.ProfileStoreServiceClient, writeInterval time.Duration) *Batcher { return &Batcher{ - logger: logger, - writeClient: wc, + logger: logger, + writeClient: wc, + writeInterval: writeInterval, series: []*profilestorepb.RawProfileSeries{}, mtx: &sync.RWMutex{}, @@ -54,10 +57,7 @@ func (b *Batcher) loopReport(lastBatchSentAt time.Time, lastBatchSendError error } func (b *Batcher) Run(ctx context.Context) error { - // TODO(Sylfrena): Make ticker duration configurable - const tickerDuration = 10 * time.Second - - ticker := time.NewTicker(tickerDuration) + ticker := time.NewTicker(b.writeInterval) defer ticker.Stop() for { @@ -77,15 +77,30 @@ func (b *Batcher) batchLoop(ctx context.Context) error { b.series = []*profilestorepb.RawProfileSeries{} b.mtx.Unlock() - if _, err := b.writeClient.WriteRaw( - ctx, - &profilestorepb.WriteRawRequest{Series: batch}, - ); err != nil { - level.Error(b.logger).Log("msg", "Write client failed to send profiles", "err", err) + expbackOff := backoff.NewExponentialBackOff() + expbackOff.MaxElapsedTime = b.writeInterval // TODO: Subtract ~10% of interval to account for overhead in loop + expbackOff.InitialInterval = 500 * time.Millisecond // Let's not retry to aggressively to start with. + + err := backoff.Retry(func() error { + _, err := b.writeClient.WriteRaw(ctx, &profilestorepb.WriteRawRequest{Series: batch}) + // Only log error if retrying, otherwise it will be logged outside the retry + if err != nil && expbackOff.NextBackOff().Nanoseconds() > 0 { + level.Debug(b.logger).Log( + "msg", "batch write client failed to send profiles", + "retry", expbackOff.NextBackOff(), + "count", len(batch), + "err", err, + ) + } + return err + }, expbackOff) + if err != nil { + // TODO: Add metric and increase with every backoff iteration. + level.Error(b.logger).Log("msg", "batch write client failed to send profiles", "count", len(batch), "err", err) return err } - level.Debug(b.logger).Log("msg", "Write client has sent profiles", "count", len(batch)) + level.Debug(b.logger).Log("msg", "batch write client has sent profiles", "count", len(batch)) return nil } diff --git a/pkg/agent/write_client_test.go b/pkg/agent/write_client_test.go index fac64628d5..f0c0245e00 100644 --- a/pkg/agent/write_client_test.go +++ b/pkg/agent/write_client_test.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "testing" + "time" "github.com/go-kit/log" profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1" @@ -53,7 +54,7 @@ func compareProfileSeries(a, b []*profilestorepb.RawProfileSeries) bool { func TestWriteClient(t *testing.T) { wc := NewNoopProfileStoreClient() - batcher := NewBatchWriteClient(log.NewNopLogger(), wc) + batcher := NewBatchWriteClient(log.NewNopLogger(), wc, time.Second) labelset1 := profilestorepb.LabelSet{ Labels: []*profilestorepb.Label{{ @@ -70,7 +71,7 @@ func TestWriteClient(t *testing.T) { ctx := context.Background() - samples1 := []*profilestorepb.RawSample{{RawProfile: []byte{11, 0o4, 96}}} + samples1 := []*profilestorepb.RawSample{{RawProfile: []byte{11, 4, 96}}} samples2 := []*profilestorepb.RawSample{{RawProfile: []byte{15, 11, 95}}} t.Run("insertFirstProfile", func(t *testing.T) { diff --git a/pkg/debuginfo/debuginfo.go b/pkg/debuginfo/debuginfo.go index 20ac7c0d14..5d49506752 100644 --- a/pkg/debuginfo/debuginfo.go +++ b/pkg/debuginfo/debuginfo.go @@ -27,7 +27,9 @@ import ( "path/filepath" "strings" "sync" + "time" + "github.com/cenkalti/backoff/v4" "github.com/containerd/containerd/sys/reaper" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -359,7 +361,22 @@ func (di *Extractor) uploadDebugInfo(ctx context.Context, buildID, file string) return fmt.Errorf("failed to open temp file for debug information: %w", err) } - if _, err := di.Client.Upload(ctx, buildID, f); err != nil { + expBackOff := backoff.NewExponentialBackOff() + expBackOff.InitialInterval = time.Second + expBackOff.MaxElapsedTime = time.Minute + + err = backoff.Retry(func() error { + _, err := di.Client.Upload(ctx, buildID, f) + if err != nil { + di.logger.Log( + "msg", "failed to upload debug information", + "retry", time.Second, + "err", err, + ) + } + return err + }, expBackOff) + if err != nil { return fmt.Errorf("failed to upload debug information: %w", err) } diff --git a/pkg/profiler/profiler.go b/pkg/profiler/profiler.go index 3753930755..289940e278 100644 --- a/pkg/profiler/profiler.go +++ b/pkg/profiler/profiler.go @@ -33,13 +33,13 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/pprof/profile" - "github.com/parca-dev/parca-agent/pkg/agent" profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "golang.org/x/sys/unix" + "github.com/parca-dev/parca-agent/pkg/agent" "github.com/parca-dev/parca-agent/pkg/byteorder" "github.com/parca-dev/parca-agent/pkg/debuginfo" "github.com/parca-dev/parca-agent/pkg/ksym" diff --git a/pkg/target/manager.go b/pkg/target/manager.go index 89a2c615ea..fa09366bf9 100644 --- a/pkg/target/manager.go +++ b/pkg/target/manager.go @@ -49,8 +49,9 @@ func NewManager( writeClient profilestorepb.ProfileStoreServiceClient, debugInfoClient debuginfo.Client, profilingDuration time.Duration, - tmp string) *Manager { - m := &Manager{ + tmp string, +) *Manager { + return &Manager{ mtx: &sync.RWMutex{}, profilerPools: map[string]*ProfilerPool{}, logger: logger, @@ -62,8 +63,6 @@ func NewManager( profilingDuration: profilingDuration, tmp: tmp, } - - return m } func (m *Manager) Run(ctx context.Context, update <-chan map[string][]*Group) error { diff --git a/pkg/target/profiler_pool.go b/pkg/target/profiler_pool.go index cf04f57e3e..8b9ddd404c 100644 --- a/pkg/target/profiler_pool.go +++ b/pkg/target/profiler_pool.go @@ -66,8 +66,9 @@ func NewProfilerPool( writeClient profilestorepb.ProfileStoreServiceClient, debugInfoClient debuginfo.Client, profilingDuration time.Duration, - tmp string) *ProfilerPool { - pp := &ProfilerPool{ + tmp string, +) *ProfilerPool { + return &ProfilerPool{ ctx: ctx, mtx: &sync.RWMutex{}, activeTargets: map[uint64]*Target{}, @@ -81,8 +82,6 @@ func NewProfilerPool( profilingDuration: profilingDuration, tmp: tmp, } - - return pp } func (pp *ProfilerPool) Profilers() []Profiler {