Skip to content

Commit

Permalink
chore(dot/network): use sync.Pool for network message buffers (ChainS…
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored and timwu20 committed Dec 6, 2021
1 parent e99e4b4 commit 49a5196
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 12 deletions.
2 changes: 2 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type Config struct {

// telemetryInterval how often to send telemetry metrics
telemetryInterval time.Duration

noPreAllocate bool // internal option
}

// build checks the configuration, sets up the private key for the network service,
Expand Down
12 changes: 7 additions & 5 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

var errCannotValidateHandshake = errors.New("failed to validate handshake")

var maxHandshakeSize = unsafe.Sizeof(BlockAnnounceHandshake{}) //nolint
const maxHandshakeSize = unsafe.Sizeof(BlockAnnounceHandshake{}) //nolint

// Handshake is the interface all handshakes for notifications protocols must implement
type Handshake interface {
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

hs, err := readHandshake(stream, decodeBlockAnnounceHandshake)
hs, err := s.readHandshake(stream, decodeBlockAnnounceHandshake)
if err != nil {
logger.Trace("failed to read handshake", "protocol", info.protocolID, "peer", peer, "error", err)
_ = stream.Close()
Expand Down Expand Up @@ -294,9 +294,11 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
}
}

func readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) {
msgBytes := make([]byte, maxHandshakeSize)
tot, err := readStream(stream, msgBytes)
func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) {
msgBytes := s.bufPool.get()
defer s.bufPool.put(&msgBytes)

tot, err := readStream(stream, msgBytes[:])
if err != nil {
return nil, err
}
Expand Down
57 changes: 57 additions & 0 deletions dot/network/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2019 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.

package network

// sizedBufferPool is a pool of buffers used for reading from streams
type sizedBufferPool struct {
c chan *[maxMessageSize]byte
}

func newSizedBufferPool(min, max int) (bp *sizedBufferPool) {
bufferCh := make(chan *[maxMessageSize]byte, max)

for i := 0; i < min; i++ {
buf := [maxMessageSize]byte{}
bufferCh <- &buf
}

return &sizedBufferPool{
c: bufferCh,
}
}

// get gets a buffer from the sizedBufferPool, or creates a new one if none are
// available in the pool. Buffers have a pre-allocated capacity.
func (bp *sizedBufferPool) get() [maxMessageSize]byte {
var buff *[maxMessageSize]byte
select {
case buff = <-bp.c:
// reuse existing buffer
default:
// create new buffer
buff = &[maxMessageSize]byte{}
}
return *buff
}

// put returns the given buffer to the sizedBufferPool.
func (bp *sizedBufferPool) put(b *[maxMessageSize]byte) {
select {
case bp.c <- b:
default: // Discard the buffer if the pool is full.
}
}
26 changes: 20 additions & 6 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"

maxMessageSize = 1024 * 1024 // 1mb for now
)

var (
Expand All @@ -70,6 +72,7 @@ type Service struct {
mdns *mdns
gossip *gossip
syncQueue *syncQueue
bufPool *sizedBufferPool

notificationsProtocols map[byte]*notificationsProtocol // map of sub-protocol msg ID to protocol info
notificationsMu sync.RWMutex
Expand Down Expand Up @@ -130,6 +133,18 @@ func NewService(cfg *Config) (*Service, error) {
return nil, err
}

// pre-allocate pool of buffers used to read from streams.
// initially allocate as many buffers as liekly necessary which is the number inbound streams we will have,
// which should equal average number of peers times the number of notifications protocols, which is currently 3.
var bufPool *sizedBufferPool
if cfg.noPreAllocate {
bufPool = &sizedBufferPool{
c: make(chan *[maxMessageSize]byte, cfg.MaxPeers*3),
}
} else {
bufPool = newSizedBufferPool((cfg.MaxPeers-cfg.MinPeers)*3/2, (cfg.MaxPeers+1)*3)
}

network := &Service{
ctx: ctx,
cancel: cancel,
Expand All @@ -146,6 +161,7 @@ func NewService(cfg *Config) (*Service, error) {
lightRequest: make(map[peer.ID]struct{}),
telemetryInterval: cfg.telemetryInterval,
closeCh: make(chan interface{}),
bufPool: bufPool,
}

network.syncQueue = newSyncQueue(network)
Expand Down Expand Up @@ -509,14 +525,12 @@ func isInbound(stream libp2pnetwork.Stream) bool {
}

func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) {
var (
maxMessageSize uint64 = maxBlockResponseSize // TODO: determine actual max message size
msgBytes = make([]byte, maxMessageSize)
peer = stream.Conn().RemotePeer()
)
peer := stream.Conn().RemotePeer()
msgBytes := s.bufPool.get()
defer s.bufPool.put(&msgBytes)

for {
tot, err := readStream(stream, msgBytes)
tot, err := readStream(stream, msgBytes[:])
if err == io.EOF {
continue
} else if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
cfg.Syncer = newMockSyncer()
}

cfg.noPreAllocate = true

srvc, err := NewService(cfg)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er
return nil
}

var (
const (
blockRequestSize uint32 = 128
blockRequestBufferSize int = 6
blockResponseBufferSize int = 6
Expand Down

0 comments on commit 49a5196

Please sign in to comment.