Skip to content

Commit

Permalink
importinto: executor part of import from select (#50341)
Browse files Browse the repository at this point in the history
ref #49883
  • Loading branch information
D3Hunter committed Jan 17, 2024
1 parent bbbada0 commit 12d273e
Show file tree
Hide file tree
Showing 25 changed files with 1,195 additions and 242 deletions.
34 changes: 34 additions & 0 deletions br/pkg/lightning/backend/local/engine_mgr.go
Expand Up @@ -74,6 +74,10 @@ func newEngineManager(config BackendConfig, storeHelper StoreHelper, logger log.
}
}()

if err = prepareSortDir(config); err != nil {
return nil, err
}

keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{})
if config.DupeDetectEnabled {
duplicateDB, err = openDuplicateDB(config.LocalStoreDir)
Expand Down Expand Up @@ -525,6 +529,15 @@ func (em *engineManager) close() {
}
em.duplicateDB = nil
}

// if checkpoint is disabled, or we finish load all data successfully, then files in this
// dir will be useless, so we clean up this dir and all files in it.
if !em.CheckpointEnabled || common.IsEmptyDir(em.LocalStoreDir) {
err := os.RemoveAll(em.LocalStoreDir)
if err != nil {
em.logger.Warn("remove local db file failed", zap.Error(err))
}
}
}

func (em *engineManager) getExternalEngine(uuid uuid.UUID) (common.Engine, bool) {
Expand Down Expand Up @@ -566,3 +579,24 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
}
return pebble.Open(dbPath, opts)
}

func prepareSortDir(config BackendConfig) error {
shouldCreate := true
if config.CheckpointEnabled {
if info, err := os.Stat(config.LocalStoreDir); err != nil {
if !os.IsNotExist(err) {
return err
}
} else if info.IsDir() {
shouldCreate = false
}
}

if shouldCreate {
err := os.Mkdir(config.LocalStoreDir, 0o700)
if err != nil {
return common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir)
}
}
return nil
}
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/local/engine_mgr_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"io"
"os"
"path"
"sync"
"testing"

Expand All @@ -36,7 +37,7 @@ func getBackendConfig(t *testing.T) BackendConfig {
MemTableSize: config.DefaultEngineMemCacheSize,
MaxOpenFiles: 1000,
DisableAutomaticCompactions: true,
LocalStoreDir: t.TempDir(),
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
DupeDetectEnabled: false,
DuplicateDetectOpt: common.DupDetectOpt{},
WorkerConcurrency: 8,
Expand Down
54 changes: 27 additions & 27 deletions br/pkg/lightning/backend/local/local.go
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"math"
"net"
"os"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -517,24 +516,6 @@ func NewBackend(
return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
}

shouldCreate := true
if config.CheckpointEnabled {
if info, err := os.Stat(config.LocalStoreDir); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else if info.IsDir() {
shouldCreate = false
}
}

if shouldCreate {
err = os.Mkdir(config.LocalStoreDir, 0o700)
if err != nil {
return nil, common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir)
}
}

