Skip to content

Commit

Permalink
kvserver: use a threshold on MVCC stats discrepancy to fall back to a…
Browse files Browse the repository at this point in the history
…ccurate-stats split

This patch is a followup to cockroachdb#119499, and allows to add thresholds on the
number of keys and bytes of the difference between the pre-split MVCC
stats (retrieved in AdminSplit) and the stats when the split holds
latches (retrieved in splitTrigger). This difference corresponds to
writes concurrent with the split. If the difference is too large, the
split falls back to computing LHS and RHS stats accurately. The
difference is computed only for stats corresponding to user-data;
system stats are always kept accurate.

These thresholds are tunable by two new cluster settings,
MaxMVCCStatCountDiff and MaxMVCCStatBytesDiff, which denote the maximum
number of entities (e.g. keys, intents) and maximum number of bytes
(e.g. value bytes, range value bytes), respectively, that are
acceptable as the difference between the pre- and post-split values of
an individual stat. The former defaults to 5000, and the latter to
5.12MB (1% of the maximum range size).

Fixes: cockroachdb#119503

Release note: None
  • Loading branch information
miraradeva committed Mar 19, 2024
1 parent 5ce9052 commit 6b4d481
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 118 deletions.
53 changes: 46 additions & 7 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
Expand All @@ -40,6 +41,30 @@ import (
"github.com/cockroachdb/redact"
)

// MaxMVCCStatCountDiff defines the maximum number of units (e.g. keys or
// intents) that is acceptable for an individual MVCC stat to diverge from the
// real value when computed during splits. If this threshold is
// exceeded, the split will fall back to computing 100% accurate stats.
// It takes effect only if kv.split.estimated_mvcc_stats.enabled is true.
var MaxMVCCStatCountDiff = settings.RegisterIntSetting(
settings.SystemVisible,
"kv.split.max_mvcc_stat_count_diff",
"defines the max number of units that are acceptable for an individual "+
"MVCC stat to diverge; needs kv.split.estimated_mvcc_stats.enabled to be true",
5000)

// MaxMVCCStatBytesDiff defines the maximum number of bytes (e.g. keys bytes or
// intents bytes) that is acceptable for an individual MVCC stat to diverge
// from the real value when computed during splits. If this threshold is
// exceeded, the split will fall back to computing 100% accurate stats.
// It takes effect only if kv.split.estimated_mvcc_stats.enabled is true.
var MaxMVCCStatBytesDiff = settings.RegisterIntSetting(
settings.SystemVisible,
"kv.split.max_mvcc_stat_bytes_diff",
"defines the max number of bytes that are acceptable for an individual "+
"MVCC stat to diverge; needs kv.split.estimated_mvcc_stats.enabled to be true",
5120000) // 5.12 MB = 1% of the max range size

