Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv(ticdc): reduce eventfeed rate limited log #4072

Merged
merged 10 commits into from
Dec 28, 2021
107 changes: 59 additions & 48 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ const (
// frequency of creating new goroutine.
defaultRegionChanSize = 128

// initial size for region rate limit queue
// initial size for region rate limit queue.
defaultRegionRateLimitQueueSize = 128
// Interval of check region retry rate limit queue.
defaultCheckRegionRateLimitInterval = 50 * time.Millisecond
// Duration of warning region retry rate limited too long.
defaultLogRegionRateLimitDuration = 10 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -135,6 +139,33 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan,
type regionErrorInfo struct {
singleRegionInfo
err error

retryLimitTime *time.Time
logRateLimitDuration time.Duration
}

func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo {
return regionErrorInfo{
singleRegionInfo: info,
err: err,

logRateLimitDuration: defaultLogRegionRateLimitDuration,
}
}

func (r *regionErrorInfo) logRateLimitedHint() bool {
now := time.Now()
if r.retryLimitTime == nil {
// Caller should log on the first rate limited.
r.retryLimitTime = &now
return true
}
if now.Sub(*r.retryLimitTime) > r.logRateLimitDuration {
// Caller should log if it lasts too long.
r.retryLimitTime = &now
return true
}
return false
}

type regionFeedState struct {
Expand Down Expand Up @@ -508,27 +539,35 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
tableID, tableName := util.TableIDFromCtx(ctx)
cfID := util.ChangefeedIDFromCtx(ctx)
g.Go(func() error {
checkRateLimitInterval := 50 * time.Millisecond
timer := time.NewTimer(checkRateLimitInterval)
timer := time.NewTimer(defaultCheckRegionRateLimitInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
s.handleRateLimit(ctx)
timer.Reset(checkRateLimitInterval)
timer.Reset(defaultCheckRegionRateLimitInterval)
case errInfo := <-s.errCh:
s.errChSizeGauge.Dec()
allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID())
if !allowed {
if errInfo.logRateLimitedHint() {
zapFieldAddr := zap.Skip()
if errInfo.singleRegionInfo.rpcCtx != nil {
// rpcCtx may be nil if we fails to get region info
// from pd. It could cause by pd down or the region
// has been merged.
zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr)
}
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zapFieldAddr)
}
// rate limit triggers, add the error info to the rate limit queue.
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr))
s.rateLimitQueue = append(s.rateLimitQueue, errInfo)
} else {
err := s.handleError(ctx, errInfo)
Expand Down Expand Up @@ -621,14 +660,13 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// onRegionFail handles a region's failure, which means, unlock the region's range and send the error to the errCh for
// error handling. This function is non blocking even if error channel is full.
// CAUTION: Note that this should only be called in a context that the region has locked it's range.
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) error {
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) {
log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err))
s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts)
if revokeToken {
s.regionRouter.Release(errorInfo.rpcCtx.Addr)
}
s.enqueueError(ctx, errorInfo)
return nil
}

// requestRegionToStore gets singleRegionInfo from regionRouter, which is a token
Expand Down Expand Up @@ -722,13 +760,8 @@ func (s *eventFeedSession) requestRegionToStore(
}
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err)
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &connectToStoreErr{},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &connectToStoreErr{})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
continue
}
s.addStream(rpcCtx.Addr, stream, streamCancel)
Expand Down Expand Up @@ -782,15 +815,8 @@ func (s *eventFeedSession) requestRegionToStore(
continue
}

// Wait for a while and retry sending the request
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not need sleep in the new change ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be rate-limited in onRegionFail.

err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &sendRequestToStoreErr{},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
} else {
s.regionRouter.Acquire(rpcCtx.Addr)
}
Expand Down Expand Up @@ -851,15 +877,8 @@ func (s *eventFeedSession) dispatchRequest(
log.Info("cannot get rpcCtx, retry span",
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span))
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &rpcCtxUnavailableErr{
verID: sri.verID,
},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
continue
}
sri.rpcCtx = rpcCtx
Expand Down Expand Up @@ -1077,14 +1096,8 @@ func (s *eventFeedSession) receiveFromStream(

remainingRegions := pendingRegions.takeAll()
for _, state := range remainingRegions {
err := s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(),
}, true /* revokeToken */)
if err != nil {
// The only possible is that the ctx is cancelled. Simply return.
return
}
errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs())
s.onRegionFail(ctx, errInfo, true /* revokeToken */)
}
}()

Expand All @@ -1096,9 +1109,7 @@ func (s *eventFeedSession) receiveFromStream(
// to call exactly once from outter code logic
worker := newRegionWorker(s, addr)

defer func() {
worker.evictAllRegions() //nolint:errcheck
}()
defer worker.evictAllRegions()

g.Go(func() error {
return worker.run(ctx)
Expand Down
17 changes: 17 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -3610,3 +3611,19 @@ func (s *clientSuite) TestHandleRateLimit(c *check.C) {
c.Assert(session.rateLimitQueue, check.HasLen, 0)
c.Assert(cap(session.rateLimitQueue), check.Equals, 128)
}

func TestRegionErrorInfoLogRateLimitedHint(t *testing.T) {
t.Parallel()

errInfo := newRegionErrorInfo(singleRegionInfo{}, nil)
errInfo.logRateLimitDuration = time.Second

// True on the first rate limited.
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())
Comment on lines +3622 to +3623
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it happen L3623 executes 200ms after L3622 executes, and logRateLimitedHint returns true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, although I think it's very rare.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.Parallel() makes it possible


// True if it lasts too long.
time.Sleep(2 * errInfo.logRateLimitDuration)
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())
}
21 changes: 6 additions & 15 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,8 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState
revokeToken := !state.initialized
// since the context used in region worker will be cancelled after region
// worker exits, we must use the parent context to prevent regionErrorInfo loss.
err2 := w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: err,
}, revokeToken)
if err2 != nil {
return err2
}
errInfo := newRegionErrorInfo(state.sri, err)
w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)

return retErr
}
Expand Down Expand Up @@ -771,8 +766,7 @@ func (w *regionWorker) handleResolvedTs(

// evictAllRegions is used when gRPC stream meets error and re-establish, notify
// all existing regions to re-establish
func (w *regionWorker) evictAllRegions() error {
var err error
func (w *regionWorker) evictAllRegions() {
for _, states := range w.statesManager.states {
states.Range(func(_, value interface{}) bool {
state := value.(*regionFeedState)
Expand All @@ -792,14 +786,11 @@ func (w *regionWorker) evictAllRegions() error {
// since the context used in region worker will be cancelled after
// region worker exits, we must use the parent context to prevent
// regionErrorInfo loss.
err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrEventFeedAborted.FastGenByArgs(),
}, revokeToken)
return err == nil
errInfo := newRegionErrorInfo(state.sri, cerror.ErrEventFeedAborted.FastGenByArgs())
w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)
return true
})
}
return err
}

func getWorkerPoolSize() (size int) {
Expand Down