Navigation Menu

Skip to content

Commit

Permalink
fix: jfr ingestion issue in 0.17.0 (#1112)
Browse files Browse the repository at this point in the history
  • Loading branch information
petethepig committed May 26, 2022
1 parent 3458551 commit 0f93772
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 73 deletions.
106 changes: 36 additions & 70 deletions pkg/convert/jfr/parser.go
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/pyroscope-io/pyroscope/pkg/storage/tree"
)

func ParseJFR(ctx context.Context, r io.Reader, s storage.Putter, pi *storage.PutInput) (err error) {
chunks, err := parser.Parse(r)
func ParseJFR(ctx context.Context, s storage.Putter, body io.Reader, pi *storage.PutInput) (err error) {
chunks, err := parser.Parse(body)
if err != nil {
return fmt.Errorf("unable to parse JFR format: %w", err)
}
Expand All @@ -24,11 +24,10 @@ func ParseJFR(ctx context.Context, r io.Reader, s storage.Putter, pi *storage.Pu
err = multierror.Append(err, pErr)
}
}
pi.Val = nil
return err
}

func parse(ctx context.Context, c parser.Chunk, s storage.Putter, pi *storage.PutInput) (err error) {
func parse(ctx context.Context, c parser.Chunk, s storage.Putter, piOriginal *storage.PutInput) (err error) {
var event, alloc, lock string
cpu := tree.New()
wall := tree.New()
Expand Down Expand Up @@ -84,83 +83,50 @@ func parse(ctx context.Context, c parser.Chunk, s storage.Putter, pi *storage.Pu
}
}
}
labels := pi.Key.Labels()
prefix := labels["__name__"]

labelsOriginal := piOriginal.Key.Labels()
prefix := labelsOriginal["__name__"]

cb := func(n string, t *tree.Tree, u metadata.Units) {
labels := map[string]string{}
for k, v := range labelsOriginal {
labels[k] = v
}
labels["__name__"] = prefix + "." + n
pi := &storage.PutInput{
StartTime: piOriginal.StartTime,
EndTime: piOriginal.EndTime,
Key: segment.NewKey(labels),
Val: t,
SpyName: piOriginal.SpyName,
SampleRate: piOriginal.SampleRate,
Units: u,
AggregationType: metadata.SumAggregationType,
}
if putErr := s.Put(ctx, pi); putErr != nil {
err = multierror.Append(err, putErr)
}
}

