Skip to content

Commit

Permalink
feat: support importing adhoc profiles in pprof or collapsed formats (#…
Browse files Browse the repository at this point in the history
…649)

* chore: move sample type config from server to tree.

In the tree package it can be more easily reused wherever it's needed.

* feat: support importing adhoc profiles in pprof or collapsed formats

* apply linter suggestions.

* Avoid an extra copy encoding the profile directly to the response.
  • Loading branch information
abeaumont committed Jan 3, 2022
1 parent 2fb0fff commit 14ee845
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 75 deletions.
81 changes: 81 additions & 0 deletions pkg/adhoc/server/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package server

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

"github.com/pyroscope-io/pyroscope/pkg/agent/spy"
"github.com/pyroscope-io/pyroscope/pkg/convert"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/storage/tree"
"github.com/pyroscope-io/pyroscope/pkg/structs/flamebearer"
)

func jsonToProfile(b []byte, _ string, _ int) (*flamebearer.FlamebearerProfile, error) {
var profile flamebearer.FlamebearerProfile
if err := json.Unmarshal(b, &profile); err != nil {
return nil, fmt.Errorf("unable to unmarshall JSON: %w", err)
}
return &profile, nil
}

func pprofToProfile(b []byte, name string, maxNodes int) (*flamebearer.FlamebearerProfile, error) {
p, err := convert.ParsePprof(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("parsing pprof: %w", err)
}
// TODO(abeaumont): Support multiple sample types
for _, stype := range p.SampleTypes() {
sampleRate := uint32(100)
units := "samples"
if c, ok := tree.DefaultSampleTypeMapping[stype]; ok {
units = c.Units
if c.Sampled && p.Period > 0 {
sampleRate = uint32(time.Second / time.Duration(p.Period))
}
}
t := tree.New()
p.Get(stype, func(_labels *spy.Labels, name []byte, val int) {
t.Insert(name, uint64(val))
})

out := &storage.GetOutput{
Tree: t,
Units: units,
SpyName: name,
SampleRate: sampleRate,
}
profile := flamebearer.NewProfile(out, maxNodes)
return &profile, nil
}
return nil, errors.New("no supported sample type found")
}

