forked from name5566/leaf
/
msg.go
154 lines (130 loc) · 3.26 KB
/
msg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package cluster
import (
"encoding/gob"
"fmt"
"sync"
"github.com/lovelly/leaf/chanrpc"
"github.com/lovelly/leaf/log"
//"github.com/lovelly/leaf/network/json"
"github.com/lovelly/leaf/gameError"
lgob "github.com/lovelly/leaf/network/gob"
)
const (
NsqMsgTypeRsp = iota //回应消息
NsqMsgTypeBroadcast //广播消息
NsqMsgTypeNotForResult // 不用回请求的消息
NsqMsgTypeForResult //要回请求的消息
)
var (
routeMap = map[string]*chanrpc.Server{}
Processor = lgob.NewProcessor()
RequestInfoLock sync.Mutex
requestID int64
requestMap = make(map[int64]*RequestInfo)
encoder = lgob.NewEncoder()
decoder = lgob.NewDecoder()
)
type RequestInfo struct {
serverName string
cb interface{}
chanRet chan *chanrpc.RetInfo
}
func init() {
gob.Register(&S2S_NsqMsg{})
gob.Register(&chanrpc.RetInfo{})
Processor.Register(&S2S_NsqMsg{})
Processor.Register(&chanrpc.RetInfo{})
}
func SetRouter(msgID string, server *chanrpc.Server) {
_, ok := routeMap[msgID]
if ok {
panic(fmt.Sprintf("function id %v: already set route", msgID))
}
routeMap[msgID] = server
}
type S2S_NsqMsg struct {
RequestID int64
MsgID string
MsgType uint8
SrcServerName string
DstServerName string
Args []interface{}
Err string
PushCnt int
}
func handleRequestMsg(recvMsg *S2S_NsqMsg) {
sendMsg := &S2S_NsqMsg{MsgType: NsqMsgTypeRsp, MsgID: "Return :" + recvMsg.MsgID, DstServerName: recvMsg.SrcServerName, RequestID: recvMsg.RequestID}
if isClose() && recvMsg.MsgType == NsqMsgTypeForResult {
sendMsg.Err = fmt.Sprintf("%v server is closing", SelfName)
Publish(sendMsg)
return
}
msgID := recvMsg.MsgID
client, ok := routeMap[msgID]
if !ok {
err := fmt.Sprintf("%v msg is not set route", msgID)
log.Error(err)
if recvMsg.MsgType == NsqMsgTypeForResult {
sendMsg.Err = err
Publish(sendMsg)
}
return
}
args := recvMsg.Args
if recvMsg.MsgType == NsqMsgTypeForResult {
sendMsgFunc := func(ret interface{}, err error) {
if ret != nil {
sendMsg.Args = []interface{}{ret}
}
if err != nil {
sendMsg.Err = err.Error()
}
Publish(sendMsg)
}
args = append(args, sendMsgFunc)
client.AsynCall(nil, msgID, args...)
} else {
client.Go(msgID, args...)
}
}
func handleResponseMsg(msg *S2S_NsqMsg) {
request := popRequest(msg.RequestID)
if request == nil {
log.Error("%v: request id %v is not exist", msg.SrcServerName, msg.RequestID)
return
}
ret := &chanrpc.RetInfo{Cb: request.cb}
if len(msg.Args) > 0 {
ret.Ret = msg.Args[0]
}
if msg.Err != "" {
ret.Err = gameError.RenderServerError(msg.Err)
}
request.chanRet <- ret
}
func registerRequest(request *RequestInfo) int64 {
RequestInfoLock.Lock()
defer RequestInfoLock.Unlock()
reqID := requestID
requestMap[reqID] = request
requestID += 1
return reqID
}
func ForEachRequest(f func(id int64, request *RequestInfo)) {
RequestInfoLock.Lock()
defer RequestInfoLock.Unlock()
for id, v := range requestMap {
f(id, v)
}
}
func popRequest(requestID int64) *RequestInfo {
RequestInfoLock.Lock()
defer RequestInfoLock.Unlock()
request, ok := requestMap[requestID]
if ok {
delete(requestMap, requestID)
return request
} else {
return nil
}
}