if event == "cpu" || event == "itimer" || event == "wall" {
profile := event
if event == "wall" {
profile = "cpu"
}
labels["__name__"] = prefix + "." + profile
pi.Key = segment.NewKey(labels)
pi.Val = cpu
pi.Units = metadata.SamplesUnits
pi.AggregationType = metadata.SumAggregationType
if putErr := s.Put(ctx, pi); putErr != nil {
err = multierror.Append(err, putErr)
}
cb(profile, cpu, metadata.SamplesUnits)
}
if event == "wall" {
labels["__name__"] = prefix + "." + event
pi.Key = segment.NewKey(labels)
pi.Val = wall
pi.Units = metadata.SamplesUnits
pi.AggregationType = metadata.SumAggregationType
if putErr := s.Put(ctx, pi); putErr != nil {
err = multierror.Append(err, putErr)
}
cb(event, wall, metadata.SamplesUnits)
}
if alloc != "" {
labels["__name__"] = prefix + ".alloc_in_new_tlab_objects"
pi.Key = segment.NewKey(labels)
pi.Val = inTLABObjects
pi.Units = metadata.ObjectsUnits
pi.AggregationType = metadata.SumAggregationType
if putErr := s.Put(ctx, pi); putErr != nil {
err = multierror.Append(err, putErr)
}
labels["__name__"] = prefix + ".alloc_in_new_tlab_bytes"
pi.Key = segment.NewKey(labels)
pi.Val = inTLABBytes
pi.Units = metadata.BytesUnits
pi.AggregationType = metadata.SumAggregationType
if putErr := s.Put(ctx, pi); putErr != nil {
err = multierror.Append(err, putErr)
}
labels["__name__"] = prefix + ".alloc_outside_tlab_objects"
pi.Key = segment.NewKey(labels)
pi.Val = outTLABObjects
pi.Units = metadata.ObjectsUnits
pi.AggregationType = metadata.SumAggregationType
if putErr := s.Put(ctx, pi); putErr != nil {
err = multierror.Append(err, putErr)
}
labels["__name__"] = prefix + ".alloc_outside_tlab_bytes"
pi.Key = segment.NewKey(labels)
pi.Val = outTLABBytes
pi.Units = metadata.BytesUnits
pi.AggregationType = metadata.SumAggregationType
if putErr := s.Put(ctx, pi); putErr != nil {
err = multierror.Append(err, putErr)
}
cb("alloc_in_new_tlab_objects", inTLABObjects, metadata.ObjectsUnits)
cb("alloc_in_new_tlab_bytes", inTLABBytes, metadata.BytesUnits)
cb("alloc_outside_tlab_objects", outTLABObjects, metadata.ObjectsUnits)
cb("alloc_outside_tlab_bytes", outTLABBytes, metadata.BytesUnits)
}
if lock != "" {
labels["__name__"] = prefix + ".lock_count"
pi.Key = segment.NewKey(labels)
pi.Val = lockSamples
pi.Units = metadata.LockSamplesUnits
pi.AggregationType = metadata.SumAggregationType
if putErr := s.Put(ctx, pi); putErr != nil {
err = multierror.Append(err, putErr)
}
labels["__name__"] = prefix + ".lock_duration"
pi.Key = segment.NewKey(labels)
pi.Val = lockDuration
pi.Units = metadata.LockNanosecondsUnits
pi.AggregationType = metadata.SumAggregationType
if putErr := s.Put(ctx, pi); putErr != nil {
err = multierror.Append(err, putErr)
}
cb("lock_count", lockSamples, metadata.LockSamplesUnits)
cb("lock_duration", lockDuration, metadata.LockNanosecondsUnits)
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/parser/parser.go
Expand Up @@ -93,7 +93,7 @@ func (p *Parser) Put(ctx context.Context, in *PutInput) error {
err = convert.ParseIndividualLines(in.Body, cb)
// with some formats we write directly to storage, hence the early return
case in.Format == "jfr":
return jfr.ParseJFR(ctx, in.Body, p.putter, pi)
return jfr.ParseJFR(ctx, p.putter, in.Body, pi)
case in.Format == "pprof":
return writePprofFromBody(ctx, p.putter, in)
case strings.Contains(in.ContentType, "multipart/form-data"):
Expand Down
10 changes: 8 additions & 2 deletions pkg/server/ingest_test.go
Expand Up @@ -91,13 +91,18 @@ var _ = Describe("server", func() {
done := make(chan interface{})
go func() {
defer GinkgoRecover()
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))

reg := prometheus.NewRegistry()

s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), reg, new(health.Controller))
queue := storage.NewIngestionQueue(logrus.StandardLogger(), s, reg, 4, 4)

Expect(err).ToNot(HaveOccurred())
e, _ := exporter.NewExporter(nil, nil)
c, _ := New(Config{
Configuration: &(*cfg).Server,
Storage: s,
Putter: s,
Putter: queue,
MetricsExporter: e,
Logger: logrus.New(),
MetricsRegisterer: prometheus.NewRegistry(),
Expand Down Expand Up @@ -145,6 +150,7 @@ var _ = Describe("server", func() {
expectedKey = name
}
sk, _ := segment.ParseKey(expectedKey)
time.Sleep(10 * time.Millisecond)
time.Sleep(sleepDur)
gOut, err := s.Get(context.TODO(), &storage.GetInput{
StartTime: st,
Expand Down

0 comments on commit 0f93772

Please sign in to comment.