/
msg_parser.go
210 lines (182 loc) · 6.25 KB
/
msg_parser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/**
* 触发消息/事件处理
* 1. wxapi.ParseMessageBody() --- 获取消息/事件的(消息体,时间戳,nonce,error),主要用于调试
* 2. wxapi.GetReply([]byte) --- 根据消息体触发消息处理函数得到返回结果。如果使用default_rest.go的实现不需要关心它
*/
package wxmsg
import (
"github.com/beevik/etree"
"fmt"
"net/http"
"io/ioutil"
"net/url"
"github.com/rosbit/go-wx-api/log"
"github.com/rosbit/go-wx-api/conf"
)
var SUCCESS_TEXT = []byte("success")
var MustSignatureArgs = []string{"signature", "timestamp", "nonce"}
const (
SIGNATURE = iota
TIMESTAMP
NONCE
)
// msgType => ReceivedMsg
var newMessages = map[string]func()ReceivedMsg {
MT_TEXT: func()ReceivedMsg { return &TextMsg{} },
MT_IMAGE: func()ReceivedMsg { return &ImageMsg{} },
MT_VOICE: func()ReceivedMsg { return &VoiceMsg{} },
MT_VIDEO: func()ReceivedMsg { return &VideoMsg{} },
MT_SHORTVIDEO: func()ReceivedMsg { return &VideoMsg{} },
MT_LOCATION: func()ReceivedMsg { return &LocationMsg{} },
MT_LINK: func()ReceivedMsg { return &LinkMsg{} },
}
// eventType => ReceivedMsg
var newEvents = map[string]func()ReceivedMsg {
ET_VIEW: func()ReceivedMsg { return &ViewEvent{} },
ET_CLICK: func()ReceivedMsg { return &ClickEvent{} },
ET_SUBSCRIBE: func()ReceivedMsg { return &SubscribeEvent{} },
ET_UNSUBSCRIBE:func()ReceivedMsg { return &SubscribeEvent{} },
ET_SCAN: func()ReceivedMsg { return &SubscribeEvent{} },
ET_WHERE: func()ReceivedMsg { return &WhereEvent{} },
ET_LOCATION: func()ReceivedMsg { return &LocationEvent{} },
ET_PIC_SYSPHOTO: func()ReceivedMsg { return &PhotoEvent{} },
ET_PIC_PHOTO_OR_ALBUM: func()ReceivedMsg { return &PhotoEvent{} },
ET_PIC_WEIXIN: func()ReceivedMsg { return &PhotoEvent{} },
ET_SCANCODE_WAITMSG: func()ReceivedMsg { return &ScancodeEvent{} },
ET_SCANCODE_PUSH: func()ReceivedMsg { return &ScancodeEvent{} },
ET_MASSSENDJOBFINISH: func()ReceivedMsg { return &MassSentEvent{} },
ET_TEMPLATESENDJOBFINISH: func()ReceivedMsg { return &TemplateSentEvent{} },
}
// 消息/事件主处理流程:分析消息内容、根据消息类型触发消息处理函数、返回结果消息
func (p *WxAppIdMsgParser) getReply(msgBody []byte) ([]byte, error) {
msg := etree.NewDocument()
err := msg.ReadFromBytes(msgBody)
if err != nil {
return SUCCESS_TEXT, err
}
root := msg.SelectElement("xml")
msgType, _ := _getText(root, "MsgType")
var eventType string
var replyMsg ReplyMsg
var receivedMsg ReceivedMsg
if msgType != MT_EVENT {
if newMessge, ok := newMessages[msgType]; ok {
receivedMsg = newMessge()
receivedMsg.parse(root)
replyMsg = p.handleReceivedMessage(receivedMsg, msgType)
} else {
return SUCCESS_TEXT, fmt.Errorf("under implementation for msg type: %s", msgType)
}
} else {
eventType, _ = _getText(root, "Event")
if newEvent, ok := newEvents[eventType]; ok {
receivedMsg = newEvent()
receivedMsg.parse(root)
replyMsg = p.handleReceivedEvent(receivedMsg, eventType)
} else {
return SUCCESS_TEXT, fmt.Errorf("under implementation for event type: %s", eventType)
}
}
if replyMsg == nil {
return SUCCESS_TEXT, nil
}
return replyMsg.ToXML(), nil
}
type _replyMsg struct {
reply []byte
err error
}
type _reqMsg struct {
msgBody []byte
replyChan chan *_replyMsg
}
type WxAppIdMsgParser struct {
wxParams *wxconf.WxParamsT
msgChan chan *_reqMsg
messageHandlers map[string]FnMessageHandler
eventHandlers map[string]FnMessageHandler
}
// 消息解析线程,被GetReply()触发,通过getReply()完成实际的消息处理
func (p *WxAppIdMsgParser) msgParser() {
for {
reqMsg := <-p.msgChan
msgBody, replyChan := reqMsg.msgBody, reqMsg.replyChan
reply, err := p.getReply(msgBody)
replyChan <- &_replyMsg{reply, err}
}
}
// 初始化应用时启动若干个消息解析线程
func StartWxMsgParsers(params *wxconf.WxParamsT, workNum int) *WxAppIdMsgParser {
p := &WxAppIdMsgParser{}
if params == nil {
p.wxParams = &wxconf.WxParams
} else {
p.wxParams = params
}
p.RegisterWxMsgHandler(MsgHandler) // set default msg handler.
p.msgChan = make(chan *_reqMsg, workNum)
for i:=0; i<workNum; i++ {
go p.msgParser()
}
return p
}
// 根据消息体获取返回消息
func (p *WxAppIdMsgParser) GetReply(msgBody []byte) ([]byte, error) {
replyChan := make(chan *_replyMsg)
p.msgChan <- &_reqMsg{msgBody, replyChan}
replyMsg := <-replyChan
close(replyChan)
return replyMsg.reply, replyMsg.err
}
func getEncryptedMsg(body []byte) (string, error) {
msg := etree.NewDocument()
if err := msg.ReadFromBytes(body); err != nil {
return "", err
}
root := msg.SelectElement("xml")
return _getText(root, "Encrypt")
}
// 从GetMessageBody()独立出来,可以通过各种方式调用,方便调试
func parseMessageBody(wxParams *wxconf.WxParamsT, u *url.URL, body []byte) ([]byte, string, string, error) {
query := u.Query()
encrypt_type := query.Get("encrypt_type")
if encrypt_type == "" {
return body, "", "", nil
} else if encrypt_type == "aes" {
eBody, err := getEncryptedMsg(body)
if err != nil {
return nil, "", "", err
}
// signautre args are checked in signatrue_checker, so just get them here
args := make([]string, len(MustSignatureArgs))
for i, arg := range MustSignatureArgs {
args[i] = query.Get(arg)
}
msg_signature := query.Get("msg_signature")
msg, err := decryptMsg(wxParams, eBody, msg_signature, args[TIMESTAMP], args[NONCE])
if err != nil {
return nil, "", "", err
}
wxlog.Logf("plain msg: %s\n", string(msg))
return msg, args[TIMESTAMP], args[NONCE], nil
} else {
return nil, "", "", fmt.Errorf("unsupported encrypted method")
}
}
// 使用缺省配置解析消息,可以用于调试
func ParseMessageBody(u *url.URL, body []byte) ([]byte, string, string, error) {
return parseMessageBody(&wxconf.WxParams, u, body)
}
// 获取服务号收到的消息参数,返回 (消息体, 时间戳, nonce, error)
func (p *WxAppIdMsgParser) GetMessageBody(r *http.Request) ([]byte, string, string, error) {
if r.Body == nil {
return nil, "", "", fmt.Errorf("body expected")
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, "", "", err
}
r.Body.Close()
wxlog.Logf("body: %s\n", string(body))
return parseMessageBody(p.wxParams, r.URL, body)
}