This repository has been archived by the owner on May 1, 2020. It is now read-only.
/
handleMessage.go
130 lines (117 loc) · 2.96 KB
/
handleMessage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package webwire
// handleMessage handles incoming messages
func (srv *server) handleMessage(clt *Client, msg *Message) error {
// Decide whether to process the message
failMsg := false
srv.opsLock.Lock()
// Reject incoming requests during server shutdown
// or the shutdown of the client agent
// return a special shutdown error
if srv.shutdown || !clt.isActive() {
failMsg = true
} else {
srv.currentOps++
}
srv.opsLock.Unlock()
if failMsg {
// Don't process the message
if msg.RequiresResponse() {
srv.failMsgShutdown(clt, msg)
}
return nil
}
// Process the message
clt.registerTask()
defer func() {
// Mark operation as done and shutdown the server
// if scheduled and no operations are left
srv.opsLock.Lock()
srv.currentOps--
if srv.shutdown && srv.currentOps < 1 {
close(srv.shutdownRdy)
}
srv.opsLock.Unlock()
clt.deregisterTask()
}()
switch msg.msgType {
case MsgSignalBinary:
fallthrough
case MsgSignalUtf8:
fallthrough
case MsgSignalUtf16:
srv.handleSignal(clt, msg)
case MsgRequestBinary:
fallthrough
case MsgRequestUtf8:
fallthrough
case MsgRequestUtf16:
srv.handleRequest(clt, msg)
case MsgRestoreSession:
return srv.handleSessionRestore(clt, msg)
case MsgCloseSession:
return srv.handleSessionClosure(clt, msg)
}
return nil
}
// fulfillMsg fulfills the message sending the reply
func (srv *server) fulfillMsg(clt *Client, msg *Message, reply Payload) {
// Send reply
if err := clt.conn.Write(
NewReplyMessage(msg.id, reply),
); err != nil {
srv.errorLog.Println("Writing failed:", err)
}
}
// failMsg fails the message returning an error reply
func (srv *server) failMsg(clt *Client, msg *Message, reqErr error) {
// Don't send any failure reply if the type of the message
// doesn't expect any response
if !msg.RequiresReply() {
return
}
var replyMsg []byte
switch err := reqErr.(type) {
case ReqErr:
replyMsg = NewErrorReplyMessage(msg.id, err.Code, err.Message)
case *ReqErr:
replyMsg = NewErrorReplyMessage(msg.id, err.Code, err.Message)
case MaxSessConnsReachedErr:
replyMsg = NewSpecialRequestReplyMessage(
MsgMaxSessConnsReached,
msg.id,
)
case SessNotFoundErr:
replyMsg = NewSpecialRequestReplyMessage(
MsgSessionNotFound,
msg.id,
)
case SessionsDisabledErr:
replyMsg = NewSpecialRequestReplyMessage(
MsgSessionsDisabled,
msg.id,
)
case ProtocolErr:
replyMsg = NewSpecialRequestReplyMessage(
MsgReplyProtocolError,
msg.id,
)
default:
replyMsg = NewSpecialRequestReplyMessage(
MsgInternalError,
msg.id,
)
}
// Send request failure notification
if err := clt.conn.Write(replyMsg); err != nil {
srv.errorLog.Println("Writing failed:", err)
}
}
// failMsgShutdown sends request failure reply due to current server shutdown
func (srv *server) failMsgShutdown(clt *Client, msg *Message) {
if err := clt.conn.Write(NewSpecialRequestReplyMessage(
MsgReplyShutdown,
msg.id,
)); err != nil {
srv.errorLog.Println("Writing failed:", err)
}
}