From 8bad79fad529749214db667737f4f2e8ef132d7a Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 16 Oct 2023 22:34:58 +0800 Subject: [PATCH] importinto: revert "use one writer for each kv group for all concurrent encoder" (#47666) ref pingcap/tidb#46704 --- br/pkg/lightning/backend/external/writer.go | 7 - .../importinto/encode_and_sort_operator.go | 131 +++++++----------- .../encode_and_sort_operator_test.go | 11 +- pkg/disttask/importinto/scheduler.go | 2 +- pkg/executor/importer/BUILD.bazel | 5 +- pkg/executor/importer/chunk_process.go | 54 +------- pkg/executor/importer/chunk_process_test.go | 54 -------- 7 files changed, 65 insertions(+), 199 deletions(-) delete mode 100644 pkg/executor/importer/chunk_process_test.go diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index b3adefe53508b..fc324bb9ee482 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -488,9 +488,6 @@ func (w *Writer) createStorageWriter(ctx context.Context) ( // EngineWriter implements backend.EngineWriter interface. type EngineWriter struct { - // Only 1 writer is used for some kv group(data or some index), no matter - // how many routines are encoding data, so need to sync write to it. - sync.Mutex w *Writer } @@ -501,8 +498,6 @@ func NewEngineWriter(w *Writer) *EngineWriter { // AppendRows implements backend.EngineWriter interface. func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { - e.Lock() - defer e.Unlock() kvs := kv.Rows2KvPairs(rows) if len(kvs) == 0 { return nil @@ -524,7 +519,5 @@ func (e *EngineWriter) IsSynced() bool { // Close implements backend.EngineWriter interface. func (e *EngineWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { - e.Lock() - defer e.Unlock() return nil, e.w.Close(ctx) } diff --git a/pkg/disttask/importinto/encode_and_sort_operator.go b/pkg/disttask/importinto/encode_and_sort_operator.go index 61af716c657e5..8d5b36e93d613 100644 --- a/pkg/disttask/importinto/encode_and_sort_operator.go +++ b/pkg/disttask/importinto/encode_and_sort_operator.go @@ -20,7 +20,6 @@ import ( "strconv" "time" - "github.com/docker/go-units" "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend/external" @@ -64,9 +63,6 @@ type encodeAndSortOperator struct { sharedVars *SharedVars logger *zap.Logger errCh chan error - - dataWriter *external.EngineWriter - indexWriter *importer.IndexRouteWriter } var _ operator.Operator = (*encodeAndSortOperator)(nil) @@ -86,64 +82,18 @@ func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor, logger: executor.logger, errCh: make(chan error), } - - if op.tableImporter.IsGlobalSort() { - op.initWriters(executor, indexMemorySizeLimit) - } - pool := workerpool.NewWorkerPool( "encodeAndSortOperator", util.ImportInto, int(executor.taskMeta.Plan.ThreadCnt), func() workerpool.Worker[*importStepMinimalTask, workerpool.None] { - return newChunkWorker(subCtx, op) + return newChunkWorker(ctx, op, indexMemorySizeLimit) }, ) op.AsyncOperator = operator.NewAsyncOperator(subCtx, pool) return op } -// with current design of global sort writer, we only create one writer for -// each kv group, and all chunks shares the same writers. -// the writer itself will sort and upload data concurrently. -func (op *encodeAndSortOperator) initWriters(executor *importStepExecutor, indexMemorySizeLimit uint64) { - totalDataKVMemSizeLimit := external.DefaultMemSizeLimit * uint64(executor.taskMeta.Plan.ThreadCnt) - totalMemSizeLimitPerIndexWriter := indexMemorySizeLimit * uint64(executor.taskMeta.Plan.ThreadCnt) - op.logger.Info("init global sort writer with mem limit", - zap.String("data-limit", units.BytesSize(float64(totalDataKVMemSizeLimit))), - zap.String("per-index-limit", units.BytesSize(float64(totalMemSizeLimitPerIndexWriter)))) - - // in case on network partition, 2 nodes might run the same subtask. - // so use uuid to make sure the path is unique. - workerUUID := uuid.New().String() - // sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID} - indexWriterFn := func(indexID int64) *external.Writer { - builder := external.NewWriterBuilder(). - SetOnCloseFunc(func(summary *external.WriterSummary) { - op.sharedVars.mergeIndexSummary(indexID, summary) - }).SetMemorySizeLimit(totalMemSizeLimitPerIndexWriter). - SetMutex(&op.sharedVars.ShareMu) - prefix := subtaskPrefix(op.taskID, op.subtaskID) - // writer id for index: index/{indexID}/{workerID} - writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID) - writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) - return writer - } - - // sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID} - builder := external.NewWriterBuilder(). - SetOnCloseFunc(op.sharedVars.mergeDataSummary). - SetMemorySizeLimit(totalDataKVMemSizeLimit). - SetMutex(&op.sharedVars.ShareMu) - prefix := subtaskPrefix(op.taskID, op.subtaskID) - // writer id for data: data/{workerID} - writerID := path.Join("data", workerUUID) - writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) - - op.dataWriter = external.NewEngineWriter(writer) - op.indexWriter = importer.NewIndexRouteWriter(op.logger, indexWriterFn) -} - func (op *encodeAndSortOperator) Open() error { op.wg.Run(func() { for err := range op.errCh { @@ -164,33 +114,9 @@ func (op *encodeAndSortOperator) Close() error { // right now AsyncOperator.Close always returns nil, ok to ignore it. // nolint:errcheck op.AsyncOperator.Close() - - closeCtx := op.ctx - if closeCtx.Err() != nil { - op.logger.Info("context canceled when closing, create a new one with timeout", - zap.Duration("timeout", maxWaitDuration)) - // in case of context canceled, we need to create a new context to close writers. - newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration) - closeCtx = newCtx - defer cancel() - } - if op.dataWriter != nil { - // Note: we cannot ignore close error as we're writing to S3 or GCS. - // ignore error might cause data loss. below too. - if _, err := op.dataWriter.Close(closeCtx); err != nil { - op.onError(errors.Trace(err)) - } - } - if op.indexWriter != nil { - if _, err := op.indexWriter.Close(closeCtx); err != nil { - op.onError(errors.Trace(err)) - } - } - op.cancel() close(op.errCh) op.wg.Wait() - // see comments on interface definition, this Close is actually WaitAndClose. return op.firstErr.Load() } @@ -214,13 +140,43 @@ func (op *encodeAndSortOperator) Done() <-chan struct{} { type chunkWorker struct { ctx context.Context op *encodeAndSortOperator + + dataWriter *external.EngineWriter + indexWriter *importer.IndexRouteWriter } -func newChunkWorker(ctx context.Context, op *encodeAndSortOperator) *chunkWorker { +func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemorySizeLimit uint64) *chunkWorker { w := &chunkWorker{ ctx: ctx, op: op, } + if op.tableImporter.IsGlobalSort() { + // in case on network partition, 2 nodes might run the same subtask. + workerUUID := uuid.New().String() + // sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID} + indexWriterFn := func(indexID int64) *external.Writer { + builder := external.NewWriterBuilder(). + SetOnCloseFunc(func(summary *external.WriterSummary) { + op.sharedVars.mergeIndexSummary(indexID, summary) + }).SetMemorySizeLimit(indexMemorySizeLimit).SetMutex(&op.sharedVars.ShareMu) + prefix := subtaskPrefix(op.taskID, op.subtaskID) + // writer id for index: index/{indexID}/{workerID} + writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID) + writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) + return writer + } + + // sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID} + builder := external.NewWriterBuilder(). + SetOnCloseFunc(op.sharedVars.mergeDataSummary).SetMutex(&op.sharedVars.ShareMu) + prefix := subtaskPrefix(op.taskID, op.subtaskID) + // writer id for data: data/{workerID} + writerID := path.Join("data", workerUUID) + writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) + w.dataWriter = external.NewEngineWriter(writer) + + w.indexWriter = importer.NewIndexRouteWriter(op.logger, indexWriterFn) + } return w } @@ -231,12 +187,31 @@ func (w *chunkWorker) HandleTask(task *importStepMinimalTask, _ func(workerpool. // we don't use the input send function, it makes workflow more complex // we send result to errCh and handle it here. executor := newImportMinimalTaskExecutor(task) - if err := executor.Run(w.ctx, w.op.dataWriter, w.op.indexWriter); err != nil { + if err := executor.Run(w.ctx, w.dataWriter, w.indexWriter); err != nil { w.op.onError(err) } } -func (*chunkWorker) Close() { +func (w *chunkWorker) Close() { + closeCtx := w.ctx + if closeCtx.Err() != nil { + // in case of context canceled, we need to create a new context to close writers. + newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration) + closeCtx = newCtx + defer cancel() + } + if w.dataWriter != nil { + // Note: we cannot ignore close error as we're writing to S3 or GCS. + // ignore error might cause data loss. below too. + if _, err := w.dataWriter.Close(closeCtx); err != nil { + w.op.onError(errors.Trace(err)) + } + } + if w.indexWriter != nil { + if _, err := w.indexWriter.Close(closeCtx); err != nil { + w.op.onError(errors.Trace(err)) + } + } } func subtaskPrefix(taskID, subtaskID int64) string { diff --git a/pkg/disttask/importinto/encode_and_sort_operator_test.go b/pkg/disttask/importinto/encode_and_sort_operator_test.go index 2ed7967dc06e7..89b30de5ab48e 100644 --- a/pkg/disttask/importinto/encode_and_sort_operator_test.go +++ b/pkg/disttask/importinto/encode_and_sort_operator_test.go @@ -72,20 +72,15 @@ func TestEncodeAndSortOperator(t *testing.T) { tableImporter: &importer.TableImporter{ LoadDataController: &importer.LoadDataController{ Plan: &importer.Plan{ - CloudStorageURI: "s3://test-bucket/test-path", + CloudStorageURI: "", }, }, }, logger: logger, } - sharedVars := &SharedVars{ - SortedDataMeta: &external.SortedKVMeta{}, - SortedIndexMetas: map[int64]*external.SortedKVMeta{}, - } - source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op := newEncodeAndSortOperator(context.Background(), executorForParam, sharedVars, 3, 0) + op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3, 0) op.SetSource(source) require.NoError(t, op.Open()) require.Greater(t, len(op.String()), 0) @@ -105,7 +100,7 @@ func TestEncodeAndSortOperator(t *testing.T) { // cancel on error and log other errors mockErr2 := errors.New("mock err 2") source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op = newEncodeAndSortOperator(context.Background(), executorForParam, sharedVars, 2, 0) + op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2, 0) op.SetSource(source) executor1 := mock.NewMockMiniTaskExecutor(ctrl) executor2 := mock.NewMockMiniTaskExecutor(ctrl) diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index ce4e6d41f1785..dd219c6f09764 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -107,7 +107,7 @@ func (s *importStepExecutor) Init(ctx context.Context) error { }() } s.indexMemorySizeLimit = getWriterMemorySizeLimit(s.tableImporter.Plan) - s.logger.Info("memory size limit per index writer per concurrency", + s.logger.Info("index writer memory size limit", zap.String("limit", units.BytesSize(float64(s.indexMemorySizeLimit)))) return nil } diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index 470c2bedb051a..3b691ad85cbcb 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -79,7 +79,6 @@ go_test( name = "importer_test", timeout = "short", srcs = [ - "chunk_process_test.go", "chunk_process_testkit_test.go", "import_test.go", "job_test.go", @@ -89,11 +88,10 @@ go_test( embed = [":importer"], flaky = True, race = "on", - shard_count = 18, + shard_count = 17, deps = [ "//br/pkg/errors", "//br/pkg/lightning/backend/encode", - "//br/pkg/lightning/backend/external", "//br/pkg/lightning/checkpoints", "//br/pkg/lightning/config", "//br/pkg/lightning/log", @@ -113,7 +111,6 @@ go_test( "//pkg/sessionctx/variable", "//pkg/testkit", "//pkg/types", - "//pkg/util", "//pkg/util/dbterror/exeerrors", "//pkg/util/etcd", "//pkg/util/logutil", diff --git a/pkg/executor/importer/chunk_process.go b/pkg/executor/importer/chunk_process.go index cdb355fac0d5c..aa559475a4462 100644 --- a/pkg/executor/importer/chunk_process.go +++ b/pkg/executor/importer/chunk_process.go @@ -17,7 +17,6 @@ package importer import ( "context" "io" - "sync" "time" "github.com/docker/go-units" @@ -32,7 +31,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/mydump" verify "github.com/pingcap/tidb/br/pkg/lightning/verification" - tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/prometheus/client_golang/prometheus" @@ -429,60 +427,20 @@ func (p *localSortChunkProcessor) deliverLoop(ctx context.Context) error { // writer will take 256MiB buffer on default. // this will take a lot of memory, or even OOM. type IndexRouteWriter struct { - // this writer and all wrappedWriters are shared by all deliver routines, - // so we need to synchronize them. - sync.RWMutex - writers map[int64]*wrappedWriter + writers map[int64]*external.Writer logger *zap.Logger writerFactory func(int64) *external.Writer } -type wrappedWriter struct { - sync.Mutex - *external.Writer -} - -func (w *wrappedWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error { - w.Lock() - defer w.Unlock() - return w.Writer.WriteRow(ctx, idxKey, idxVal, handle) -} - -func (w *wrappedWriter) Close(ctx context.Context) error { - w.Lock() - defer w.Unlock() - return w.Writer.Close(ctx) -} - // NewIndexRouteWriter creates a new IndexRouteWriter. func NewIndexRouteWriter(logger *zap.Logger, writerFactory func(int64) *external.Writer) *IndexRouteWriter { return &IndexRouteWriter{ - writers: make(map[int64]*wrappedWriter), + writers: make(map[int64]*external.Writer), logger: logger, writerFactory: writerFactory, } } -func (w *IndexRouteWriter) getWriter(indexID int64) *wrappedWriter { - w.RLock() - writer, ok := w.writers[indexID] - w.RUnlock() - if ok { - return writer - } - - w.Lock() - defer w.Unlock() - writer, ok = w.writers[indexID] - if !ok { - writer = &wrappedWriter{ - Writer: w.writerFactory(indexID), - } - w.writers[indexID] = writer - } - return writer -} - // AppendRows implements backend.EngineWriter interface. func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { kvs := kv.Rows2KvPairs(rows) @@ -494,7 +452,11 @@ func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows enco if err != nil { return errors.Trace(err) } - writer := w.getWriter(indexID) + writer, ok := w.writers[indexID] + if !ok { + writer = w.writerFactory(indexID) + w.writers[indexID] = writer + } if err = writer.WriteRow(ctx, item.Key, item.Val, nil); err != nil { return errors.Trace(err) } @@ -510,8 +472,6 @@ func (*IndexRouteWriter) IsSynced() bool { // Close implements backend.EngineWriter interface. func (w *IndexRouteWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { var firstErr error - w.Lock() - defer w.Unlock() for _, writer := range w.writers { if err := writer.Close(ctx); err != nil { if firstErr == nil { diff --git a/pkg/executor/importer/chunk_process_test.go b/pkg/executor/importer/chunk_process_test.go deleted file mode 100644 index 9d7e132195ff8..0000000000000 --- a/pkg/executor/importer/chunk_process_test.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importer - -import ( - "math/rand" - "testing" - "time" - - "github.com/pingcap/tidb/br/pkg/lightning/backend/external" - "github.com/pingcap/tidb/pkg/util" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -func TestIndexRouteWriter(t *testing.T) { - logger := zap.NewExample() - routeWriter := NewIndexRouteWriter(logger, func(i int64) *external.Writer { - return external.NewWriterBuilder().Build(nil, "", "") - }) - wg := util.WaitGroupWrapper{} - for i := 0; i < 10; i++ { - idx := i - wg.Run(func() { - seed := time.Now().Unix() - logger.Info("seed", zap.Int("idx", idx), zap.Int64("seed", seed)) - r := rand.New(rand.NewSource(seed)) - gotWriters := make(map[int64]*wrappedWriter) - for i := 0; i < 3000; i++ { - indexID := int64(r.Int()) % 100 - writer := routeWriter.getWriter(indexID) - require.NotNil(t, writer) - if got, ok := gotWriters[indexID]; ok { - require.Equal(t, got, writer) - } else { - gotWriters[indexID] = writer - } - } - }) - } - wg.Wait() -}