Skip to content

Commit

Permalink
kvserver: log a trace for slow splits
Browse files Browse the repository at this point in the history
This change enables logging a trace of a split request if it exceeds a
certain duration. The duration is controlled by a new cluster setting:
`kv.split.slow_split_tracing_threshold`, which defaults to 2s.

Fixes: cockroachdb#81152

Release note: None
  • Loading branch information
miraradeva authored and msbutler committed Mar 29, 2024
1 parent af328f4 commit 505c288
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 5 deletions.
120 changes: 120 additions & 0 deletions pkg/kv/kvserver/client_split_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"math"
"math/rand"
"reflect"
"regexp"
"sort"
"strconv"
"sync/atomic"
Expand Down Expand Up @@ -66,10 +67,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1062,6 +1065,123 @@ func TestStoreRangeSplitWithConcurrentWrites(t *testing.T) {
})
}

// TestStoreRangeSplitWithTracing tests that the split queue logs traces for
// slow splits.
func TestStoreRangeSplitWithTracing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
l := log.ScopeWithoutShowLogs(t)
_ = log.SetVModule("split_queue=1")
defer l.Close(t)

splitKey := roachpb.Key("b")
var targetRange atomic.Int32
manualClock := hlc.NewHybridManualClock()
filter := func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error {
if req, ok := request.GetArg(kvpb.EndTxn); ok {
et := req.(*kvpb.EndTxnRequest)
if tr := et.InternalCommitTrigger.GetSplitTrigger(); tr != nil {
if tr.RightDesc.StartKey.Equal(splitKey) {
// Manually increment the replica's clock to simulate a slow split.
manualClock.Increment(kvserver.SlowSplitTracingThreshold.Default().Nanoseconds())
}
}
}
return nil
}

// Override the load-based split key funciton to force the range to be
// processed by the split queue.
overrideLBSplitFn := func(rangeID roachpb.RangeID) (roachpb.Key, bool) {
if rangeID == roachpb.RangeID(targetRange.Load()) {
return splitKey, true
}
return nil, false
}

ctx := context.Background()
s := serverutils.StartServerOnly(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingRequestFilter: filter,
LoadBasedSplittingOverrideKey: overrideLBSplitFn,
},
},
})

defer s.Stopper().Stop(ctx)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)

// Write some data on both sides of the split key.
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("a"), []byte("foo")))
require.NoError(t, pErr.GoError())
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs(splitKey, []byte("bar")))
require.NoError(t, pErr.GoError())
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("c"), []byte("foo")))
require.NoError(t, pErr.GoError())

splitKeyAddr, err := keys.Addr(splitKey)
require.NoError(t, err)
repl := store.LookupReplica(splitKeyAddr)
targetRange.Store(int32(repl.RangeID))

recording, processErr, enqueueErr := store.Enqueue(
ctx, "split", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
require.NoError(t, processErr)

// Flush logs and get log messages from split_queue.go
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(math.MinInt64, math.MaxInt64, 100,
regexp.MustCompile(`split_queue\.go`), log.WithMarkedSensitiveData)
require.NoError(t, err)

opName := "split"
traceRegexp, err := regexp.Compile(`trace:.*`)
require.NoError(t, err)
opRegexp, err := regexp.Compile(fmt.Sprintf(`operation:%s`, opName))
require.NoError(t, err)

// Find the log entry to validate the trace output.
foundEntry := false
var entry logpb.Entry
for _, entry = range entries {
if opRegexp.MatchString(entry.Message) {
foundEntry = true
break
}
}
require.True(t, foundEntry)

// Validate that the trace is included in the log message.
require.Regexp(t, traceRegexp, entry.Message)

// Validate that the returned tracing span includes the operation, but also
// that the stringified trace was not logged to the span or its parent.
processRecSpan, foundSpan := recording.FindSpan(opName)
require.True(t, foundSpan)

foundParent := false
var parentRecSpan tracingpb.RecordedSpan
for _, parentRecSpan = range recording {
if parentRecSpan.SpanID == processRecSpan.ParentSpanID {
foundParent = true
break
}
}
require.True(t, foundParent)
spans := tracingpb.Recording{parentRecSpan, processRecSpan}
stringifiedSpans := spans.String()
require.NotRegexp(t, traceRegexp, stringifiedSpans)
}