func init() {
RegisterReadWriteCommand(kvpb.EndTxn, declareKeysEndTxn, EndTxn)
}
Expand Down Expand Up @@ -1070,12 +1095,15 @@ func splitTrigger(
DeltaBatchEstimated: bothDeltaMS,
DeltaRangeKey: rangeKeyDeltaMS,
PreSplitLeftUser: split.PreSplitLeftUserStats,
PreSplitStats: split.PreSplitStats,
PostSplitScanLeftFn: makeScanStatsFn(ctx, batch, ts, &split.LeftDesc, "left hand side", false /* excludeUserSpans */),
PostSplitScanRightFn: makeScanStatsFn(ctx, batch, ts, &split.RightDesc, "right hand side", false /* excludeUserSpans */),
PostSplitScanLocalLeftFn: makeScanStatsFn(ctx, batch, ts, &split.LeftDesc, "local left hand side", true /* excludeUserSpans */),
ScanRightFirst: splitScansRightForStatsFirst || emptyRHS,
LeftUserIsEmpty: emptyLHS,
RightUserIsEmpty: emptyRHS,
MaxCountDiff: MaxMVCCStatCountDiff.Get(&rec.ClusterSettings().SV),
MaxBytesDiff: MaxMVCCStatBytesDiff.Get(&rec.ClusterSettings().SV),
}
return splitTriggerHelper(ctx, rec, batch, h, split, ts)
}
Expand Down Expand Up @@ -1149,17 +1177,28 @@ func splitTriggerHelper(
// modifications to the right hand side are accounted for by updating the
// helper's AbsPostSplitRight() reference.
var h splitStatsHelper
// If the leaseholder node is running an older version, it would not include
// the PreSplitLeftUser proto field, which is needed in
// makeEstimatedSplitStatsHelper, so we fall back to accurate stats computation.
// There are three conditions under which we want to fall back to accurate
// stats computation:
// 1. There are no pre-computed stats for the LHS. This can happen if
// kv.split.estimated_mvcc_stats.enabled is disabled, or if the leaseholder
// node is running an older version. Pre-computed stats are necessary for
// makeEstimatedSplitStatsHelper to estimate the stats.
// Note that PreSplitLeftUserStats can also be equal to enginepb.MVCCStats{}
// when the user LHS stats are all zero, but in that case it's ok to fall back
// to accurate stats computation because scanning the empty LHS is not
// expensive.
if split.PreSplitLeftUserStats == (enginepb.MVCCStats{}) ||
// If either side contains no user data, fall back to accurate stats
// computation because scanning the empty ranges is cheap.
statsInput.LeftUserIsEmpty || statsInput.RightUserIsEmpty {
noPreComputedStats := split.PreSplitLeftUserStats == enginepb.MVCCStats{}
// 2. If either side contains no user data; scanning the empty ranges is
// cheap.
emptyLeftOrRight := statsInput.LeftUserIsEmpty || statsInput.RightUserIsEmpty
// 3. If the user pre-split stats differ significantly from the current stats
// stored on disk. Note that the current stats on disk were corrected in
// AdminSplit, so any differences we see here are due to writes concurrent
// with this split (not compounded estimates from previous splits).
preComputedStatsDiff := !statsInput.AbsPreSplitBothStored.HasUserDataCloseTo(
statsInput.PreSplitStats, statsInput.MaxCountDiff, statsInput.MaxBytesDiff)

if noPreComputedStats || emptyLeftOrRight || preComputedStatsDiff {
h, err = makeSplitStatsHelper(statsInput)
} else {
h, err = makeEstimatedSplitStatsHelper(statsInput)
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/batcheval/split_stats_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ type splitStatsHelperInput struct {
// side. Those are expensive to compute, so we compute them in AdminSplit,
// before holding latches, and pass them to splitTrigger.
PreSplitLeftUser enginepb.MVCCStats
// PreSplitStats are the total on-disk stats before the split (in AdminSplit).
PreSplitStats enginepb.MVCCStats
// PostSplitScanLeftFn returns the stats for the left hand side of the
// split computed by scanning relevant part of the range.
PostSplitScanLeftFn splitStatsScanFn
Expand Down Expand Up @@ -160,6 +162,16 @@ type splitStatsHelperInput struct {
// entire right hand side will be scanned to compute stats accurately. This is
// cheap because the range is empty.
RightUserIsEmpty bool
// Max number of entities (keys, values, etc.) corresponding to a single MVCC
// stat (e.g. KeyCount) that is acceptable as the absolute difference between
// PreSplitStats and AbsPreSplitBothStored.
// Tuned by kv.split.max_mvcc_stat_count_diff.
MaxCountDiff int64
// Max number of bytes corresponding to a single MVCC stat (e.g. KeyBytes)
// that is acceptable as the absolute difference between PreSplitStats and
// AbsPreSplitBothStored.
// Tuned by kv.split.max_mvcc_stat_bytes_diff.
MaxBytesDiff int64
}

// makeSplitStatsHelper initializes a splitStatsHelper. The values in the input
Expand Down
237 changes: 127 additions & 110 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
Expand Down Expand Up @@ -924,123 +925,139 @@ func TestStoreRangeSplitWithConcurrentWrites(t *testing.T) {

testutils.RunTrueAndFalse(t, "estimates", func(t *testing.T, estimates bool) {
testutils.RunTrueAndFalse(t, "recompute", func(t *testing.T, recompute bool) {
settings := cluster.MakeTestingClusterSettings()
kvserver.EnableEstimatedMVCCStatsInSplit.Override(context.Background(), &settings.SV, estimates)
kvserver.EnableMVCCStatsRecomputationInSplit.Override(context.Background(), &settings.SV, recompute)

ctx := context.Background()
s := serverutils.StartServerOnly(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingRequestFilter: filter,
},
},
Settings: settings,
})

defer s.Stopper().Stop(ctx)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)

// Write some initial data to the future LHS.
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("a"), []byte("foo")))
require.NoError(t, pErr.GoError())
// Write some initial data to the future RHS.
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("c"), []byte("bar")))
require.NoError(t, pErr.GoError())

splitKeyAddr, err := keys.Addr(splitKey)
require.NoError(t, err)
lhsRepl := store.LookupReplica(splitKeyAddr)

// Split the range.
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKey))
return pErr.GoError()
})

