/
jarvismsgtask.go
145 lines (115 loc) · 3.67 KB
/
jarvismsgtask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package jarviscore
import (
"time"
jarvisbase "github.com/zhs007/jarviscore/base"
pb "github.com/zhs007/jarviscore/proto"
"go.uber.org/zap"
)
const (
// JarvisResultTypeSend - send
JarvisResultTypeSend = 1
// JarvisResultTypeReply - reply
JarvisResultTypeReply = 2
// JarvisResultTypeLocalError - error
JarvisResultTypeLocalError = 3
// JarvisResultTypeRemoved - removed
JarvisResultTypeRemoved = 5
// JarvisResultTypeLocalErrorEnd - error & end
JarvisResultTypeLocalErrorEnd = 6
)
// NormalMsgTaskInfo - normal message task info
type NormalMsgTaskInfo struct {
Msg *pb.JarvisMsg
ReplyStream *JarvisMsgReplyStream
OnResult FuncOnProcMsgResult
}
// JarvisMsgInfo - JarvisMsg information
type JarvisMsgInfo struct {
JarvisResultType int `json:"jarvisresulttype"`
Msg *pb.JarvisMsg `json:"msg"`
Err error `json:"err"`
}
// IsEnd - is end msg
func (jmi *JarvisMsgInfo) IsEnd() bool {
return jmi.Msg != nil && jmi.Msg.MsgType == pb.MSGTYPE_REPLY2 && jmi.Msg.ReplyType == pb.REPLYTYPE_END
}
// IsErrorEnd - is error end msg
func (jmi *JarvisMsgInfo) IsErrorEnd() bool {
return jmi.JarvisResultType == JarvisResultTypeLocalErrorEnd
}
// IsEndOrIGI - is end msg or IGOTIT
func (jmi *JarvisMsgInfo) IsEndOrIGI() bool {
return jmi.Msg != nil && jmi.Msg.MsgType == pb.MSGTYPE_REPLY2 &&
(jmi.Msg.ReplyType == pb.REPLYTYPE_END || jmi.Msg.ReplyType == pb.REPLYTYPE_IGOTIT)
}
// StreamMsgTaskInfo - stream message task info
type StreamMsgTaskInfo struct {
Msgs []JarvisMsgInfo
ReplyStream *JarvisMsgReplyStream
OnResult FuncOnProcMsgResult
}
// JarvisMsgTask - jarvis message task
type JarvisMsgTask struct {
Normal *NormalMsgTaskInfo
Stream *StreamMsgTaskInfo
}
// JarvisMsgReplyStream - reply JarvisMsg stream
type JarvisMsgReplyStream struct {
msgs []*pb.JarvisMsg
procMsg pb.JarvisCoreServ_ProcMsgServer
isSent bool
}
// NewJarvisMsgReplyStream - new JarvisMsgReplyStream
func NewJarvisMsgReplyStream(procMsg pb.JarvisCoreServ_ProcMsgServer) *JarvisMsgReplyStream {
return &JarvisMsgReplyStream{
procMsg: procMsg,
isSent: false,
}
}
// ReplyMsg - reply JarvisMsg
func (stream *JarvisMsgReplyStream) ReplyMsg(jn JarvisNode, sendmsg *pb.JarvisMsg) error {
if sendmsg == nil {
jarvisbase.Warn("JarvisMsgReplyStream.ReplyMsg",
zap.Error(ErrInvalidJarvisMsgReplyStreamSendMsg))
return ErrInvalidJarvisMsgReplyStreamSendMsg
}
if stream.procMsg != nil {
sendmsg.CurTime = time.Now().Unix()
err := SignJarvisMsg(jn.GetCoreDB().GetPrivateKey(), sendmsg)
if err != nil {
jarvisbase.Warn("JarvisMsgReplyStream.ReplyMsg:SignJarvisMsg", zap.Error(err))
return err
}
if stream.procMsg != nil {
err = stream.procMsg.Send(sendmsg)
if err != nil {
jarvisbase.Warn("JarvisMsgReplyStream.ReplyMsg:procMsg.sendmsg", zap.Error(err))
return err
}
}
return nil
}
if stream.isSent {
jarvisbase.Warn("JarvisMsgReplyStream.ReplyMsg",
zap.Error(ErrJarvisMsgReplyStreamSent))
return ErrJarvisMsgReplyStreamSent
}
if len(stream.msgs) > 0 {
if stream.msgs[0].DestAddr != sendmsg.DestAddr {
jarvisbase.Warn("JarvisMsgReplyStream.ReplyMsg",
zap.Error(ErrInvalidJarvisMsgReplyStreamDestAddr))
return ErrInvalidJarvisMsgReplyStreamDestAddr
}
if stream.msgs[0].ReplyMsgID != sendmsg.ReplyMsgID {
jarvisbase.Warn("JarvisMsgReplyStream.ReplyMsg",
zap.Error(ErrInvalidJarvisMsgReplyStreamReplyMsgID))
return ErrInvalidJarvisMsgReplyStreamReplyMsgID
}
}
stream.msgs = append(stream.msgs, sendmsg)
if sendmsg.MsgType == pb.MSGTYPE_REPLY2 && sendmsg.ReplyType == pb.REPLYTYPE_END {
jn.SendStreamMsg(sendmsg.DestAddr, stream.msgs, nil)
stream.isSent = true
}
return nil
}