Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): c3: PPROF extensions #541

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 277 additions & 5 deletions internal/pproflogging/pproflogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import (
"fmt"
"io"
"os"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"

"github.com/kopia/kopia/repo/logging"
)
Expand All @@ -23,6 +27,8 @@ type ProfileName string

const (
pair = 2
// PPROFDumpTimeout Set a time upper bound for dumps.
PPROFDumpTimeout = 15 * time.Second
)

const (
Expand All @@ -40,8 +46,21 @@ const (

// flags used to configure profiling in EnvVarKopiaDebugPprof.
const (
// KopiaDebugFlagForceGc force garbage collection before dumping heap data.
KopiaDebugFlagForceGc = "forcegc"
// KopiaDebugFlagDebug value of the profiles `debug` parameter.
KopiaDebugFlagDebug = "debug"
// KopiaDebugFlagRate rate setting for the named profile (if available). always an integer.
KopiaDebugFlagRate = "rate"
)

const (
// ProfileNameBlock block profile key.
ProfileNameBlock ProfileName = "block"
// ProfileNameMutex mutex profile key.
ProfileNameMutex = "mutex"
// ProfileNameCPU cpu profile key.
ProfileNameCPU = "cpu"
)

var (
Expand All @@ -67,6 +86,23 @@ type ProfileConfigs struct {
pcm map[ProfileName]*ProfileConfig
}

type pprofSetRate struct {
setter func(int)
defaultValue int
}

//nolint:gochecknoglobals
var pprofProfileRates = map[ProfileName]pprofSetRate{
ProfileNameBlock: {
setter: func(x int) { runtime.SetBlockProfileRate(x) },
defaultValue: DefaultDebugProfileRate,
},
ProfileNameMutex: {
setter: func(x int) { runtime.SetMutexProfileFraction(x) },
defaultValue: DefaultDebugProfileRate,
},
}

// HasProfileBuffersEnabled return true if pprof profiling is enabled.
func HasProfileBuffersEnabled() bool {
pprofConfigs.mu.Lock()
Expand All @@ -75,6 +111,43 @@ func HasProfileBuffersEnabled() bool {
return len(pprofConfigs.pcm) != 0
}

// MaybeStartProfileBuffers start profile buffers for this process.
func MaybeStartProfileBuffers(ctx context.Context) {
config := os.Getenv(EnvVarKopiaDebugPprof)

pprofConfigs.mu.Lock()
defer pprofConfigs.mu.Unlock()

if !loadAndSetProfileBuffersLocked(ctx, config) {
log(ctx).Debug("no profile buffer configuration to start")
return
}

pprofConfigs.StartProfileBuffersLocked(ctx)
}

// MaybeStopProfileBuffers stop and dump the contents of the buffers to the log as PEMs. Buffers
// supplied here are from MaybeStartProfileBuffers.
func MaybeStopProfileBuffers(ctx context.Context) {
pprofConfigs.mu.Lock()
defer pprofConfigs.mu.Unlock()

pprofConfigs.StopProfileBuffersLocked(ctx)
}

func loadAndSetProfileBuffersLocked(ctx context.Context, config string) bool {
pcm, err := LoadProfileConfig(ctx, config)
if err != nil {
log(ctx).With("error", err).Debug("cannot start configured profile buffers")
return false
}

// allow profile dumps to be cleared
pprofConfigs.pcm = pcm

return len(pprofConfigs.pcm) != 0
}

func newProfileConfigs(wrt Writer) *ProfileConfigs {
q := &ProfileConfigs{
wrt: wrt,
Expand Down Expand Up @@ -121,6 +194,120 @@ func LoadProfileConfig(ctx context.Context, ppconfigss string) (map[ProfileName]
return parseProfileConfigs(bufSizeB, ppconfigss)
}

// StartProfileBuffersLocked start profile buffers for enabled profiles/trace. Buffers
// are returned in a slice of buffers: CPU, Heap and trace respectively. class
// is used to distinguish profiles external to kopia.
func (p *ProfileConfigs) StartProfileBuffersLocked(ctx context.Context) {
// profiling rates need to be set before starting profiling
setupProfileFractions(ctx, p.pcm)

// cpu has special initialization
v, ok := p.pcm[ProfileNameCPU]
if !ok {
return
}

err := pprof.StartCPUProfile(v.buf)
if err != nil {
delete(p.pcm, ProfileNameCPU)
log(ctx).With("cause", err).Warn("cannot start cpu PPROF")
}
}

// StopProfileBuffersLocked stop and dump the contents of the buffers to the log as PEMs. Buffers
// supplied here are from MaybeStartProfileBuffers.
func (p *ProfileConfigs) StopProfileBuffersLocked(ctx context.Context) {
defer func() {
// clear the profile rates and fractions to effectively stop profiling
clearProfileFractions(p.pcm)
p.pcm = map[ProfileName]*ProfileConfig{}
}()

log(ctx).Debugf("saving %d PEM buffers for output", len(p.pcm))

// cpu and heap profiles requires special handling
for nm, v := range p.pcm {
if v == nil {
// silently ignore empty profiles
continue
}

_, ok := v.GetValue(KopiaDebugFlagForceGc)
if ok {
log(ctx).Debug("performing GC before PPROF dump ...")
runtime.GC()
}

// stop CPU profile after GC
if nm == ProfileNameCPU {
log(ctx).Debug("stopping CPU profile")

pprof.StopCPUProfile()

continue
}

// look up the profile. must not be nil
pent := pprof.Lookup(string(nm))
if pent == nil {
log(ctx).Warnf("no system PPROF entry for %q", nm)
delete(p.pcm, nm)

continue
}

// parse the debug number if supplied
debug, err := parseDebugNumber(v)
if err != nil {
log(ctx).With("cause", err).Warnf("%q: invalid PPROF configuration debug number", nm)
continue
}

// write the profile data to the buffer associated with the profile
err = pent.WriteTo(v.buf, debug)
if err != nil {
log(ctx).With("cause", err).Warnf("%q: error writing PPROF buffer", nm)
continue
}

// check context for break
if ctx.Err() != nil {
log(ctx).With("cause", err).Warnf("%q: error writing PPROF buffer", nm)
break
}
}

ctx, canfn := context.WithTimeout(ctx, PPROFDumpTimeout)
defer canfn()

// dump the profiles out into their respective PEMs
for k, v := range p.pcm {
if v == nil {
// no profile config, loop
continue
}

// PEM headings always in upper case
unm := strings.ToUpper(string(k))

log(ctx).Debugf("dumping PEM for %q", unm)

err := DumpPem(ctx, v.buf.Bytes(), unm, p.wrt)
if err != nil {
log(ctx).With("cause", err).Errorf("%q: cannot write PEM", unm)
return
}

// process context
err = ctx.Err()
if err != nil {
// ctx context may be bad, so use context.Background for safety
log(ctx).With("cause", err).Warnf("%q: cannot write PEM", unm)
return
}
}
}

// ProfileConfig configuration flags for a profile.
type ProfileConfig struct {
flags []string
Expand Down Expand Up @@ -192,6 +379,58 @@ func newProfileConfig(bufSizeB int, ppconfig string) *ProfileConfig {
return q
}

// setupProfileFractions somewhat complex setup for profile buffers. The intent
// is to implement a generic method for setting up _any_ pprofule. This is done
// in anticipation of using different or custom profiles.
func setupProfileFractions(ctx context.Context, profileBuffers map[ProfileName]*ProfileConfig) {
for k, pprofset := range pprofProfileRates {
v, ok := profileBuffers[k]
if !ok {
// profile not configured - leave it alone
continue
}

if v == nil {
// profile configured, but no rate - set to default
pprofset.setter(pprofset.defaultValue)
continue
}

s, _ := v.GetValue(KopiaDebugFlagRate)
if s == "" {
// flag without an argument - set to default
pprofset.setter(pprofset.defaultValue)
continue
}

n1, err := strconv.Atoi(s)
if err != nil {
log(ctx).With("cause", err).Warnf("invalid PPROF rate, %q, for %s: %v", s, k)
continue
}

log(ctx).Debugf("setting PPROF rate, %d, for %s", n1, k)
pprofset.setter(n1)
}
}

// clearProfileFractions set the profile fractions to their zero values.
func clearProfileFractions(profileBuffers map[ProfileName]*ProfileConfig) {
for k, pprofset := range pprofProfileRates {
v := profileBuffers[k]
if v == nil { // fold missing values and empty values
continue
}

_, ok := v.GetValue(KopiaDebugFlagRate)
if !ok { // only care if a value might have been set before
continue
}

pprofset.setter(0)
}
}

// DumpPem dump a PEM version of the byte slice, bs, into writer, wrt.
// DumpPem performs the PEM dump in two goroutines: the first (the main
// execution thread) performs the output to the console (see notes below).
Expand All @@ -202,6 +441,10 @@ func newProfileConfig(bufSizeB int, ppconfig string) *ProfileConfig {
// The implementation could be single threaded (unrolling the io.Pipe into
// a single thread), but the utilization of the bufio buffer then becomes
// convoluted (or a custom temporary buffer needs to be allocated).
//
// (linter includes comments ... so this function has too many comments :/ )
//
//nolint:funlen
func DumpPem(ctx context.Context, bs []byte, types string, wrt Writer) error {
blk := &pem.Block{
Type: types,
Expand All @@ -212,15 +455,16 @@ func DumpPem(ctx context.Context, bs []byte, types string, wrt Writer) error {
// stalls in the output path.
pr, pw := io.Pipe()

// ensure that the reader is closed on exit
// ensure that the pipe reader is closed on exit
defer func() {
err := pr.Close()
if err != nil {
log(ctx).Errorf("error closing PEM buffer: %w", err)
}
}()

// start a writer in the background. This writes encoded PEMs while there's data.
// start a pipe writer in the background. This writes encoded PEMs to the pipe
// writer while there's data..
go func() {
// do the encoding
err0 := pem.Encode(pw, blk)
Expand Down Expand Up @@ -258,21 +502,31 @@ func DumpPem(ctx context.Context, bs []byte, types string, wrt Writer) error {
//
for err1 == nil && err2 == nil && err3 == nil {
var ln []byte
ln, err1 = rdr.ReadBytes('\n')
// ReadBytes may hang and ignore context timout
// err1 can return ln and non-nil err1, so always call write
ln, err1 = rdr.ReadBytes('\n')
// Write may hang and ignore context timout
// write line to output
_, err2 = wrt.Write(ln)
// check context timeout between the two. If a stall occurs on
// any of the IO operations above, it will rely on the OS/golang
// library timeout to return
// (golang really needs IO functions with better support for
// timeouts)
err3 = ctx.Err()
}

// be nice, tell everyone of any problems.

// got a context error. this has precedent
if err3 != nil {
// got a context error. this has precedent
// err2 and err1 may be non-nil but err3 is the "cause"
// so report that.
return fmt.Errorf("could not write PEM: %w", err3)
}

// got a write error.
if err2 != nil {
// got a write error.
return fmt.Errorf("could not write PEM: %w", err2)
}

Expand All @@ -294,3 +548,21 @@ func DumpPem(ctx context.Context, bs []byte, types string, wrt Writer) error {

return fmt.Errorf("error reading bytes: %w", err1)
}

// parseDebugNumber debug numbers are supported by Go PPROF logging
// the interpretation of debug numbers is left up to the profile
// implementation. This library is permissive on the debug number
// values, so it makes no attempt to determine what's valid.
func parseDebugNumber(v *ProfileConfig) (int, error) {
debugs, ok := v.GetValue(KopiaDebugFlagDebug)
if !ok {
return 0, nil
}

debug, err := strconv.Atoi(debugs)
if err != nil {
return 0, fmt.Errorf("could not parse number %q: %w", debugs, err)
}

return debug, nil
}
Loading