-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
127 lines (111 loc) · 4.05 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package peer
import (
"fmt"
"github.com/turingchain2020/turingchain/queue"
"github.com/turingchain2020/turingchain/system/p2p/dht/extension"
"github.com/turingchain2020/turingchain/types"
"github.com/libp2p/go-libp2p-core/peer"
)
//处理订阅topic的请求
func (p *Protocol) handleEventSubTopic(msg *queue.Message) {
//先检查是否已经订阅相关topic
//接收turingchain其他模块发来的请求消息
subtopic := msg.GetData().(*types.SubTopic)
topic := subtopic.GetTopic()
//check topic
moduleName := subtopic.GetModule()
//模块名,用来收到订阅的消息后转发给对应的模块名
if !p.Pubsub.HasTopic(topic) {
err := p.Pubsub.JoinAndSubTopic(topic, p.subCallBack) //订阅topic
if err != nil {
log.Error("peerPubSub", "err", err)
msg.Reply(p.QueueClient.NewMessage("", types.EventSubTopic, &types.Reply{IsOk: false, Msg: []byte(err.Error())}))
return
}
}
var reply types.SubTopicReply
reply.Status = true
reply.Msg = fmt.Sprintf("subtopic %v success", topic)
msg.Reply(p.QueueClient.NewMessage("", types.EventSubTopic, &types.Reply{IsOk: true, Msg: types.Encode(&reply)}))
p.topicMutex.Lock()
defer p.topicMutex.Unlock()
var modules map[string]bool
v, ok := p.topicModule.Load(topic)
if ok {
modules = v.(map[string]bool)
} else {
modules = make(map[string]bool)
}
modules[moduleName] = true
p.topicModule.Store(topic, modules)
//接收订阅的消息
}
//处理收到的数据
func (p *Protocol) subCallBack(topic string, msg extension.SubMsg) {
p.topicMutex.RLock()
defer p.topicMutex.RUnlock()
modules, ok := p.topicModule.Load(topic)
if !ok {
return
}
for moduleName := range modules.(map[string]bool) {
newMsg := p.QueueClient.NewMessage(moduleName, types.EventReceiveSubData, &types.TopicData{Topic: topic, From: peer.ID(msg.From).String(), Data: msg.Data}) //加入到输出通道)
_ = p.QueueClient.Send(newMsg, false)
}
}
//获取所有已经订阅的topic
func (p *Protocol) handleEventGetTopics(msg *queue.Message) {
_, ok := msg.GetData().(*types.FetchTopicList)
if !ok {
msg.Reply(p.QueueClient.NewMessage("", types.EventFetchTopics, &types.Reply{IsOk: false, Msg: []byte("need *types.FetchTopicList")}))
return
}
//获取topic列表
topics := p.Pubsub.GetTopics()
var reply types.TopicList
reply.Topics = topics
msg.Reply(p.QueueClient.NewMessage("", types.EventFetchTopics, &types.Reply{IsOk: true, Msg: types.Encode(&reply)}))
}
//删除已经订阅的某一个topic
func (p *Protocol) handleEventRemoveTopic(msg *queue.Message) {
p.topicMutex.Lock()
defer p.topicMutex.Unlock()
v, ok := msg.GetData().(*types.RemoveTopic)
if !ok {
msg.Reply(p.QueueClient.NewMessage("", types.EventRemoveTopic, &types.Reply{IsOk: false, Msg: []byte("need *types.RemoveTopic")}))
return
}
vmodules, ok := p.topicModule.Load(v.GetTopic())
if !ok || len(vmodules.(map[string]bool)) == 0 {
msg.Reply(p.QueueClient.NewMessage("", types.EventRemoveTopic, &types.Reply{IsOk: false, Msg: []byte("this module no sub this topic")}))
return
}
modules := vmodules.(map[string]bool)
delete(modules, v.GetModule()) //删除消息推送的module
var reply types.RemoveTopicReply
reply.Topic = v.GetTopic()
reply.Status = true
if len(modules) != 0 {
msg.Reply(p.QueueClient.NewMessage("", types.EventRemoveTopic, &types.Reply{IsOk: true, Msg: types.Encode(&reply)}))
return
}
p.Pubsub.RemoveTopic(v.GetTopic())
msg.Reply(p.QueueClient.NewMessage("", types.EventRemoveTopic, &types.Reply{IsOk: true, Msg: types.Encode(&reply)}))
}
//发布Topic消息
func (p *Protocol) handleEventPubMsg(msg *queue.Message) {
v, ok := msg.GetData().(*types.PublishTopicMsg)
if !ok {
msg.Reply(p.QueueClient.NewMessage("", types.EventPubTopicMsg, &types.Reply{IsOk: false, Msg: []byte("need *types.PublishTopicMsg")}))
return
}
var isOK = true
replyInfo := "push success"
err := p.Pubsub.Publish(v.GetTopic(), v.GetMsg())
if err != nil {
//publish msg failed
isOK = false
replyInfo = err.Error()
}
msg.Reply(p.QueueClient.NewMessage("", types.EventPubTopicMsg, &types.Reply{IsOk: isOK, Msg: []byte(replyInfo)}))
}