Skip to content

Commit

Permalink
Merge branch 'master' into shard-rand
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Dec 7, 2022
2 parents 49f23ec + 2ea253e commit f493bd4
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 10 deletions.
1 change: 1 addition & 0 deletions br/pkg/streamhelper/BUILD.bazel
Expand Up @@ -71,6 +71,7 @@ go_test(
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/storage",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/spans",
"//br/pkg/utils",
"//kv",
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/streamhelper/advancer.go
Expand Up @@ -382,15 +382,17 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
log.Debug("No tasks yet, skipping advancing.")
return nil
}
cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()

threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(ctx); err != nil {
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(ctx, func(ctx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(ctx, threshold)
err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
return err
Expand Down
36 changes: 36 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Expand Up @@ -9,8 +9,11 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -219,3 +222,36 @@ func TestTaskRangesWithSplit(t *testing.T) {
shouldFinishInTime(t, 10*time.Second, "second advancing", func() { require.NoError(t, adv.OnTick(ctx)) })
require.Greater(t, env.getCheckpoint(), fstCheckpoint)
}

func TestBlocked(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
ctx := context.Background()
req := require.New(t)
c.splitAndScatter("0012", "0034", "0048")
marked := false
for _, s := range c.stores {
s.clientMu.Lock()
s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error {
// blocking the thread.
// this may happen when TiKV goes down or too busy.
<-(chan struct{})(nil)
return nil
}
s.clientMu.Unlock()
marked = true
}
req.True(marked, "failed to mark the cluster: ")
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.UpdateConfigWith(func(c *config.Config) {
// ... So the tick timeout would be 100ms
c.TickDuration = 10 * time.Millisecond
})
var err error
shouldFinishInTime(t, time.Second, "ticking", func() {
err = adv.OnTick(ctx)
})
req.ErrorIs(errors.Cause(err), context.DeadlineExceeded)
}
15 changes: 11 additions & 4 deletions br/pkg/streamhelper/basic_lib_for_test.go
Expand Up @@ -77,10 +77,11 @@ type fakeStore struct {
id uint64
regions map[uint64]*region

clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
onGetRegionCheckpoint func(*logbackup.GetLastFlushTSOfRegionRequest) error
}

type fakeCluster struct {
Expand Down Expand Up @@ -184,6 +185,12 @@ func (f *fakeStore) SetSupportFlushSub(b bool) {
}

func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.GetLastFlushTSOfRegionRequest, opts ...grpc.CallOption) (*logbackup.GetLastFlushTSOfRegionResponse, error) {
if f.onGetRegionCheckpoint != nil {
err := f.onGetRegionCheckpoint(in)
if err != nil {
return nil, err
}
}
resp := &logbackup.GetLastFlushTSOfRegionResponse{
Checkpoints: []*logbackup.RegionCheckpoint{},
}
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/streamhelper/config/advancer_conf.go
Expand Up @@ -78,3 +78,9 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration {
func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration {
return conf.TryAdvanceThreshold / 5
}

// TickTimeout returns the max duration for each tick.
func (conf Config) TickTimeout() time.Duration {
// If a tick blocks 10x the interval of ticking, we may need to break it and retry.
return 10 * conf.TickDuration
}
2 changes: 1 addition & 1 deletion br/tests/lightning_fail_fast/run.sh
Expand Up @@ -25,7 +25,7 @@ for CFG in chunk engine; do
! run_lightning --backend tidb --enable-checkpoint=0 --log-file "$TEST_DIR/lightning-tidb.log" --config "tests/$TEST_NAME/$CFG.toml"
[ $? -eq 0 ]

tail -n 10 $TEST_DIR/lightning-tidb.log | grep "ERROR" | tail -n 1 | grep -Fq "Error 1062: Duplicate entry '1-1' for key 'tb.uq'"
tail -n 10 $TEST_DIR/lightning-tidb.log | grep "ERROR" | tail -n 1 | grep -Fq "Error 1062 (23000): Duplicate entry '1-1' for key 'tb.uq'"

! grep -Fq "restore file completed" $TEST_DIR/lightning-tidb.log
[ $? -eq 0 ]
Expand Down
6 changes: 6 additions & 0 deletions ddl/backfilling.go
Expand Up @@ -467,6 +467,12 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
zap.String("task failed error", err.Error()),
zap.String("take time", elapsedTime.String()),
zap.NamedError("updateHandleError", err1))
failpoint.Inject("MockGetIndexRecordErr", func() {
// Make sure this job didn't failed because by the "Write conflict" error.
if dbterror.ErrNotOwner.Equal(err) {
time.Sleep(50 * time.Millisecond)
}
})
return errors.Trace(err)
}

Expand Down
3 changes: 2 additions & 1 deletion ddl/column.go
Expand Up @@ -834,7 +834,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
// If timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
}
if kv.IsTxnRetryableError(err) {
if kv.IsTxnRetryableError(err) || dbterror.ErrNotOwner.Equal(err) {
return false, ver, errors.Trace(err)
}
if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
Expand Down Expand Up @@ -1124,6 +1124,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
// Then the handle range of the rest elements' is [originalStartHandle, originalEndHandle].
if i == startElementOffsetToResetHandle+1 {
reorgInfo.StartKey, reorgInfo.EndKey = originalStartHandle, originalEndHandle
w.getReorgCtx(reorgInfo.Job).setNextKey(reorgInfo.StartKey)
}

// Update the element in the reorgCtx to keep the atomic access for daemon-worker.
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Expand Up @@ -1256,7 +1256,7 @@ func (w *baseIndexWorker) getIndexRecord(idxInfo *model.IndexInfo, handle kv.Han
failpoint.Return(nil, errors.Trace(dbterror.ErrCantDecodeRecord.GenWithStackByArgs("index",
errors.New("mock can't decode record error"))))
case "modifyColumnNotOwnerErr":
if idxInfo.Name.O == "_Idx$_idx" && handle.IntValue() == 7168 && atomic.CompareAndSwapUint32(&mockNotOwnerErrOnce, 0, 1) {
if idxInfo.Name.O == "_Idx$_idx_0" && handle.IntValue() == 7168 && atomic.CompareAndSwapUint32(&mockNotOwnerErrOnce, 0, 1) {
failpoint.Return(nil, errors.Trace(dbterror.ErrNotOwner))
}
case "addIdxNotOwnerErr":
Expand Down
5 changes: 5 additions & 0 deletions ddl/modify_column_test.go
Expand Up @@ -50,6 +50,11 @@ func batchInsert(tk *testkit.TestKit, tbl string, start, end int) {
func TestModifyColumnReorgInfo(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

originalTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 10 * time.Millisecond
defer func() {
ddl.ReorgWaitTimeout = originalTimeout
}()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
Expand Down

0 comments on commit f493bd4

Please sign in to comment.