// The following copies tikv.NewTxnClient without creating yet another pdClient.
spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
if err != nil {
Expand Down Expand Up @@ -597,6 +578,27 @@ func NewBackend(
return local, nil
}

// NewBackendForTest creates a new Backend for test.
func NewBackendForTest(ctx context.Context, config BackendConfig, storeHelper StoreHelper) (*Backend, error) {
config.adjust()

logger := log.FromContext(ctx)
engineMgr, err := newEngineManager(config, storeHelper, logger)
if err != nil {
return nil, err
}
local := &Backend{
BackendConfig: config,
logger: logger,
engineMgr: engineMgr,
}
if m, ok := metric.GetCommonMetric(ctx); ok {
local.metrics = m
}

return local, nil
}

// TotalMemoryConsume returns the total memory usage of the local backend.
func (local *Backend) TotalMemoryConsume() int64 {
return local.engineMgr.totalMemoryConsume()
Expand Down Expand Up @@ -672,14 +674,6 @@ func (local *Backend) Close() {
local.engineMgr.close()
local.importClientFactory.Close()

// if checkpoint is disabled, or we finish load all data successfully, then files in this
// dir will be useless, so we clean up this dir and all files in it.
if !local.CheckpointEnabled || common.IsEmptyDir(local.LocalStoreDir) {
err := os.RemoveAll(local.LocalStoreDir)
if err != nil {
local.logger.Warn("remove local db file failed", zap.Error(err))
}
}
_ = local.tikvCli.Close()
local.pdHTTPCli.Close()
local.pdCli.Close()
Expand Down Expand Up @@ -1592,6 +1586,12 @@ func (local *Backend) GetTiKVCodec() tikvclient.Codec {
return local.tikvCodec
}

// CloseEngineMgr close the engine manager.
// This function is used for test.
func (local *Backend) CloseEngineMgr() {
local.engineMgr.close()
}

var getSplitConfFromStoreFunc = getSplitConfFromStore

// return region split size, region split keys, error
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"math"
"math/rand"
"path"
"path/filepath"
"sort"
"strings"
Expand Down Expand Up @@ -1258,6 +1259,7 @@ func TestCheckPeersBusy(t *testing.T) {
supportMultiIngest: true,
BackendConfig: BackendConfig{
ShouldCheckWriteStall: true,
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
tikvCodec: keyspace.CodecV1,
}
Expand Down Expand Up @@ -1380,6 +1382,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
supportMultiIngest: true,
BackendConfig: BackendConfig{
ShouldCheckWriteStall: true,
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
tikvCodec: keyspace.CodecV1,
}
Expand Down Expand Up @@ -1477,6 +1480,9 @@ func TestPartialWriteIngestErrorWontPanic(t *testing.T) {
writeLimiter: noopStoreWriteLimiter{},
supportMultiIngest: true,
tikvCodec: keyspace.CodecV1,
BackendConfig: BackendConfig{
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
}
var err error
local.engineMgr, err = newEngineManager(local.BackendConfig, local, local.logger)
Expand Down Expand Up @@ -1570,6 +1576,9 @@ func TestPartialWriteIngestBusy(t *testing.T) {
writeLimiter: noopStoreWriteLimiter{},
supportMultiIngest: true,
tikvCodec: keyspace.CodecV1,
BackendConfig: BackendConfig{
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
}
var err error
local.engineMgr, err = newEngineManager(local.BackendConfig, local, local.logger)
Expand Down Expand Up @@ -2322,6 +2331,7 @@ func TestExternalEngine(t *testing.T) {
local := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 2,
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
splitCli: initTestSplitClient([][]byte{
keys[0], keys[50], endKey,
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Expand Up @@ -240,6 +240,7 @@ go_library(
"@com_github_burntsushi_toml//:toml",
"@com_github_docker_go_units//:go-units",
"@com_github_gogo_protobuf//proto",
"@com_github_google_uuid//:uuid",
"@com_github_ngaut_pools//:pools",
"@com_github_opentracing_basictracer_go//:basictracer-go",
"@com_github_opentracing_opentracing_go//:opentracing-go",
Expand Down
106 changes: 104 additions & 2 deletions pkg/executor/import_into.go
Expand Up @@ -16,8 +16,10 @@ package executor

import (
"context"
"fmt"
"sync/atomic"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/log"
Expand Down Expand Up @@ -105,6 +107,11 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
e.importPlan = importPlan
e.controller = controller

if e.selectExec != nil {
// `import from select` doesn't return rows, so no need to set dataFilled.
return e.importFromSelect(ctx)
}

if err2 := e.controller.InitDataFiles(ctx); err2 != nil {
return err2
}
Expand Down Expand Up @@ -237,12 +244,107 @@ func (e *ImportIntoExec) doImport(ctx context.Context, se sessionctx.Context, di
// use background, since ctx is canceled already.
return cancelAndWaitImportJob(context.Background(), taskManager, distImporter.JobID())
}
if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, distImporter.Result(ctx)); err2 != nil {
importResult := distImporter.Result(ctx)
if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, &importResult); err2 != nil {
logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2))
}
return err
}

func (e *ImportIntoExec) importFromSelect(ctx context.Context) error {
e.dataFilled = true
// must use a new session to pre-check, else the stmt in show processlist will be changed.
newSCtx, err2 := CreateSession(e.userSctx)
if err2 != nil {
return err2
}
defer CloseSession(newSCtx)

sqlExec := newSCtx.(sqlexec.SQLExecutor)
if err2 = e.controller.CheckRequirements(ctx, sqlExec); err2 != nil {
return err2
}
if err := e.importPlan.InitTiKVConfigs(ctx, newSCtx); err != nil {
return err
}

// TODO: we didn't use this `group` here, but have to init GroupCtx, refactor this later.
group, groupCtx := errgroup.WithContext(ctx)
param := &importer.JobImportParam{
Job: &importer.Job{},
Group: group,
GroupCtx: groupCtx,
Done: make(chan struct{}),
Progress: importer.NewProgress(),
}
importID := uuid.New().String()
logutil.Logger(ctx).Info("importing data from select statement",
zap.String("import-id", importID), zap.Int("concurrency", e.controller.ThreadCnt),
zap.String("target-table", e.controller.FullTableName()),
zap.Int64("target-table-id", e.controller.TableInfo.ID))
ti, err2 := importer.NewTableImporter(param, e.controller, importID)
if err2 != nil {
return err2
}
defer func() {
if err := ti.Close(); err != nil {
logutil.Logger(ctx).Error("close importer failed", zap.Error(err))
}
}()
selectedRowCh := make(chan importer.QueryRow)
ti.SetSelectedRowCh(selectedRowCh)

var importResult *importer.JobImportResult
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
var err error
importResult, err = ti.ImportSelectedRows(egCtx, newSCtx)
return err
})
eg.Go(func() error {
defer close(selectedRowCh)
fields := exec.RetTypes(e.selectExec)
var idAllocator int64
for {
// rows will be consumed concurrently, we cannot use chunk pool in session ctx.
chk := exec.NewFirstChunk(e.selectExec)
iter := chunk.NewIterator4Chunk(chk)
err := exec.Next(egCtx, e.selectExec, chk)
if err != nil {
return err
}
if chk.NumRows() == 0 {
break
}
for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() {
idAllocator++
select {
case selectedRowCh <- importer.QueryRow{
ID: idAllocator,
Data: innerChunkRow.GetDatumRow(fields),
}:
case <-egCtx.Done():
return egCtx.Err()
}
}
}
return nil
})
if err := eg.Wait(); err != nil {
return err
}

if err2 = flushStats(ctx, e.userSctx, e.importPlan.TableInfo.ID, importResult); err2 != nil {
logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2))
}

stmtCtx := e.userSctx.GetSessionVars().StmtCtx
stmtCtx.SetAffectedRows(importResult.Affected)
// TODO: change it after spec is ready.
stmtCtx.SetMessage(fmt.Sprintf("Records: %d, ID: %s", importResult.Affected, importID))
return nil
}

// ImportIntoActionExec represents a import into action executor.
type ImportIntoActionExec struct {
exec.BaseExecutor
Expand Down Expand Up @@ -297,7 +399,7 @@ func (e *ImportIntoActionExec) checkPrivilegeAndStatus(ctx context.Context, mana
}

// flushStats flushes the stats of the table.
func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result importer.JobImportResult) error {
func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result *importer.JobImportResult) error {
if err := sessiontxn.NewTxn(ctx, se); err != nil {
return err
}
Expand Down

0 comments on commit 12d273e

Please sign in to comment.