// Wait until split is underway.
<-splitBlocked
testutils.RunTrueAndFalse(t, "maxCount", func(t *testing.T, maxCount bool) {
testutils.RunTrueAndFalse(t, "maxBytes", func(t *testing.T, maxBytes bool) {
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
kvserver.EnableEstimatedMVCCStatsInSplit.Override(ctx, &settings.SV, estimates)
kvserver.EnableMVCCStatsRecomputationInSplit.Override(ctx, &settings.SV, recompute)
if maxCount {
// If there is even a single write concurrent with the split, fall
// back to accurate stats computation.
batcheval.MaxMVCCStatCountDiff.Override(ctx, &settings.SV, 0)
}
if maxBytes {
// If there are more than 10 bytes of writes concurrent with the split, fall
// back to accurate stats computation.
batcheval.MaxMVCCStatBytesDiff.Override(ctx, &settings.SV, 10)
}

// Write some more data to both sides.
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("aa"), []byte("foo")))
require.NoError(t, pErr.GoError())
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("cc"), []byte("bar")))
require.NoError(t, pErr.GoError())
s := serverutils.StartServerOnly(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingRequestFilter: filter,
},
},
Settings: settings,
})

// Unblock the split.
splitBlocked <- struct{}{}
defer s.Stopper().Stop(ctx)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)

// Write some initial data to the future LHS.
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("a"), []byte("foo")))
require.NoError(t, pErr.GoError())
// Write some initial data to the future RHS.
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("c"), []byte("bar")))
require.NoError(t, pErr.GoError())

splitKeyAddr, err := keys.Addr(splitKey)
require.NoError(t, err)
lhsRepl := store.LookupReplica(splitKeyAddr)

// Split the range.
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKey))
return pErr.GoError()
})

// Wait for the split to complete.
require.Nil(t, g.Wait())
// Wait until split is underway.
<-splitBlocked

snap := store.TODOEngine().NewSnapshot()
defer snap.Close()
lhsStats, err := stateloader.Make(lhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhsRepl := store.LookupReplica(splitKeyAddr)
rhsStats, err := stateloader.Make(rhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
// If the split is producing estimates, expect non-zero ContainsEstimates.
if estimates {
require.Greater(t, lhsStats.ContainsEstimates, int64(0))
require.Greater(t, rhsStats.ContainsEstimates, int64(0))
} else {
// Otherwise, the stats should agree with re-computation.
assertRecomputedStats(t, "LHS after split", snap, lhsRepl.Desc(), lhsStats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS after split", snap, rhsRepl.Desc(), rhsStats, s.Clock().PhysicalNow())
}
// Write some more data to both sides.
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("aa"), []byte("foo")))
require.NoError(t, pErr.GoError())
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("cc"), []byte("bar")))
require.NoError(t, pErr.GoError())

// If we used estimated stats while splitting the range, the stats on disk
// will not match the stats recomputed from the range. We expect both of the
// concurrent writes to be attributed to the RHS (instead of 1 to the LHS and
// 1 th the RHS). But if we split these ranges one more time (no concurrent
// writes this time), we expect the stats to be corrected and not drift
// (when recompute = true).
splitKeyLeft := roachpb.Key("aa")
splitKeyLeftAddr, err := keys.Addr(splitKeyLeft)
require.NoError(t, err)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKeyLeft))
require.NoError(t, pErr.GoError())
splitKeyRight := roachpb.Key("bb")
splitKeyRightAddr, err := keys.Addr(splitKeyRight)
require.NoError(t, err)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKeyRight))
require.NoError(t, pErr.GoError())
// Unblock the split.
splitBlocked <- struct{}{}

