diff --git a/migrate-to-otel.gopatch b/migrate-to-otel.gopatch
new file mode 100644
index 000000000..81ae324c5
--- /dev/null
+++ b/migrate-to-otel.gopatch
@@ -0,0 +1,40 @@
+@@
+var a expression
+var b expression
+var s identifier
+var t identifier
+@@
+-s, t := opentracing.StartSpanFromContext(a,b)
+-...
+- defer s.Finish()
++import "go.opentelemetry.io/otel"
++t, s := otel.Tracer("github.com/grafana/pyroscope").Start(a,b)
++defer s.End()
+
+@@
+var foo,x identifier
+@@
+
+-import foo "github.com/opentracing/opentracing-go/log"
++import foo "go.opentelemetry.io/otel/attribute"
+foo.x
+
+@@
+@@
+- otlog
++ attribute
+
+@@
+var span identifier
+var x expression
+@@
+- span.LogFields(...)
++import "go.opentelemetry.io/otel/trace"
++ span.AddEvent("TODO", trace.WithAttributes(...))
+
+
+@@
+@@
+-opentracing.Span
++import "go.opentelemetry.io/otel/trace"
++trace.Span
diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go
index 989e95a1a..fd690d60f 100644
--- a/pkg/phlaredb/block_querier.go
+++ b/pkg/phlaredb/block_querier.go
@@ -21,14 +21,15 @@ import (
 	"github.com/grafana/dskit/runutil"
 	"github.com/oklog/ulid"
 	"github.com/opentracing/opentracing-go"
-	otlog "github.com/opentracing/opentracing-go/log"
 	"github.com/pkg/errors"
 	"github.com/prometheus/common/model"
 	"github.com/prometheus/prometheus/promql/parser"
 	"github.com/samber/lo"
 	"github.com/segmentio/parquet-go"
 	"github.com/thanos-io/objstore"
-	"golang.org/x/exp/constraints"
+	"go.opentelemetry.io/otel"
+	attribute "go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
 	"golang.org/x/sync/errgroup"
 	"google.golang.org/grpc/codes"
 
@@ -462,58 +463,6 @@ func (b *singleBlockQuerier) Bounds() (model.Time, model.Time) {
 	return b.meta.MinTime, b.meta.MaxTime
 }
 
-type mapPredicate[K constraints.Integer, V any] struct {
-	min K
-	max K
-	m   map[K]V
-}
-
-func newMapPredicate[K constraints.Integer, V any](m map[K]V) query.Predicate {
-	p := &mapPredicate[K, V]{
-		m: m,
-	}
-
-	first := true
-	for k := range m {
-		if first || p.max < k {
-			p.max = k
-		}
-		if first || p.min > k {
-			p.min = k
-		}
-		first = false
-	}
-
-	return p
-}
-
-func (m *mapPredicate[K, V]) KeepColumnChunk(c parquet.ColumnChunk) bool {
-	if ci := c.ColumnIndex(); ci != nil {
-		for i := 0; i < ci.NumPages(); i++ {
-			min := K(ci.MinValue(i).Int64())
-			max := K(ci.MaxValue(i).Int64())
-			if m.max >= min && m.min <= max {
-				return true
-			}
-		}
-		return false
-	}
-
-	return true
-}
-
-func (m *mapPredicate[K, V]) KeepPage(page parquet.Page) bool {
-	if min, max, ok := page.Bounds(); ok {
-		return m.max >= K(min.Int64()) && m.min <= K(max.Int64())
-	}
-	return true
-}
-
-func (m *mapPredicate[K, V]) KeepValue(v parquet.Value) bool {
-	_, exists := m.m[K(v.Int64())]
-	return exists
-}
-
 type labelsInfo struct {
 	fp  model.Fingerprint
 	lbs phlaremodel.Labels
@@ -610,8 +559,8 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile
 }
 
 func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse], blockGetter BlockGetter) error {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesStacktraces")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesStacktraces")
+	defer sp.End()
 
 	r, err := stream.Receive()
 	if err != nil {
@@ -625,12 +574,11 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
 		return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request"))
 	}
 	request := r.Request
-	sp.LogFields(
-		otlog.String("start", model.Time(request.Start).Time().String()),
-		otlog.String("end", model.Time(request.End).Time().String()),
-		otlog.String("selector", request.LabelSelector),
-		otlog.String("profile_id", request.Type.ID),
-	)
+	sp.AddEvent("TODO", trace.WithAttributes(
+		attribute.String("start", model.Time(request.Start).Time().String()),
+		attribute.String("end", model.Time(request.End).Time().String()),
+		attribute.String("selector", request.LabelSelector),
+		attribute.String("profile_id", request.Type.ID)))
 
 	queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
 	if err != nil {
@@ -674,7 +622,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
 
 	// Signals the end of the profile streaming by sending an empty response.
 	// This allows the client to not block other streaming ingesters.
-	sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming"))
+	sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "signaling the end of the profile streaming")))
 	if err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{}); err != nil {
 		return err
 	}
@@ -684,7 +632,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
 	}
 
 	// sends the final result to the client.
-	sp.LogFields(otlog.String("msg", "sending the final result to the client"))
+	sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending the final result to the client")))
 	err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{
 		Result: &ingestv1.MergeProfilesStacktracesResult{
 			Format:    ingestv1.StacktracesMergeFormat_MERGE_FORMAT_TREE,
@@ -702,8 +650,8 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
 }
 
 func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse], blockGetter BlockGetter) error {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesLabels")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesLabels")
+	defer sp.End()
 
 	r, err := stream.Receive()
 	if err != nil {
@@ -719,13 +667,12 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
 	request := r.Request
 	by := r.By
 	sort.Strings(by)
-	sp.LogFields(
-		otlog.String("start", model.Time(request.Start).Time().String()),
-		otlog.String("end", model.Time(request.End).Time().String()),
-		otlog.String("selector", request.LabelSelector),
-		otlog.String("profile_id", request.Type.ID),
-		otlog.String("by", strings.Join(by, ",")),
-	)
+	sp.AddEvent("TODO", trace.WithAttributes(
+		attribute.String("start", model.Time(request.Start).Time().String()),
+		attribute.String("end", model.Time(request.End).Time().String()),
+		attribute.String("selector", request.LabelSelector),
+		attribute.String("profile_id", request.Type.ID),
+		attribute.String("by", strings.Join(by, ","))))
 
 	queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
 	if err != nil {
@@ -796,8 +743,8 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
 }
 
 func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse], blockGetter BlockGetter) error {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesPprof")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesPprof")
+	defer sp.End()
 
 	r, err := stream.Receive()
 	if err != nil {
@@ -811,12 +758,11 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
 		return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request"))
 	}
 	request := r.Request
-	sp.LogFields(
-		otlog.String("start", model.Time(request.Start).Time().String()),
-		otlog.String("end", model.Time(request.End).Time().String()),
-		otlog.String("selector", request.LabelSelector),
-		otlog.String("profile_id", request.Type.ID),
-	)
+	sp.AddEvent("TODO", trace.WithAttributes(
+		attribute.String("start", model.Time(request.Start).Time().String()),
+		attribute.String("end", model.Time(request.End).Time().String()),
+		attribute.String("selector", request.LabelSelector),
+		attribute.String("profile_id", request.Type.ID)))
 
 	queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
 	if err != nil {
@@ -942,8 +888,9 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 {
 }
 
 func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - Block")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - Block")
+	defer sp.End()
+
 	if err := b.Open(ctx); err != nil {
 		return nil, err
 	}
@@ -988,26 +935,26 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
 	}
 
 	var (
-		buf       [][]parquet.Value
-		joinIters []query.Iterator
+		buf [][]parquet.Value
+	)
+
+	pIt := query.NewBinaryJoinIterator(
+		0,
+		b.profiles.columnIter(ctx, "SeriesIndex", query.NewMapPredicate(lblsPerRef), "SeriesIndex"),
+		b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
 	)
 
 	if b.meta.Version >= 2 {
-		joinIters = []query.Iterator{
-			b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
-			b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
+		pIt = query.NewBinaryJoinIterator(
+			0,
+			pIt,
 			b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
-		}
+		)
 		buf = make([][]parquet.Value, 3)
 	} else {
-		joinIters = []query.Iterator{
-			b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
-			b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
-		}
 		buf = make([][]parquet.Value, 2)
 	}
 
-	pIt := query.NewJoinIterator(0, joinIters, nil)
 	iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef))
 	defer pIt.Close()
 
@@ -1098,9 +1045,9 @@ func (q *singleBlockQuerier) openFiles(ctx context.Context) error {
 	sp, ctx := opentracing.StartSpanFromContext(ctx, "BlockQuerier - open")
 	defer func() {
 		q.metrics.blockOpeningLatency.Observe(time.Since(start).Seconds())
-		sp.LogFields(
-			otlog.String("block_ulid", q.meta.ULID.String()),
-		)
+		sp.AddEvent("TODO", trace.WithAttributes(
+			attribute.String("block_ulid", q.meta.ULID.String())))
+
 		sp.Finish()
 	}()
 	g, ctx := errgroup.WithContext(ctx)
@@ -1206,7 +1153,7 @@ func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string,
 		return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath()))
 	}
 	ctx = query.AddMetricsToContext(ctx, r.metrics.query)
-	return query.NewColumnIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
+	return query.NewSyncIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
 }
 
 func repeatedColumnIter[T any](ctx context.Context, source Source, columnName string, rows iter.Iterator[T]) iter.Iterator[*query.RepeatedRow[T]] {
diff --git a/pkg/phlaredb/block_querier_test.go b/pkg/phlaredb/block_querier_test.go
index 828a8411e..495107d00 100644
--- a/pkg/phlaredb/block_querier_test.go
+++ b/pkg/phlaredb/block_querier_test.go
@@ -190,5 +190,4 @@ func TestBlockCompatability(t *testing.T) {
 		})
 
 	}
-
 }
diff --git a/pkg/phlaredb/head.go b/pkg/phlaredb/head.go
index 3c131920d..b54a787e9 100644
--- a/pkg/phlaredb/head.go
+++ b/pkg/phlaredb/head.go
@@ -17,13 +17,15 @@ import (
 	"github.com/google/pprof/profile"
 	"github.com/google/uuid"
 	"github.com/opentracing/opentracing-go"
-	otlog "github.com/opentracing/opentracing-go/log"
 	"github.com/pkg/errors"
 	"github.com/prometheus/common/model"
 	"github.com/prometheus/prometheus/model/labels"
 	"github.com/prometheus/prometheus/promql/parser"
 	"github.com/prometheus/prometheus/tsdb/fileutil"
 	"github.com/samber/lo"
+	"go.opentelemetry.io/otel"
+	attribute "go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
 	"go.uber.org/atomic"
 	"google.golang.org/grpc/codes"
 
@@ -533,8 +535,8 @@ func (h *Head) Queriers() Queriers {
 
 // add the location IDs to the stacktraces
 func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stacktracesByMapping) *ingestv1.MergeProfilesStacktracesResult {
-	sp, _ := opentracing.StartSpanFromContext(ctx, "resolveStacktraces - Head")
-	defer sp.Finish()
+	_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolveStacktraces - Head")
+	defer sp.End()
 
 	names := []string{}
 	functions := map[int64]int{}
@@ -548,7 +550,7 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac
 		h.strings.lock.RUnlock()
 	}()
 
-	sp.LogFields(otlog.String("msg", "building MergeProfilesStacktracesResult"))
+	sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building MergeProfilesStacktracesResult")))
 	_ = stacktracesByMapping.ForEach(
 		func(mapping uint64, stacktraceSamples stacktraceSampleMap) error {
 			mp, ok := h.symbolDB.MappingReader(mapping)
@@ -595,8 +597,8 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac
 }
 
 func (h *Head) resolvePprof(ctx context.Context, stacktracesByMapping profileSampleByMapping) *profile.Profile {
-	sp, _ := opentracing.StartSpanFromContext(ctx, "resolvePprof - Head")
-	defer sp.Finish()
+	_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolvePprof - Head")
+	defer sp.End()
 
 	locations := map[int32]*profile.Location{}
 	functions := map[uint64]*profile.Function{}
diff --git a/pkg/phlaredb/head_queriers.go b/pkg/phlaredb/head_queriers.go
index bf15d922e..fb132e30f 100644
--- a/pkg/phlaredb/head_queriers.go
+++ b/pkg/phlaredb/head_queriers.go
@@ -10,6 +10,7 @@ import (
 	"github.com/pkg/errors"
 	"github.com/prometheus/common/model"
 	"github.com/segmentio/parquet-go"
+	"go.opentelemetry.io/otel"
 
 	ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
 	typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
@@ -34,8 +35,8 @@ func (q *headOnDiskQuerier) Open(_ context.Context) error {
 }
 
 func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadOnDisk")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadOnDisk")
+	defer sp.End()
 
 	// query the index for rows
 	rowIter, labelsPerFP, err := q.head.profiles.index.selectMatchingRowRanges(ctx, params, q.rowGroupIdx)
@@ -48,14 +49,13 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *
 		start = model.Time(params.Start)
 		end   = model.Time(params.End)
 	)
-	pIt := query.NewJoinIterator(
-		0,
-		[]query.Iterator{
+	pIt := query.NewBinaryJoinIterator(0,
+		query.NewBinaryJoinIterator(
+			0,
 			rowIter,
 			q.rowGroup().columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(start.UnixNano(), end.UnixNano()), "TimeNanos"),
-			q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
-		},
-		nil,
+		),
+		q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
 	)
 	defer pIt.Close()
 
@@ -107,8 +107,8 @@ func (q *headOnDiskQuerier) Bounds() (model.Time, model.Time) {
 }
 
 func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadOnDisk")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadOnDisk")
+	defer sp.End()
 
 	stacktraceSamples := stacktracesByMapping{}
 
@@ -121,8 +121,8 @@ func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.It
 }
 
 func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByPprof - HeadOnDisk")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByPprof - HeadOnDisk")
