-
Notifications
You must be signed in to change notification settings - Fork 1
/
distservice_running.go
148 lines (119 loc) · 3.21 KB
/
distservice_running.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package dserv
import (
"context"
"errors"
"fmt"
"git.golaxy.org/framework/plugins/broker"
"git.golaxy.org/framework/plugins/discovery"
"git.golaxy.org/framework/plugins/log"
"time"
)
func (d *_DistService) mainLoop(serviceNode *discovery.Service, subs []broker.ISubscriber) {
defer d.wg.Done()
log.Infof(d.servCtx, "service %q node %q started", d.servCtx.GetName(), d.servCtx.GetId())
if d.options.RefreshTTL {
ticker := time.NewTicker(d.options.TTL / 2)
defer ticker.Stop()
loop:
for {
select {
case <-ticker.C:
// 刷新服务节点
if err := d.registry.Register(d.ctx, serviceNode, d.options.TTL); err != nil {
log.Errorf(d.servCtx, "refresh service %q node %q failed, %s", d.servCtx.GetName(), d.servCtx.GetId(), err)
continue
}
log.Debugf(d.servCtx, "refresh service %q node %q success", d.servCtx.GetName(), d.servCtx.GetId())
case <-d.ctx.Done():
break loop
}
}
} else {
<-d.ctx.Done()
}
// 取消注册服务节点
if err := d.registry.Deregister(context.Background(), serviceNode); err != nil {
log.Errorf(d.servCtx, "deregister service %q node %q failed, %s", d.servCtx.GetName(), d.servCtx.GetId(), err)
}
// 取消订阅topic
for _, sub := range subs {
<-sub.Unsubscribe()
}
d.broker.Flush(context.Background())
log.Infof(d.servCtx, "service %q node %q stopped", d.servCtx.GetName(), d.servCtx.GetId())
}
func (d *_DistService) watchingService() {
defer d.wg.Done()
log.Debug(d.servCtx, "watching service changes started")
retry:
var watcher discovery.IWatcher
var err error
retryInterval := 3 * time.Second
select {
case <-d.ctx.Done():
goto end
default:
}
// 监控服务节点变化
watcher, err = d.registry.Watch(d.ctx, "")
if err != nil {
log.Errorf(d.servCtx, "watching service changes failed, %s, retry it", err)
time.Sleep(retryInterval)
goto retry
}
for {
e, err := watcher.Next()
if err != nil {
if errors.Is(err, discovery.ErrTerminated) {
time.Sleep(retryInterval)
goto retry
}
log.Errorf(d.servCtx, "watching service changes failed, %s, retry it", err)
<-watcher.Terminate()
time.Sleep(retryInterval)
goto retry
}
switch e.Type {
case discovery.Delete:
for _, node := range e.Service.Nodes {
d.deduplication.Remove(node.Address)
}
}
}
end:
if watcher != nil {
<-watcher.Terminate()
}
log.Debug(d.servCtx, "watching service changes stopped")
}
func (d *_DistService) handleEvent(e broker.IEvent) error {
mp, err := d.decoder.DecodeBytes(e.Message())
if err != nil {
return err
}
// 最少一次交付模式,需要消息去重
if d.broker.GetDeliveryReliability() == broker.AtLeastOnce {
if !d.deduplication.Validate(mp.Head.Src, mp.Head.Seq) {
return fmt.Errorf("gap: discard duplicate msg-packet, head:%+v", mp.Head)
}
}
var errs []error
interrupt := func(err, _ error) bool {
if err != nil {
errs = append(errs, err)
}
return false
}
// 回调监控器
d.msgWatchers.AutoRLock(func(watchers *[]*_MsgWatcher) {
for i := range *watchers {
(*watchers)[i].handler.Exec(interrupt, e.Topic(), mp)
}
})
// 回调处理器
d.options.RecvMsgHandler.Exec(interrupt, e.Topic(), mp)
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}