-
Notifications
You must be signed in to change notification settings - Fork 2
/
cnsyncupworker.go
53 lines (45 loc) · 1.24 KB
/
cnsyncupworker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package monitor
import (
"context"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/virtual-disk-array/vda/pkg/lib"
"github.com/virtual-disk-array/vda/pkg/logger"
)
type cnSyncupWorker struct {
name string
kf *lib.KeyFmt
sm *lib.SyncupManager
cnTimeout int
}
func (csw *cnSyncupWorker) getName() string {
return csw.name
}
func (csw *cnSyncupWorker) getRange(begin, end int) (string, string) {
key := csw.kf.CnErrWithHash(uint32(begin))
endKey := csw.kf.CnErrWithHash(uint32(end))
return key, endKey
}
func (csw *cnSyncupWorker) processBacklog(ctx context.Context, key string) {
logger.Info("process key: %s %s", csw.name, key)
_, sockAddr, err := csw.kf.DecodeCnErrKey(key)
if err != nil {
logger.Error("Decode key err: %s %v", csw.name, err)
return
}
cnCtx, cancel := context.WithTimeout(context.Background(),
time.Duration(csw.cnTimeout)*time.Second)
csw.sm.SyncupCn(sockAddr, cnCtx)
cancel()
}
func newCnSyncupWorker(etcdCli *clientv3.Client, kf *lib.KeyFmt,
gc *lib.GrpcCache, cnTimeout int) *cnSyncupWorker {
sw := lib.NewStmWrapper(etcdCli)
sm := lib.NewSyncupManager(kf, sw, gc)
return &cnSyncupWorker{
name: "CnSyncupWorker",
kf: kf,
sm: sm,
cnTimeout: cnTimeout,
}
}