// RaftMessageHandlerInterceptor wraps a storage.IncomingRaftMessageHandler. It
// delegates all methods to the underlying storage.IncomingRaftMessageHandler,
// except that HandleSnapshot calls receiveSnapshotFilter with the snapshot
Expand Down
65 changes: 60 additions & 5 deletions pkg/kv/kvserver/split_queue.go
Expand Up @@ -19,12 +19,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand All @@ -44,6 +47,16 @@ const (
// RocksDB scans over part of the splitting range to recompute stats. We
// allow a limitted number of splits to be processed at once.
splitQueueConcurrency = 4
// slowSplitThresholdDefault is the split processing time after which we will
// output a verbose trace.
slowSplitThresholdDefault = 2 * time.Second
)

var SlowSplitTracingThreshold = settings.RegisterDurationSetting(
settings.SystemOnly,
"kv.split.slow_split_tracing_threshold",
"the duration after which a trace of the split is logged",
slowSplitThresholdDefault,
)

var (
Expand Down Expand Up @@ -92,6 +105,8 @@ type splitQueue struct {
// loadBasedCount counts the load-based splits performed by the queue.
loadBasedCount telemetry.Counter
metrics SplitQueueMetrics
// logTracesThreshold is the threshold for logging a trace of a slow split.
logTracesThreshold time.Duration
}

var _ queueImpl = &splitQueue{}
Expand All @@ -107,10 +122,11 @@ func newSplitQueue(store *Store, db *kv.DB) *splitQueue {
}

sq := &splitQueue{
db: db,
purgChan: purgChan,
loadBasedCount: telemetry.GetCounter("kv.split.load"),
metrics: makeSplitQueueMetrics(),
db: db,
purgChan: purgChan,
loadBasedCount: telemetry.GetCounter("kv.split.load"),
metrics: makeSplitQueueMetrics(),
logTracesThreshold: SlowSplitTracingThreshold.Get(&store.ClusterSettings().SV),
}
store.metrics.registry.AddMetricStruct(&sq.metrics)
sq.baseQueue = newBaseQueue(
Expand Down Expand Up @@ -210,7 +226,7 @@ var _ PurgatoryError = unsplittableRangeError{}
func (sq *splitQueue) process(
ctx context.Context, r *Replica, confReader spanconfig.StoreReader,
) (processed bool, err error) {
processed, err = sq.processAttempt(ctx, r, confReader)
processed, err = sq.processAttemptWithTracing(ctx, r, confReader)
if errors.HasType(err, (*kvpb.ConditionFailedError)(nil)) {
// ConditionFailedErrors are an expected outcome for range split
// attempts because splits can race with other descriptor modifications.
Expand All @@ -224,6 +240,45 @@ func (sq *splitQueue) process(
return processed, err
}

// processAttemptWithTracing executes processAttempt within a tracing span,
// logging the resulting traces in the case of errors or when the configured log
// traces threshold is exceeded.
func (sq *splitQueue) processAttemptWithTracing(
ctx context.Context, r *Replica, confReader spanconfig.StoreReader,
) (processed bool, _ error) {
processStart := r.Clock().PhysicalTime()
ctx, sp := tracing.EnsureChildSpan(ctx, sq.Tracer, "split",
tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()

processed, err := sq.processAttempt(ctx, r, confReader)

// Utilize a new background context (properly annotated) to avoid writing
// traces from a child context into its parent.
{
ctx := r.AnnotateCtx(sq.AnnotateCtx(context.Background()))
processDuration := r.Clock().PhysicalTime().Sub(processStart)
exceededDuration := sq.logTracesThreshold > time.Duration(0) && processDuration > sq.logTracesThreshold

var traceOutput redact.RedactableString
traceLoggingNeeded := (err != nil || exceededDuration) && log.ExpensiveLogEnabled(ctx, 1)
if traceLoggingNeeded {
// Add any trace filtering here if the output is too verbose.
rec := sp.GetConfiguredRecording()
traceOutput = redact.Sprintf("\ntrace:\n%s", rec)
}

if err != nil {
log.Infof(ctx, "error during range split: %v%s", err, traceOutput)
} else if exceededDuration {
log.Infof(ctx, "range split took %s, exceeding threshold of %s%s",
processDuration, sq.logTracesThreshold, traceOutput)
}
}

return processed, err
}

func (sq *splitQueue) processAttempt(
ctx context.Context, r *Replica, confReader spanconfig.StoreReader,
) (processed bool, err error) {
Expand Down

0 comments on commit 505c288

Please sign in to comment.