diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index d71be238ef2..d9727618b7b 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -46,6 +46,8 @@ type worker struct { flushInterval time.Duration hasPending bool postTxnExecutedCallbacks []func() + + lastSlowConflictDetectLog map[model.TableID]time.Time } func newWorker(ctx context.Context, changefeedID model.ChangeFeedID, @@ -70,6 +72,8 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID, flushInterval: backend.MaxFlushInterval(), hasPending: false, postTxnExecutedCallbacks: make([]func(), 0, 1024), + + lastSlowConflictDetectLog: make(map[model.TableID]time.Time), } } @@ -87,6 +91,9 @@ func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) erro zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) + cleanSlowLogHistory := time.NewTicker(time.Hour) + defer cleanSlowLogHistory.Stop() + start := time.Now() for { select { @@ -95,6 +102,15 @@ func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) erro zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) return nil + case <-cleanSlowLogHistory.C: + lastSlowConflictDetectLog := w.lastSlowConflictDetectLog + w.lastSlowConflictDetectLog = make(map[model.TableID]time.Time) + now := time.Now() + for tableID, lastLog := range lastSlowConflictDetectLog { + if now.Sub(lastLog) <= time.Minute { + w.lastSlowConflictDetectLog[tableID] = lastLog + } + } case txn := <-txnCh: // we get the data from txnCh until no more data here or reach the state that can be flushed. // If no more data in txnCh, and also not reach the state that can be flushed, @@ -151,8 +167,24 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool { return false } - w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds()) + conflictDetectTime := txn.conflictResolved.Sub(txn.start).Seconds() + w.metricConflictDetectDuration.Observe(conflictDetectTime) w.metricQueueDuration.Observe(time.Since(txn.start).Seconds()) + + // Log tables which conflict detect time larger than 1 minute. + if conflictDetectTime > float64(60) { + now := time.Now() + // Log slow conflict detect tables every minute. + if lastLog, ok := w.lastSlowConflictDetectLog[txn.Event.PhysicalTableID]; !ok || now.Sub(lastLog) > time.Minute { + log.Warn("Transaction dmlSink finds a slow transaction in conflict detector", + zap.String("changefeedID", w.changefeed), + zap.Int("workerID", w.ID), + zap.Int64("TableID", txn.Event.PhysicalTableID), + zap.Float64("seconds", conflictDetectTime)) + w.lastSlowConflictDetectLog[txn.Event.PhysicalTableID] = now + } + } + w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows))) w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted) return w.backend.OnTxnEvent(txn.TxnCallbackableEvent)