diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 098f0cf543a9..2f472b55424f 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -17,6 +17,7 @@ import ( "math" "math/rand" "reflect" + "regexp" "sort" "strconv" "sync/atomic" @@ -66,10 +67,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" @@ -1062,6 +1065,123 @@ func TestStoreRangeSplitWithConcurrentWrites(t *testing.T) { }) } +// TestStoreRangeSplitWithTracing tests that the split queue logs traces for +// slow splits. +func TestStoreRangeSplitWithTracing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + l := log.ScopeWithoutShowLogs(t) + _ = log.SetVModule("split_queue=1") + defer l.Close(t) + + splitKey := roachpb.Key("b") + var targetRange atomic.Int32 + manualClock := hlc.NewHybridManualClock() + filter := func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error { + if req, ok := request.GetArg(kvpb.EndTxn); ok { + et := req.(*kvpb.EndTxnRequest) + if tr := et.InternalCommitTrigger.GetSplitTrigger(); tr != nil { + if tr.RightDesc.StartKey.Equal(splitKey) { + // Manually increment the replica's clock to simulate a slow split. + manualClock.Increment(kvserver.SlowSplitTracingThreshold.Default().Nanoseconds()) + } + } + } + return nil + } + + // Override the load-based split key funciton to force the range to be + // processed by the split queue. + overrideLBSplitFn := func(rangeID roachpb.RangeID) (roachpb.Key, bool) { + if rangeID == roachpb.RangeID(targetRange.Load()) { + return splitKey, true + } + return nil, false + } + + ctx := context.Background() + s := serverutils.StartServerOnly(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + WallClock: manualClock, + }, + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + TestingRequestFilter: filter, + LoadBasedSplittingOverrideKey: overrideLBSplitFn, + }, + }, + }) + + defer s.Stopper().Stop(ctx) + store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + + // Write some data on both sides of the split key. + _, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("a"), []byte("foo"))) + require.NoError(t, pErr.GoError()) + _, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs(splitKey, []byte("bar"))) + require.NoError(t, pErr.GoError()) + _, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("c"), []byte("foo"))) + require.NoError(t, pErr.GoError()) + + splitKeyAddr, err := keys.Addr(splitKey) + require.NoError(t, err) + repl := store.LookupReplica(splitKeyAddr) + targetRange.Store(int32(repl.RangeID)) + + recording, processErr, enqueueErr := store.Enqueue( + ctx, "split", repl, true /* skipShouldQueue */, false, /* async */ + ) + require.NoError(t, enqueueErr) + require.NoError(t, processErr) + + // Flush logs and get log messages from split_queue.go + log.FlushFiles() + entries, err := log.FetchEntriesFromFiles(math.MinInt64, math.MaxInt64, 100, + regexp.MustCompile(`split_queue\.go`), log.WithMarkedSensitiveData) + require.NoError(t, err) + + opName := "split" + traceRegexp, err := regexp.Compile(`trace:.*`) + require.NoError(t, err) + opRegexp, err := regexp.Compile(fmt.Sprintf(`operation:%s`, opName)) + require.NoError(t, err) + + // Find the log entry to validate the trace output. + foundEntry := false + var entry logpb.Entry + for _, entry = range entries { + if opRegexp.MatchString(entry.Message) { + foundEntry = true + break + } + } + require.True(t, foundEntry) + + // Validate that the trace is included in the log message. + require.Regexp(t, traceRegexp, entry.Message) + + // Validate that the returned tracing span includes the operation, but also + // that the stringified trace was not logged to the span or its parent. + processRecSpan, foundSpan := recording.FindSpan(opName) + require.True(t, foundSpan) + + foundParent := false + var parentRecSpan tracingpb.RecordedSpan + for _, parentRecSpan = range recording { + if parentRecSpan.SpanID == processRecSpan.ParentSpanID { + foundParent = true + break + } + } + require.True(t, foundParent) + spans := tracingpb.Recording{parentRecSpan, processRecSpan} + stringifiedSpans := spans.String() + require.NotRegexp(t, traceRegexp, stringifiedSpans) +} + // RaftMessageHandlerInterceptor wraps a storage.IncomingRaftMessageHandler. It // delegates all methods to the underlying storage.IncomingRaftMessageHandler, // except that HandleSnapshot calls receiveSnapshotFilter with the snapshot diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index eff6f507fef2..4c6153b5facd 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -19,12 +19,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -44,6 +47,16 @@ const ( // RocksDB scans over part of the splitting range to recompute stats. We // allow a limitted number of splits to be processed at once. splitQueueConcurrency = 4 + // slowSplitThresholdDefault is the split processing time after which we will + // output a verbose trace. + slowSplitThresholdDefault = 2 * time.Second +) + +var SlowSplitTracingThreshold = settings.RegisterDurationSetting( + settings.SystemOnly, + "kv.split.slow_split_tracing_threshold", + "the duration after which a trace of the split is logged", + slowSplitThresholdDefault, ) var ( @@ -92,6 +105,8 @@ type splitQueue struct { // loadBasedCount counts the load-based splits performed by the queue. loadBasedCount telemetry.Counter metrics SplitQueueMetrics + // logTracesThreshold is the threshold for logging a trace of a slow split. + logTracesThreshold time.Duration } var _ queueImpl = &splitQueue{} @@ -107,10 +122,11 @@ func newSplitQueue(store *Store, db *kv.DB) *splitQueue { } sq := &splitQueue{ - db: db, - purgChan: purgChan, - loadBasedCount: telemetry.GetCounter("kv.split.load"), - metrics: makeSplitQueueMetrics(), + db: db, + purgChan: purgChan, + loadBasedCount: telemetry.GetCounter("kv.split.load"), + metrics: makeSplitQueueMetrics(), + logTracesThreshold: SlowSplitTracingThreshold.Get(&store.ClusterSettings().SV), } store.metrics.registry.AddMetricStruct(&sq.metrics) sq.baseQueue = newBaseQueue( @@ -210,7 +226,7 @@ var _ PurgatoryError = unsplittableRangeError{} func (sq *splitQueue) process( ctx context.Context, r *Replica, confReader spanconfig.StoreReader, ) (processed bool, err error) { - processed, err = sq.processAttempt(ctx, r, confReader) + processed, err = sq.processAttemptWithTracing(ctx, r, confReader) if errors.HasType(err, (*kvpb.ConditionFailedError)(nil)) { // ConditionFailedErrors are an expected outcome for range split // attempts because splits can race with other descriptor modifications. @@ -224,6 +240,45 @@ func (sq *splitQueue) process( return processed, err } +// processAttemptWithTracing executes processAttempt within a tracing span, +// logging the resulting traces in the case of errors or when the configured log +// traces threshold is exceeded. +func (sq *splitQueue) processAttemptWithTracing( + ctx context.Context, r *Replica, confReader spanconfig.StoreReader, +) (processed bool, _ error) { + processStart := r.Clock().PhysicalTime() + ctx, sp := tracing.EnsureChildSpan(ctx, sq.Tracer, "split", + tracing.WithRecording(tracingpb.RecordingVerbose)) + defer sp.Finish() + + processed, err := sq.processAttempt(ctx, r, confReader) + + // Utilize a new background context (properly annotated) to avoid writing + // traces from a child context into its parent. + { + ctx := r.AnnotateCtx(sq.AnnotateCtx(context.Background())) + processDuration := r.Clock().PhysicalTime().Sub(processStart) + exceededDuration := sq.logTracesThreshold > time.Duration(0) && processDuration > sq.logTracesThreshold + + var traceOutput redact.RedactableString + traceLoggingNeeded := (err != nil || exceededDuration) && log.ExpensiveLogEnabled(ctx, 1) + if traceLoggingNeeded { + // Add any trace filtering here if the output is too verbose. + rec := sp.GetConfiguredRecording() + traceOutput = redact.Sprintf("\ntrace:\n%s", rec) + } + + if err != nil { + log.Infof(ctx, "error during range split: %v%s", err, traceOutput) + } else if exceededDuration { + log.Infof(ctx, "range split took %s, exceeding threshold of %s%s", + processDuration, sq.logTracesThreshold, traceOutput) + } + } + + return processed, err +} + func (sq *splitQueue) processAttempt( ctx context.Context, r *Replica, confReader spanconfig.StoreReader, ) (processed bool, err error) {