Skip to content

Commit

Permalink
Revert "sync to 1.2: remove the old fast ranges and update metrics (#… (
Browse files Browse the repository at this point in the history
#16401)

…16359)"

This reverts commit 918a356.

## What type of PR is this?

- [ ] API-change
- [ ] BUG
- [ ] Improvement
- [ ] Documentation
- [ ] Feature
- [ ] Test and CI
- [ ] Code Refactoring

## Which issue(s) this PR fixes:

issue ##16379

## What this PR does / why we need it:
revert to avoid oom
  • Loading branch information
gouhongshen committed May 25, 2024
1 parent 6b2c006 commit 18f806e
Show file tree
Hide file tree
Showing 8 changed files with 1,409 additions and 159 deletions.
106 changes: 61 additions & 45 deletions pkg/util/metric/v2/dashboard/grafana_dashboard_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ func (c *DashboardCreator) initTxnDashboard() error {
c.initTxnLockWaitersRow(),
c.initTxnStatementDurationRow(),
c.initTxnStatementsCountRow(),
c.initTxnTableRangesRow(),
c.initTxnCheckPKDupRow(),
c.initTxnReaderDurationRow(),
c.initTxnMpoolRow(),
c.initTxnOnPrepareWALRow(),
c.initTxnBeforeCommitRow(),
c.initTxnDequeuePreparedRow(),
c.initTxnTableRangesRow(),
c.initTxnRangesSelectivityRow(),
c.initTxnRangesCountRow(),
c.initTxnRangesLoadedObjectMetaRow(),
c.initFastRangesRow(),
c.initRangesRow(),
c.initTxnShowAccountsRow(),
c.initCNCommittedObjectQuantityRow(),
)...)
Expand Down Expand Up @@ -74,62 +75,66 @@ func (c *DashboardCreator) initCNCommittedObjectQuantityRow() dashboard.Option {
)
}

