Skip to content

Commit

Permalink
import into: switch tikv mode during import (pingcap#44067)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed May 26, 2023
1 parent b95806b commit 5cc1c3b
Show file tree
Hide file tree
Showing 19 changed files with 240 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ mock_s3iface:
mock_lightning:
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage > br/pkg/mock/mocklocal/local.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
22 changes: 14 additions & 8 deletions br/pkg/lightning/backend/local/tikv_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,38 @@ import (
)

// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
type TiKVModeSwitcher struct {
type TiKVModeSwitcher interface {
// ToImportMode switches all TiKV nodes to Import mode.
ToImportMode(ctx context.Context)
// ToNormalMode switches all TiKV nodes to Normal mode.
ToNormalMode(ctx context.Context)
}

// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
type switcher struct {
tls *common.TLS
pdAddr string
logger *zap.Logger
}

// NewTiKVModeSwitcher creates a new TiKVModeSwitcher.
func NewTiKVModeSwitcher(tls *common.TLS, pdAddr string, logger *zap.Logger) *TiKVModeSwitcher {
return &TiKVModeSwitcher{
func NewTiKVModeSwitcher(tls *common.TLS, pdAddr string, logger *zap.Logger) TiKVModeSwitcher {
return &switcher{
tls: tls,
pdAddr: pdAddr,
logger: logger,
}
}

// ToImportMode switches all TiKV nodes to Import mode.
func (rc *TiKVModeSwitcher) ToImportMode(ctx context.Context) {
func (rc *switcher) ToImportMode(ctx context.Context) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import)
}

// ToNormalMode switches all TiKV nodes to Normal mode.
func (rc *TiKVModeSwitcher) ToNormalMode(ctx context.Context) {
func (rc *switcher) ToNormalMode(ctx context.Context) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal)
}

func (rc *TiKVModeSwitcher) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) {
func (rc *switcher) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) {
rc.logger.Info("switch tikv mode", zap.Stringer("mode", mode))

// It is fine if we miss some stores which did not switch to Import mode,
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ const (

defaultCSVDataCharacterSet = "binary"
defaultCSVDataInvalidCharReplace = utf8.RuneError

DefaultSwitchTiKVModeInterval = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -929,7 +931,7 @@ func NewConfig() *Config {
ChecksumTableConcurrency: defaultChecksumTableConcurrency,
},
Cron: Cron{
SwitchMode: Duration{Duration: 5 * time.Minute},
SwitchMode: Duration{Duration: DefaultSwitchTiKVModeInterval},
LogProgress: Duration{Duration: 5 * time.Minute},
CheckDiskQuota: Duration{Duration: 1 * time.Minute},
},
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ type Controller struct {
preInfoGetter PreImportInfoGetter
precheckItemBuilder *PrecheckItemBuilder
encBuilder encode.EncodingBuilder
tikvModeSwitcher *local.TiKVModeSwitcher
tikvModeSwitcher local.TiKVModeSwitcher

keyspaceName string
}
Expand Down Expand Up @@ -1212,6 +1212,7 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s

case <-switchModeChan:
// periodically switch to import mode, as requested by TiKV 3.0
// TiKV will switch back to normal mode if we didn't call this again within 10 minutes
rc.tikvModeSwitcher.ToImportMode(ctx)

case <-logProgressChan:
Expand Down
50 changes: 49 additions & 1 deletion br/pkg/mock/mocklocal/local.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ddl/disttask_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func NewLitBackfillFlowHandle(d DDL) dispatcher.TaskFlowHandle {
}
}

func (*litBackfillFlowHandle) OnTicker(_ context.Context, _ *proto.Task) {
}

// ProcessNormalFlow processes the normal flow.
func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
var globalTaskMeta BackfillGlobalMeta
Expand Down
4 changes: 3 additions & 1 deletion disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (d *dispatcher) DispatchTaskLoop() {
if d.isRunningGTask(gTask.ID) {
continue
}
// owner changed
if gTask.State == proto.TaskStateRunning || gTask.State == proto.TaskStateReverting || gTask.State == proto.TaskStateCancelling {
d.setRunningGTask(gTask)
cnt++
Expand Down Expand Up @@ -273,8 +274,9 @@ func (d *dispatcher) detectTask(gTask *proto.Task) {
case <-ticker.C:
// TODO: Consider actively obtaining information about task completion.
stepIsFinished, errStr := d.probeTask(gTask)
// The global task isn't finished and failed.
// The global task isn't finished and not failed.
if !stepIsFinished && len(errStr) == 0 {
GetTaskFlowHandle(gTask.Type).OnTicker(d.ctx, gTask)
logutil.BgLogger().Debug("detect task, this task keeps current state",
zap.Int64("taskID", gTask.ID), zap.String("state", gTask.State))
break
Expand Down
3 changes: 3 additions & 0 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ type NumberExampleHandle struct{}

var _ dispatcher.TaskFlowHandle = (*NumberExampleHandle)(nil)

func (NumberExampleHandle) OnTicker(_ context.Context, _ *proto.Task) {
}

func (n NumberExampleHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State == proto.TaskStatePending {
gTask.Step = proto.StepInit
Expand Down
4 changes: 4 additions & 0 deletions disttask/framework/dispatcher/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (

// TaskFlowHandle is used to control the process operations for each global task.
type TaskFlowHandle interface {
// OnTicker is used to handle the ticker event, if business impl need to do some periodical work, you can
// do it here, but don't do too much work here, because the ticker interval is small, and it will block
// the event is generated every checkTaskRunningInterval, and only when the task NOT FINISHED and NO ERROR.
OnTicker(ctx context.Context, gTask *proto.Task)
ProcessNormalFlow(ctx context.Context, h TaskHandle, gTask *proto.Task) (subtaskMetas [][]byte, err error)
ProcessErrFlow(ctx context.Context, h TaskHandle, gTask *proto.Task, receiveErr [][]byte) (subtaskMeta []byte, err error)
// GetEligibleInstances is used to get the eligible instances for the global task.
Expand Down
3 changes: 3 additions & 0 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type testFlowHandle struct{}

var _ dispatcher.TaskFlowHandle = (*testFlowHandle)(nil)

func (*testFlowHandle) OnTicker(_ context.Context, _ *proto.Task) {
}

func (*testFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State == proto.TaskStatePending {
gTask.Step = proto.StepOne
Expand Down
2 changes: 2 additions & 0 deletions disttask/loaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ go_library(
"//util/logutil",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
Expand Down
66 changes: 54 additions & 12 deletions disttask/loaddata/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package loaddata
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand All @@ -29,16 +31,42 @@ import (
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// FlowHandle is the dispatcher for load data.
type FlowHandle struct{}
type flowHandle struct {
mu sync.RWMutex
// the last time we switch TiKV into IMPORT mode, this is a global operation, do it for one task makes
// no difference to do it for all tasks. So we do not need to record the switch time for each task.
lastSwitchTime atomic.Time
}

var _ dispatcher.TaskFlowHandle = (*flowHandle)(nil)

func (h *flowHandle) OnTicker(ctx context.Context, task *proto.Task) {
// only switch TiKV mode when task is running and reach the interval
if task.State != proto.TaskStateRunning || time.Since(h.lastSwitchTime.Load()) < config.DefaultSwitchTiKVModeInterval {
return
}

var _ dispatcher.TaskFlowHandle = (*FlowHandle)(nil)
h.mu.Lock()
defer h.mu.Unlock()
if time.Since(h.lastSwitchTime.Load()) < config.DefaultSwitchTiKVModeInterval {
return
}

// ProcessNormalFlow implements dispatcher.TaskFlowHandle interface.
func (*FlowHandle) ProcessNormalFlow(ctx context.Context, handle dispatcher.TaskHandle, gTask *proto.Task) ([][]byte, error) {
logger := logutil.BgLogger().With(zap.Int64("task_id", task.ID))
switcher, err := importer.GetTiKVModeSwitcher(logger)
if err != nil {
logger.Warn("get tikv mode switcher failed", zap.Error(err))
return
}
switcher.ToImportMode(ctx)
h.lastSwitchTime.Store(time.Now())
}

func (h *flowHandle) ProcessNormalFlow(ctx context.Context, handle dispatcher.TaskHandle, gTask *proto.Task) ([][]byte, error) {
logger := logutil.BgLogger().With(zap.String("component", "dispatcher"), zap.String("type", gTask.Type), zap.Int64("ID", gTask.ID))
taskMeta := &TaskMeta{}
err := json.Unmarshal(gTask.Meta, taskMeta)
Expand All @@ -49,6 +77,7 @@ func (*FlowHandle) ProcessNormalFlow(ctx context.Context, handle dispatcher.Task

switch gTask.Step {
case Import:
h.switchTiKV2NormalMode(ctx, logutil.BgLogger())
if err := postProcess(ctx, handle, gTask, logger); err != nil {
return nil, err
}
Expand Down Expand Up @@ -78,16 +107,15 @@ func (*FlowHandle) ProcessNormalFlow(ctx context.Context, handle dispatcher.Task
return metaBytes, nil
}

// ProcessErrFlow implements dispatcher.ProcessErrFlow interface.
func (*FlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task, receiveErr [][]byte) ([]byte, error) {
func (h *flowHandle) ProcessErrFlow(ctx context.Context, _ dispatcher.TaskHandle, gTask *proto.Task, receiveErr [][]byte) ([]byte, error) {
logger := logutil.BgLogger().With(zap.String("component", "dispatcher"), zap.String("type", gTask.Type), zap.Int64("ID", gTask.ID))
logger.Info("process error flow", zap.ByteStrings("error message", receiveErr))
h.switchTiKV2NormalMode(ctx, logger)
gTask.Error = receiveErr[0]
return nil, nil
}

// GetEligibleInstances implements dispatcher.TaskFlowHandle interface.
func (*FlowHandle) GetEligibleInstances(ctx context.Context, gTask *proto.Task) ([]*infosync.ServerInfo, error) {
func (*flowHandle) GetEligibleInstances(ctx context.Context, gTask *proto.Task) ([]*infosync.ServerInfo, error) {
taskMeta := &TaskMeta{}
err := json.Unmarshal(gTask.Meta, taskMeta)
if err != nil {
Expand All @@ -99,12 +127,26 @@ func (*FlowHandle) GetEligibleInstances(ctx context.Context, gTask *proto.Task)
return dispatcher.GenerateSchedulerNodes(ctx)
}

// IsRetryableErr implements dispatcher.IsRetryableErr interface.
func (*FlowHandle) IsRetryableErr(error) bool {
func (*flowHandle) IsRetryableErr(error) bool {
// TODO: check whether the error is retryable.
return false
}

func (h *flowHandle) switchTiKV2NormalMode(ctx context.Context, logger *zap.Logger) {
h.mu.Lock()
defer h.mu.Unlock()

switcher, err := importer.GetTiKVModeSwitcher(logger)
if err != nil {
logger.Warn("get tikv mode switcher failed", zap.Error(err))
return
}
switcher.ToNormalMode(ctx)

// clear it, so next task can switch TiKV mode again.
h.lastSwitchTime.Store(time.Time{})
}

// postProcess does the post processing for the task.
func postProcess(ctx context.Context, handle dispatcher.TaskHandle, gTask *proto.Task, logger *zap.Logger) error {
taskMeta := &TaskMeta{}
Expand Down Expand Up @@ -237,5 +279,5 @@ func generateSubtaskMetas(ctx context.Context, taskMeta *TaskMeta) (subtaskMetas
}

func init() {
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &FlowHandle{})
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &flowHandle{})
}
2 changes: 1 addition & 1 deletion disttask/loaddata/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *loadDataSuite) TestFlowHandleGetEligibleInstances() {
}
mockedAllServerInfos := makeFailpointRes(serverInfoMap)

h := FlowHandle{}
h := flowHandle{}
gTask := &proto.Task{Meta: []byte("{}")}
s.enableFailPoint("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", mockedAllServerInfos)
eligibleInstances, err := h.GetEligibleInstances(context.Background(), gTask)
Expand Down
5 changes: 0 additions & 5 deletions disttask/loaddata/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package loaddata
import (
"encoding/json"
"fmt"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
Expand All @@ -31,10 +30,6 @@ import (
"go.uber.org/zap"
)

var (
checkTaskFinishInterval = 300 * time.Millisecond
)

// DistImporter is a JobImporter for distributed load data.
type DistImporter struct {
*importer.JobImportParam
Expand Down
1 change: 1 addition & 0 deletions disttask/loaddata/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

// TaskStep of LoadData.
const (
// Import we sort source data and ingest it into TiKV in this step.
Import int64 = 1
)

Expand Down

0 comments on commit 5cc1c3b

Please sign in to comment.