Skip to content

Commit

Permalink
Add exponential backoff to profile writes and debuginfo upload
Browse files Browse the repository at this point in the history
Most of the other things are gofumpt formattings and a couple of improvements
  • Loading branch information
metalmatze committed Feb 23, 2022
1 parent 5128f50 commit 77c25dd
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 43 deletions.
9 changes: 5 additions & 4 deletions cmd/debug-info/main.go
Expand Up @@ -69,8 +69,8 @@ func main() {
logger := logger.NewLogger(flags.LogLevel, logger.LogFormatLogfmt, "")

var (
g run.Group
dc debuginfo.Client = debuginfo.NewNoopClient()
g run.Group
debugInfoClient = debuginfo.NewNoopClient()
)

if len(flags.Upload.StoreAddress) > 0 {
Expand All @@ -82,10 +82,10 @@ func main() {
}
defer conn.Close()

dc = parcadebuginfo.NewDebugInfoClient(conn)
debugInfoClient = parcadebuginfo.NewDebugInfoClient(conn)
}

die := debuginfo.NewExtractor(logger, dc, flags.TempDir)
die := debuginfo.NewExtractor(logger, debugInfoClient, flags.TempDir)

ctx, cancel := context.WithCancel(context.Background())
switch kongCtx.Command() {
Expand Down Expand Up @@ -171,6 +171,7 @@ func main() {
g.Add(run.SignalHandler(ctx, os.Interrupt, os.Kill))
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
level.Info(logger).Log("msg", "done!")
}
Expand Down
23 changes: 12 additions & 11 deletions cmd/parca-agent/main.go
Expand Up @@ -119,8 +119,8 @@ func main() {
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)

var wc profilestorepb.ProfileStoreServiceClient = agent.NewNoopProfileStoreClient()
var dc debuginfo.Client = debuginfo.NewNoopClient()
profileStoreClient := agent.NewNoopProfileStoreClient()
debugInfoClient := debuginfo.NewNoopClient()

if len(flags.StoreAddress) > 0 {
conn, err := grpcConn(reg, flags)
Expand All @@ -130,16 +130,17 @@ func main() {
}

// Initialize actual clients with the connection.
wc = profilestorepb.NewProfileStoreServiceClient(conn)
dc = parcadebuginfo.NewDebugInfoClient(conn)
profileStoreClient = profilestorepb.NewProfileStoreServiceClient(conn)
debugInfoClient = parcadebuginfo.NewDebugInfoClient(conn)
}

ksymCache := ksym.NewKsymCache(logger)

var (
configs discovery.Configs
bwc = agent.NewBatchWriteClient(logger, wc)
listener = agent.NewProfileListener(logger, bwc)
configs discovery.Configs
// TODO(Sylfrena): Make ticker duration configurable
batchWriteClient = agent.NewBatchWriteClient(logger, profileStoreClient, 10*time.Second)
profileListener = agent.NewProfileListener(logger, batchWriteClient)
)

if flags.Kubernetes {
Expand All @@ -158,7 +159,7 @@ func main() {
}

externalLabels := getExternalLabels(flags.ExternalLabel, flags.Node)
tm := target.NewManager(logger, reg, externalLabels, ksymCache, listener, dc, flags.ProfilingDuration, flags.TempDir)
tm := target.NewManager(logger, reg, externalLabels, ksymCache, profileListener, debugInfoClient, flags.ProfilingDuration, flags.TempDir)

mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
mux.HandleFunc("/debug/pprof/", pprof.Index)
Expand Down Expand Up @@ -251,7 +252,7 @@ func main() {
ctx, cancel := context.WithTimeout(ctx, time.Second*11)
defer cancel()

profile, err := listener.NextMatchingProfile(ctx, matchers)
profile, err := profileListener.NextMatchingProfile(ctx, matchers)
if profile == nil || err == context.Canceled {
http.Error(w, "No profile taken in the last 11 seconds that matches the requested label-matchers query. Profiles are taken every 10 seconds so either the profiler matching the label-set has stopped profiling, or the label-set was incorrect.", http.StatusNotFound)
return
Expand Down Expand Up @@ -322,7 +323,7 @@ func main() {
ctx, cancel := context.WithCancel(ctx)
g.Add(func() error {
level.Debug(logger).Log("msg", "starting batch write client")
return bwc.Run(ctx)
return batchWriteClient.Run(ctx)
}, func(error) {
cancel()
})
Expand Down Expand Up @@ -424,7 +425,7 @@ func grpcConn(reg prometheus.Registerer, flags flags) (*grpc.ClientConn, error)
return nil, fmt.Errorf("failed to read bearer token from file: %w", err)
}
opts = append(opts, grpc.WithPerRPCCredentials(&perRequestBearerToken{
token: string(b),
token: strings.TrimSpace(string(b)),
insecure: flags.Insecure,
}))
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/alecthomas/kong v0.4.1
github.com/aquasecurity/libbpfgo v0.2.4-libbpf-0.6.1
github.com/cenkalti/backoff/v4 v4.1.2
github.com/cespare/xxhash/v2 v2.1.2
github.com/containerd/cgroups v1.0.3
github.com/containerd/containerd v1.6.0
Expand Down Expand Up @@ -46,7 +47,6 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible // indirect
github.com/baidubce/bce-sdk-go v0.9.81 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/containerd/console v1.0.3 // indirect
github.com/containerd/go-runc v1.0.0 // indirect
Expand Down
45 changes: 30 additions & 15 deletions pkg/agent/write_client.go
Expand Up @@ -18,15 +18,17 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"google.golang.org/grpc"
)

type Batcher struct {
logger log.Logger
writeClient profilestorepb.ProfileStoreServiceClient
logger log.Logger
writeClient profilestorepb.ProfileStoreServiceClient
writeInterval time.Duration

mtx *sync.RWMutex
series []*profilestorepb.RawProfileSeries
Expand All @@ -35,10 +37,11 @@ type Batcher struct {
lastBatchSendError error
}

func NewBatchWriteClient(logger log.Logger, wc profilestorepb.ProfileStoreServiceClient) *Batcher {
func NewBatchWriteClient(logger log.Logger, wc profilestorepb.ProfileStoreServiceClient, writeInterval time.Duration) *Batcher {
return &Batcher{
logger: logger,
writeClient: wc,
logger: logger,
writeClient: wc,
writeInterval: writeInterval,

series: []*profilestorepb.RawProfileSeries{},
mtx: &sync.RWMutex{},
Expand All @@ -54,10 +57,7 @@ func (b *Batcher) loopReport(lastBatchSentAt time.Time, lastBatchSendError error
}

func (b *Batcher) Run(ctx context.Context) error {
// TODO(Sylfrena): Make ticker duration configurable
const tickerDuration = 10 * time.Second

ticker := time.NewTicker(tickerDuration)
ticker := time.NewTicker(b.writeInterval)
defer ticker.Stop()

for {
Expand All @@ -77,15 +77,30 @@ func (b *Batcher) batchLoop(ctx context.Context) error {
b.series = []*profilestorepb.RawProfileSeries{}
b.mtx.Unlock()

if _, err := b.writeClient.WriteRaw(
ctx,
&profilestorepb.WriteRawRequest{Series: batch},
); err != nil {
level.Error(b.logger).Log("msg", "Write client failed to send profiles", "err", err)
expbackOff := backoff.NewExponentialBackOff()
expbackOff.MaxElapsedTime = b.writeInterval // TODO: Subtract ~10% of interval to account for overhead in loop
expbackOff.InitialInterval = 500 * time.Millisecond // Let's not retry to aggressively to start with.

err := backoff.Retry(func() error {
_, err := b.writeClient.WriteRaw(ctx, &profilestorepb.WriteRawRequest{Series: batch})
// Only log error if retrying, otherwise it will be logged outside the retry
if err != nil && expbackOff.NextBackOff().Nanoseconds() > 0 {
level.Debug(b.logger).Log(
"msg", "batch write client failed to send profiles",
"retry", expbackOff.NextBackOff(),
"count", len(batch),
"err", err,
)
}
return err
}, expbackOff)
if err != nil {
// TODO: Add metric and increase with every backoff iteration.
level.Error(b.logger).Log("msg", "batch write client failed to send profiles", "count", len(batch), "err", err)
return err
}

level.Debug(b.logger).Log("msg", "Write client has sent profiles", "count", len(batch))
level.Debug(b.logger).Log("msg", "batch write client has sent profiles", "count", len(batch))
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/write_client_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"testing"
"time"

"github.com/go-kit/log"
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
Expand Down Expand Up @@ -53,7 +54,7 @@ func compareProfileSeries(a, b []*profilestorepb.RawProfileSeries) bool {

func TestWriteClient(t *testing.T) {
wc := NewNoopProfileStoreClient()
batcher := NewBatchWriteClient(log.NewNopLogger(), wc)
batcher := NewBatchWriteClient(log.NewNopLogger(), wc, time.Second)

labelset1 := profilestorepb.LabelSet{
Labels: []*profilestorepb.Label{{
Expand All @@ -70,7 +71,7 @@ func TestWriteClient(t *testing.T) {

ctx := context.Background()

samples1 := []*profilestorepb.RawSample{{RawProfile: []byte{11, 0o4, 96}}}
samples1 := []*profilestorepb.RawSample{{RawProfile: []byte{11, 4, 96}}}
samples2 := []*profilestorepb.RawSample{{RawProfile: []byte{15, 11, 95}}}

t.Run("insertFirstProfile", func(t *testing.T) {
Expand Down
19 changes: 18 additions & 1 deletion pkg/debuginfo/debuginfo.go
Expand Up @@ -27,7 +27,9 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/containerd/containerd/sys/reaper"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -359,7 +361,22 @@ func (di *Extractor) uploadDebugInfo(ctx context.Context, buildID, file string)
return fmt.Errorf("failed to open temp file for debug information: %w", err)
}

if _, err := di.Client.Upload(ctx, buildID, f); err != nil {
expBackOff := backoff.NewExponentialBackOff()
expBackOff.InitialInterval = time.Second
expBackOff.MaxElapsedTime = time.Minute

err = backoff.Retry(func() error {
_, err := di.Client.Upload(ctx, buildID, f)
if err != nil {
di.logger.Log(
"msg", "failed to upload debug information",
"retry", time.Second,
"err", err,
)
}
return err
}, expBackOff)
if err != nil {
return fmt.Errorf("failed to upload debug information: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/profiler/profiler.go
Expand Up @@ -33,13 +33,13 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/pprof/profile"
"github.com/parca-dev/parca-agent/pkg/agent"
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"golang.org/x/sys/unix"

"github.com/parca-dev/parca-agent/pkg/agent"
"github.com/parca-dev/parca-agent/pkg/byteorder"
"github.com/parca-dev/parca-agent/pkg/debuginfo"
"github.com/parca-dev/parca-agent/pkg/ksym"
Expand Down
7 changes: 3 additions & 4 deletions pkg/target/manager.go
Expand Up @@ -49,8 +49,9 @@ func NewManager(
writeClient profilestorepb.ProfileStoreServiceClient,
debugInfoClient debuginfo.Client,
profilingDuration time.Duration,
tmp string) *Manager {
m := &Manager{
tmp string,
) *Manager {
return &Manager{
mtx: &sync.RWMutex{},
profilerPools: map[string]*ProfilerPool{},
logger: logger,
Expand All @@ -62,8 +63,6 @@ func NewManager(
profilingDuration: profilingDuration,
tmp: tmp,
}

return m
}

func (m *Manager) Run(ctx context.Context, update <-chan map[string][]*Group) error {
Expand Down
7 changes: 3 additions & 4 deletions pkg/target/profiler_pool.go
Expand Up @@ -66,8 +66,9 @@ func NewProfilerPool(
writeClient profilestorepb.ProfileStoreServiceClient,
debugInfoClient debuginfo.Client,
profilingDuration time.Duration,
tmp string) *ProfilerPool {
pp := &ProfilerPool{
tmp string,
) *ProfilerPool {
return &ProfilerPool{
ctx: ctx,
mtx: &sync.RWMutex{},
activeTargets: map[uint64]*Target{},
Expand All @@ -81,8 +82,6 @@ func NewProfilerPool(
profilingDuration: profilingDuration,
tmp: tmp,
}

return pp
}

func (pp *ProfilerPool) Profilers() []Profiler {
Expand Down

0 comments on commit 77c25dd

Please sign in to comment.