Skip to content

Commit

Permalink
kv(ticdc): reduce eventfeed rate limited log (pingcap#4072)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus committed Dec 28, 2021
1 parent 83d64b3 commit e7bc9f5
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 63 deletions.
107 changes: 59 additions & 48 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ const (
// frequency of creating new goroutine.
defaultRegionChanSize = 128

// initial size for region rate limit queue
// initial size for region rate limit queue.
defaultRegionRateLimitQueueSize = 128
// Interval of check region retry rate limit queue.
defaultCheckRegionRateLimitInterval = 50 * time.Millisecond
// Duration of warning region retry rate limited too long.
defaultLogRegionRateLimitDuration = 10 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -135,6 +139,33 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan,
type regionErrorInfo struct {
singleRegionInfo
err error

retryLimitTime *time.Time
logRateLimitDuration time.Duration
}

func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo {
return regionErrorInfo{
singleRegionInfo: info,
err: err,

logRateLimitDuration: defaultLogRegionRateLimitDuration,
}
}

func (r *regionErrorInfo) logRateLimitedHint() bool {
now := time.Now()
if r.retryLimitTime == nil {
// Caller should log on the first rate limited.
r.retryLimitTime = &now
return true
}
if now.Sub(*r.retryLimitTime) > r.logRateLimitDuration {
// Caller should log if it lasts too long.
r.retryLimitTime = &now
return true
}
return false
}

type regionFeedState struct {
Expand Down Expand Up @@ -521,27 +552,35 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
tableID, tableName := util.TableIDFromCtx(ctx)
cfID := util.ChangefeedIDFromCtx(ctx)
g.Go(func() error {
checkRateLimitInterval := 50 * time.Millisecond
timer := time.NewTimer(checkRateLimitInterval)
timer := time.NewTimer(defaultCheckRegionRateLimitInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
s.handleRateLimit(ctx)
timer.Reset(checkRateLimitInterval)
timer.Reset(defaultCheckRegionRateLimitInterval)
case errInfo := <-s.errCh:
s.errChSizeGauge.Dec()
allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID())
if !allowed {
if errInfo.logRateLimitedHint() {
zapFieldAddr := zap.Skip()
if errInfo.singleRegionInfo.rpcCtx != nil {
// rpcCtx may be nil if we fails to get region info
// from pd. It could cause by pd down or the region
// has been merged.
zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr)
}
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zapFieldAddr)
}
// rate limit triggers, add the error info to the rate limit queue.
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr))
s.rateLimitQueue = append(s.rateLimitQueue, errInfo)
} else {
err := s.handleError(ctx, errInfo)
Expand Down Expand Up @@ -634,14 +673,13 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// onRegionFail handles a region's failure, which means, unlock the region's range and send the error to the errCh for
// error handling. This function is non blocking even if error channel is full.
// CAUTION: Note that this should only be called in a context that the region has locked it's range.
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) error {
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) {
log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err))
s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts)
if revokeToken {
s.regionRouter.Release(errorInfo.rpcCtx.Addr)
}
s.enqueueError(ctx, errorInfo)
return nil
}

// requestRegionToStore gets singleRegionInfo from regionRouter, which is a token
Expand Down Expand Up @@ -735,13 +773,8 @@ func (s *eventFeedSession) requestRegionToStore(
}
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err)
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &connectToStoreErr{},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &connectToStoreErr{})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
continue
}
s.addStream(rpcCtx.Addr, stream, streamCancel)
Expand Down Expand Up @@ -795,15 +828,8 @@ func (s *eventFeedSession) requestRegionToStore(
continue
}

// Wait for a while and retry sending the request
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &sendRequestToStoreErr{},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
} else {
s.regionRouter.Acquire(rpcCtx.Addr)
}
Expand Down Expand Up @@ -864,15 +890,8 @@ func (s *eventFeedSession) dispatchRequest(
log.Info("cannot get rpcCtx, retry span",
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span))
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &rpcCtxUnavailableErr{
verID: sri.verID,
},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
continue
}
sri.rpcCtx = rpcCtx
Expand Down Expand Up @@ -1090,14 +1109,8 @@ func (s *eventFeedSession) receiveFromStream(

remainingRegions := pendingRegions.takeAll()
for _, state := range remainingRegions {
err := s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(),
}, true /* revokeToken */)
if err != nil {
// The only possible is that the ctx is cancelled. Simply return.
return
}
errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs())
s.onRegionFail(ctx, errInfo, true /* revokeToken */)
}
}()

Expand All @@ -1109,9 +1122,7 @@ func (s *eventFeedSession) receiveFromStream(
// to call exactly once from outter code logic
worker := newRegionWorker(s, addr)

defer func() {
worker.evictAllRegions() //nolint:errcheck
}()
defer worker.evictAllRegions()

g.Go(func() error {
return worker.run(ctx)
Expand Down
17 changes: 17 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -3585,3 +3586,19 @@ func (s *clientSuite) TestHandleRateLimit(c *check.C) {
c.Assert(session.rateLimitQueue, check.HasLen, 0)
c.Assert(cap(session.rateLimitQueue), check.Equals, 128)
}

func TestRegionErrorInfoLogRateLimitedHint(t *testing.T) {
t.Parallel()

errInfo := newRegionErrorInfo(singleRegionInfo{}, nil)
errInfo.logRateLimitDuration = time.Second

// True on the first rate limited.
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())

// True if it lasts too long.
time.Sleep(2 * errInfo.logRateLimitDuration)
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())
}
21 changes: 6 additions & 15 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,8 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState
revokeToken := !state.initialized
// since the context used in region worker will be cancelled after region
// worker exits, we must use the parent context to prevent regionErrorInfo loss.
err2 := w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: err,
}, revokeToken)
if err2 != nil {
return err2
}
errInfo := newRegionErrorInfo(state.sri, err)
w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)

return retErr
}
Expand Down Expand Up @@ -771,8 +766,7 @@ func (w *regionWorker) handleResolvedTs(

// evictAllRegions is used when gRPC stream meets error and re-establish, notify
// all existing regions to re-establish
func (w *regionWorker) evictAllRegions() error {
var err error
func (w *regionWorker) evictAllRegions() {
for _, states := range w.statesManager.states {
states.Range(func(_, value interface{}) bool {
state := value.(*regionFeedState)
Expand All @@ -792,14 +786,11 @@ func (w *regionWorker) evictAllRegions() error {
// since the context used in region worker will be cancelled after
// region worker exits, we must use the parent context to prevent
// regionErrorInfo loss.
err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrEventFeedAborted.FastGenByArgs(),
}, revokeToken)
return err == nil
errInfo := newRegionErrorInfo(state.sri, cerror.ErrEventFeedAborted.FastGenByArgs())
w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)
return true
})
}
return err
}

func getWorkerPoolSize() (size int) {
Expand Down

0 comments on commit e7bc9f5

Please sign in to comment.