Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

WIP: Migrate to OTEL tracing #830

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove unused code
  • Loading branch information
simonswine committed Jul 10, 2023
commit 208bdc0b05100b6b0c0ee6f506dc500d963ef8cb
112 changes: 25 additions & 87 deletions pkg/phlaredb/query/iters.go
Original file line number Diff line number Diff line change
@@ -6,11 +6,12 @@ import (
"fmt"
"io"
"math"
"strings"
"sync"
"sync/atomic"

"github.com/grafana/dskit/multierror"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/segmentio/parquet-go"

"github.com/grafana/phlare/pkg/iter"
@@ -279,22 +280,6 @@ func (t *RowNumber) Next(repetitionLevel, definitionLevel int) {
}
}

// nextSlow is the original implementation of next. it is kept to test against
// the unrolled version above
func (t *RowNumber) nextSlow(repetitionLevel, definitionLevel int) {
t[repetitionLevel]++

// New children up through the definition level
for i := repetitionLevel + 1; i <= definitionLevel; i++ {
t[i] = 0
}

// // Children past the definition level are undefined
for i := definitionLevel + 1; i < len(t); i++ {
t[i] = -1
}
}

// Skip rows at the root-level.
func (t *RowNumber) Skip(numRows int64) {
t[0] += numRows
@@ -391,34 +376,7 @@ func NewErrIterator(err error) Iterator {
return iter.NewErrSeekIterator[*IteratorResult, RowNumberWithDefinitionLevel](err)
}

var columnIteratorPool = sync.Pool{
New: func() interface{} {
return &columnIteratorBuffer{}
},
}

func columnIteratorPoolGet(capacity, len int) *columnIteratorBuffer {
res := columnIteratorPool.Get().(*columnIteratorBuffer)
if cap(res.rowNumbers) < capacity {
res.rowNumbers = make([]RowNumber, capacity)
}
if cap(res.values) < capacity {
res.values = make([]parquet.Value, capacity)
}
res.rowNumbers = res.rowNumbers[:len]
res.values = res.values[:len]
return res
}

func columnIteratorPoolPut(b *columnIteratorBuffer) {
b.values = b.values[:cap(b.values)]
for i := range b.values {
b.values[i] = parquet.Value{}
}
columnIteratorPool.Put(b)
}

var columnIteratorResultPool = sync.Pool{
var iteratorResultPool = sync.Pool{
New: func() interface{} {
return &IteratorResult{Entries: make([]struct {
k string
@@ -428,48 +386,18 @@ var columnIteratorResultPool = sync.Pool{
},
}

func columnIteratorResultPoolGet() *IteratorResult {
res := columnIteratorResultPool.Get().(*IteratorResult)
func iteratorResultPoolGet() *IteratorResult {
res := iteratorResultPool.Get().(*IteratorResult)
return res
}

func columnIteratorResultPoolPut(r *IteratorResult) {
func iteratorResultPoolPut(r *IteratorResult) {
if r != nil {
r.Reset()
columnIteratorResultPool.Put(r)
iteratorResultPool.Put(r)
}
}

// ColumnIterator asynchronously iterates through the given row groups and column. Applies
// the optional predicate to each chunk, page, and value. Results are read by calling
// Next() until it returns nil.
type ColumnIterator struct {
rgs []parquet.RowGroup
col int
colName string
filter *InstrumentedPredicate

selectAs string
seekTo atomic.Value

metrics *Metrics
table string
quit chan struct{}
ch chan *columnIteratorBuffer

curr *columnIteratorBuffer
currN int

result *IteratorResult
err error
}

type columnIteratorBuffer struct {
rowNumbers []RowNumber
values []parquet.Value
err error
}

type BinaryJoinIterator struct {
left Iterator
right Iterator
@@ -506,7 +434,7 @@ func (bj *BinaryJoinIterator) Next() bool {

if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 {
// we have a found an element
bj.res = columnIteratorResultPoolGet()
bj.res = iteratorResultPoolGet()
bj.res.RowNumber = resLeft.RowNumber
bj.res.Append(resLeft)
bj.res.Append(resRight)
@@ -644,7 +572,7 @@ func (j *JoinIterator) seekAll(to RowNumberWithDefinitionLevel) {
to.RowNumber = TruncateRowNumber(to)
for iterNum, iter := range j.iters {
if j.peeks[iterNum] == nil || CompareRowNumbers(to.DefinitionLevel, j.peeks[iterNum].RowNumber, to.RowNumber) == -1 {
columnIteratorResultPoolPut(j.peeks[iterNum])
iteratorResultPoolPut(j.peeks[iterNum])
if iter.Seek(to) {
j.peeks[iterNum] = iter.At()
} else {
@@ -667,15 +595,15 @@ func (j *JoinIterator) peek(iterNum int) *IteratorResult {
// the next row (according to the configured definition level)
// or are exhausted.
func (j *JoinIterator) collect(rowNumber RowNumber) *IteratorResult {
result := columnIteratorResultPoolGet()
result := iteratorResultPoolGet()
result.RowNumber = rowNumber

for i := range j.iters {
for j.peeks[i] != nil && CompareRowNumbers(j.definitionLevel, j.peeks[i].RowNumber, rowNumber) == 0 {

result.Append(j.peeks[i])

columnIteratorResultPoolPut(j.peeks[i])
iteratorResultPoolPut(j.peeks[i])

if j.iters[i].Next() {
j.peeks[i] = j.iters[i].At()
@@ -809,15 +737,15 @@ func (u *UnionIterator) peek(iterNum int) *IteratorResult {
// the next row (according to the configured definition level)
// or are exhausted.
func (u *UnionIterator) collect(iterNums []int, rowNumber RowNumber) *IteratorResult {
result := columnIteratorResultPoolGet()
result := iteratorResultPoolGet()
result.RowNumber = rowNumber

for _, iterNum := range iterNums {
for u.peeks[iterNum] != nil && CompareRowNumbers(u.definitionLevel, u.peeks[iterNum].RowNumber, rowNumber) == 0 {

result.Append(u.peeks[iterNum])

columnIteratorResultPoolPut(u.peeks[iterNum])
iteratorResultPoolPut(u.peeks[iterNum])

if u.iters[iterNum].Next() {
u.peeks[iterNum] = u.iters[iterNum].At()
@@ -928,7 +856,7 @@ func (r *RowNumberIterator[T]) Next() bool {
if !r.Iterator.Next() {
return false
}
r.current = columnIteratorResultPoolGet()
r.current = iteratorResultPoolGet()
r.current.Reset()
rowGetter, ok := any(r.Iterator.At()).(RowGetter)
if !ok {
@@ -975,6 +903,7 @@ type SyncIterator struct {
// Config
column int
columnName string
table string
rgs []parquet.RowGroup
rgsMin []RowNumber
rgsMax []RowNumber // Exclusive, row number of next one past the row group
@@ -984,6 +913,7 @@ type SyncIterator struct {

// Status
span opentracing.Span
metrics *Metrics
curr RowNumber
currRowGroup parquet.RowGroup
currRowGroupMin RowNumber
@@ -1045,6 +975,8 @@ func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, co
})

return &SyncIterator{
table: strings.ToLower(rgs[0].Schema().Name()) + "s",
metrics: getMetricsFromContext(ctx),
span: span,
column: column,
columnName: columnName,
@@ -1209,6 +1141,12 @@ func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bo
c.closeCurrRowGroup()
return true, err
}
c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1)
c.span.LogFields(
log.String("msg", "reading page"),
log.Int64("page_num_values", pg.NumValues()),
log.Int64("page_size", pg.Size()),
)

// Skip based on row number?
newRN := c.curr
@@ -1365,7 +1303,7 @@ func (c *SyncIterator) closeCurrRowGroup() {
}

func (c *SyncIterator) makeResult(t RowNumber, v *parquet.Value) *IteratorResult {
r := columnIteratorResultPoolGet()
r := iteratorResultPoolGet()
r.RowNumber = t
if c.selectAs != "" {
r.AppendValue(c.selectAs, v.Clone())