func collapsedToProfile(b []byte, name string, maxNodes int) (*flamebearer.FlamebearerProfile, error) {
t := tree.New()
for _, line := range bytes.Split(b, []byte("\n")) {
if len(line) == 0 {
continue
}
i := bytes.LastIndexByte(line, ' ')
if i < 0 {
return nil, errors.New("unable to find stacktrace and value separator")
}
value, err := strconv.ParseUint(string(line[i+1:]), 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse sample value: %w", err)
}
t.Insert(line[:i], value)
}
out := &storage.GetOutput{
Tree: t,
SpyName: name,
SampleRate: 100, // We don't have this information, use the default
}
profile := flamebearer.NewProfile(out, maxNodes)
return &profile, nil
}
71 changes: 44 additions & 27 deletions pkg/adhoc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type profile struct {

type server struct {
log logrus.FieldLogger
maxNodes int
enabled bool
profiles map[string]profile
mtx sync.RWMutex
Expand All @@ -36,8 +37,13 @@ type Server interface {
AddRoutes(router *mux.Router) http.HandlerFunc
}

func New(log logrus.FieldLogger, enabled bool) Server {
return &server{log: log, enabled: enabled, profiles: make(map[string]profile)}
func New(log logrus.FieldLogger, maxNodes int, enabled bool) Server {
return &server{
log: log,
maxNodes: maxNodes,
enabled: enabled,
profiles: make(map[string]profile),
}
}

func (s *server) AddRoutes(r *mux.Router) http.HandlerFunc {
Expand Down Expand Up @@ -96,7 +102,6 @@ func (s *server) Profiles(w http.ResponseWriter, _ *http.Request) {
}

// Profile retrieves a local file identified by its ID.
// TODO(abeaumont): Support other formats, only native JSON is supported for now.
func (s *server) Profile(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
s.mtx.RLock()
Expand All @@ -107,35 +112,15 @@ func (s *server) Profile(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
dataDir, err := util.EnsureDataDirectory()
if err != nil {
s.log.WithError(err).Errorf("Unable to create data directory")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
f, err := os.Open(filepath.Join(dataDir, p.Name))
fb, err := s.convert(p)
if err != nil {
s.log.WithError(err).Error("Unable to open profile")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer f.Close()
// Validate the format
b, err := io.ReadAll(f)
if err != nil {
s.log.WithError(err).Error("Unable to read profile")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var fb flamebearer.FlamebearerProfile
if err := json.Unmarshal(b, &fb); err != nil {
s.log.WithError(err).Error("Invalid file format")
s.log.WithError(err).Error("Unable to process profile")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(b); err != nil {
s.log.WithError(err).Error("Error sending profile")
if err := json.NewEncoder(w).Encode(*fb); err != nil {
s.log.WithError(err).Error("Unable to marshal profile")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -144,3 +129,35 @@ func (s *server) Profile(w http.ResponseWriter, r *http.Request) {
// TODO(abeaumont)
func (*server) Diff(_ http.ResponseWriter, _ *http.Request) {
}

type converterFn func(b []byte, name string, maxNodes int) (*flamebearer.FlamebearerProfile, error)

func (s *server) convert(p profile) (*flamebearer.FlamebearerProfile, error) {
dataDir, err := util.EnsureDataDirectory()
if err != nil {
return nil, fmt.Errorf("unable to create data directory: %w", err)
}
fname := filepath.Join(dataDir, p.Name)
ext := filepath.Ext(fname)
var converter converterFn
switch ext {
case ".json":
converter = jsonToProfile
case ".pprof":
converter = pprofToProfile
case ".txt":
converter = collapsedToProfile
default:
return nil, fmt.Errorf("unsupported file extension %s", ext)
}
f, err := os.Open(fname)
if err != nil {
return nil, fmt.Errorf("unable to open profile: %w", err)
}
defer f.Close()
b, err := io.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("unable to read profile: %w", err)
}
return converter(b, p.Name, s.maxNodes)
}
14 changes: 9 additions & 5 deletions pkg/cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,15 @@ func newServerService(c *config.Server) (*serverService, error) {
}

svc.controller, err = server.New(server.Config{
Configuration: svc.config,
Storage: svc.storage,
MetricsExporter: metricsExporter,
Notifier: svc.healthController,
Adhoc: adhocserver.New(svc.logger, svc.config.EnableExperimentalAdhocUI),
Configuration: svc.config,
Storage: svc.storage,
MetricsExporter: metricsExporter,
Notifier: svc.healthController,
Adhoc: adhocserver.New(
svc.logger,
svc.config.MaxNodesRender,
svc.config.EnableExperimentalAdhocUI,
),
Logger: svc.logger,
MetricsRegisterer: prometheus.DefaultRegisterer,
ExportedMetricsRegistry: exportedMetricsRegistry,
Expand Down
46 changes: 3 additions & 43 deletions pkg/server/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,6 @@ import (
"github.com/pyroscope-io/pyroscope/pkg/util/attime"
)

// TODO: dedup this with the version in scrape package
type SampleTypeConfig struct {
Units string `json:"units,omitempty"`
DisplayName string `json:"display-name,omitempty"`

// TODO(kolesnikovae): Introduce Kind?
// In Go, we have at least the following combinations:

// instant: Aggregation:avg && !Cumulative && !Sampled
// cumulative: Aggregation:sum && Cumulative && !Sampled
// delta: Aggregation:sum && !Cumulative && Sampled
Aggregation string `json:"aggregation,omitempty"`
Cumulative bool `json:"cumulative,omitempty"`
Sampled bool `json:"sampled,omitempty"`
}

var DefaultSampleTypeMapping = map[string]*SampleTypeConfig{
"samples": {
DisplayName: "cpu",
Units: "samples",
Sampled: true,
},
"inuse_objects": {
Units: "objects",
Aggregation: "avg",
},
"alloc_objects": {
Units: "objects",
Cumulative: true,
},
"inuse_space": {
Units: "bytes",
Aggregation: "avg",
},
"alloc_space": {
Units: "bytes",
Cumulative: true,
},
}

type ingestHandler struct {
log *logrus.Logger
storage *storage.Storage
Expand Down Expand Up @@ -117,9 +77,9 @@ func (h ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO: add error handling for all of these

for _, sampleTypeStr := range profile.SampleTypes() {
var t *SampleTypeConfig
var t *tree.SampleTypeConfig
var ok bool
if t, ok = DefaultSampleTypeMapping[sampleTypeStr]; !ok {
if t, ok = tree.DefaultSampleTypeMapping[sampleTypeStr]; !ok {
continue
}
var tries map[string]*transporttrie.Trie
Expand Down Expand Up @@ -266,7 +226,7 @@ func (h ingestHandler) ingestParamsFromRequest(r *http.Request) (*storage.PutInp
func pprofToTries(originalAppName, sampleTypeStr string, pprof *tree.Profile) map[string]*transporttrie.Trie {
tries := map[string]*transporttrie.Trie{}

sampleTypeConfig := DefaultSampleTypeMapping[sampleTypeStr]
sampleTypeConfig := tree.DefaultSampleTypeMapping[sampleTypeStr]
if sampleTypeConfig == nil {
return tries
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/storage/tree/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,46 @@ import (
"time"
)

// TODO: dedup this with the version in scrape package
type SampleTypeConfig struct {
Units string `json:"units,omitempty"`
DisplayName string `json:"display-name,omitempty"`

// TODO(kolesnikovae): Introduce Kind?
// In Go, we have at least the following combinations:

// instant: Aggregation:avg && !Cumulative && !Sampled
// cumulative: Aggregation:sum && Cumulative && !Sampled
// delta: Aggregation:sum && !Cumulative && Sampled
Aggregation string `json:"aggregation,omitempty"`
Cumulative bool `json:"cumulative,omitempty"`
Sampled bool `json:"sampled,omitempty"`
}

var DefaultSampleTypeMapping = map[string]*SampleTypeConfig{
"samples": {
DisplayName: "cpu",
Units: "samples",
Sampled: true,
},
"inuse_objects": {
Units: "objects",
Aggregation: "avg",
},
"alloc_objects": {
Units: "objects",
Cumulative: true,
},
"inuse_space": {
Units: "bytes",
Aggregation: "avg",
},
"alloc_space": {
Units: "bytes",
Cumulative: true,
},
}

type pprof struct {
locations map[string]uint64
functions map[string]uint64
Expand Down

0 comments on commit 14ee845

Please sign in to comment.