forked from absolute8511/ZanRedisDB
/
remote_sync_mgr.go
397 lines (365 loc) · 12.1 KB
/
remote_sync_mgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
package node
import (
"encoding/json"
"errors"
"sync"
"time"
"github.com/youzan/ZanRedisDB/metric"
)
const syncStateTimeout = time.Minute * 5
type SyncedState struct {
SyncedTerm uint64 `json:"synced_term,omitempty"`
SyncedIndex uint64 `json:"synced_index,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
// this is used to disallow compare using the ==
disableEqual []byte
}
func (ss *SyncedState) IsNewer(other *SyncedState) bool {
if ss.SyncedTerm >= other.SyncedTerm && ss.SyncedIndex >= other.SyncedIndex {
return true
}
return false
}
func (ss *SyncedState) IsSame(other *SyncedState) bool {
return ss.SyncedTerm == other.SyncedTerm && ss.SyncedIndex == other.SyncedIndex
}
func (ss *SyncedState) IsNewer2(term uint64, index uint64) bool {
if ss.SyncedTerm >= term && ss.SyncedIndex >= index {
return true
}
return false
}
const (
ApplySnapUnknown int = iota
ApplySnapBegin
ApplySnapTransferring
ApplySnapTransferred
ApplySnapApplying
ApplySnapDone
ApplySnapFailed
)
var applyStatusMsgs = []string{
"unknown",
"begin",
"transferring",
"transferred",
"applying",
"done",
"failed",
}
type SnapApplyStatus struct {
SS SyncedState
StatusCode int
Status string
UpdatedTime time.Time
}
type remoteSyncedStateMgr struct {
sync.RWMutex
remoteSyncedStates map[string]SyncedState
remoteSnapshotsApplying map[string]*SnapApplyStatus
}
func newRemoteSyncedStateMgr() *remoteSyncedStateMgr {
return &remoteSyncedStateMgr{
remoteSyncedStates: make(map[string]SyncedState),
remoteSnapshotsApplying: make(map[string]*SnapApplyStatus),
}
}
func (rss *remoteSyncedStateMgr) RemoveApplyingSnap(name string, state SyncedState) {
rss.Lock()
sas, ok := rss.remoteSnapshotsApplying[name]
if ok && sas.SS.IsSame(&state) {
delete(rss.remoteSnapshotsApplying, name)
}
rss.Unlock()
}
func (rss *remoteSyncedStateMgr) AddApplyingSnap(name string, state SyncedState) (*SnapApplyStatus, bool) {
added := false
rss.Lock()
defer rss.Unlock()
sas, ok := rss.remoteSnapshotsApplying[name]
canAdd := false
if !ok {
canAdd = true
} else if sas.StatusCode == ApplySnapDone {
delete(rss.remoteSnapshotsApplying, name)
canAdd = true
} else if sas.StatusCode == ApplySnapBegin || sas.StatusCode == ApplySnapFailed || sas.StatusCode == ApplySnapApplying {
// begin -> transferring, may lost if proposal dropped,
// so we check the time and restart
if time.Since(sas.UpdatedTime) > syncStateTimeout {
delete(rss.remoteSnapshotsApplying, name)
canAdd = true
}
} else if !sas.SS.IsNewer(&state) {
if time.Since(sas.UpdatedTime) > syncStateTimeout {
nodeLog.Infof("%v got newer snapshot %v, old is %v", name,
state, sas)
delete(rss.remoteSnapshotsApplying, name)
canAdd = true
}
}
if canAdd {
sas = &SnapApplyStatus{
SS: state,
StatusCode: ApplySnapBegin,
Status: applyStatusMsgs[ApplySnapBegin],
UpdatedTime: time.Now(),
}
rss.remoteSnapshotsApplying[name] = sas
added = true
}
return sas, added
}
func (rss *remoteSyncedStateMgr) UpdateApplyingSnapStatus(name string, ss SyncedState, status int) {
rss.Lock()
sas, ok := rss.remoteSnapshotsApplying[name]
if ok && status < len(applyStatusMsgs) && ss.IsSame(&sas.SS) {
if sas.StatusCode != status {
sas.StatusCode = status
sas.Status = applyStatusMsgs[status]
sas.UpdatedTime = time.Now()
}
}
rss.Unlock()
}
func (rss *remoteSyncedStateMgr) GetApplyingSnap(name string) (*SnapApplyStatus, bool) {
rss.Lock()
sas, ok := rss.remoteSnapshotsApplying[name]
rss.Unlock()
return sas, ok
}
func (rss *remoteSyncedStateMgr) UpdateState(name string, state SyncedState) {
rss.Lock()
rss.remoteSyncedStates[name] = state
rss.Unlock()
}
func (rss *remoteSyncedStateMgr) GetState(name string) (SyncedState, bool) {
rss.RLock()
state, ok := rss.remoteSyncedStates[name]
rss.RUnlock()
return state, ok
}
func (rss *remoteSyncedStateMgr) RestoreStates(ss map[string]SyncedState) {
rss.Lock()
rss.remoteSyncedStates = make(map[string]SyncedState, len(ss))
for k, v := range ss {
rss.remoteSyncedStates[k] = v
}
rss.Unlock()
}
func (rss *remoteSyncedStateMgr) Clone() map[string]SyncedState {
rss.RLock()
clone := make(map[string]SyncedState, len(rss.remoteSyncedStates))
for k, v := range rss.remoteSyncedStates {
clone[k] = v
}
rss.RUnlock()
return clone
}
func (nd *KVNode) isContinueCommit(reqList BatchInternalRaftRequest) bool {
oldState, ok := nd.remoteSyncedStates.GetState(reqList.OrigCluster)
if ok {
if reqList.OrigIndex > oldState.SyncedIndex+1 {
nd.rn.Infof("request %v is not continue while sync : %v",
reqList.OrigIndex, oldState)
return false
}
}
// not found, we consider first init
return true
}
func (nd *KVNode) isAlreadyApplied(reqList BatchInternalRaftRequest) bool {
oldState, ok := nd.remoteSyncedStates.GetState(reqList.OrigCluster)
if ok {
// check if retrying duplicate req, we can just ignore old retry
if reqList.OrigTerm < oldState.SyncedTerm {
nd.rn.Infof("request %v ignored since older than synced:%v",
reqList.OrigTerm, oldState)
return true
}
if reqList.OrigIndex <= oldState.SyncedIndex {
nd.rn.Infof("request %v ignored since older than synced index :%v",
reqList.OrigIndex, oldState.SyncedIndex)
return true
}
}
return false
}
// return as (cluster name, is transferring remote snapshot, is applying remote snapshot)
func (nd *KVNode) preprocessRemoteSnapApply(reqList BatchInternalRaftRequest) (bool, bool) {
ss := SyncedState{SyncedTerm: reqList.OrigTerm,
SyncedIndex: reqList.OrigIndex, Timestamp: reqList.Timestamp}
for _, req := range reqList.Reqs {
if req.Header.DataType == int32(CustomReq) {
var cr customProposeData
err := json.Unmarshal(req.Data, &cr)
if err != nil {
nd.rn.Infof("failed to unmarshal custom propose: %v, err:%v", req.String(), err)
}
if cr.ProposeOp == ProposeOp_TransferRemoteSnap {
// for replica which is not leader, the applying status is not added,
// so we add here is ok. leader will handle the duplicate.
nd.remoteSyncedStates.AddApplyingSnap(reqList.OrigCluster, ss)
nd.remoteSyncedStates.UpdateApplyingSnapStatus(reqList.OrigCluster, ss, ApplySnapTransferring)
return true, false
} else if cr.ProposeOp == ProposeOp_ApplyRemoteSnap {
return false, true
}
}
}
return false, false
}
func (nd *KVNode) postprocessRemoteApply(reqList BatchInternalRaftRequest,
isRemoteSnapTransfer bool, isRemoteSnapApply bool, retErr error) {
if reqList.OrigTerm == 0 && reqList.OrigIndex == 0 {
return
}
ss := SyncedState{SyncedTerm: reqList.OrigTerm, SyncedIndex: reqList.OrigIndex, Timestamp: reqList.Timestamp}
// for remote snapshot transfer, we need wait apply success before update sync state
if !isRemoteSnapTransfer {
if retErr != errIgnoredRemoteApply {
nd.remoteSyncedStates.UpdateState(reqList.OrigCluster, ss)
if isRemoteSnapApply {
nd.remoteSyncedStates.UpdateApplyingSnapStatus(reqList.OrigCluster, ss, ApplySnapDone)
}
} else {
if isRemoteSnapApply {
nd.remoteSyncedStates.UpdateApplyingSnapStatus(reqList.OrigCluster, ss, ApplySnapFailed)
}
}
} else {
status := ApplySnapTransferred
if retErr == errRemoteSnapTransferFailed {
status = ApplySnapFailed
}
nd.remoteSyncedStates.UpdateApplyingSnapStatus(reqList.OrigCluster, ss, status)
}
}
func (nd *KVNode) SetRemoteClusterSyncedRaft(name string, term uint64, index uint64, ts int64) {
nd.remoteSyncedStates.UpdateState(name, SyncedState{SyncedTerm: term, SyncedIndex: index, Timestamp: ts})
}
func (nd *KVNode) GetRemoteClusterSyncedRaft(name string) (uint64, uint64, int64) {
state, _ := nd.remoteSyncedStates.GetState(name)
return state.SyncedTerm, state.SyncedIndex, state.Timestamp
}
func (nd *KVNode) GetLogSyncStatsInSyncLearner() (*metric.LogSyncStats, *metric.LogSyncStats) {
logSyncer, ok := nd.sm.(*logSyncerSM)
if !ok {
return nil, nil
}
recv, sync := logSyncer.GetLogSyncStats()
return &recv, &sync
}
func (nd *KVNode) ApplyRemoteSnapshot(skip bool, name string, term uint64, index uint64) error {
// restore the state machine from transferred snap data when transfer success.
// we do not need restore other cluster member info here.
nd.rn.Infof("begin recovery from remote cluster %v snapshot here: %v-%v", name, term, index)
if skip {
nd.rn.Infof("got skipped snapshot from remote cluster %v : %v-%v", name, term, index)
} else {
// we should disallow applying remote snap while we are running as master cluster
if !IsSyncerOnly() {
nd.rn.Infof("cluster %v snapshot is not allowed: %v-%v", name, term, index)
return errors.New("apply remote snapshot is not allowed while not in syncer only mode")
}
oldS, ok := nd.remoteSyncedStates.GetApplyingSnap(name)
if !ok {
return errors.New("no remote snapshot waiting apply")
}
if oldS.SS.SyncedTerm != term || oldS.SS.SyncedIndex != index {
nd.rn.Infof("remote cluster %v snapshot mismatch: %v-%v, old: %v", name, term, index, oldS)
return errors.New("apply remote snapshot term-index mismatch")
}
if oldS.StatusCode != ApplySnapTransferred {
nd.rn.Infof("remote cluster %v snapshot not ready for apply: %v", name, oldS)
// it may be changed to applying but proposal failed, so we need proposal again
if oldS.StatusCode == ApplySnapApplying && time.Since(oldS.UpdatedTime) > syncStateTimeout {
nd.rn.Infof("remote cluster %v snapshot waiting applying too long: %v", name, oldS)
} else {
return errors.New("apply remote snapshot status invalid")
}
}
// set the snap status to applying and the snap status will be updated if apply done or failed
nd.remoteSyncedStates.UpdateApplyingSnapStatus(name, oldS.SS, ApplySnapApplying)
}
var reqList BatchInternalRaftRequest
reqList.OrigCluster = name
reqList.ReqNum = 1
reqList.Timestamp = time.Now().UnixNano()
p := &customProposeData{
ProposeOp: ProposeOp_ApplyRemoteSnap,
NeedBackup: true,
RemoteTerm: term,
RemoteIndex: index,
}
if skip {
p.ProposeOp = ProposeOp_ApplySkippedRemoteSnap
}
d, _ := json.Marshal(p)
h := RequestHeader{
ID: 0,
DataType: int32(CustomReq),
}
raftReq := InternalRaftRequest{
Header: h,
Data: d,
}
reqList.Reqs = append(reqList.Reqs, raftReq)
err := nd.ProposeRawAndWaitFromSyncer(&reqList, term, index, reqList.Timestamp)
if err != nil {
nd.rn.Infof("cluster %v applying snap %v-%v failed", name, term, index)
// just wait next retry
} else {
nd.rn.Infof("cluster %v applying snap %v-%v done", name, term, index)
}
return nil
}
func (nd *KVNode) BeginTransferRemoteSnap(name string, term uint64, index uint64, syncAddr string, syncPath string) error {
// we should disallow transfer remote snap while we are running as master cluster
if !IsSyncerOnly() {
nd.rn.Infof("cluster %v snapshot is not allowed: %v-%v", name, term, index)
return errors.New("remote snapshot is not allowed while not in syncer only mode")
}
ss := SyncedState{SyncedTerm: term, SyncedIndex: index}
// set the snap status to begin and the snap status will be updated if transfer begin
// if transfer failed to propose, after some timeout it will be removed while adding
old, added := nd.remoteSyncedStates.AddApplyingSnap(name, ss)
if !added {
nd.rn.Infof("cluster %v applying snap %v already running while apply %v", name, old, ss)
if !old.SS.IsSame(&ss) {
return errors.New("another snapshot applying")
}
}
p := &customProposeData{
ProposeOp: ProposeOp_TransferRemoteSnap,
NeedBackup: false,
SyncAddr: syncAddr,
SyncPath: syncPath,
RemoteTerm: term,
RemoteIndex: index,
}
d, _ := json.Marshal(p)
var reqList BatchInternalRaftRequest
reqList.OrigCluster = name
reqList.ReqNum = 1
reqList.Timestamp = time.Now().UnixNano()
h := RequestHeader{
ID: 0,
DataType: int32(CustomReq),
}
raftReq := InternalRaftRequest{
Header: h,
Data: d,
}
reqList.Reqs = append(reqList.Reqs, raftReq)
err := nd.ProposeRawAndWaitFromSyncer(&reqList, term, index, reqList.Timestamp)
if err != nil {
nd.rn.Infof("cluster %v applying transfer snap %v failed", name, ss)
} else {
nd.rn.Infof("cluster %v applying transfer snap %v done", name, ss)
}
return err
}
func (nd *KVNode) GetApplyRemoteSnapStatus(name string) (*SnapApplyStatus, bool) {
return nd.remoteSyncedStates.GetApplyingSnap(name)
}