-
Notifications
You must be signed in to change notification settings - Fork 269
/
unit.go
186 lines (169 loc) · 6.59 KB
/
unit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package unit
import (
"context"
"strings"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/dm/pkg/terror"
)
const (
// DefaultInitTimeout represents the default timeout value when initializing a process unit.
DefaultInitTimeout = time.Minute
)
// Unit defines interface for subtask process units, like syncer, loader, relay, etc.
// The Unit is not responsible to maintain its status like "pausing"/"paused". The caller should maintain the status,
// for example, know the Unit is "paused" and avoid call Pause again.
// All method is Unit interface can expect no concurrent invocation, the caller should guarantee this.
type Unit interface {
// Init initializes the dm process unit
// every unit does base initialization in `Init`, and this must pass before start running the subtask
// other setups can be done in the beginning of `Process`, but this should be treated carefully to make it
// compatible with Pause / Resume.
// if initialing successfully, the outer caller should call `Close` when the unit (or the task) finished, stopped or canceled (because other units Init fail).
// if initialing fail, Init itself should release resources it acquired before (rolling itself back).
Init(ctx context.Context) error
// Process does the main logic and its returning must send a result to pr channel.
// When ctx.Done, stops the process and returns, otherwise the DM-worker will be blocked forever
// When not in processing, call Process to continue or resume the process
Process(ctx context.Context, pr chan pb.ProcessResult)
// Close shuts down the process and closes the unit, after that can not call Process to resume
// The implementation should not block for a long time.
Close()
// Kill shuts down the process and closes the unit without graceful.
Kill()
// Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned.
// The implementation should not block for a long time.
Pause()
// Resume resumes the paused process and its returning must send a result to pr channel.
Resume(ctx context.Context, pr chan pb.ProcessResult)
// Update updates the configuration
Update(ctx context.Context, cfg *config.SubTaskConfig) error
// Status returns the unit's current status. The result may need calculation with source status, like estimated time
// to catch up. If sourceStatus is nil, the calculation should be skipped.
Status(sourceStatus *binlog.SourceStatus) interface{}
// Type returns the unit's type
Type() pb.UnitType
// IsFreshTask return whether is a fresh task (not processed before)
// it will be used to decide where the task should become restoring
IsFreshTask(ctx context.Context) (bool, error)
}
// NewProcessError creates a new ProcessError
// we can refine to add error scope field if needed.
func NewProcessError(err error) *pb.ProcessError {
if e, ok := err.(*terror.Error); ok {
return &pb.ProcessError{
ErrCode: int32(e.Code()),
ErrClass: e.Class().String(),
ErrScope: e.Scope().String(),
ErrLevel: e.Level().String(),
Message: terror.Message(e),
RawCause: terror.Message(e.Cause()),
Workaround: e.Workaround(),
}
}
return &pb.ProcessError{
ErrCode: int32(terror.ErrNotSet.Code()),
ErrClass: terror.ErrNotSet.Class().String(),
ErrScope: terror.ErrNotSet.Scope().String(),
ErrLevel: terror.ErrNotSet.Level().String(),
Message: terror.Message(err),
RawCause: terror.Message(terror.ErrNotSet.Cause()),
Workaround: terror.ErrNotSet.Workaround(),
}
}
// IsCtxCanceledProcessErr returns true if the err's context canceled.
func IsCtxCanceledProcessErr(err *pb.ProcessError) bool {
return strings.Contains(err.Message, "context canceled")
}
// JoinProcessErrors return the string of pb.ProcessErrors joined by ", ".
func JoinProcessErrors(errors []*pb.ProcessError) string {
serrs := make([]string, 0, len(errors))
for _, serr := range errors {
serrs = append(serrs, serr.String())
}
return strings.Join(serrs, ", ")
}
// IsResumableError checks the error message and returns whether we need to
// resume the task unit and retry.
func IsResumableError(err *pb.ProcessError) bool {
if err == nil {
return true
}
// not elegant code, because TiDB doesn't expose some error
for _, msg := range retry.UnsupportedDDLMsgs {
if strings.Contains(strings.ToLower(err.RawCause), strings.ToLower(msg)) {
return false
}
}
for _, msg := range retry.UnsupportedDMLMsgs {
if strings.Contains(strings.ToLower(err.RawCause), strings.ToLower(msg)) {
return false
}
}
for _, msg := range retry.ReplicationErrMsgs {
if strings.Contains(strings.ToLower(err.RawCause), strings.ToLower(msg)) {
return false
}
}
if err.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(strings.ToLower(err.Message), strings.ToLower(msg)) {
return false
}
}
}
if _, ok := retry.UnresumableErrCodes[err.ErrCode]; ok {
return false
}
return true
}
// IsResumableDBError checks whether the error is resumable DB error.
// this is a simplified version of IsResumableError.
// we use a blacklist to filter out some errors which can not be resumed,
// all other errors is resumable.
func IsResumableDBError(err error) bool {
if err == nil {
return true
}
err = errors.Cause(err)
if err == context.Canceled {
return false
}
// not elegant code, because TiDB doesn't expose some error
errStr := strings.ToLower(err.Error())
for _, msg := range retry.UnsupportedDDLMsgs {
if strings.Contains(errStr, strings.ToLower(msg)) {
return false
}
}
for _, msg := range retry.UnsupportedDMLMsgs {
if strings.Contains(errStr, strings.ToLower(msg)) {
return false
}
}
return true
}
// IsResumableRelayError return whether we need resume relay on error
// since relay impl unit interface too, so we put it here.
func IsResumableRelayError(err *pb.ProcessError) bool {
if _, ok := retry.UnresumableRelayErrCodes[err.ErrCode]; ok {
return false
}
return true
}