-
Notifications
You must be signed in to change notification settings - Fork 5.7k
/
backfilling_import_cloud.go
121 lines (108 loc) · 3.48 KB
/
backfilling_import_cloud.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
)
type cloudImportExecutor struct {
taskexecutor.EmptyStepExecutor
job *model.Job
index *model.IndexInfo
ptbl table.PhysicalTable
bc ingest.BackendCtx
cloudStoreURI string
}
func newCloudImportExecutor(
job *model.Job,
index *model.IndexInfo,
ptbl table.PhysicalTable,
bcGetter func() (ingest.BackendCtx, error),
cloudStoreURI string,
) (*cloudImportExecutor, error) {
bc, err := bcGetter()
if err != nil {
return nil, err
}
return &cloudImportExecutor{
job: job,
index: index,
ptbl: ptbl,
bc: bc,
cloudStoreURI: cloudStoreURI,
}, nil
}
func (*cloudImportExecutor) Init(ctx context.Context) error {
logutil.Logger(ctx).Info("cloud import executor init subtask exec env")
return nil
}
func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
logutil.Logger(ctx).Info("cloud import executor run subtask")
sm, err := decodeBackfillSubTaskMeta(subtask.Meta)
if err != nil {
return err
}
local := m.bc.GetLocalBackend()
if local == nil {
return errors.Errorf("local backend not found")
}
_, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, m.index.ID)
all := external.SortedKVMeta{}
for _, g := range sm.MetaGroups {
all.Merge(g)
}
err = local.CloseEngine(ctx, &backend.EngineConfig{
External: &backend.ExternalEngineConfig{
StorageURI: m.cloudStoreURI,
DataFiles: sm.DataFiles,
StatFiles: sm.StatFiles,
StartKey: all.StartKey,
EndKey: all.EndKey,
SplitKeys: sm.RangeSplitKeys,
TotalFileSize: int64(all.TotalKVSize),
TotalKVCount: 0,
CheckHotspot: true,
},
TS: sm.TS,
}, engineUUID)
if err != nil {
return err
}
local.WorkerConcurrency = subtask.Concurrency * 2
err = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, m.index, m.ptbl.Meta())
}
return err
}
func (m *cloudImportExecutor) Cleanup(ctx context.Context) error {
logutil.Logger(ctx).Info("cloud import executor clean up subtask env")
// cleanup backend context
ingest.LitBackCtxMgr.Unregister(m.job.ID)
return nil
}
func (*cloudImportExecutor) OnFinished(ctx context.Context, _ *proto.Subtask) error {
logutil.Logger(ctx).Info("cloud import executor finish subtask")
return nil
}