Skip to content

Commit

Permalink
Optimise RW message pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Sharp authored and Alexey Sharp committed Mar 21, 2021
1 parent 00ee4fb commit 6936111
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Msg struct {
Code uint64
Size uint32 // Size of the raw payload
Payload io.Reader
consumed chan<- struct{}
ReceivedAt time.Time

meterCap Cap // Protocol name and version for egress metering
Expand Down Expand Up @@ -127,17 +128,12 @@ func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error {
type eofSignal struct {
wrapped io.Reader
count uint32 // number of bytes left
eof chan<- struct{}
}

// note: when using eofSignal to detect whether a message payload
// has been read, Read might not be called for zero sized messages.
func (r *eofSignal) Read(buf []byte) (int, error) {
if r.count == 0 {
if r.eof != nil {
r.eof <- struct{}{}
r.eof = nil
}
return 0, io.EOF
}

Expand All @@ -147,10 +143,6 @@ func (r *eofSignal) Read(buf []byte) (int, error) {
}
n, err := r.wrapped.Read(buf[:max])
r.count -= uint32(n)
if (err != nil || r.count == 0) && r.eof != nil {
r.eof <- struct{}{} // tell Peer that msg has been consumed
r.eof = nil
}
return n, err
}

Expand Down Expand Up @@ -185,7 +177,8 @@ type MsgPipeRW struct {
func (p *MsgPipeRW) WriteMsg(msg Msg) error {
if atomic.LoadInt32(p.closed) == 0 {
consumed := make(chan struct{}, 1)
msg.Payload = &eofSignal{msg.Payload, msg.Size, consumed}
msg.consumed = consumed
msg.Payload = &eofSignal{msg.Payload, msg.Size}
select {
case p.w <- msg:
if msg.Size > 0 {
Expand All @@ -207,6 +200,7 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) {
if atomic.LoadInt32(p.closed) == 0 {
select {
case msg := <-p.r:
msg.consumed <- struct{}{}
return msg, nil
case <-p.closing:
}
Expand Down

0 comments on commit 6936111

Please sign in to comment.