func (c *DashboardCreator) initTxnTableRangesRow() dashboard.Option {
func (c *DashboardCreator) initRangesRow() dashboard.Option {
return dashboard.Row(
"Txn Table Ranges Duration",
"Txn Ranges Selectivity",
c.getHistogram(
"Txn table ranges duration",
c.getMetricWithFilter(`mo_txn_ranges_duration_seconds_bucket`, ``),
"ranges block selectivity",
c.getMetricWithFilter("mo_txn_ranges_selectivity_percentage_bucket", `type="block_selectivity"`),
[]float64{0.50, 0.8, 0.90, 0.99},
12,
axis.Unit("s"),
6,
axis.Unit(""),
axis.Min(0)),

c.getHistogram(
"ranges result len",
c.getMetricWithFilter("mo_txn_ranges_duration_size_bucket", `type="ranges_len"`),
[]float64{0.50, 0.8, 0.90, 0.99},
6,
axis.Unit(""),
axis.Min(0)),
)
}

func (c *DashboardCreator) initTxnRangesSelectivityRow() dashboard.Option {
func (c *DashboardCreator) initFastRangesRow() dashboard.Option {
return dashboard.Row(
"Ranges Selectivity",
c.getMultiHistogram(
[]string{
c.getMetricWithFilter(`mo_txn_ranges_selectivity_percentage_bucket`, `type="slow_path_block_selectivity"`),
c.getMetricWithFilter(`mo_txn_ranges_selectivity_percentage_bucket`, `type="fast_path_block_selectivity"`),
c.getMetricWithFilter(`mo_txn_ranges_selectivity_percentage_bucket`, `type="fast_path_obj_sort_key_zm_selectivity"`),
c.getMetricWithFilter(`mo_txn_ranges_selectivity_percentage_bucket`, `type="fast_path_obj_column_zm_selectivity"`),
c.getMetricWithFilter(`mo_txn_ranges_selectivity_percentage_bucket`, `type="fast_path_blk_column_zm_selectivity"`),
},
[]string{
"slow_path_block_selectivity",
"fast_path_block_selectivity",
"fast_path_obj_sort_key_zm_selectivity",
"fast_path_obj_column_zm_selectivity",
"fast_path_blk_column_zm_selectivity",
},
"Txn Fast Ranges Selectivity",
c.getHistogram(
"fast ranges block selectivity",
c.getMetricWithFilter("mo_txn_ranges_selectivity_percentage_bucket", `type="fast_block_selectivity"`),
[]float64{0.50, 0.8, 0.90, 0.99},
[]float32{3, 3, 3, 3},
axis.Min(0))...,
4,
axis.Unit(""),
axis.Min(0)),

c.getHistogram(
"fast ranges zone map selectivity",
c.getMetricWithFilter("mo_txn_ranges_selectivity_percentage_bucket", `type="fast_zm_selectivity"`),
[]float64{0.50, 0.8, 0.90, 0.99},
4,
axis.Unit(""),
axis.Min(0)),

c.getHistogram(
"fast ranges result len",
c.getMetricWithFilter("mo_txn_ranges_duration_size_bucket", `type="fast_ranges_len"`),
[]float64{0.50, 0.8, 0.90, 0.99},
4,
axis.Unit(""),
axis.Min(0)),
)
}

func (c *DashboardCreator) initTxnRangesCountRow() dashboard.Option {
func (c *DashboardCreator) initTxnTableRangesRow() dashboard.Option {
return dashboard.Row(
"Ranges Count",
c.getMultiHistogram(
[]string{
c.getMetricWithFilter(`mo_txn_ranges_selected_block_cnt_total_bucket`, `type="slow_path_selected_block_cnt"`),
c.getMetricWithFilter(`mo_txn_ranges_selected_block_cnt_total_bucket`, `type="fast_path_selected_block_cnt"`),
c.getMetricWithFilter(`mo_txn_ranges_selected_block_cnt_total_bucket`, `type="fast_path_load_obj_cnt"`),
c.getMetricWithFilter(`mo_txn_ranges_selected_block_cnt_total_bucket`, `type="slow_path_load_obj_cnt"`),
},
[]string{
"slow_path_selected_block_cnt",
"fast_path_selected_block_cnt",
"fast_path_load_obj_cnt",
"slow_path_load_obj_cnt",
},
"Txn table ranges",
c.getHistogram(
"Txn table ranges duration",
c.getMetricWithFilter(`mo_txn_ranges_duration_seconds_bucket`, ``),
[]float64{0.50, 0.8, 0.90, 0.99},
[]float32{3, 3, 3, 3},
axis.Min(0))...,
12,
axis.Unit("s"),
axis.Min(0)),
)
}

Expand Down Expand Up @@ -398,6 +403,17 @@ func (c *DashboardCreator) initTxnStatementsCountRow() dashboard.Option {
)
}

func (c *DashboardCreator) initTxnRangesLoadedObjectMetaRow() dashboard.Option {
return dashboard.Row(
"Txn Ranges Loaded Object Meta",
c.withGraph(
"Txn Ranges Loaded Object Meta",
12,
`sum(increase(`+c.getMetricWithFilter("mo_txn_ranges_loaded_object_meta_total", "")+`[$interval])) by (`+c.by+`, type)`,
"{{ "+c.by+"-type }}"),
)
}

func (c *DashboardCreator) initTxnShowAccountsRow() dashboard.Option {
return dashboard.Row(
"Show Accounts Duration",
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/metric/v2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,12 @@ func initTxnMetrics() {
registry.MustRegister(TxnTableRangeDurationHistogram)
registry.MustRegister(TxnCheckPKDupDurationHistogram)
registry.MustRegister(TxnLockWaitersTotalHistogram)
registry.MustRegister(txnTableRangeTotalHistogram)
registry.MustRegister(txnTableRangeSizeHistogram)
registry.MustRegister(txnMpoolDurationHistogram)
registry.MustRegister(TxnUnlockTableTotalHistogram)
registry.MustRegister(txnReaderDurationHistogram)

registry.MustRegister(TxnRangesLoadedObjectMetaTotalCounter)
registry.MustRegister(txnCNCommittedLocationQuantityGauge)

registry.MustRegister(txnRangesSelectivityHistogram)
Expand Down
32 changes: 18 additions & 14 deletions pkg/util/metric/v2/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ var (
TxnLockTotalCounter = txnLockCounter.WithLabelValues("total")
TxnLocalLockTotalCounter = txnLockCounter.WithLabelValues("local")
TxnRemoteLockTotalCounter = txnLockCounter.WithLabelValues("remote")

TxnRangesLoadedObjectMetaTotalCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "mo",
Subsystem: "txn",
Name: "ranges_loaded_object_meta_total",
Help: "Total number of ranges loaded object meta.",
})
)

var (
Expand Down Expand Up @@ -239,19 +247,17 @@ var (
Buckets: getDurationBuckets(),
})

txnTableRangeTotalHistogram = prometheus.NewHistogramVec(
txnTableRangeSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "mo",
Subsystem: "txn",
Name: "ranges_selected_block_cnt_total",
Help: "Bucketed histogram of txn table ranges selected block cnt.",
Buckets: prometheus.ExponentialBuckets(1, 2.0, 15),
Name: "ranges_duration_size",
Help: "Bucketed histogram of txn table ranges size.",
Buckets: prometheus.ExponentialBuckets(1, 2.0, 20),
}, []string{"type"})

TxnRangesSlowPathSelectedBlockCntHistogram = txnTableRangeTotalHistogram.WithLabelValues("slow_path_selected_block_cnt")
TxnRangesFastPathSelectedBlockCntHistogram = txnTableRangeTotalHistogram.WithLabelValues("fast_path_selected_block_cnt")
TxnRangesFastPathLoadObjCntHistogram = txnTableRangeTotalHistogram.WithLabelValues("fast_path_load_obj_cnt")
TxnRangesSlowPathLoadObjCntHistogram = txnTableRangeTotalHistogram.WithLabelValues("slow_path_load_obj_cnt")
TxnRangeSizeHistogram = txnTableRangeSizeHistogram.WithLabelValues("ranges_len")
TxnFastRangeSizeHistogram = txnTableRangeSizeHistogram.WithLabelValues("fast_ranges_len")

txnTNSideDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -301,11 +307,9 @@ var (
Subsystem: "txn",
Name: "ranges_selectivity_percentage",
Help: "Bucketed histogram of fast ranges selectivity percentage.",
Buckets: prometheus.ExponentialBucketsRange(0.001, 1, 21),
Buckets: prometheus.LinearBuckets(0, 0.05, 21),
}, []string{"type"})
TxnRangesSlowPathBlockSelectivityHistogram = txnRangesSelectivityHistogram.WithLabelValues("slow_path_block_selectivity")
TxnRangesFastPathBlkTotalSelectivityHistogram = txnRangesSelectivityHistogram.WithLabelValues("fast_path_block_selectivity")
TxnRangesFastPathObjSortKeyZMapSelectivityHistogram = txnRangesSelectivityHistogram.WithLabelValues("fast_path_obj_sort_key_zm_selectivity")
TxnRangesFastPathObjColumnZMapSelectivityHistogram = txnRangesSelectivityHistogram.WithLabelValues("fast_path_obj_column_zm_selectivity")
TxnRangesFastPathBlkColumnZMapSelectivityHistogram = txnRangesSelectivityHistogram.WithLabelValues("fast_path_blk_column_zm_selectivity")
TxnRangesBlockSelectivityHistogram = txnRangesSelectivityHistogram.WithLabelValues("block_selectivity")
TxnFastRangesBlockSelectivityHistogram = txnRangesSelectivityHistogram.WithLabelValues("fast_block_selectivity")
TxnFastRangesZMapSelectivityHistogram = txnRangesSelectivityHistogram.WithLabelValues("fast_zm_selectivity")
)
34 changes: 0 additions & 34 deletions pkg/vm/engine/disttae/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/util"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
Expand Down Expand Up @@ -1022,42 +1021,15 @@ func ExecuteBlockFilter(
fs fileservice.FileService,
proc *process.Process,
) (err error) {
var (
totalBlocks float64
loadHit float64
objFilterTotal, objFilterHit float64
blkFilterTotal, blkFilterHit float64
fastFilterTotal, fastFilterHit float64
)

defer func() {
v2.TxnRangesFastPathLoadObjCntHistogram.Observe(loadHit)
v2.TxnRangesFastPathSelectedBlockCntHistogram.Observe(float64(outBlocks.Len() - 1))
if fastFilterTotal > 0 {
v2.TxnRangesFastPathObjSortKeyZMapSelectivityHistogram.Observe(fastFilterHit / fastFilterTotal)
}
if objFilterTotal > 0 {
v2.TxnRangesFastPathObjColumnZMapSelectivityHistogram.Observe(objFilterHit / objFilterTotal)
}
if blkFilterTotal > 0 {
v2.TxnRangesFastPathBlkColumnZMapSelectivityHistogram.Observe(blkFilterHit / blkFilterTotal)
}
if totalBlocks > 0 {
v2.TxnRangesFastPathBlkTotalSelectivityHistogram.Observe(float64(outBlocks.Len()-1) / totalBlocks)
}
}()

hasDeletes := len(dirtyBlocks) > 0
err = ForeachSnapshotObjects(
snapshotTS,
func(obj logtailreplay.ObjectInfo, isCommitted bool) (err2 error) {
var ok bool
objStats := obj.ObjectStats
totalBlocks += float64(objStats.BlkCnt())
if fastFilterOp != nil {
fastFilterTotal++
if ok, err2 = fastFilterOp(objStats); err2 != nil || !ok {
fastFilterHit++
return
}
}
Expand All @@ -1066,17 +1038,14 @@ func ExecuteBlockFilter(
bf objectio.BloomFilter
)
if loadOp != nil {
loadHit++
if meta, bf, err2 = loadOp(
proc.Ctx, objStats, meta, bf,
); err2 != nil {
return
}
}
if objectFilterOp != nil {
objFilterTotal++
if ok, err2 = objectFilterOp(meta, bf); err2 != nil || !ok {
objFilterHit++
return
}
}
Expand Down Expand Up @@ -1109,7 +1078,6 @@ func ExecuteBlockFilter(
for ; pos < blockCnt; pos++ {
var blkMeta objectio.BlockObject
if dataMeta != nil && blockFilterOp != nil {
blkFilterTotal++
var (
quickBreak, ok2 bool
)
Expand All @@ -1120,12 +1088,10 @@ func ExecuteBlockFilter(
}
// skip the following block checks
if quickBreak {
blkFilterHit++
break
}
// skip this block
if !ok2 {
blkFilterHit++
continue
}
}
Expand Down
72 changes: 67 additions & 5 deletions pkg/vm/engine/disttae/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,79 @@ func (mixin *withFilterMixin) getReadFilter(proc *process.Process, blkCnt int) (
return
}
pk := mixin.tableDef.Pkey
if mixin.filterState.expr == nil || pk == nil {
if pk == nil {
mixin.filterState.evaluated = true
mixin.filterState.filter = nil
return
}
return mixin.getNonCompositPKFilter(proc, blkCnt)
if pk.CompPkeyCol == nil {
return mixin.getNonCompositPKFilter(proc, blkCnt)
}
return mixin.getCompositPKFilter(proc, blkCnt)
}

func (mixin *withFilterMixin) getCompositPKFilter(proc *process.Process, blkCnt int) (
filter blockio.ReadFilter,
) {
// if no primary key is included in the columns or no filter expr is given,
// no filter is needed
if len(mixin.columns.compPKPositions) == 0 || mixin.filterState.expr == nil {
mixin.filterState.evaluated = true
mixin.filterState.filter = nil
return
}

// evaluate
pkNames := mixin.tableDef.Pkey.Names
pkVals := make([]*plan.Literal, len(pkNames))
ok, hasNull := getCompositPKVals(mixin.filterState.expr, pkNames, pkVals, proc)

if !ok || pkVals[0] == nil {
mixin.filterState.evaluated = true
mixin.filterState.filter = nil
mixin.filterState.hasNull = hasNull
return
}
cnt := getValidCompositePKCnt(pkVals)
pkVals = pkVals[:cnt]

filterFuncs := make([]func(*vector.Vector, []int32, *[]int32), len(pkVals))
for i := range filterFuncs {
filterFuncs[i] = getCompositeFilterFuncByExpr(pkVals[i], i == 0)
}

filter = func(vecs []*vector.Vector) []int32 {
var (
inputSels []int32
)
for i := range filterFuncs {
vec := vecs[i]
mixin.sels = mixin.sels[:0]
filterFuncs[i](vec, inputSels, &mixin.sels)
if len(mixin.sels) == 0 {
break
}
inputSels = mixin.sels
}
// logutil.Debugf("%s: %d/%d", mixin.tableDef.Name, len(res), vecs[0].Length())

return mixin.sels
}

mixin.filterState.evaluated = true
mixin.filterState.filter = filter
mixin.filterState.seqnums = make([]uint16, 0, len(mixin.columns.compPKPositions))
mixin.filterState.colTypes = make([]types.Type, 0, len(mixin.columns.compPKPositions))
for _, pos := range mixin.columns.compPKPositions {
mixin.filterState.seqnums = append(mixin.filterState.seqnums, mixin.columns.seqnums[pos])
mixin.filterState.colTypes = append(mixin.filterState.colTypes, mixin.columns.colTypes[pos])
}
// records how many blks one reader needs to read when having filter
objectio.BlkReadStats.BlksByReaderStats.Record(1, blkCnt)
return
}

func (mixin *withFilterMixin) getNonCompositPKFilter(
proc *process.Process, blkCnt int,
) blockio.ReadFilter {
func (mixin *withFilterMixin) getNonCompositPKFilter(proc *process.Process, blkCnt int) blockio.ReadFilter {
// if no primary key is included in the columns or no filter expr is given,
// no filter is needed
if mixin.columns.pkPos == -1 || mixin.filterState.expr == nil {
Expand Down
Loading

0 comments on commit 18f806e

Please sign in to comment.