Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv(ticdc): reduce eventfeed rate limited log #4072

Merged
merged 10 commits into from
Dec 28, 2021
87 changes: 57 additions & 30 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ const (
// frequency of creating new goroutine.
defaultRegionChanSize = 128

// initial size for region rate limit queue
// initial size for region rate limit queue.
defaultRegionRateLimitQueueSize = 128
// Interval of check region retry rate limit queue.
defaultCheckRegionRateLimitInterval = 50 * time.Millisecond
// Duration of warning region retry rate limited too long.
defaultLogRegionRateLimitDuration = 10 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -135,6 +139,33 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan,
type regionErrorInfo struct {
singleRegionInfo
err error

retryLimitTime *time.Time
logRateLimitDuration time.Duration
}

func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo {
return regionErrorInfo{
singleRegionInfo: info,
err: err,

logRateLimitDuration: defaultLogRegionRateLimitDuration,
}
}

func (r *regionErrorInfo) logRateLimitedHint() bool {
now := time.Now()
if r.retryLimitTime == nil {
// Caller should log on the first rate limited.
r.retryLimitTime = &now
return true
}
if now.Sub(*r.retryLimitTime) > r.logRateLimitDuration {
// Caller should log if it lasts too long.
r.retryLimitTime = &now
return true
}
return false
}

type regionFeedState struct {
Expand Down Expand Up @@ -508,27 +539,35 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
tableID, tableName := util.TableIDFromCtx(ctx)
cfID := util.ChangefeedIDFromCtx(ctx)
g.Go(func() error {
checkRateLimitInterval := 50 * time.Millisecond
timer := time.NewTimer(checkRateLimitInterval)
timer := time.NewTimer(defaultCheckRegionRateLimitInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
s.handleRateLimit(ctx)
timer.Reset(checkRateLimitInterval)
timer.Reset(defaultCheckRegionRateLimitInterval)
case errInfo := <-s.errCh:
s.errChSizeGauge.Dec()
allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID())
if !allowed {
if errInfo.logRateLimitedHint() {
zapFieldAddr := zap.Skip()
if errInfo.singleRegionInfo.rpcCtx != nil {
// rpcCtx may be nil if we fails to get region info
// from pd. It could cause by pd down or the region
// has been merged.
zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr)
}
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zapFieldAddr)
}
// rate limit triggers, add the error info to the rate limit queue.
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr))
s.rateLimitQueue = append(s.rateLimitQueue, errInfo)
} else {
err := s.handleError(ctx, errInfo)
Expand Down Expand Up @@ -722,10 +761,8 @@ func (s *eventFeedSession) requestRegionToStore(
}
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err)
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &connectToStoreErr{},
}, false /* revokeToken */)
errInfo := newRegionErrorInfo(sri, &connectToStoreErr{})
err = s.onRegionFail(ctx, errInfo, false /* revokeToken */)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -782,12 +819,8 @@ func (s *eventFeedSession) requestRegionToStore(
continue
}

// Wait for a while and retry sending the request
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not need sleep in the new change ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be rate-limited in onRegionFail.

err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &sendRequestToStoreErr{},
}, false /* revokeToken */)
errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{})
err = s.onRegionFail(ctx, errInfo, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -851,12 +884,8 @@ func (s *eventFeedSession) dispatchRequest(
log.Info("cannot get rpcCtx, retry span",
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span))
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &rpcCtxUnavailableErr{
verID: sri.verID,
},
}, false /* revokeToken */)
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
err = s.onRegionFail(ctx, errInfo, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1077,10 +1106,8 @@ func (s *eventFeedSession) receiveFromStream(

remainingRegions := pendingRegions.takeAll()
for _, state := range remainingRegions {
err := s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(),
}, true /* revokeToken */)
errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs())
err := s.onRegionFail(ctx, errInfo, true /* revokeToken */)
if err != nil {
// The only possible is that the ctx is cancelled. Simply return.
return
Expand Down
17 changes: 17 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -3610,3 +3611,19 @@ func (s *clientSuite) TestHandleRateLimit(c *check.C) {
c.Assert(session.rateLimitQueue, check.HasLen, 0)
c.Assert(cap(session.rateLimitQueue), check.Equals, 128)
}

func TestRegionErrorInfoLogRateLimitedHint(t *testing.T) {
t.Parallel()

errInfo := newRegionErrorInfo(singleRegionInfo{}, nil)
errInfo.logRateLimitDuration = 200 * time.Millisecond

// True on the first rate limited.
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())
Comment on lines +3622 to +3623
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it happen L3623 executes 200ms after L3622 executes, and logRateLimitedHint returns true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, although I think it's very rare.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.Parallel() makes it possible


// True if it lasts too long.
time.Sleep(2 * errInfo.logRateLimitDuration)
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())
}
12 changes: 4 additions & 8 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,8 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState
revokeToken := !state.initialized
// since the context used in region worker will be cancelled after region
// worker exits, we must use the parent context to prevent regionErrorInfo loss.
err2 := w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: err,
}, revokeToken)
errInfo := newRegionErrorInfo(state.sri, err)
err2 := w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)
if err2 != nil {
return err2
}
Expand Down Expand Up @@ -792,10 +790,8 @@ func (w *regionWorker) evictAllRegions() error {
// since the context used in region worker will be cancelled after
// region worker exits, we must use the parent context to prevent
// regionErrorInfo loss.
err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrEventFeedAborted.FastGenByArgs(),
}, revokeToken)
errInfo := newRegionErrorInfo(state.sri, cerror.ErrEventFeedAborted.FastGenByArgs())
err = w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)
return err == nil
})
}
Expand Down