-
Notifications
You must be signed in to change notification settings - Fork 0
/
protocol.go
113 lines (102 loc) · 3.3 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright 2019 The go-vnt Authors
// This file is part of the go-vnt library.
//
// The go-vnt library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-vnt library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-vnt library. If not, see <http://www.gnu.org/licenses/>.
package vntp2p
import (
"encoding/binary"
"encoding/json"
"io"
"time"
inet "github.com/libp2p/go-libp2p-net"
libp2p "github.com/libp2p/go-libp2p-peer"
"github.com/vntchain/go-vnt/log"
"github.com/vntchain/go-vnt/rlp"
)
// 目前依然沿用原有的子协议结构,减少上层的改动
type Protocol struct {
Name string
Version uint
Length uint64
Run func(peer *Peer, rw MsgReadWriter) error
NodeInfo func() interface{}
PeerInfo func(id libp2p.ID) interface{}
}
// HandleStream handle all message which is from anywhere
// 主、被动连接都走的流程
func (server *Server) HandleStream(s inet.Stream) {
// peer信息只获取1次即可
log.Debug("Stream data coming...")
peer := server.GetPeerByRemoteID(s)
if peer == nil {
log.Debug("HandleStream", "remotePeerID", s.Conn().RemotePeer(), "this remote peer is nil, don't handle it")
_ = s.Reset()
return
}
// 发生错误时才会退出
defer func() {
peer.log.Debug("HandleStream reset stream before exit")
peer.Reset()
}()
// stream未关闭则连接正常可持续读取消息
for {
// 读取消息
msgHeaderByte := make([]byte, MessageHeaderLength)
_, err := io.ReadFull(s, msgHeaderByte)
if err != nil {
peer.log.Error("HandleStream", "read msg header error", err)
notifyError(peer.msgers, err)
return
}
bodySize := binary.LittleEndian.Uint32(msgHeaderByte)
msgBodyByte := make([]byte, bodySize)
_, err = io.ReadFull(s, msgBodyByte)
if err != nil {
peer.log.Error("HandleStream", "read msg Body error", err)
notifyError(peer.msgers, err)
return
}
msgBody := &MsgBody{Payload: &rlp.EncReader{}}
err = json.Unmarshal(msgBodyByte, msgBody)
if err != nil {
peer.log.Error("HandleStream", "unmarshal msg Body error", err)
notifyError(peer.msgers, err)
return
}
msgBody.ReceivedAt = time.Now()
// 传递给msger
var msgHeader MsgHeader
copy(msgHeader[:], msgHeaderByte)
msg := Msg{
Header: msgHeader,
Body: *msgBody,
}
if msger, ok := peer.msgers[msgBody.ProtocolID]; ok { // this node support protocolID
// 非阻塞向上层协议传递消息,如果2s还未被读取,认为上层协议有故障
select {
case msger.in <- msg:
peer.log.Trace("HandleStream send message to messager success")
case <-time.NewTimer(time.Second * 2).C:
peer.log.Trace("HandleStream send message to messager timeout")
}
} else {
peer.log.Warn("HandleStream", "receive unknown message", msg)
}
}
}
func notifyError(msgers map[string]*VNTMsger, err error) {
for _, m := range msgers {
m.err <- err
}
}