/
txs_loop.go
42 lines (35 loc) · 998 Bytes
/
txs_loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Copyright (c) 2020 The Meter.io developers
// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying
// file LICENSE or <https://www.gnu.org/licenses/lgpl-3.0.html>
package comm
import (
"github.com/meterio/meter-pov/comm/proto"
"github.com/meterio/meter-pov/txpool"
)
func (c *Communicator) txsLoop() {
txEvCh := make(chan *txpool.TxEvent, 10)
sub := c.txPool.SubscribeTxEvent(txEvCh)
defer sub.Unsubscribe()
for {
select {
case <-c.ctx.Done():
return
case txEv := <-txEvCh:
if txEv.Executable != nil && *txEv.Executable {
tx := txEv.Tx
peers := c.peerSet.Slice().Filter(func(p *Peer) bool {
return !p.IsTransactionKnown(tx.ID())
})
for _, peer := range peers {
peer := peer
peer.MarkTransaction(tx.ID())
c.goes.Go(func() {
if err := proto.NotifyNewTx(c.ctx, peer, tx); err != nil {
peer.logger.Debug("failed to broadcast tx", "err", err)
}
})
}
}
}
}
}