Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: batch transactions on broadcast #681

Merged
merged 2 commits into from
Mar 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/core/mempool/mem_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,9 @@ func (mp *Pool) Remove(hash util.Uint256) {
// drop part of the mempool that is now invalid after the block acceptance.
func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) {
mp.lock.Lock()
// We expect a lot of changes, so it's easier to allocate a new slice
// rather than move things in an old one.
newVerifiedTxes := make([]*item, 0, mp.capacity)
// We can reuse already allocated slice
// because items are iterated one-by-one in increasing order.
newVerifiedTxes := mp.verifiedTxes[:0]
newInputs := mp.inputs[:0]
newClaims := mp.claims[:0]
for _, itm := range mp.verifiedTxes {
Expand Down
70 changes: 68 additions & 2 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type (
unregister chan peerDrop
quit chan struct{}

transactions chan *transaction.Transaction

consensusStarted *atomic.Bool

log *zap.Logger
Expand Down Expand Up @@ -97,6 +99,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
peers: make(map[Peer]bool),
consensusStarted: atomic.NewBool(false),
log: log,
transactions: make(chan *transaction.Transaction, 64),
}
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
if s.consensusStarted.Load() {
Expand Down Expand Up @@ -174,6 +177,7 @@ func (s *Server) Start(errChan chan error) {

s.discovery.BackFill(s.Seeds...)

go s.broadcastTxLoop()
go s.bQueue.run()
go s.transport.Accept()
setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10))
Expand Down Expand Up @@ -602,7 +606,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
// in the pool.
if s.verifyAndPoolTX(tx) == RelaySucceed {
s.consensus.OnTransaction(tx)
go s.broadcastTX(tx)
s.broadcastTX(tx)
}
return nil
}
Expand Down Expand Up @@ -828,11 +832,73 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {

// broadcastTX broadcasts an inventory message about new transaction.
func (s *Server) broadcastTX(t *transaction.Transaction) {
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()}))
select {
case s.transactions <- t:
case <-s.quit:
}
}

func (s *Server) broadcastTxHashes(hs []util.Uint256) {
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, hs))

// We need to filter out non-relaying nodes, so plain broadcast
// functions don't fit here.
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool {
return p.Handshaked() && p.Version().Relay
})
}

// broadcastTxLoop is a loop for batching and sending
// transactions hashes in an INV payload.
func (s *Server) broadcastTxLoop() {
const (
batchTime = time.Millisecond * 50
batchSize = 32
)

txs := make([]util.Uint256, 0, batchSize)
var timer *time.Timer

timerCh := func() <-chan time.Time {
if timer == nil {
return nil
}
return timer.C
}

broadcast := func() {
s.broadcastTxHashes(txs)
txs = txs[:0]
if timer != nil {
timer.Stop()
}
}

for {
select {
case <-s.quit:
loop:
for {
select {
case <-s.transactions:
default:
break loop
}
}
return
case <-timerCh():
if len(txs) > 0 {
broadcast()
}
fyrchik marked this conversation as resolved.
Show resolved Hide resolved
case tx := <-s.transactions:
if len(txs) == 0 {
timer = time.NewTimer(batchTime)
}

txs = append(txs, tx.Hash())
fyrchik marked this conversation as resolved.
Show resolved Hide resolved
if len(txs) == batchSize {
broadcast()
}
}
}
}