/
metaserver_watcher.go
52 lines (48 loc) · 1.55 KB
/
metaserver_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package metaserver
import (
"time"
"github.com/rfeverything/rfs/internal/logger"
"go.uber.org/zap"
)
func (ms *MetaServer) watchVolumeState() {
sts, err := ms.Store.GetVolumesStatus()
if err != nil {
logger.Global().Error("failed to get volumes status", zap.Error(err))
return
}
for _, st := range sts {
for cnt := 0; cnt < 3; cnt++ {
logger.Global().Debug("watchVolumeState", zap.String("volumeId", st.VolumeId), zap.Int64("free", int64(st.Free)))
vc, err := NewVolumeClient(st.Address)
if err != nil {
logger.Global().Error("watchVolumeState", zap.Error(err), zap.String("volumeId", st.VolumeId), zap.Int64("free", int64(st.Free)), zap.String("address", st.Address), zap.Int("cnt", cnt))
time.Sleep(time.Second)
continue
}
ms.VolumeClients[st.VolumeId] = vc
break
}
}
for event := range ms.Store.GetVolumesStatusChan() {
logger.Global().Info("watchVolumeState", zap.Any("event", event))
if event.VolumeId == "" {
continue
}
switch event.Type {
case VolumeEventTypePut:
for cnt := 0; cnt < 3; cnt++ {
logger.Global().Info("watchVolumeState", zap.String("volumeId", event.VolumeId))
vc, err := NewVolumeClient(event.Status.Address)
if err != nil {
logger.Global().Error("watchVolumeState", zap.Error(err), zap.String("volumeId", event.VolumeId), zap.String("address", event.Status.Address), zap.Int("cnt", cnt))
time.Sleep(time.Second)
continue
}
ms.VolumeClients[event.VolumeId] = vc
break
}
case VolumeEventTypeDelete:
delete(ms.VolumeClients, event.VolumeId)
}
}
}