/
etcdrouter.go
87 lines (76 loc) · 1.87 KB
/
etcdrouter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package broker
import (
"encoding/json"
"fmt"
genUUID "github.com/go-basic/uuid"
"github.com/sestack/smq/global"
"github.com/sestack/smq/model"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
type etcdRouter struct {
forwardDir string
forwardWatchDir string
broker *Broker
}
func (r *etcdRouter) Forward(nodeID string, data *model.Forward) error {
key := fmt.Sprintf("%s/%s/%s", r.forwardDir, nodeID, genUUID.New())
value, err := json.Marshal(data)
if err != nil {
return err
}
return r.broker.store.Put(key, string(value))
}
func (r *etcdRouter) Receive() {
etcdClient, _ := r.broker.store.Raw().(*clientv3.Client)
watcher := clientv3.NewWatcher(etcdClient)
defer watcher.Close()
ctx := etcdClient.Ctx()
for {
rch := watcher.Watch(ctx, r.forwardWatchDir, clientv3.WithPrefix())
for wresp := range rch {
if wresp.Canceled {
return
}
for _, ev := range wresp.Events {
if ev.Type == mvccpb.PUT {
value := &model.Forward{}
if len(ev.Kv.Value) > 0 {
if err := json.Unmarshal(ev.Kv.Value, value); err != nil {
global.LOGGER.Error("unmarshal forward data failed", zap.String("value", string(ev.Kv.Value)), zap.Error(err))
continue
}
}
if value != nil {
cli, exist := r.broker.clients.Load(value.ClientID)
if !exist {
continue
}
c, ok := cli.(*client)
if !ok {
continue
}
if c.typ != LOCAL {
continue
}
msg := &Message{
client: c,
packet: value.Packet,
forward: true,
}
r.broker.SubmitWork(value.ClientID, msg)
}
global.LOGGER.Debug("delete forward key", zap.String("key", string(ev.Kv.Key)))
r.broker.store.Del(string(ev.Kv.Key))
}
}
}
select {
case <-ctx.Done():
// server closed, return
return
default:
}
}
}