snap = store.TODOEngine().NewSnapshot()
defer snap.Close()
lhs1Stats, err := stateloader.Make(lhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
lhs2Repl := store.LookupReplica(splitKeyLeftAddr)
lhs2Stats, err := stateloader.Make(lhs2Repl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhs1Stats, err := stateloader.Make(rhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhs2Repl := store.LookupReplica(splitKeyRightAddr)
rhs2Stats, err := stateloader.Make(rhs2Repl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
// Wait for the split to complete.
require.Nil(t, g.Wait())

snap := store.TODOEngine().NewSnapshot()
defer snap.Close()
lhsStats, err := stateloader.Make(lhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhsRepl := store.LookupReplica(splitKeyAddr)
rhsStats, err := stateloader.Make(rhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
// If the split is producing estimates and neither of the tight count
// and bytes thresholds is set, expect non-zero ContainsEstimates.
expectContainsEstimates := estimates && !(maxCount || maxBytes)
if expectContainsEstimates {
require.Greater(t, lhsStats.ContainsEstimates, int64(0))
require.Greater(t, rhsStats.ContainsEstimates, int64(0))
} else {
// Otherwise, the stats should agree with re-computation.
assertRecomputedStats(t, "LHS after split", snap, lhsRepl.Desc(), lhsStats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS after split", snap, rhsRepl.Desc(), rhsStats, s.Clock().PhysicalNow())
}

// Stats should agree with re-computation unless we're producing
// estimates and not re-computing stats at the beginning of splits.
expectIncorrectStats := estimates && !recompute
if !expectIncorrectStats {
assertRecomputedStats(t, "LHS1 after second split", snap, lhsRepl.Desc(), lhs1Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "LHS2 after second split", snap, lhs2Repl.Desc(), lhs2Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS1 after second split", snap, rhsRepl.Desc(), rhs1Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS2 after second split", snap, rhs2Repl.Desc(), rhs2Stats, s.Clock().PhysicalNow())
}
if estimates {
require.Greater(t, lhs1Stats.ContainsEstimates, int64(0))
require.Greater(t, lhs2Stats.ContainsEstimates, int64(0))
// The range corresponding to rhs1Stats is empty, so the split of the
// original RHS fell back to accurate-stats computation.
require.Equal(t, int64(0), rhs1Stats.ContainsEstimates)
require.Equal(t, int64(0), rhs2Stats.ContainsEstimates)
}
// If we used estimated stats while splitting the range, the stats on disk
// will not match the stats recomputed from the range. We expect both of the
// concurrent writes to be attributed to the RHS (instead of 1 to the LHS and
// 1 th the RHS). But if we split these ranges one more time (no concurrent
// writes this time), we expect the stats to be corrected and not
// drift (when recompute = true).
splitKeyLeft := roachpb.Key("aa")
splitKeyLeftAddr, err := keys.Addr(splitKeyLeft)
require.NoError(t, err)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKeyLeft))
require.NoError(t, pErr.GoError())
splitKeyRight := roachpb.Key("bb")
splitKeyRightAddr, err := keys.Addr(splitKeyRight)
require.NoError(t, err)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKeyRight))
require.NoError(t, pErr.GoError())

snap = store.TODOEngine().NewSnapshot()
defer snap.Close()
lhs1Stats, err := stateloader.Make(lhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
lhs2Repl := store.LookupReplica(splitKeyLeftAddr)
lhs2Stats, err := stateloader.Make(lhs2Repl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhs1Stats, err := stateloader.Make(rhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhs2Repl := store.LookupReplica(splitKeyRightAddr)
rhs2Stats, err := stateloader.Make(rhs2Repl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)

// Stats should agree with re-computation unless we're producing
// estimates and not re-computing stats at the beginning of splits.
expectIncorrectStats := expectContainsEstimates && !recompute
if !expectIncorrectStats {
assertRecomputedStats(t, "LHS1 after second split", snap, lhsRepl.Desc(), lhs1Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "LHS2 after second split", snap, lhs2Repl.Desc(), lhs2Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS1 after second split", snap, rhsRepl.Desc(), rhs1Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS2 after second split", snap, rhs2Repl.Desc(), rhs2Stats, s.Clock().PhysicalNow())
}
if expectContainsEstimates {
require.Greater(t, lhs1Stats.ContainsEstimates, int64(0))
require.Greater(t, lhs2Stats.ContainsEstimates, int64(0))
// The range corresponding to rhs1Stats is empty, so the split of the
// original RHS fell back to accurate-stats computation.
require.Equal(t, int64(0), rhs1Stats.ContainsEstimates)
require.Equal(t, int64(0), rhs2Stats.ContainsEstimates)
}
})
})
})
})
}
Expand Down
Loading

0 comments on commit 6b4d481

Please sign in to comment.