+	defer sp.End()
 
 	stacktraceSamples := profileSampleByMapping{}
 
@@ -134,8 +134,8 @@ func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[P
 }
 
 func (q *headOnDiskQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadOnDisk")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadOnDisk")
+	defer sp.End()
 
 	seriesByLabels := make(seriesByLabels)
 
@@ -169,8 +169,8 @@ func (q *headInMemoryQuerier) Open(_ context.Context) error {
 }
 
 func (q *headInMemoryQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadInMemory")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadInMemory")
+	defer sp.End()
 
 	index := q.head.profiles.index
 
@@ -216,8 +216,8 @@ func (q *headInMemoryQuerier) Bounds() (model.Time, model.Time) {
 }
 
 func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) {
-	sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadInMemory")
-	defer sp.Finish()
+	_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadInMemory")
+	defer sp.End()
 
 	stacktraceSamples := stacktracesByMapping{}
 
@@ -244,8 +244,8 @@ func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter.
 }
 
 func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
-	sp, _ := opentracing.StartSpanFromContext(ctx, "MergePprof - HeadInMemory")
-	defer sp.Finish()
+	_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergePprof - HeadInMemory")
+	defer sp.End()
 
 	stacktraceSamples := profileSampleByMapping{}
 
@@ -268,8 +268,8 @@ func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator
 }
 
 func (q *headInMemoryQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
-	sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadInMemory")
-	defer sp.Finish()
+	_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadInMemory")
+	defer sp.End()
 
 	labelsByFingerprint := map[model.Fingerprint]string{}
 	seriesByLabels := make(seriesByLabels)
diff --git a/pkg/phlaredb/phlaredb.go b/pkg/phlaredb/phlaredb.go
index 640410353..391f0755d 100644
--- a/pkg/phlaredb/phlaredb.go
+++ b/pkg/phlaredb/phlaredb.go
@@ -18,9 +18,11 @@ import (
 	"github.com/grafana/dskit/services"
 	"github.com/oklog/ulid"
 	"github.com/opentracing/opentracing-go"
-	otlog "github.com/opentracing/opentracing-go/log"
 	"github.com/pkg/errors"
 	"github.com/prometheus/common/model"
+	"go.opentelemetry.io/otel"
+	attribute "go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
 
 	profilev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
 	ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
@@ -380,12 +382,8 @@ func filterProfiles[B BidiServerMerge[Res, Req],
 		Profile: maxBlockProfile,
 		Index:   0,
 	}, true, its...), batchProfileSize, func(ctx context.Context, batch []ProfileWithIndex) error {
-		sp, _ := opentracing.StartSpanFromContext(ctx, "filterProfiles - Filtering batch")
-		sp.LogFields(
-			otlog.Int("batch_len", len(batch)),
-			otlog.Int("batch_requested_size", batchProfileSize),
-		)
-		defer sp.Finish()
+		_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "filterProfiles - Filtering batch")
+		defer sp.End()
 
 		seriesByFP := map[model.Fingerprint]labelWithIndex{}
 		selectProfileResult.LabelsSets = selectProfileResult.LabelsSets[:0]
@@ -409,7 +407,7 @@ func filterProfiles[B BidiServerMerge[Res, Req],
 			})
 
 		}
-		sp.LogFields(otlog.String("msg", "sending batch to client"))
+		sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending batch to client")))
 		var err error
 		switch s := BidiServerMerge[Res, Req](stream).(type) {
 		case BidiServerMerge[*ingestv1.MergeProfilesStacktracesResponse, *ingestv1.MergeProfilesStacktracesRequest]:
@@ -433,9 +431,9 @@ func filterProfiles[B BidiServerMerge[Res, Req],
 			}
 			return err
 		}
-		sp.LogFields(otlog.String("msg", "batch sent to client"))
+		sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "batch sent to client")))
 
-		sp.LogFields(otlog.String("msg", "reading selection from client"))
+		sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "reading selection from client")))
 
 		// handle response for the batch.
 		var selected []bool
@@ -462,7 +460,7 @@ func filterProfiles[B BidiServerMerge[Res, Req],
 			}
 			return err
 		}
