-
Notifications
You must be signed in to change notification settings - Fork 9
/
channel.go
161 lines (145 loc) · 4.01 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package connection
import (
"io"
"sync/atomic"
"time"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
)
// ChannelDescriptor is the setting of channel
type ChannelDescriptor struct {
ID byte
Priority int
SendQueueCapacity int
RecvBufferCapacity int
RecvMessageCapacity int
}
// FillDefaults set the channel config if empty
func (chDesc *ChannelDescriptor) FillDefaults() {
if chDesc.SendQueueCapacity == 0 {
chDesc.SendQueueCapacity = defaultSendQueueCapacity
}
if chDesc.RecvBufferCapacity == 0 {
chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
}
if chDesc.RecvMessageCapacity == 0 {
chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
}
}
type channel struct {
conn *MConnection
desc *ChannelDescriptor
id byte
sendQueue chan []byte
sendQueueSize int32 // atomic.
recving []byte
sending []byte
priority int
recentlySent int64 // exponential moving average
}
func newChannel(conn *MConnection, desc *ChannelDescriptor) *channel {
desc.FillDefaults()
if desc.Priority <= 0 {
cmn.PanicSanity("Channel default priority must be a postive integer")
}
return &channel{
conn: conn,
desc: desc,
id: desc.ID,
sendQueue: make(chan []byte, desc.SendQueueCapacity),
recving: make([]byte, 0, desc.RecvBufferCapacity),
priority: desc.Priority,
}
}
// Goroutine-safe
// Use only as a heuristic.
func (ch *channel) canSend() bool {
return ch.loadSendQueueSize() < defaultSendQueueCapacity
}
// Returns true if any msgPackets are pending to be sent.
// Call before calling nextMsgPacket()
// Goroutine-safe
func (ch *channel) isSendPending() bool {
if len(ch.sending) == 0 {
if len(ch.sendQueue) == 0 {
return false
}
ch.sending = <-ch.sendQueue
}
return true
}
// Goroutine-safe
func (ch *channel) loadSendQueueSize() (size int) {
return int(atomic.LoadInt32(&ch.sendQueueSize))
}
// Creates a new msgPacket to send.
// Not goroutine-safe
func (ch *channel) nextMsgPacket() msgPacket {
packet := msgPacket{
ChannelID: byte(ch.id),
Bytes: ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))],
}
if len(ch.sending) <= maxMsgPacketPayloadSize {
packet.EOF = byte(0x01)
ch.sending = nil
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
} else {
packet.EOF = byte(0x00)
ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
}
return packet
}
// Handles incoming msgPackets. Returns a msg bytes if msg is complete.
// Not goroutine-safe
func (ch *channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
return nil, wire.ErrBinaryReadOverflow
}
ch.recving = append(ch.recving, packet.Bytes...)
if packet.EOF == byte(0x01) {
msgBytes := ch.recving
ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
return msgBytes, nil
}
return nil, nil
}
// Queues message to send to this channel.
// Goroutine-safe
// Times out (and returns false) after defaultSendTimeout
func (ch *channel) sendBytes(bytes []byte) bool {
select {
case ch.sendQueue <- bytes:
atomic.AddInt32(&ch.sendQueueSize, 1)
return true
case <-time.After(defaultSendTimeout):
return false
}
}
// Queues message to send to this channel.
// Nonblocking, returns true if successful.
// Goroutine-safe
func (ch *channel) trySendBytes(bytes []byte) bool {
select {
case ch.sendQueue <- bytes:
atomic.AddInt32(&ch.sendQueueSize, 1)
return true
default:
return false
}
}
// Writes next msgPacket to w.
// Not goroutine-safe
func (ch *channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
packet := ch.nextMsgPacket()
wire.WriteByte(packetTypeMsg, w, &n, &err)
wire.WriteBinary(packet, w, &n, &err)
if err == nil {
ch.recentlySent += int64(n)
}
return
}
// Call this periodically to update stats for throttling purposes.
// Not goroutine-safe
func (ch *channel) updateStats() {
ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
}