Skip to content

Commit

Permalink
Merge pull request #372 from parca-dev/release-0.2
Browse files Browse the repository at this point in the history
Merge `release-0.2` back into `main`
  • Loading branch information
brancz committed Oct 26, 2021
2 parents 2a302a4 + 37f8225 commit fca447a
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 13 deletions.
3 changes: 3 additions & 0 deletions pkg/query/query.go
Expand Up @@ -111,6 +111,9 @@ func (q *Query) QueryRange(ctx context.Context, req *pb.QueryRangeRequest) (*pb.
it := series.Iterator()
for it.Next() {
p := it.At()
if p.ProfileMeta().Timestamp == 0 {
return nil, status.Error(codes.Internal, "profile's timestamp is 0")
}
metricsSeries.Samples = append(metricsSeries.Samples, &pb.MetricsSample{
Timestamp: timestamppb.New(timestamp.Time(p.ProfileMeta().Timestamp)),
Value: p.ProfileTree().RootCumulativeValue(),
Expand Down
23 changes: 17 additions & 6 deletions pkg/storage/flamegraph.go
Expand Up @@ -167,11 +167,15 @@ func GenerateFlamegraph(
outerMost, innerMost := locationToTreeNodes(l, 0, 0)

flamegraphStack.Peek().node.Children = append(flamegraphStack.Peek().node.Children, outerMost)
flamegraphStack.Push(&TreeStackEntry{
node: innerMost,
})
if int32(len(flamegraphStack)) > flamegraph.Height {
flamegraph.Height = int32(len(flamegraphStack))

steppedInto := it.StepInto()
if steppedInto {
flamegraphStack.Push(&TreeStackEntry{
node: innerMost,
})
if int32(len(flamegraphStack)) > flamegraph.Height {
flamegraph.Height = int32(len(flamegraphStack))
}
}

for _, n := range child.FlatValues() {
Expand All @@ -191,7 +195,6 @@ func GenerateFlamegraph(
}
}

it.StepInto()
continue
}

Expand Down Expand Up @@ -301,6 +304,8 @@ func mergeChildren(node *pb.FlamegraphNode, compare, equals func(a, b *pb.Flameg
return compare(node.Children[i], node.Children[j])
})

var cumulative int64

i, j := 0, 1
for i < len(node.Children)-1 {
current, next := node.Children[i], node.Children[j]
Expand All @@ -311,6 +316,7 @@ func mergeChildren(node *pb.FlamegraphNode, compare, equals func(a, b *pb.Flameg
current.Meta.Mapping = &pb.Mapping{}
}

cumulative += next.Cumulative
current.Cumulative += next.Cumulative
current.Diff += next.Diff
current.Children = append(current.Children, next.Children...)
Expand All @@ -320,6 +326,11 @@ func mergeChildren(node *pb.FlamegraphNode, compare, equals func(a, b *pb.Flameg
}
i, j = i+1, j+1
}

// TODO: This is just a safeguard and should be properly fixed before this function.
if node.Cumulative < cumulative {
node.Cumulative = cumulative
}
}

func compareByName(a, b *pb.FlamegraphNode) bool {
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/series_iterator.go
Expand Up @@ -289,17 +289,21 @@ func (n *MemSeriesIteratorTreeNode) FlatValues() []*ProfileTreeValueNode {
return res
}

func getIndexRange(it MemSeriesValuesIterator, numSamples uint16, mint, maxt int64) (uint64, uint64, error) {
func getIndexRange(it MemSeriesValuesIterator, numSamples uint64, mint, maxt int64) (uint64, uint64, error) {
// figure out the index of the first sample > mint and the last sample < maxt
start := uint64(0)
end := uint64(0)
i := uint16(0)
i := uint64(0)
for it.Next() {
if i == numSamples {
end++
break
}
t := it.At()
// MultiChunkIterator might return sparse values - shouldn't usually happen though.
if t == 0 {
break
}
if t < mint {
start++
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/series_iterator_merge.go
Expand Up @@ -44,15 +44,18 @@ func (ms *MemMergeSeries) Iterator() ProfileSeriesIterator {
maxt = ms.s.maxTime
}

var numSamples uint64

chunkStart, chunkEnd := ms.s.timestamps.indexRange(mint, maxt)
timestamps := make([]chunkenc.Chunk, 0, chunkEnd-chunkStart)
for _, t := range ms.s.timestamps[chunkStart:chunkEnd] {
numSamples += uint64(t.chunk.NumSamples())
timestamps = append(timestamps, t.chunk)
}

sl := &SliceProfileSeriesIterator{i: -1}

start, end, err := getIndexRange(NewMultiChunkIterator(timestamps), ms.s.numSamples, mint, maxt)
start, end, err := getIndexRange(NewMultiChunkIterator(timestamps), numSamples, mint, maxt)
if err != nil {
sl.err = err
return sl
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/series_iterator_range.go
Expand Up @@ -37,14 +37,17 @@ func (rs *MemRangeSeries) Iterator() ProfileSeriesIterator {
rs.s.mu.RLock()
defer rs.s.mu.RUnlock()

var numSamples uint64

chunkStart, chunkEnd := rs.s.timestamps.indexRange(rs.mint, rs.maxt)
timestamps := make([]chunkenc.Chunk, 0, chunkEnd-chunkStart)
for _, t := range rs.s.timestamps[chunkStart:chunkEnd] {
numSamples += uint64(t.chunk.NumSamples())
timestamps = append(timestamps, t.chunk)
}

it := NewMultiChunkIterator(timestamps)
start, end, err := getIndexRange(it, rs.s.numSamples, rs.mint, rs.maxt)
start, end, err := getIndexRange(it, numSamples, rs.mint, rs.maxt)
if err != nil {
return &MemRangeSeriesIterator{err: err}
}
Expand Down Expand Up @@ -111,7 +114,6 @@ func (rs *MemRangeSeries) Iterator() ProfileSeriesIterator {
periodsIterator.Seek(start)
}

numSamples := uint64(rs.s.numSamples)
if end-start < numSamples {
numSamples = end - start - 1
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/series_iterator_root.go
Expand Up @@ -35,14 +35,17 @@ func (rs *MemRootSeries) Iterator() ProfileSeriesIterator {
rs.s.mu.RLock()
defer rs.s.mu.RUnlock()

var numSamples uint64

chunkStart, chunkEnd := rs.s.timestamps.indexRange(rs.mint, rs.maxt)
timestamps := make([]chunkenc.Chunk, 0, chunkEnd-chunkStart)
for _, t := range rs.s.timestamps[chunkStart:chunkEnd] {
numSamples += uint64(t.chunk.NumSamples())
timestamps = append(timestamps, t.chunk)
}

it := NewMultiChunkIterator(timestamps)
start, end, err := getIndexRange(it, rs.s.numSamples, rs.mint, rs.maxt)
start, end, err := getIndexRange(it, numSamples, rs.mint, rs.maxt)
if start == end {
return &MemRootSeriesIterator{err: fmt.Errorf("no samples within the time range")}
}
Expand All @@ -58,7 +61,7 @@ func (rs *MemRootSeries) Iterator() ProfileSeriesIterator {
rootIterator.Seek(start)
}

numSamples := uint64(rs.s.numSamples)
// Set numSamples correctly if only subset selected.
if end-start < numSamples {
numSamples = end - start - 1
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/series_iterator_test.go
Expand Up @@ -269,6 +269,11 @@ func TestGetIndexRange(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(2), start)
require.Equal(t, uint64(4), end)

start, end, err = getIndexRange(NewMultiChunkIterator([]chunkenc.Chunk{c}), 123, 1, 12)
require.NoError(t, err)
require.Equal(t, uint64(0), start)
require.Equal(t, uint64(5), end)
}

func TestIteratorRangeSum(t *testing.T) {
Expand Down

0 comments on commit fca447a

Please sign in to comment.