-		sp.LogFields(otlog.String("msg", "selection received"))
+		sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "selection received")))
 		for i, k := range selected {
 			if k {
 				selection[batch[i].Index] = append(selection[batch[i].Index], batch[i].Profile)
diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go
index 5d21f7590..79e44467a 100644
--- a/pkg/phlaredb/profile_store.go
+++ b/pkg/phlaredb/profile_store.go
@@ -498,7 +498,7 @@ func (r *rowGroupOnDisk) columnIter(ctx context.Context, columnName string, pred
 	if !found {
 		return query.NewErrIterator(fmt.Errorf("column '%s' not found in head row group segment '%s'", columnName, r.file.Name()))
 	}
-	return query.NewColumnIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias)
+	return query.NewSyncIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias)
 }
 
 type seriesIDRowsRewriter struct {
diff --git a/pkg/phlaredb/profiles.go b/pkg/phlaredb/profiles.go
index 5d28d4484..8e236b1aa 100644
--- a/pkg/phlaredb/profiles.go
+++ b/pkg/phlaredb/profiles.go
@@ -15,6 +15,7 @@ import (
 	"github.com/prometheus/prometheus/promql/parser"
 	"github.com/prometheus/prometheus/storage"
 	"github.com/samber/lo"
+	"go.opentelemetry.io/otel"
 	"go.uber.org/atomic"
 	"google.golang.org/grpc/codes"
 
@@ -195,8 +196,9 @@ func (pi *profilesIndex) Add(ps *schemav1.InMemoryProfile, lbs phlaremodel.Label
 }
 
 func (pi *profilesIndex) selectMatchingFPs(ctx context.Context, params *ingestv1.SelectProfilesRequest) ([]model.Fingerprint, error) {
-	sp, _ := opentracing.StartSpanFromContext(ctx, "selectMatchingFPs - Index")
-	defer sp.Finish()
+	_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMatchingFPs - Index")
+	defer sp.End()
+
 	selectors, err := parser.ParseMetricSelector(params.LabelSelector)
 	if err != nil {
 		return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error())
@@ -246,8 +248,8 @@ func (pi *profilesIndex) selectMatchingRowRanges(ctx context.Context, params *in
 	map[model.Fingerprint]phlaremodel.Labels,
 	error,
 ) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "selectMatchingRowRanges - Index")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMatchingRowRanges - Index")
+	defer sp.End()
 
 	ids, err := pi.selectMatchingFPs(ctx, params)
 	if err != nil {
diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go
index 28b2264cc..ac2013fba 100644
--- a/pkg/phlaredb/query/iters.go
+++ b/pkg/phlaredb/query/iters.go
@@ -8,12 +8,12 @@ import (
 	"math"
 	"strings"
 	"sync"
-	"sync/atomic"
 
 	"github.com/grafana/dskit/multierror"
-	"github.com/opentracing/opentracing-go"
-	"github.com/opentracing/opentracing-go/log"
 	"github.com/segmentio/parquet-go"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
 
 	"github.com/grafana/phlare/pkg/iter"
 )
@@ -40,6 +40,10 @@ type RowNumberWithDefinitionLevel struct {
 	DefinitionLevel int
 }
 
+func (r *RowNumberWithDefinitionLevel) String() string {
+	return fmt.Sprintf("%v:%v", r.RowNumber, r.DefinitionLevel)
+}
+
 // EmptyRowNumber creates an empty invalid row number.
 func EmptyRowNumber() RowNumber {
 	return RowNumber{-1, -1, -1, -1, -1, -1}
@@ -76,7 +80,7 @@ func TruncateRowNumber(t RowNumberWithDefinitionLevel) RowNumber {
 	return n
 }
 
-func (t RowNumber) Valid() bool {
+func (t *RowNumber) Valid() bool {
 	return t[0] >= 0
 }
 
@@ -96,17 +100,188 @@ func (t RowNumber) Valid() bool {
 // gb     | 1 | 3 | {  0,  2,  0,  0 }
 // null   | 0 | 1 | {  1,  0, -1, -1 }
 func (t *RowNumber) Next(repetitionLevel, definitionLevel int) {
-	// Next row at this level
 	t[repetitionLevel]++
 
-	// New children up through the definition level
-	for i := repetitionLevel + 1; i <= definitionLevel; i++ {
-		t[i] = 0
-	}
-
-	// Children past the definition level are undefined
-	for i := definitionLevel + 1; i < len(t); i++ {
-		t[i] = -1
+	// the following is nextSlow() unrolled
+	switch repetitionLevel {
+	case 0:
+		switch definitionLevel {
+		case 0:
+			t[1] = -1
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 1:
+			t[1] = 0
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 2:
+			t[1] = 0
+			t[2] = 0
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 3:
+			t[1] = 0
+			t[2] = 0
+			t[3] = 0
+			t[4] = -1
+			t[5] = -1
+		case 4:
+			t[1] = 0
+			t[2] = 0
+			t[3] = 0
+			t[4] = 0
+			t[5] = -1
+		case 5:
+			t[1] = 0
+			t[2] = 0
+			t[3] = 0
+			t[4] = 0
+			t[5] = 0
+		}
+	case 1:
+		switch definitionLevel {
+		case 0:
+			t[1] = -1
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 1:
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 2:
+			t[2] = 0
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 3:
+			t[2] = 0
+			t[3] = 0
+			t[4] = -1
+			t[5] = -1
+		case 4:
+			t[2] = 0
+			t[3] = 0
+			t[4] = 0
+			t[5] = -1
+		case 5:
+			t[2] = 0
+			t[3] = 0
+			t[4] = 0
+			t[5] = 0
+		}
+	case 2:
+		switch definitionLevel {
+		case 0:
+			t[1] = -1
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 1:
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 2:
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 3:
+			t[3] = 0
+			t[4] = -1
+			t[5] = -1
+		case 4:
+			t[3] = 0
+			t[4] = 0
+			t[5] = -1
+		case 5:
+			t[3] = 0
+			t[4] = 0
+			t[5] = 0
+		}
+	case 3:
+		switch definitionLevel {
+		case 0:
+			t[1] = -1
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 1:
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 2:
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 3:
+			t[4] = -1
+			t[5] = -1
+		case 4:
+			t[4] = 0
+			t[5] = -1
+		case 5:
+			t[4] = 0
+			t[5] = 0
+		}
+	case 4:
+		switch definitionLevel {
+		case 0:
+			t[1] = -1
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 1:
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 2:
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 3:
+			t[4] = -1
+			t[5] = -1
+		case 4:
+			t[5] = -1
+		case 5:
+			t[5] = 0
+		}
+	case 5:
+		switch definitionLevel {
+		case 0:
+			t[1] = -1
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 1:
+			t[2] = -1
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 2:
+			t[3] = -1
+			t[4] = -1
+			t[5] = -1
+		case 3:
+			t[4] = -1
+			t[5] = -1
+		case 4:
+			t[5] = -1
+		}
 	}
 }
 
@@ -118,6 +293,26 @@ func (t *RowNumber) Skip(numRows int64) {
 	}
 }
 
+// Preceding returns the largest representable row number that is immediately prior to this
+// one. Think of it like math.NextAfter but for segmented row numbers. Examples:
+//
+//		RowNumber 1000.0.0 (defined at 3 levels) is preceded by 999.max.max
+//	    RowNumber 1000.-1.-1 (defined at 1 level) is preceded by 999.-1.-1
+func (t RowNumber) Preceding() RowNumber {
+	for i := len(t) - 1; i >= 0; i-- {
+		switch t[i] {
+		case -1:
+			continue
+		case 0:
+			t[i] = math.MaxInt64
+		default:
+			t[i]--
+			return t
+		}
+	}
+	return t
+}
+
 // IteratorResult is a row of data with a row number and named columns of data.
 // Internally it has an unstructured list for efficient collection. The ToMap()
 // function can be used to make inspection easier.
@@ -179,6 +374,14 @@ func (r *IteratorResult) Columns(buffer [][]parquet.Value, names ...string) [][]
 	return buffer
 }
 
+func (r *IteratorResult) String() string {
+	if r == nil {
+		return "nil"
+	}
+	return fmt.Sprintf("rowNum=%d entries=%+#v", r.RowNumber[0], r.ToMap())
+
+}
+
 // iterator - Every iterator follows this interface and can be composed.
 type Iterator = iter.SeekIterator[*IteratorResult, RowNumberWithDefinitionLevel]
 
@@ -186,34 +389,7 @@ func NewErrIterator(err error) Iterator {
 	return iter.NewErrSeekIterator[*IteratorResult, RowNumberWithDefinitionLevel](err)
 }
 
-var columnIteratorPool = sync.Pool{
-	New: func() interface{} {
-		return &columnIteratorBuffer{}
-	},
-}
-
-func columnIteratorPoolGet(capacity, len int) *columnIteratorBuffer {
-	res := columnIteratorPool.Get().(*columnIteratorBuffer)
-	if cap(res.rowNumbers) < capacity {
-		res.rowNumbers = make([]RowNumber, capacity)
-	}
-	if cap(res.values) < capacity {
-		res.values = make([]parquet.Value, capacity)
-	}
-	res.rowNumbers = res.rowNumbers[:len]
-	res.values = res.values[:len]
-	return res
-}
-
-func columnIteratorPoolPut(b *columnIteratorBuffer) {
-	b.values = b.values[:cap(b.values)]
-	for i := range b.values {
-		b.values[i] = parquet.Value{}
-	}
-	columnIteratorPool.Put(b)
-}
-
-var columnIteratorResultPool = sync.Pool{
+var iteratorResultPool = sync.Pool{
 	New: func() interface{} {
 		return &IteratorResult{Entries: make([]struct {
 			k        string
@@ -223,459 +399,112 @@ var columnIteratorResultPool = sync.Pool{
 	},
 }
 
-func columnIteratorResultPoolGet() *IteratorResult {
-	res := columnIteratorResultPool.Get().(*IteratorResult)
+func iteratorResultPoolGet() *IteratorResult {
+	res := iteratorResultPool.Get().(*IteratorResult)
 	return res
 }
 
-func columnIteratorResultPoolPut(r *IteratorResult) {
+func iteratorResultPoolPut(r *IteratorResult) {
 	if r != nil {
 		r.Reset()
-		columnIteratorResultPool.Put(r)
+		iteratorResultPool.Put(r)
 	}
 }
 
-// ColumnIterator asynchronously iterates through the given row groups and column. Applies
-// the optional predicate to each chunk, page, and value.  Results are read by calling
-// Next() until it returns nil.
-type ColumnIterator struct {
-	rgs     []parquet.RowGroup
-	col     int
-	colName string
-	filter  *InstrumentedPredicate
-
-	selectAs string
-	seekTo   atomic.Value
-
-	metrics *Metrics
-	table   string
-	quit    chan struct{}
-	ch      chan *columnIteratorBuffer
-
-	curr  *columnIteratorBuffer
-	currN int
+type BinaryJoinIterator struct {
+	left            Iterator
+	right           Iterator
+	definitionLevel int
 
-	result *IteratorResult
-	err    error
+	err error
+	res *IteratorResult
 }
 
-var _ Iterator = (*ColumnIterator)(nil)
+var _ Iterator = (*BinaryJoinIterator)(nil)
 
-type columnIteratorBuffer struct {
-	rowNumbers []RowNumber
-	values     []parquet.Value
-	err        error
+func NewBinaryJoinIterator(definitionLevel int, left, right Iterator) *BinaryJoinIterator {
+	return &BinaryJoinIterator{
+		left:            left,
+		right:           right,
+		definitionLevel: definitionLevel,
+	}
 }
 
-func NewColumnIterator(ctx context.Context, rgs []parquet.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *ColumnIterator {
-	c := &ColumnIterator{
-		metrics:  getMetricsFromContext(ctx),
-		table:    strings.ToLower(rgs[0].Schema().Name()) + "s",
-		rgs:      rgs,
-		col:      column,
-		colName:  columnName,
-		filter:   &InstrumentedPredicate{pred: filter},
-		selectAs: selectAs,
-		quit:     make(chan struct{}),
-		ch:       make(chan *columnIteratorBuffer, 1),
-		currN:    -1,
+// nextOrSeek will use next if the iterator is exactly one row aways
+func (bj *BinaryJoinIterator) nextOrSeek(to RowNumberWithDefinitionLevel, it Iterator) bool {
+	// Seek when definition level is higher then 0, there is not previous iteration or when the difference between current position and to is not 1
+	if to.DefinitionLevel != 0 || it.At() == nil || to.RowNumber.Preceding() != it.At().RowNumber {
+		return it.Seek(to)
 	}
-
-	go c.iterate(ctx, readSize)
-	return c
+	return it.Next()
 }
 
-func (c *ColumnIterator) iterate(ctx context.Context, readSize int) {
-	defer close(c.ch)
-
-	span, _ := opentracing.StartSpanFromContext(ctx, "columnIterator.iterate", opentracing.Tags{
-		"columnIndex": c.col,
-		"column":      c.colName,
-	})
-	defer func() {
-		span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load())
-		span.SetTag("inspectedPages", c.filter.InspectedPages.Load())
-		span.SetTag("inspectedValues", c.filter.InspectedValues.Load())
-		span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load())
-		span.SetTag("keptPages", c.filter.KeptPages.Load())
-		span.SetTag("keptValues", c.filter.KeptValues.Load())
-		span.Finish()
-	}()
-
-	rn := EmptyRowNumber()
-	buffer := make([]parquet.Value, readSize)
-
-	checkSkip := func(numRows int64) bool {
-		seekTo := c.seekTo.Load()
-		if seekTo == nil {
+func (bj *BinaryJoinIterator) Next() bool {
+	for {
+		if !bj.left.Next() {
+			bj.err = bj.left.Err()
 			return false
 		}
+		resLeft := bj.left.At()
 
-		seekToRN := seekTo.(RowNumber)
-
-		rnNext := rn
-		rnNext.Skip(numRows)
-
-		return CompareRowNumbers(0, rnNext, seekToRN) == -1
-	}
-
-	for _, rg := range c.rgs {
-		col := rg.ColumnChunks()[c.col]
-
-		if checkSkip(rg.NumRows()) {
-			// Skip column chunk
-			rn.Skip(rg.NumRows())
-			continue
-		}
-
-		if c.filter != nil {
-			if !c.filter.KeepColumnChunk(col) {
-				// Skip column chunk
-				rn.Skip(rg.NumRows())
-				continue
-			}
-		}
-
-		func(col parquet.ColumnChunk) {
-			pgs := col.Pages()
-			defer func() {
-				if err := pgs.Close(); err != nil {
-					span.LogKV("closing error", err)
-				}
-			}()
-			for {
-				pg, err := pgs.ReadPage()
-				if pg == nil || err == io.EOF {
-					break
-				}
-				c.metrics.pageReadsTotal.WithLabelValues(c.table, c.colName).Add(1)
-				span.LogFields(
-					log.String("msg", "reading page"),
-					log.Int64("page_num_values", pg.NumValues()),
-					log.Int64("page_size", pg.Size()),
-				)
-				if err != nil {
-					return
-				}
-
-				if checkSkip(pg.NumRows()) {
-					// Skip page
-					rn.Skip(pg.NumRows())
-					continue
-				}
-
-				if c.filter != nil {
-					if !c.filter.KeepPage(pg) {
-						// Skip page
-						rn.Skip(pg.NumRows())
-						continue
-					}
-				}
-
-				vr := pg.Values()
-				for {
-					count, err := vr.ReadValues(buffer)
-					if count > 0 {
-
-						// Assign row numbers, filter values, and collect the results.
-						newBuffer := columnIteratorPoolGet(readSize, 0)
-
-						for i := 0; i < count; i++ {
-
-							v := buffer[i]
-
-							// We have to do this for all values (even if the
-							// value is excluded by the predicate)
-							rn.Next(v.RepetitionLevel(), v.DefinitionLevel())
-
-							if c.filter != nil {
-								if !c.filter.KeepValue(v) {
-									continue
-								}
-							}
-
-							newBuffer.rowNumbers = append(newBuffer.rowNumbers, rn)
-							newBuffer.values = append(newBuffer.values, v)
-						}
-
-						if len(newBuffer.rowNumbers) > 0 {
-							select {
-							case c.ch <- newBuffer:
-							case <-c.quit:
-								return
-							case <-ctx.Done():
-								return
-							}
-						} else {
-							// All values excluded, we go ahead and immediately
-							// return the buffer to the pool.
-							columnIteratorPoolPut(newBuffer)
-						}
-					}
-
-					// Error checks MUST occur after processing any returned data
-					// following io.Reader behavior.
-					if err == io.EOF {
-						break
-					}
-					if err != nil {
-						c.ch <- &columnIteratorBuffer{err: err}
-						return
-					}
-				}
-
-			}
-		}(col)
-	}
-}
-
-// At returns the current value from the iterator.
-func (c *ColumnIterator) At() *IteratorResult {
-	return c.result
-}
-
-// Next returns the next matching value from the iterator.
-// Returns nil when finished.
-func (c *ColumnIterator) Next() bool {
-	t, v := c.next()
-	if t.Valid() {
-		c.result = c.makeResult(t, v)
-		return true
-	}
-
-	c.result = nil
-	return false
-}
-
-func (c *ColumnIterator) next() (RowNumber, parquet.Value) {
-	// Consume current buffer until exhausted
-	// then read another one from the channel.
-	if c.curr != nil {
-		for c.currN++; c.currN < len(c.curr.rowNumbers); {
-			t := c.curr.rowNumbers[c.currN]
-			if t.Valid() {
-				return t, c.curr.values[c.currN]
-			}
+		// now seek the right iterator to the left position
+		if !bj.nextOrSeek(RowNumberWithDefinitionLevel{resLeft.RowNumber, bj.definitionLevel}, bj.right) {
+			bj.err = bj.right.Err()
+			return false
 		}
-
-		// Done with this buffer
-		columnIteratorPoolPut(c.curr)
-		c.curr = nil
-	}
-
-	if v, ok := <-c.ch; ok {
-		if v.err != nil {
-			c.err = v.err
-			return EmptyRowNumber(), parquet.Value{}
+		resRight := bj.right.At()
+
+		makeResult := func() {
+			bj.res = iteratorResultPoolGet()
+			bj.res.RowNumber = resLeft.RowNumber
+			bj.res.Append(resLeft)
+			bj.res.Append(resRight)
+			iteratorResultPoolPut(resLeft)
+			iteratorResultPoolPut(resRight)
 		}
-		// Got next buffer, guaranteed to have at least 1 element
-		c.curr = v
-		c.currN = 0
-		return c.curr.rowNumbers[0], c.curr.values[0]
-	}
-
-	// Failed to read from the channel, means iterator is exhausted.
-	return EmptyRowNumber(), parquet.Value{}
-}
-
-// SeekTo moves this iterator to the next result that is greater than
-// or equal to the given row number (and based on the given definition level)
-func (c *ColumnIterator) Seek(to RowNumberWithDefinitionLevel) bool {
-	var at RowNumber
-	var v parquet.Value
-
-	// Because iteration happens in the background, we signal the row
-	// to skip to, and then read until we are at the right spot. The
-	// seek is best-effort and may have no effect if the iteration
-	// already further ahead, and there may already be older data
-	// in the buffer.
-	c.seekTo.Store(to.RowNumber)
-	for at, v = c.next(); at.Valid() && CompareRowNumbers(to.DefinitionLevel, at, to.RowNumber) < 0; {
-		at, v = c.next()
-	}
-
-	if at.Valid() {
-		c.result = c.makeResult(at, v)
-		return true
-	}
-
-	c.result = nil
-	return false
-}
-
-func (c *ColumnIterator) makeResult(t RowNumber, v parquet.Value) *IteratorResult {
-	r := columnIteratorResultPoolGet()
-	r.RowNumber = t
-	if c.selectAs != "" {
-		r.AppendValue(c.selectAs, v)
-	}
-	return r
-}
-
-func (c *ColumnIterator) Close() error {
-	close(c.quit)
-	return nil
-}
 
-func (c *ColumnIterator) Err() error {
-	return c.err
-}
-
-// JoinIterator joins two or more iterators for matches at the given definition level.
-// I.e. joining at definitionLevel=0 means that each iterator must produce a result
-// within the same root node.
-type JoinIterator struct {
-	definitionLevel int
-	iters           []Iterator
-	peeks           []*IteratorResult
-	pred            GroupPredicate
-
-	result *IteratorResult
-}
-
-var _ Iterator = (*JoinIterator)(nil)
-
-func NewJoinIterator(definitionLevel int, iters []Iterator, pred GroupPredicate) *JoinIterator {
-	j := JoinIterator{
-		definitionLevel: definitionLevel,
-		iters:           iters,
-		peeks:           make([]*IteratorResult, len(iters)),
-		pred:            pred,
-	}
-	return &j
-}
-
-func (j *JoinIterator) At() *IteratorResult {
-	return j.result
-}
-
-func (j *JoinIterator) Next() bool {
-	// Here is the algorithm for joins:  On each pass of the iterators
-	// we remember which ones are pointing at the earliest rows. If all
-	// are the lowest (and therefore pointing at the same thing) then
-	// there is a successful join and return the result.
-	// Else we progress the iterators and try again.
-	// There is an optimization here in that we can seek to the highest
-	// row seen. It's impossible to have joins before that row.
-	for {
-		lowestRowNumber := MaxRowNumber()
-		highestRowNumber := EmptyRowNumber()
-		lowestIters := make([]int, 0, len(j.iters))
-
-		for iterNum := range j.iters {
-			res := j.peek(iterNum)
-
-			if res == nil {
-				// Iterator exhausted, no more joins possible
-				j.result = nil
+		if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 {
+			// we have a found an element
+			makeResult()
+			return true
+		} else if cmp < 0 {
+			if !bj.nextOrSeek(RowNumberWithDefinitionLevel{resRight.RowNumber, bj.definitionLevel}, bj.left) {
+				bj.err = bj.left.Err()
 				return false
 			}
+			resLeft = bj.left.At()
 
-			c := CompareRowNumbers(j.definitionLevel, res.RowNumber, lowestRowNumber)
-			switch c {
-			case -1:
-				// New lowest, reset
-				lowestIters = lowestIters[:0]
-				lowestRowNumber = res.RowNumber
-				fallthrough
-
-			case 0:
-				// Same, append
-				lowestIters = append(lowestIters, iterNum)
-			}
-
-			if CompareRowNumbers(j.definitionLevel, res.RowNumber, highestRowNumber) == 1 {
-				// New high water mark
-				highestRowNumber = res.RowNumber
-			}
-		}
-
-		// All iterators pointing at same row?
-		if len(lowestIters) == len(j.iters) {
-			// Get the data
-			result := j.collect(lowestRowNumber)
-
-			// Keep group?
-			if j.pred == nil || j.pred.KeepGroup(result) {
-				// Yes
-				j.result = result
+			if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 {
+				makeResult()
 				return true
 			}
-		}
-
-		// Skip all iterators to the highest row seen, it's impossible
-		// to find matches before that.
-		j.seekAll(RowNumberWithDefinitionLevel{RowNumber: highestRowNumber, DefinitionLevel: j.definitionLevel})
-	}
-}
-
-func (j *JoinIterator) Seek(to RowNumberWithDefinitionLevel) bool {
-	j.seekAll(to)
-	return j.Next()
-}
 
-func (j *JoinIterator) seekAll(to RowNumberWithDefinitionLevel) {
-	to.RowNumber = TruncateRowNumber(to)
-	for iterNum, iter := range j.iters {
-		if j.peeks[iterNum] == nil || CompareRowNumbers(to.DefinitionLevel, j.peeks[iterNum].RowNumber, to.RowNumber) == -1 {
-			columnIteratorResultPoolPut(j.peeks[iterNum])
-			if iter.Seek(to) {
-				j.peeks[iterNum] = iter.At()
-			} else {
-				j.peeks[iterNum] = nil
-			}
+		} else {
+			// the right value can't be smaller than the left one because we seeked beyond it
+			panic("not expected to happen")
 		}
 	}
 }
 
-func (j *JoinIterator) peek(iterNum int) *IteratorResult {
-	if j.peeks[iterNum] == nil {
-		if j.iters[iterNum].Next() {
-			j.peeks[iterNum] = j.iters[iterNum].At()
-		}
-	}
-	return j.peeks[iterNum]
+func (bj *BinaryJoinIterator) At() *IteratorResult {
+	return bj.res
 }
 
-// Collect data from the given iterators until they point at
-// the next row (according to the configured definition level)
-// or are exhausted.
-func (j *JoinIterator) collect(rowNumber RowNumber) *IteratorResult {
-	result := columnIteratorResultPoolGet()
-	result.RowNumber = rowNumber
-
-	for i := range j.iters {
-		for j.peeks[i] != nil && CompareRowNumbers(j.definitionLevel, j.peeks[i].RowNumber, rowNumber) == 0 {
-
-			result.Append(j.peeks[i])
-
-			columnIteratorResultPoolPut(j.peeks[i])
-
-			if j.iters[i].Next() {
-				j.peeks[i] = j.iters[i].At()
-			} else {
-				j.peeks[i] = nil
-			}
-		}
-	}
-	return result
+func (bj *BinaryJoinIterator) Seek(to RowNumberWithDefinitionLevel) bool {
+	bj.left.Seek(to)
+	bj.right.Seek(to)
+	return bj.Next()
 }
 
-func (j *JoinIterator) Close() error {
+func (bj *BinaryJoinIterator) Close() error {
 	var merr multierror.MultiError
-	for _, i := range j.iters {
-		merr.Add(i.Close())
-	}
+	merr.Add(bj.left.Close())
+	merr.Add(bj.right.Close())
 	return merr.Err()
 }
 
-func (j *JoinIterator) Err() error {
-	for _, i := range j.iters {
-		if err := i.Err(); err != nil {
-			return err
-		}
-	}
-	return nil
+func (c *BinaryJoinIterator) Err() error {
+	return c.err
 }
 
 // UnionIterator produces all results for all given iterators.  When iterators
@@ -783,7 +612,7 @@ func (u *UnionIterator) peek(iterNum int) *IteratorResult {
 // the next row (according to the configured definition level)
 // or are exhausted.
 func (u *UnionIterator) collect(iterNums []int, rowNumber RowNumber) *IteratorResult {
-	result := columnIteratorResultPoolGet()
+	result := iteratorResultPoolGet()
 	result.RowNumber = rowNumber
 
 	for _, iterNum := range iterNums {
@@ -791,7 +620,7 @@ func (u *UnionIterator) collect(iterNums []int, rowNumber RowNumber) *IteratorRe
 
 			result.Append(u.peeks[iterNum])
 
-			columnIteratorResultPoolPut(u.peeks[iterNum])
+			iteratorResultPoolPut(u.peeks[iterNum])
 
 			if u.iters[iterNum].Next() {
 				u.peeks[iterNum] = u.iters[iterNum].At()
@@ -902,7 +731,7 @@ func (r *RowNumberIterator[T]) Next() bool {
 	if !r.Iterator.Next() {
 		return false
 	}
-	r.current = columnIteratorResultPoolGet()
+	r.current = iteratorResultPoolGet()
 	r.current.Reset()
 	rowGetter, ok := any(r.Iterator.At()).(RowGetter)
 	if !ok {
@@ -941,3 +770,463 @@ func (r *RowNumberIterator[T]) Seek(to RowNumberWithDefinitionLevel) bool {
 	}
 	return true
 }
+
+// SyncIterator is a synchronous column iterator. It scans through the given row
+// groups and column, and applies the optional predicate to each chunk, page, and value.
+// Results are read by calling Next() until it returns nil.
+type SyncIterator struct {
+	// Config
+	column     int
+	columnName string
+	table      string
+	rgs        []parquet.RowGroup
+	rgsMin     []RowNumber
+	rgsMax     []RowNumber // Exclusive, row number of next one past the row group
+	readSize   int
+	selectAs   string
+	filter     *InstrumentedPredicate
+
+	// Status
+	ctx             context.Context
+	cancel          func()
+	span            trace.Span
+	metrics         *Metrics
+	curr            RowNumber
+	currRowGroup    parquet.RowGroup
+	currRowGroupMin RowNumber
+	currRowGroupMax RowNumber
+	currChunk       parquet.ColumnChunk
+	currPages       parquet.Pages
+	currPage        parquet.Page
+	currPageMax     RowNumber
+	currValues      parquet.ValueReader
+	currBuf         []parquet.Value
+	currBufN        int
+
+	err error
+	res *IteratorResult
+}
+
+var _ Iterator = (*SyncIterator)(nil)
+
+var syncIteratorPool = sync.Pool{
+	New: func() interface{} {
+		return []parquet.Value{}
+	},
+}
+
+func syncIteratorPoolGet(capacity, len int) []parquet.Value {
+	res := syncIteratorPool.Get().([]parquet.Value)
+	if cap(res) < capacity {
+		res = make([]parquet.Value, capacity)
+	}
+	res = res[:len]
+	return res
+}
+
+func syncIteratorPoolPut(b []parquet.Value) {
+	for i := range b {
+		b[i] = parquet.Value{}
+	}
+	syncIteratorPool.Put(b) // nolint: staticcheck
+}
+
+func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *SyncIterator {
+
+	// Assign row group bounds.
+	// Lower bound is inclusive
+	// Upper bound is exclusive, points at the first row of the next group
+	rn := EmptyRowNumber()
+	rgsMin := make([]RowNumber, len(rgs))
+	rgsMax := make([]RowNumber, len(rgs))
+	for i, rg := range rgs {
+		rgsMin[i] = rn
+		rgsMax[i] = rn
+		rgsMax[i].Skip(rg.NumRows() + 1)
+		rn.Skip(rg.NumRows())
+	}
+
+	tr := otel.Tracer("query")
+
+	ctx, span := tr.Start(ctx, "syncIterator", trace.WithAttributes(
+		attribute.String("column", columnName),
+		attribute.Int("columnIndex", column),
+	))
+
+	ctx, cancel := context.WithCancel(ctx)
+
+	return &SyncIterator{
+		table:      strings.ToLower(rgs[0].Schema().Name()) + "s",
+		ctx:        ctx,
+		cancel:     cancel,
+		metrics:    getMetricsFromContext(ctx),
+		span:       span,
+		column:     column,
+		columnName: columnName,
+		rgs:        rgs,
+		readSize:   readSize,
+		selectAs:   selectAs,
+		rgsMin:     rgsMin,
+		rgsMax:     rgsMax,
+		filter:     &InstrumentedPredicate{pred: filter},
+		curr:       EmptyRowNumber(),
+	}
+}
+
+func (c *SyncIterator) At() *IteratorResult {
+	return c.res
+}
+
+func (c *SyncIterator) Next() bool {
+	rn, v, err := c.next()
+	if err != nil {
+		c.res = nil
+		c.err = err
+		return false
+	}
+	if !rn.Valid() {
+		c.res = nil
+		c.err = nil
+		return false
+	}
+	c.res = c.makeResult(rn, v)
+	return true
+}
+
+// SeekTo moves this iterator to the next result that is greater than
+// or equal to the given row number (and based on the given definition level)
+func (c *SyncIterator) Seek(to RowNumberWithDefinitionLevel) bool {
+
+	if c.seekRowGroup(to.RowNumber, to.DefinitionLevel) {
+		c.res = nil
+		c.err = nil
+		return false
+	}
+
+	done, err := c.seekPages(to.RowNumber, to.DefinitionLevel)
+	if err != nil {
+		c.res = nil
+		c.err = err
+		return false
+	}
+	if done {
+		c.res = nil
+		c.err = nil
+		return false
+	}
+
+	// The row group and page have been selected to where this value is possibly
+	// located. Now scan through the page and look for it.
+	for {
+		rn, v, err := c.next()
+		if err != nil {
+			c.res = nil
+			c.err = err
+			return false
+		}
+		if !rn.Valid() {
+			c.res = nil
+			c.err = nil
+			return false
+		}
+
+		if CompareRowNumbers(to.DefinitionLevel, rn, to.RowNumber) >= 0 {
+			c.res = c.makeResult(rn, v)
+			c.err = nil
+			return true
+		}
+	}
+}
+
+func (c *SyncIterator) popRowGroup() (parquet.RowGroup, RowNumber, RowNumber) {
+	if len(c.rgs) == 0 {
+		return nil, EmptyRowNumber(), EmptyRowNumber()
+	}
+
+	rg := c.rgs[0]
+	min := c.rgsMin[0]
+	max := c.rgsMax[0]
+
+	c.rgs = c.rgs[1:]
+	c.rgsMin = c.rgsMin[1:]
+	c.rgsMax = c.rgsMax[1:]
+
+	return rg, min, max
+}
+
+// seekRowGroup skips ahead to the row group that could contain the value at the
+// desired row number. Does nothing if the current row group is already the correct one.
+func (c *SyncIterator) seekRowGroup(seekTo RowNumber, definitionLevel int) (done bool) {
+	if c.currRowGroup != nil && CompareRowNumbers(definitionLevel, seekTo, c.currRowGroupMax) >= 0 {
+		// Done with this row group
+		c.closeCurrRowGroup()
+	}
+
+	for c.currRowGroup == nil {
+
+		rg, min, max := c.popRowGroup()
+		if rg == nil {
+			return true
+		}
+
+		if CompareRowNumbers(definitionLevel, seekTo, max) != -1 {
+			continue
+		}
+
+		cc := rg.ColumnChunks()[c.column]
+		if c.filter != nil && !c.filter.KeepColumnChunk(cc) {
+			continue
+		}
+
+		// This row group matches both row number and filter.
+		c.setRowGroup(rg, min, max)
+	}
+
+	return c.currRowGroup == nil
+}
+
+// seekPages skips ahead in the current row group to the page that could contain the value at
+// the desired row number. Does nothing if the current page is already the correct one.
+func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bool, err error) {
+	if c.currPage != nil && CompareRowNumbers(definitionLevel, seekTo, c.currPageMax) >= 0 {
+		// Value not in this page
+		c.setPage(nil)
+	}
+
+	if c.currPage == nil {
+
+		// TODO (mdisibio)   :((((((((
+		//    pages.SeekToRow is more costly than expected.  It doesn't reuse existing i/o
+		// so it can't be called naively every time we swap pages. We need to figure out
+		// a way to determine when it is worth calling here.
+		/*
+			// Seek into the pages. This is relative to the start of the row group
+			if seekTo[0] > 0 {
+				// Determine row delta. We subtract 1 because curr points at the previous row
+				skip := seekTo[0] - c.currRowGroupMin[0] - 1
+				if skip > 0 {
+					if err := c.currPages.SeekToRow(skip); err != nil {
+						return true, err
+					}
+					c.curr.Skip(skip)
+				}
+			}*/
+
+		for c.currPage == nil {
+			pg, err := c.currPages.ReadPage()
+			if pg == nil || err != nil {
+				// No more pages in this column chunk,
+				// cleanup and exit.
+				if err == io.EOF {
+					err = nil
+				}
+				parquet.Release(pg)
+				c.closeCurrRowGroup()
+				return true, err
+			}
+			c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1)
+			c.span.AddEvent(
+				"read page (seekPages)",
+				trace.WithAttributes(
+					attribute.Int64("page_num_values", pg.NumValues()),
+					attribute.Int64("page_size", pg.Size()),
+				))
+
+			// Skip based on row number?
+			newRN := c.curr
+			newRN.Skip(pg.NumRows() + 1)
+			if CompareRowNumbers(definitionLevel, seekTo, newRN) >= 0 {
+				c.curr.Skip(pg.NumRows())
+				parquet.Release(pg)
+				continue
+			}
+
+			// Skip based on filter?
+			if c.filter != nil && !c.filter.KeepPage(pg) {
+				c.curr.Skip(pg.NumRows())
+				parquet.Release(pg)
+				continue
+			}
+
+			c.setPage(pg)
+		}
+	}
+
+	return false, nil
+}
+
+// next is the core functionality of this iterator and returns the next matching result. This
+// may involve inspecting multiple row groups, pages, and values until a match is found. When
+// we run out of things to inspect, it returns nil. The reason this method is distinct from
+// Next() is because it doesn't wrap the results in an IteratorResult, which is more efficient
+// when being called multiple times and throwing away the results like in SeekTo().
+func (c *SyncIterator) next() (RowNumber, *parquet.Value, error) {
+	for {
+
+		// return if context is cancelled
+		select {
+		case <-c.ctx.Done():
+			return EmptyRowNumber(), nil, c.ctx.Err()
+		default:
+		}
+
+		if c.currRowGroup == nil {
+			rg, min, max := c.popRowGroup()
+			if rg == nil {
+				return EmptyRowNumber(), nil, nil
+			}
+
+			cc := rg.ColumnChunks()[c.column]
+			if c.filter != nil && !c.filter.KeepColumnChunk(cc) {
+				continue
+			}
+
+			c.setRowGroup(rg, min, max)
+		}
+
+		if c.currPage == nil {
+			pg, err := c.currPages.ReadPage()
+			if pg == nil || err == io.EOF {
+				// This row group is exhausted
+				c.closeCurrRowGroup()
+				continue
+			}
+			if err != nil {
+				return EmptyRowNumber(), nil, err
+			}
+			c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1)
+			c.span.AddEvent(
+				"read page (next)",
+				trace.WithAttributes(
+					attribute.Int64("page_num_values", pg.NumValues()),
+					attribute.Int64("page_size", pg.Size()),
+				))
+
+			if c.filter != nil && !c.filter.KeepPage(pg) {
+				// This page filtered out
+				c.curr.Skip(pg.NumRows())
+				parquet.Release(pg)
+				continue
+			}
+			c.setPage(pg)
+		}
+
+		// Read next batch of values if needed
+		if c.currBuf == nil {
+			c.currBuf = syncIteratorPoolGet(c.readSize, 0)
+		}
+		if c.currBufN >= len(c.currBuf) || len(c.currBuf) == 0 {
+			c.currBuf = c.currBuf[:cap(c.currBuf)]
+			n, err := c.currValues.ReadValues(c.currBuf)
+			if err != nil && err != io.EOF {
+				return EmptyRowNumber(), nil, err
+			}
+			c.currBuf = c.currBuf[:n]
+			c.currBufN = 0
+			if n == 0 {
+				// This value reader and page are exhausted.
+				c.setPage(nil)
+				continue
+			}
+		}
+
+		// Consume current buffer until empty
+		for c.currBufN < len(c.currBuf) {
+			v := &c.currBuf[c.currBufN]
+
+			// Inspect all values to track the current row number,
+			// even if the value is filtered out next.
+			c.curr.Next(v.RepetitionLevel(), v.DefinitionLevel())
+			c.currBufN++
+
+			if c.filter != nil && !c.filter.KeepValue(*v) {
+				continue
+			}
+
+			return c.curr, v, nil
+		}
+	}
+}
+
+func (c *SyncIterator) setRowGroup(rg parquet.RowGroup, min, max RowNumber) {
+	c.closeCurrRowGroup()
+	c.curr = min
+	c.currRowGroup = rg
+	c.currRowGroupMin = min
+	c.currRowGroupMax = max
+	c.currChunk = rg.ColumnChunks()[c.column]
+	c.currPages = c.currChunk.Pages()
+}
+
+func (c *SyncIterator) setPage(pg parquet.Page) {
+
+	// Handle an outgoing page
+	if c.currPage != nil {
+		c.curr = c.currPageMax.Preceding() // Reposition current row number to end of this page.
+		parquet.Release(c.currPage)
+		c.currPage = nil
+	}
+
+	// Reset value buffers
+	c.currValues = nil
+	c.currPageMax = EmptyRowNumber()
+	c.currBufN = 0
+
+	// If we don't immediately have a new incoming page
+	// then return the buffer to the pool.
+	if pg == nil && c.currBuf != nil {
+		syncIteratorPoolPut(c.currBuf)
+		c.currBuf = nil
+	}
+
+	// Handle an incoming page
+	if pg != nil {
+		rn := c.curr
+		rn.Skip(pg.NumRows() + 1) // Exclusive upper bound, points at the first rownumber in the next page
+		c.currPage = pg
+		c.currPageMax = rn
+		c.currValues = pg.Values()
+	}
+}
+
+func (c *SyncIterator) closeCurrRowGroup() {
+	if c.currPages != nil {
+		c.currPages.Close()
+	}
+
+	c.currRowGroup = nil
+	c.currRowGroupMin = EmptyRowNumber()
+	c.currRowGroupMax = EmptyRowNumber()
+	c.currChunk = nil
+	c.currPages = nil
+	c.setPage(nil)
+}
+
+func (c *SyncIterator) makeResult(t RowNumber, v *parquet.Value) *IteratorResult {
+	r := iteratorResultPoolGet()
+	r.RowNumber = t
+	if c.selectAs != "" {
+		r.AppendValue(c.selectAs, v.Clone())
+	}
+	return r
+}
+
+func (c *SyncIterator) Err() error {
+	return c.err
+}
+
+func (c *SyncIterator) Close() error {
+	c.cancel()
+	c.closeCurrRowGroup()
+
+	c.span.SetAttributes(attribute.Int64("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load()))
+	/*
+		c.span.SetTag("inspectedPages", c.filter.InspectedPages.Load())
+		c.span.SetTag("inspectedValues", c.filter.InspectedValues.Load())
+		c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load())
+		c.span.SetTag("keptPages", c.filter.KeptPages.Load())
+		c.span.SetTag("keptValues", c.filter.KeptValues.Load())
+	*/
+	c.span.End()
+	return nil
+}
diff --git a/pkg/phlaredb/query/iters_test.go b/pkg/phlaredb/query/iters_test.go
index ba7828d85..1b06b344f 100644
--- a/pkg/phlaredb/query/iters_test.go
+++ b/pkg/phlaredb/query/iters_test.go
@@ -1,175 +1,586 @@
 package query
 
 import (
+	"bytes"
 	"context"
-	"errors"
+	"fmt"
+	"log"
+	"math"
+	"os"
 	"testing"
 
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/testutil"
 	"github.com/segmentio/parquet-go"
 	"github.com/stretchr/testify/require"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/exporters/jaeger"
+	"go.opentelemetry.io/otel/sdk/resource"
+	tracesdk "go.opentelemetry.io/otel/sdk/trace"
+	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
+	"go.opentelemetry.io/otel/trace"
 )
 
-type testData struct {
-	ID   int64  `parquet:"id"`
-	Name string `parquet:"name"`
+const MaxDefinitionLevel = 5
+
+type makeTestIterFn func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator
+
+var iterTestCases = []struct {
+	name     string
+	makeIter makeTestIterFn
+}{
+	{"sync", func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator {
+		return NewSyncIterator(context.TODO(), pf.RowGroups(), idx, selectAs, 1000, filter, selectAs)
+	}},
 }
 
-func newTestBuffer[A any](rows []A) parquet.RowGroup {
-	buffer := parquet.NewBuffer()
-	for i := range rows {
-		err := buffer.Write(rows[i])
-		if err != nil {
-			panic(err.Error())
+/*
+// TestNext compares the unrolled Next() with the original nextSlow() to
+// prevent drift
+func TestNext(t *testing.T) {
+	rn1 := RowNumber{0, 0, 0, 0, 0, 0}
+	rn2 := RowNumber{0, 0, 0, 0, 0, 0}
+
+	for i := 0; i < 1000; i++ {
+		r := rand.Intn(6)
+		d := rand.Intn(6)
+
+		rn1.Next(r, d)
+		rn2.nextSlow(r, d)
+
+		require.Equal(t, rn1, rn2)
+	}
+}
+*/
+
+func TestRowNumber(t *testing.T) {
+	tr := EmptyRowNumber()
+	require.Equal(t, RowNumber{-1, -1, -1, -1, -1, -1}, tr)
+
+	steps := []struct {
+		repetitionLevel int
+		definitionLevel int
+		expected        RowNumber
+	}{
+		// Name.Language.Country examples from the Dremel whitepaper
+		{0, 3, RowNumber{0, 0, 0, 0, -1, -1}},
+		{2, 2, RowNumber{0, 0, 1, -1, -1, -1}},
+		{1, 1, RowNumber{0, 1, -1, -1, -1, -1}},
+		{1, 3, RowNumber{0, 2, 0, 0, -1, -1}},
+		{0, 1, RowNumber{1, 0, -1, -1, -1, -1}},
+	}
+
+	for _, step := range steps {
+		tr.Next(step.repetitionLevel, step.definitionLevel)
+		require.Equal(t, step.expected, tr)
+	}
+}
+
+func TestCompareRowNumbers(t *testing.T) {
+	testCases := []struct {
+		a, b     RowNumber
+		expected int
+	}{
+		{RowNumber{-1}, RowNumber{0}, -1},
+		{RowNumber{0}, RowNumber{0}, 0},
+		{RowNumber{1}, RowNumber{0}, 1},
+
+		{RowNumber{0, 1}, RowNumber{0, 2}, -1},
+		{RowNumber{0, 2}, RowNumber{0, 1}, 1},
+	}
+
+	for _, tc := range testCases {
+		require.Equal(t, tc.expected, CompareRowNumbers(MaxDefinitionLevel, tc.a, tc.b))
+	}
+}
+
+func TestRowNumberPreceding(t *testing.T) {
+	testCases := []struct {
+		start, preceding RowNumber
+	}{
+		{RowNumber{1000, -1, -1, -1, -1, -1}, RowNumber{999, -1, -1, -1, -1, -1}},
+		{RowNumber{1000, 0, 0, 0, 0, 0}, RowNumber{999, math.MaxInt64, math.MaxInt64, math.MaxInt64, math.MaxInt64, math.MaxInt64}},
+	}
+
+	for _, tc := range testCases {
+		require.Equal(t, tc.preceding, tc.start.Preceding())
+	}
+}
+
+func TestColumnIterator(t *testing.T) {
+	for _, tc := range iterTestCases {
+		t.Run(tc.name, func(t *testing.T) {
+			testColumnIterator(t, tc.makeIter)
+		})
+	}
+}
+
+func testColumnIterator(t *testing.T, makeIter makeTestIterFn) {
+	count := 100_000
+	pf := createTestFile(t, count)
+
+	idx, _ := GetColumnIndexByPath(pf, "A")
+	iter := makeIter(pf, idx, nil, "A")
+	defer iter.Close()
+
+	for i := 0; i < count; i++ {
+		require.True(t, iter.Next())
+		res := iter.At()
+		require.NotNil(t, res, "i=%d", i)
+		require.Equal(t, RowNumber{int64(i), -1, -1, -1, -1, -1}, res.RowNumber)
+		require.Equal(t, int64(i), res.ToMap()["A"][0].Int64())
+	}
+
+	require.False(t, iter.Next())
+	require.NoError(t, iter.Err())
+}
+
+func TestColumnIteratorSeek(t *testing.T) {
+	for _, tc := range iterTestCases {
+		t.Run(tc.name, func(t *testing.T) {
+			testColumnIteratorSeek(t, tc.makeIter)
+		})
+	}
+}
+
+func testColumnIteratorSeek(t *testing.T, makeIter makeTestIterFn) {
+	count := 10_000
+	pf := createTestFile(t, count)
+
+	idx, _ := GetColumnIndexByPath(pf, "A")
+	iter := makeIter(pf, idx, nil, "A")
+	defer iter.Close()
+
+	seekTos := []int64{
+		100,
+		1234,
+		4567,
+		5000,
+		7890,
+	}
+
+	for _, seekTo := range seekTos {
+		rn := EmptyRowNumber()
+		rn[0] = seekTo
+		require.True(t, iter.Seek(RowNumberWithDefinitionLevel{rn, 0}))
+		res := iter.At()
+		require.NotNil(t, res, "seekTo=%v", seekTo)
+		require.Equal(t, RowNumber{seekTo, -1, -1, -1, -1, -1}, res.RowNumber)
+		require.Equal(t, seekTo, res.ToMap()["A"][0].Int64())
+	}
+}
+
+func TestColumnIteratorPredicate(t *testing.T) {
+	for _, tc := range iterTestCases {
+		t.Run(tc.name, func(t *testing.T) {
+			testColumnIteratorPredicate(t, tc.makeIter)
+		})
+	}
+}
+
+func testColumnIteratorPredicate(t *testing.T, makeIter makeTestIterFn) {
+	count := 10_000
+	pf := createTestFile(t, count)
+
+	pred := NewIntBetweenPredicate(7001, 7003)
+
+	idx, _ := GetColumnIndexByPath(pf, "A")
+	iter := makeIter(pf, idx, pred, "A")
+	defer iter.Close()
+
+	expectedResults := []int64{
+		7001,
+		7002,
+		7003,
+	}
+
+	for _, expectedResult := range expectedResults {
+		require.True(t, iter.Next())
+		res := iter.At()
+		require.NotNil(t, res)
+		require.Equal(t, RowNumber{expectedResult, -1, -1, -1, -1, -1}, res.RowNumber)
+		require.Equal(t, expectedResult, res.ToMap()["A"][0].Int64())
+	}
+}
+
+func TestColumnIteratorExitEarly(t *testing.T) {
+	type T struct{ A int }
+
+	rows := []T{}
+	count := 10_000
+	for i := 0; i < count; i++ {
+		rows = append(rows, T{i})
+	}
+
+	pf := createFileWith(t, rows, 2)
+	idx, _ := GetColumnIndexByPath(pf, "A")
+	readSize := 1000
+
+	readIter := func(iter Iterator) (int, error) {
+		received := 0
+		for iter.Next() {
+			received++
 		}
+		return received, iter.Err()
 	}
-	return buffer
+
+	t.Run("cancelledEarly", func(t *testing.T) {
+		// Cancel before iterating
+		ctx, cancel := context.WithCancel(context.TODO())
+		cancel()
+		iter := NewSyncIterator(ctx, pf.RowGroups(), idx, "", readSize, nil, "A")
+		count, err := readIter(iter)
+		require.ErrorContains(t, err, "context canceled")
+		require.Equal(t, 0, count)
+	})
+
+	t.Run("cancelledPartial", func(t *testing.T) {
+		ctx, cancel := context.WithCancel(context.TODO())
+		iter := NewSyncIterator(ctx, pf.RowGroups(), idx, "", readSize, nil, "A")
+
+		// Read some results
+		require.True(t, iter.Next())
+
+		// Then cancel
+		cancel()
+
+		// Read again = context cancelled
+		_, err := readIter(iter)
+		require.ErrorContains(t, err, "context canceled")
+	})
+
+	t.Run("closedEarly", func(t *testing.T) {
+		// Close before iterating
+		iter := NewSyncIterator(context.TODO(), pf.RowGroups(), idx, "", readSize, nil, "A")
+		iter.Close()
+		count, err := readIter(iter)
+		require.ErrorContains(t, err, "context canceled")
+		require.Equal(t, 0, count)
+	})
+
+	t.Run("closedPartial", func(t *testing.T) {
+		iter := NewSyncIterator(context.TODO(), pf.RowGroups(), idx, "", readSize, nil, "A")
+
+		// Read some results
+		require.True(t, iter.Next())
+
+		// Then close
+		iter.Close()
+
+		// Read again = should close early
+		res2, err := readIter(iter)
+		require.ErrorContains(t, err, "context canceled")
+		require.Less(t, readSize+res2, count)
+	})
 }
 
-type errRowGroup struct {
-	parquet.RowGroup
+func BenchmarkColumnIterator(b *testing.B) {
+	for _, tc := range iterTestCases {
+		b.Run(tc.name, func(b *testing.B) {
+			benchmarkColumnIterator(b, tc.makeIter)
+		})
+	}
 }
 
-func (e *errRowGroup) ColumnChunks() []parquet.ColumnChunk {
-	chunks := e.RowGroup.ColumnChunks()
-	for pos := range chunks {
-		chunks[pos] = &errColumnChunk{chunks[pos]}
+func benchmarkColumnIterator(b *testing.B, makeIter makeTestIterFn) {
+	count := 100_000
+	pf := createTestFile(b, count)
+
+	idx, _ := GetColumnIndexByPath(pf, "A")
+
+	b.ResetTimer()
+
+	for i := 0; i < b.N; i++ {
+		iter := makeIter(pf, idx, nil, "A")
+		actualCount := 0
+		for iter.Next() {
+			actualCount++
+		}
+		iter.Close()
+		require.Equal(b, count, actualCount)
+		//fmt.Println(actualCount)
+	}
+}
+
+func createTestFile(t testing.TB, count int) *parquet.File {
+	type T struct{ A int }
+
+	rows := []T{}
+	for i := 0; i < count; i++ {
+		rows = append(rows, T{i})
+	}
+
+	pf := createFileWith(t, rows, 2)
+	return pf
+}
+
+func createProfileLikeFile(t testing.TB, count int) *parquet.File {
+	type T struct {
+		SeriesID  uint32
+		TimeNanos int64
+	}
+
+	// every row group is ordered by serieID and then time nanos
+	// time is always increasing between rowgroups
+
+	rowGroups := 10
+	series := 8
+
+	rows := make([]T, count)
+	for i := range rows {
+
+		rowsPerRowGroup := count / rowGroups
+		seriesPerRowGroup := rowsPerRowGroup / series
+		rowGroupNum := i / rowsPerRowGroup
+
+		seriesID := uint32(i % (count / rowGroups) / (rowsPerRowGroup / series))
+		rows[i] = T{
+			SeriesID:  seriesID,
+			TimeNanos: int64(i%seriesPerRowGroup+rowGroupNum*seriesPerRowGroup) * 1000,
+		}
+
 	}
-	return chunks
+
+	return createFileWith[T](t, rows, rowGroups)
+
 }
 
-type errColumnChunk struct {
-	parquet.ColumnChunk
+func createFileWith[T any](t testing.TB, rows []T, rowGroups int) *parquet.File {
+	f, err := os.CreateTemp(t.TempDir(), "data.parquet")
+	require.NoError(t, err)
+	t.Logf("Created temp file %s", f.Name())
+
+	perRG := len(rows) / rowGroups
+
+	w := parquet.NewGenericWriter[T](f)
+	for i := 0; i < (rowGroups - 1); i++ {
+		_, err = w.Write(rows[0:perRG])
+		require.NoError(t, err)
+		require.NoError(t, w.Flush())
+		rows = rows[perRG:]
+	}
+
+	_, err = w.Write(rows)
+	require.NoError(t, err)
+	require.NoError(t, w.Flush())
+
+	require.NoError(t, w.Close())
+
+	stat, err := f.Stat()
+	require.NoError(t, err)
+
+	pf, err := parquet.OpenFile(f, stat.Size())
+	require.NoError(t, err)
+
+	return pf
 }
 
-func (e *errColumnChunk) Pages() parquet.Pages {
-	return &errPages{e.ColumnChunk.Pages()}
+type iteratorTracer struct {
+	it        Iterator
+	span      trace.Span
+	name      string
+	nextCount uint32
+	seekCount uint32
 }
 
-type errPages struct {
-	parquet.Pages
+func (i iteratorTracer) Next() bool {
+	i.nextCount++
+	posBefore := i.it.At()
+	result := i.it.Next()
+	posAfter := i.it.At()
+	i.span.AddEvent("next", trace.WithAttributes(
+		attribute.String("column", i.name),
+		attribute.Bool("result", result),
+		attribute.Stringer("posBefore", posBefore),
+		attribute.Stringer("posAfter", posAfter),
+	))
+	return result
 }
 
-func (e *errPages) ReadPage() (parquet.Page, error) {
-	p, err := e.Pages.ReadPage()
-	return &errPage{p}, err
+func (i iteratorTracer) At() *IteratorResult {
+	return i.it.At()
 }
 
-type errPage struct {
-	parquet.Page
+func (i iteratorTracer) Err() error {
+	return i.it.Err()
 }
 
-func (e *errPage) Values() parquet.ValueReader {
-	return &errValueReader{e.Page.Values()}
+func (i iteratorTracer) Close() error {
+	return i.it.Close()
 }
 
-type errValueReader struct {
-	parquet.ValueReader
+func (i iteratorTracer) Seek(pos RowNumberWithDefinitionLevel) bool {
+	i.seekCount++
+	posBefore := i.it.At()
+	result := i.it.Seek(pos)
+	posAfter := i.it.At()
+	i.span.AddEvent("seek", trace.WithAttributes(
+		attribute.String("column", i.name),
+		attribute.Bool("result", result),
+		attribute.Stringer("seekTo", &pos),
+		attribute.Stringer("posBefore", posBefore),
+		attribute.Stringer("posAfter", posAfter),
+	))
+	return result
 }
 
-func (e *errValueReader) ReadValues(vals []parquet.Value) (int, error) {
-	_, _ = e.ValueReader.ReadValues(vals)
-	return 0, errors.New("read error")
+func newIteratorTracer(span trace.Span, name string, it Iterator) Iterator {
+	return &iteratorTracer{
+		span: span,
+		name: name,
+		it:   it,
+	}
 }
 
-func withReadValueError(rg []parquet.RowGroup) []parquet.RowGroup {
-	for pos := range rg {
-		rg[pos] = &errRowGroup{rg[pos]}
+// tracerProvider returns an OpenTelemetry TracerProvider configured to use
+// the Jaeger exporter that will send spans to the provided url. The returned
+// TracerProvider will also use a Resource configured with all the information
+// about the application.
+func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
+	// Create the Jaeger exporter
+	exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
+	if err != nil {
+		return nil, err
 	}
-	return rg
+	tp := tracesdk.NewTracerProvider(
+		// Always be sure to batch in production.
+		tracesdk.WithBatcher(exp),
+		// Record information about this application in a Resource.
+		tracesdk.WithResource(resource.NewWithAttributes(
+			semconv.SchemaURL,
+			semconv.ServiceName("phlare-go-test"),
+		)),
+	)
+	return tp, nil
 }
 
-func newTestSet() []parquet.RowGroup {
-	return []parquet.RowGroup{
-		newTestBuffer(
-			[]testData{
-				{1, "one"},
-				{2, "two"},
-			}),
-		newTestBuffer(
-			[]testData{
-				{3, "three"},
-				{5, "five"},
-			}),
+func TestMain(m *testing.M) {
+	tp, err := tracerProvider("http://localhost:14268/api/traces")
+	if err != nil {
+		log.Fatal(err)
 	}
+
+	// Register our TracerProvider as the global so any imported
+	// instrumentation in the future will default to using it.
+	otel.SetTracerProvider(tp)
+
+	result := m.Run()
+
+	fmt.Println("shutting tracer down")
+	tp.Shutdown(context.Background())
+
+	os.Exit(result)
 }
 
-func TestColumnIterator(t *testing.T) {
+func TestBinaryJoinIterator(t *testing.T) {
+	tr := otel.Tracer("query")
+
+	_, span := tr.Start(context.Background(), "TestBinaryJoinIterator")
+	defer span.End()
+
+	rowCount := 1600
+	pf := createProfileLikeFile(t, rowCount)
+
 	for _, tc := range []struct {
-		name      string
-		result    []parquet.Value
-		rowGroups []parquet.RowGroup
-		err       error
+		name                string
+		seriesPredicate     Predicate
+		seriesPageReads     int
+		timePredicate       Predicate
+		timePageReads       int
+		expectedResultCount int
 	}{
 		{
-			name:      "read-int-column",
-			rowGroups: newTestSet(),
-			result: []parquet.Value{
-				parquet.ValueOf(1),
-				parquet.ValueOf(2),
-				parquet.ValueOf(3),
-				parquet.ValueOf(5),
-			},
+			name:                "no predicate",
+			expectedResultCount: rowCount, // expect everything
+			seriesPageReads:     10,
+			timePageReads:       10,
+		},
+		{
+			name:                "one series ID",
+			expectedResultCount: rowCount / 8, // expect an eight of the rows
+			seriesPredicate:     NewMapPredicate(map[int64]struct{}{0: {}}),
+			seriesPageReads:     10,
+			timePageReads:       10,
+		},
+		{
+			name:                "two series IDs",
+			expectedResultCount: rowCount / 8 * 2, // expect two eights of the rows
+			seriesPredicate:     NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}),
+			seriesPageReads:     10,
+			timePageReads:       10,
+		},
+		{
+			name:                "missing series",
+			expectedResultCount: 0,
+			seriesPredicate:     NewMapPredicate(map[int64]struct{}{10: {}}),
+		},
+		{
+			name:                "first two time stamps each",
+			expectedResultCount: 2 * 8, // expect two profiles for each series
+			timePredicate:       NewIntBetweenPredicate(0, 1000),
+			seriesPageReads:     1,
+			timePageReads:       1,
 		},
 		{
-			name:      "err-read-values",
-			rowGroups: withReadValueError(newTestSet()),
-			err:       errors.New("read error"),
+			name:                "time before results",
+			expectedResultCount: 0,
+			timePredicate:       NewIntBetweenPredicate(-10, -1),
+			seriesPageReads:     1,
+			timePageReads:       0,
+		},
+		{
+			name:                "time after results",
+			expectedResultCount: 0,
+			timePredicate:       NewIntBetweenPredicate(200000, 20001000),
+			seriesPredicate:     NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}),
+			seriesPageReads:     1,
+			timePageReads:       0,
 		},
 	} {
 		t.Run(tc.name, func(t *testing.T) {
-			var (
-				buffer [][]parquet.Value
+			ctx, cancel := context.WithCancel(context.Background())
+			defer cancel()
 
-				ctx = context.Background()
-				i   = NewColumnIterator(ctx, tc.rowGroups, 0, "id", 10, nil, "id")
-			)
-			for i.Next() {
-				require.Nil(t, i.Err())
-				buffer = i.At().Columns(buffer, "id")
-			}
+			reg := prometheus.NewRegistry()
+			metrics := NewMetrics(reg)
+			metrics.pageReadsTotal.WithLabelValues("ts", "SeriesId").Add(0)
+			metrics.pageReadsTotal.WithLabelValues("ts", "TimeNanos").Add(0)
+			ctx = AddMetricsToContext(ctx, metrics)
 
-			require.Equal(t, tc.err, i.Err())
-		})
-	}
-}
+			seriesIt := newIteratorTracer(span, "SeriesID", NewSyncIterator(ctx, pf.RowGroups(), 0, "SeriesId", 1000, tc.seriesPredicate, "SeriesId"))
+			timeIt := newIteratorTracer(span, "TimeNanos", NewSyncIterator(ctx, pf.RowGroups(), 1, "TimeNanos", 1000, tc.timePredicate, "TimeNanos"))
 
-func TestRowNumber(t *testing.T) {
-	tr := EmptyRowNumber()
-	require.Equal(t, RowNumber{-1, -1, -1, -1, -1, -1}, tr)
+			ctx, span := tr.Start(ctx, t.Name())
+			defer span.End()
 
-	steps := []struct {
-		repetitionLevel int
-		definitionLevel int
-		expected        RowNumber
-	}{
-		// Name.Language.Country examples from the Dremel whitepaper
-		{0, 3, RowNumber{0, 0, 0, 0, -1, -1}},
-		{2, 2, RowNumber{0, 0, 1, -1, -1, -1}},
-		{1, 1, RowNumber{0, 1, -1, -1, -1, -1}},
-		{1, 3, RowNumber{0, 2, 0, 0, -1, -1}},
-		{0, 1, RowNumber{1, 0, -1, -1, -1, -1}},
-	}
+			it := NewBinaryJoinIterator(
+				0,
+				seriesIt,
+				timeIt,
+			)
 
-	for _, step := range steps {
-		tr.Next(step.repetitionLevel, step.definitionLevel)
-		require.Equal(t, step.expected, tr)
-	}
-}
+			results := 0
+			for it.Next() {
+				span.AddEvent("match", trace.WithAttributes(
+					attribute.Stringer("element", it.At()),
+				))
+				results++
+			}
+			require.NoError(t, it.Err())
 
-func TestCompareRowNumbers(t *testing.T) {
-	testCases := []struct {
-		a, b     RowNumber
-		expected int
-	}{
-		{RowNumber{-1}, RowNumber{0}, -1},
-		{RowNumber{0}, RowNumber{0}, 0},
-		{RowNumber{1}, RowNumber{0}, 1},
+			require.NoError(t, it.Close())
 
-		{RowNumber{0, 1}, RowNumber{0, 2}, -1},
-		{RowNumber{0, 2}, RowNumber{0, 1}, 1},
-	}
+			require.Equal(t, tc.expectedResultCount, results)
 
-	for _, tc := range testCases {
-		require.Equal(t, tc.expected, CompareRowNumbers(5, tc.a, tc.b))
+			require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewReader([]byte(fmt.Sprintf(
+				`
+        # HELP pyroscopedb_page_reads_total Total number of pages read while querying
+        # TYPE pyroscopedb_page_reads_total counter
+        pyroscopedb_page_reads_total{column="SeriesId",table="ts"} %d
+        pyroscopedb_page_reads_total{column="TimeNanos",table="ts"} %d
+        `, tc.seriesPageReads, tc.timePageReads))), "pyroscopedb_page_reads_total"))
+
+		})
 	}
 }
diff --git a/pkg/phlaredb/query/predicate_test.go b/pkg/phlaredb/query/predicate_test.go
index 798d84a96..734041f87 100644
--- a/pkg/phlaredb/query/predicate_test.go
+++ b/pkg/phlaredb/query/predicate_test.go
@@ -75,7 +75,7 @@ func testPredicate[T any](t *testing.T, tc predicateTestCase[T]) {
 
 	p := InstrumentedPredicate{pred: tc.predicate}
 
-	i := NewColumnIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "")
+	i := NewSyncIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "")
 	for i.Next() {
 	}
 
diff --git a/pkg/phlaredb/query/predicates.go b/pkg/phlaredb/query/predicates.go
index bccdd95cc..5df379154 100644
--- a/pkg/phlaredb/query/predicates.go
+++ b/pkg/phlaredb/query/predicates.go
@@ -6,6 +6,7 @@ import (
 
 	pq "github.com/segmentio/parquet-go"
 	"go.uber.org/atomic"
+	"golang.org/x/exp/constraints"
 )
 
 // Predicate is a pushdown predicate that can be applied at
@@ -254,3 +255,42 @@ func (p *InstrumentedPredicate) KeepValue(v pq.Value) bool {
 
 	return false
 }
+
+type mapPredicate[K constraints.Integer, V any] struct {
+	inbetweenPred Predicate
+	m             map[K]V
+}
+
+func NewMapPredicate[K constraints.Integer, V any](m map[K]V) Predicate {
+
+	var min, max int64
+
+	first := true
+	for k := range m {
+		if first || max < int64(k) {
+			max = int64(k)
+		}
+		if first || min > int64(k) {
+			min = int64(k)
+		}
+		first = false
+	}
+
+	return &mapPredicate[K, V]{
+		inbetweenPred: NewIntBetweenPredicate(min, max),
+		m:             m,
+	}
+}
+
+func (m *mapPredicate[K, V]) KeepColumnChunk(c pq.ColumnChunk) bool {
+	return m.inbetweenPred.KeepColumnChunk(c)
+}
+
+func (m *mapPredicate[K, V]) KeepPage(page pq.Page) bool {
+	return m.inbetweenPred.KeepPage(page)
+}
+
+func (m *mapPredicate[K, V]) KeepValue(v pq.Value) bool {
+	_, exists := m.m[K(v.Int64())]
+	return exists
+}
diff --git a/pkg/phlaredb/query/repeated.go b/pkg/phlaredb/query/repeated.go
index d2264321c..7164d1a1c 100644
--- a/pkg/phlaredb/query/repeated.go
+++ b/pkg/phlaredb/query/repeated.go
@@ -7,9 +7,10 @@ import (
 
 	"github.com/grafana/dskit/multierror"
 	"github.com/opentracing/opentracing-go"
-	otlog "github.com/opentracing/opentracing-go/log"
 	"github.com/samber/lo"
 	"github.com/segmentio/parquet-go"
+	attribute "go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
 
 	"github.com/grafana/phlare/pkg/iter"
 )
@@ -24,7 +25,7 @@ type repeatedPageIterator[T any] struct {
 	column   int
 	readSize int
 	ctx      context.Context
-	span     opentracing.Span
+	span     trace.Span
 
 	rgs                 []parquet.RowGroup
 	startRowGroupRowNum int64
@@ -134,10 +135,10 @@ Outer:
 				return false
 			}
 			it.span.LogFields(
-				otlog.String("msg", "Page read"),
-				otlog.Int64("startRowGroupRowNum", it.startRowGroupRowNum),
-				otlog.Int64("startPageRowNum", it.startPageRowNum),
-				otlog.Int64("pageRowNum", it.currentPage.NumRows()),
+				attribute.String("msg", "Page read"),
+				attribute.Int64("startRowGroupRowNum", it.startRowGroupRowNum),
+				attribute.Int64("startPageRowNum", it.startPageRowNum),
+				attribute.Int64("pageRowNum", it.currentPage.NumRows()),
 			)
 			it.valueReader = it.currentPage.Values()
 		}
diff --git a/pkg/phlaredb/sample_merge.go b/pkg/phlaredb/sample_merge.go
index 9b5681530..bde145544 100644
--- a/pkg/phlaredb/sample_merge.go
+++ b/pkg/phlaredb/sample_merge.go
@@ -6,10 +6,12 @@ import (
 
 	"github.com/google/pprof/profile"
 	"github.com/opentracing/opentracing-go"
-	otlog "github.com/opentracing/opentracing-go/log"
 	"github.com/prometheus/common/model"
 	"github.com/samber/lo"
 	"github.com/segmentio/parquet-go"
+	"go.opentelemetry.io/otel"
+	attribute "go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
 
 	googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
 	ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
@@ -20,8 +22,8 @@ import (
 )
 
 func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - Block")
+	defer sp.End()
 
 	stacktraceAggrValues := make(stacktracesByMapping)
 	if err := mergeByStacktraces(ctx, b.profiles.file, rows, stacktraceAggrValues); err != nil {
@@ -33,8 +35,8 @@ func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.I
 }
 
 func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - Block")
+	defer sp.End()
 
 	stacktraceAggrValues := make(profileSampleByMapping)
 	if err := mergeByStacktraces(ctx, b.profiles.file, rows, stacktraceAggrValues); err != nil {
@@ -85,18 +87,18 @@ func (b *singleBlockQuerier) resolveLocations(ctx context.Context, mapping uint6
 }
 
 func (b *singleBlockQuerier) resolvePprofSymbols(ctx context.Context, profileSampleByMapping profileSampleByMapping) (*profile.Profile, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "ResolvePprofSymbols - Block")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ResolvePprofSymbols - Block")
+	defer sp.End()
 
 	locationsIdsByStacktraceID := newLocationsIdsByStacktraceID(len(profileSampleByMapping) * 1024)
 
 	// gather stacktraces
 	if err := profileSampleByMapping.ForEach(func(mapping uint64, samples profileSampleMap) error {
 		stacktraceIDs := samples.Ids()
-		sp.LogFields(
-			otlog.Int("stacktraces", len(stacktraceIDs)),
-			otlog.Uint64("mapping", mapping),
-		)
+		sp.AddEvent("TODO", trace.WithAttributes(
+			attribute.Int("stacktraces", len(stacktraceIDs)),
+			attribute.Uint64("mapping", mapping)))
+
 		return b.resolveLocations(ctx, mapping, locationsIdsByStacktraceID, stacktraceIDs)
 	}); err != nil {
 		return nil, err
@@ -245,26 +247,27 @@ func (b *singleBlockQuerier) resolvePprofSymbols(ctx context.Context, profileSam
 }
 
 func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMapping stacktracesByMapping) (*ingestv1.MergeProfilesStacktracesResult, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "ResolveSymbols - Block")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ResolveSymbols - Block")
+	defer sp.End()
+
 	locationsIdsByStacktraceID := newLocationsIdsByStacktraceID(len(stacktracesByMapping) * 1024)
 
 	// gather stacktraces
 	if err := stacktracesByMapping.ForEach(func(mapping uint64, samples stacktraceSampleMap) error {
 		stacktraceIDs := samples.Ids()
-		sp.LogFields(
-			otlog.Int("stacktraces", len(stacktraceIDs)),
-			otlog.Uint64("mapping", mapping),
-		)
+		sp.AddEvent("TODO", trace.WithAttributes(
+			attribute.Int("stacktraces", len(stacktraceIDs)),
+			attribute.Uint64("mapping", mapping)))
+
 		return b.resolveLocations(ctx, mapping, locationsIdsByStacktraceID, stacktraceIDs)
 	}); err != nil {
 		return nil, err
 	}
 
-	sp.LogFields(otlog.Int("locationIDs", len(locationsIdsByStacktraceID.locationIds())))
+	sp.AddEvent("TODO", trace.WithAttributes(attribute.Int("locationIDs", len(locationsIdsByStacktraceID.locationIds()))))
 
 	// gather locations
-	sp.LogFields(otlog.String("msg", "gather locations"))
+	sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather locations")))
 	var (
 		locationIDsByFunctionID = newUniqueIDs[[]int64]()
 		locations               = b.locations.retrieveRows(ctx, locationsIdsByStacktraceID.locationIds().iterator())
@@ -279,10 +282,10 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa
 	if err := locations.Err(); err != nil {
 		return nil, err
 	}
-	sp.LogFields(otlog.Int("functions", len(locationIDsByFunctionID)))
+	sp.AddEvent("TODO", trace.WithAttributes(attribute.Int("functions", len(locationIDsByFunctionID))))
 
 	// gather functions
-	sp.LogFields(otlog.String("msg", "gather functions"))
+	sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather functions")))
 	var (
 		functionIDsByStringID = newUniqueIDs[[]int64]()
 		functions             = b.functions.retrieveRows(ctx, locationIDsByFunctionID.iterator())
@@ -297,7 +300,7 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa
 	}
 
 	// gather strings
-	sp.LogFields(otlog.String("msg", "gather strings"))
+	sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather strings")))
 	var (
 		names   = make([]string, len(functionIDsByStringID))
 		idSlice = make([][]int64, len(functionIDsByStringID))
@@ -314,7 +317,7 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa
 		return nil, err
 	}
 
-	sp.LogFields(otlog.String("msg", "build MergeProfilesStacktracesResult"))
+	sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "build MergeProfilesStacktracesResult")))
 	// idSlice contains stringIDs and gets rewritten into functionIDs
 	for nameID := range idSlice {
 		var functionIDs []int64
@@ -361,8 +364,8 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa
 }
 
 func (b *singleBlockQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - Block")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - Block")
+	defer sp.End()
 
 	m := make(seriesByLabels)
 	columnName := "TotalValue"
@@ -469,8 +472,9 @@ type mapAdder interface {
 }
 
 func mergeByStacktraces(ctx context.Context, profileSource Source, rows iter.Iterator[Profile], m mapAdder) error {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "mergeByStacktraces")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "mergeByStacktraces")
+	defer sp.End()
+
 	// clone the rows to be able to iterate over them twice
 	multiRows, err := iter.CloneN(rows, 2)
 	if err != nil {
diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go
index e3361dc67..ab5c18e3f 100644
--- a/pkg/querier/ingester_querier.go
+++ b/pkg/querier/ingester_querier.go
@@ -8,6 +8,7 @@ import (
 	ring_client "github.com/grafana/dskit/ring/client"
 	"github.com/opentracing/opentracing-go"
 	"github.com/prometheus/prometheus/promql/parser"
+	"go.opentelemetry.io/otel"
 	"golang.org/x/sync/errgroup"
 
 	ingesterv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
@@ -58,8 +59,9 @@ func forAllIngesters[T any](ctx context.Context, ingesterQuerier *IngesterQuerie
 }
 
 func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.SelectMergeStacktracesRequest) (*phlaremodel.Tree, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectTree Ingesters")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectTree Ingesters")
+	defer sp.End()
+
 	profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID)
 	if err != nil {
 		return nil, connect.NewError(connect.CodeInvalidArgument, err)
@@ -103,8 +105,9 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se
 }
 
 func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries Ingesters")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectSeries Ingesters")
+	defer sp.End()
+
 	responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesLabels, error) {
 		return ic.MergeProfilesLabels(ctx), nil
 	})
diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go
index 4d9ce48fd..edde0b6aa 100644
--- a/pkg/querier/querier.go
+++ b/pkg/querier/querier.go
@@ -15,13 +15,15 @@ import (
 	"github.com/grafana/dskit/services"
 	"github.com/grafana/mimir/pkg/util/spanlogger"
 	"github.com/opentracing/opentracing-go"
-	otlog "github.com/opentracing/opentracing-go/log"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promauto"
 	"github.com/prometheus/common/model"
 	"github.com/prometheus/prometheus/promql/parser"
 	"github.com/samber/lo"
+	"go.opentelemetry.io/otel"
+	attribute "go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
 	"golang.org/x/sync/errgroup"
 
 	googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
@@ -112,8 +114,8 @@ func (q *Querier) stopping(_ error) error {
 }
 
 func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querierv1.ProfileTypesRequest]) (*connect.Response[querierv1.ProfileTypesResponse], error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "ProfileTypes")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ProfileTypes")
+	defer sp.End()
 
 	responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]*typesv1.ProfileType, error) {
 		res, err := ic.ProfileTypes(childCtx, connect.NewRequest(&ingestv1.ProfileTypesRequest{}))
@@ -148,9 +150,9 @@ func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querier
 func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error) {
 	sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelValues")
 	defer func() {
-		sp.LogFields(
-			otlog.String("name", req.Msg.Name),
-		)
+		sp.AddEvent("TODO", trace.WithAttributes(
+			attribute.String("name", req.Msg.Name)))
+
 		sp.Finish()
 	}()
 	responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]string, error) {
@@ -173,8 +175,9 @@ func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1.
 }
 
 func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelNames")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "LabelNames")
+	defer sp.End()
+
 	responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]string, error) {
 		res, err := ic.LabelNames(childCtx, connect.NewRequest(&typesv1.LabelNamesRequest{
 			Matchers: req.Msg.Matchers,
@@ -196,9 +199,9 @@ func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.L
 func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.SeriesRequest]) (*connect.Response[querierv1.SeriesResponse], error) {
 	sp, ctx := opentracing.StartSpanFromContext(ctx, "Series")
 	defer func() {
-		sp.LogFields(
-			otlog.String("matchers", strings.Join(req.Msg.Matchers, ",")),
-		)
+		sp.AddEvent("TODO", trace.WithAttributes(
+			attribute.String("matchers", strings.Join(req.Msg.Matchers, ","))))
+
 		sp.Finish()
 	}()
 	responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]*typesv1.Labels, error) {
@@ -227,13 +230,13 @@ func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.Ser
 func (q *Querier) Diff(ctx context.Context, req *connect.Request[querierv1.DiffRequest]) (*connect.Response[querierv1.DiffResponse], error) {
 	sp, ctx := opentracing.StartSpanFromContext(ctx, "Diff")
 	defer func() {
-		sp.LogFields(
-			otlog.String("leftStart", model.Time(req.Msg.Left.Start).Time().String()),
-			otlog.String("leftEnd", model.Time(req.Msg.Left.End).Time().String()),
+		sp.AddEvent("TODO", trace.WithAttributes(
+			attribute.String("leftStart", model.Time(req.Msg.Left.Start).Time().String()),
+			attribute.String("leftEnd", model.Time(req.Msg.Left.End).Time().String()),
 			// Assume are the same
-			otlog.String("selector", req.Msg.Left.LabelSelector),
-			otlog.String("profile_id", req.Msg.Left.ProfileTypeID),
-		)
+			attribute.String("selector", req.Msg.Left.LabelSelector),
+			attribute.String("profile_id", req.Msg.Left.ProfileTypeID)))
+
 		sp.Finish()
 	}()
 
@@ -409,12 +412,12 @@ func splitQueryToStores(start, end model.Time, now model.Time, queryStoreAfter t
 func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[querierv1.SelectMergeProfileRequest]) (*connect.Response[googlev1.Profile], error) {
 	sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMergeProfile")
 	defer func() {
-		sp.LogFields(
-			otlog.String("start", model.Time(req.Msg.Start).Time().String()),
-			otlog.String("end", model.Time(req.Msg.End).Time().String()),
-			otlog.String("selector", req.Msg.LabelSelector),
-			otlog.String("profile_id", req.Msg.ProfileTypeID),
-		)
+		sp.AddEvent("TODO", trace.WithAttributes(
+			attribute.String("start", model.Time(req.Msg.Start).Time().String()),
+			attribute.String("end", model.Time(req.Msg.End).Time().String()),
+			attribute.String("selector", req.Msg.LabelSelector),
+			attribute.String("profile_id", req.Msg.ProfileTypeID)))
+
 		sp.Finish()
 	}()
 
@@ -467,14 +470,14 @@ func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[q
 func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) (*connect.Response[querierv1.SelectSeriesResponse], error) {
 	sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries")
 	defer func() {
-		sp.LogFields(
-			otlog.String("start", model.Time(req.Msg.Start).Time().String()),
-			otlog.String("end", model.Time(req.Msg.End).Time().String()),
-			otlog.String("selector", req.Msg.LabelSelector),
-			otlog.String("profile_id", req.Msg.ProfileTypeID),
-			otlog.String("group_by", strings.Join(req.Msg.GroupBy, ",")),
-			otlog.Float64("step", req.Msg.Step),
-		)
+		sp.AddEvent("TODO", trace.WithAttributes(
+			attribute.String("start", model.Time(req.Msg.Start).Time().String()),
+			attribute.String("end", model.Time(req.Msg.End).Time().String()),
+			attribute.String("selector", req.Msg.LabelSelector),
+			attribute.String("profile_id", req.Msg.ProfileTypeID),
+			attribute.String("group_by", strings.Join(req.Msg.GroupBy, ",")),
+			attribute.Float64("step", req.Msg.Step)))
+
 		sp.Finish()
 	}()
 
diff --git a/pkg/querier/select_merge.go b/pkg/querier/select_merge.go
index 5f12d94e7..6a912cad4 100644
--- a/pkg/querier/select_merge.go
+++ b/pkg/querier/select_merge.go
@@ -8,13 +8,6 @@ import (
 
 	"github.com/google/pprof/profile"
 	"github.com/grafana/dskit/multierror"
-	"github.com/opentracing/opentracing-go"
-	"github.com/prometheus/common/model"
-	"github.com/samber/lo"
-	"golang.org/x/sync/errgroup"
-
-	otlog "github.com/opentracing/opentracing-go/log"
-
 	googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
 	ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
 	typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
@@ -24,6 +17,13 @@ import (
 	"github.com/grafana/phlare/pkg/pprof"
 	"github.com/grafana/phlare/pkg/util"
 	"github.com/grafana/phlare/pkg/util/loser"
+	"github.com/opentracing/opentracing-go"
+	"github.com/prometheus/common/model"
+	"github.com/samber/lo"
+	"go.opentelemetry.io/otel"
+	attribute "go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/trace"
+	"golang.org/x/sync/errgroup"
 )
 
 type ProfileWithLabels struct {
@@ -220,8 +220,9 @@ func (s *mergeIterator[R, Req, Res]) Close() error {
 
 // skipDuplicates iterates through the iterator and skip duplicates.
 func skipDuplicates(ctx context.Context, its []MergeIterator) error {
-	span, _ := opentracing.StartSpanFromContext(ctx, "skipDuplicates")
-	defer span.Finish()
+	_, span := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "skipDuplicates")
+	defer span.End()
+
 	var errors multierror.MultiError
 	tree := loser.New(its,
 		&ProfileWithLabels{
@@ -259,8 +260,8 @@ func skipDuplicates(ctx context.Context, its []MergeIterator) error {
 		}
 		duplicates++
 	}
-	span.LogFields(otlog.Int("duplicates", duplicates))
-	span.LogFields(otlog.Int("total", total))
+	span.AddEvent("TODO", trace.WithAttributes(attribute.Int("duplicates", duplicates)))
+	span.AddEvent("TODO", trace.WithAttributes(attribute.Int("total", total)))
 
 	return errors.Err()
 }
@@ -268,8 +269,8 @@ func skipDuplicates(ctx context.Context, its []MergeIterator) error {
 // selectMergeTree selects the  profile from each ingester by deduping them and
 // returns merge of stacktrace samples represented as a tree.
 func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesStacktraces]) (*phlaremodel.Tree, error) {
-	span, ctx := opentracing.StartSpanFromContext(ctx, "selectMergeTree")
-	defer span.Finish()
+	ctx, span := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMergeTree")
+	defer span.End()
 
 	mergeResults := make([]MergeResult[*ingestv1.MergeProfilesStacktracesResult], len(responses))
 	iters := make([]MergeIterator, len(responses))
@@ -294,7 +295,7 @@ func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[client
 	}
 
 	// Collects the results in parallel.
-	span.LogFields(otlog.String("msg", "collecting merge results"))
+	span.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "collecting merge results")))
 	g, _ := errgroup.WithContext(ctx)
 	m := phlaremodel.NewTreeMerger()
 	sm := phlaremodel.NewStackTraceMerger()
@@ -327,7 +328,7 @@ func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[client
 		}
 	}
 
-	span.LogFields(otlog.String("msg", "building tree"))
+	span.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building tree")))
 	return m.Tree(), nil
 }
 
diff --git a/pkg/querier/store_gateway_querier.go b/pkg/querier/store_gateway_querier.go
index 2f333d95b..787616673 100644
--- a/pkg/querier/store_gateway_querier.go
+++ b/pkg/querier/store_gateway_querier.go
@@ -15,6 +15,7 @@ import (
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promauto"
 	"github.com/prometheus/prometheus/promql/parser"
+	"go.opentelemetry.io/otel"
 	"golang.org/x/sync/errgroup"
 
 	ingesterv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
@@ -151,8 +152,9 @@ func GetShuffleShardingSubring(ring ring.ReadRing, userID string, limits StoreGa
 }
 
 func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1.SelectMergeStacktracesRequest) (*phlaremodel.Tree, error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectTree StoreGateway")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectTree StoreGateway")
+	defer sp.End()
+
 	profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID)
 	if err != nil {
 		return nil, connect.NewError(connect.CodeInvalidArgument, err)
@@ -200,8 +202,9 @@ func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1
 }
 
 func (q *Querier) selectSeriesFromStoreGateway(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries StoreGateway")
-	defer sp.Finish()
+	ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectSeries StoreGateway")
+	defer sp.End()
+
 	tenantID, err := tenant.ExtractTenantIDFromContext(ctx)
 	if err != nil {
 		return nil, connect.NewError(connect.CodeInvalidArgument, err)
diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go
index f4d8d31aa..99a1f8a1f 100644
--- a/pkg/scheduler/scheduler.go
+++ b/pkg/scheduler/scheduler.go
@@ -27,6 +27,7 @@ import (
 	"github.com/prometheus/client_golang/prometheus/promauto"
 	"github.com/weaveworks/common/middleware"
 	"github.com/weaveworks/common/user"
+	"go.opentelemetry.io/otel/trace"
 	"google.golang.org/grpc"
 
 	"github.com/grafana/phlare/pkg/frontend/frontendpb"
@@ -200,7 +201,7 @@ type schedulerRequest struct {
 
 	ctx       context.Context
 	ctxCancel context.CancelFunc
-	queueSpan opentracing.Span
+	queueSpan trace.Span
 
 	// This is only used for testing.
 	parentSpanContext opentracing.SpanContext