Skip to content

Commit

Permalink
pushtx: introduce pushtx package with Broadcaster
Browse files Browse the repository at this point in the history
In this commit, we introduce a new package: pushtx. This package will be
responsible for reliably propagating transactions throughout the
network. This is done by continuously broadcasting unconfirmed
transactions to peers upon every new update at the tip of the chain.
We'll continue doing this until we receive an acknowledgment from _all_
of our peers that the transaction has confirmed on-chain. This implies
that our trust model should include one well-behaved peer to ensure
transactions actually propagate throughout the network.

A new set of errors have also been introduced in order to introduce more
granularity to external callers. This is done by analyzing the contents
of a peer's reject message for a transaction we're attempting to
broadcast. Currently, four different cases exist:

  1. A transaction is deemed by a peer as invalid.
  2. A transaction is deemed by a peer as having an insufficient fee.
  3. A transaction is deemed by a peer as already being in its mempool.
  4. A transaction is deemed by a peer as already being included in the
     chain.
  • Loading branch information
wpaulino authored and Roasbeef committed Mar 9, 2019
1 parent e3bf0a7 commit 294dd75
Show file tree
Hide file tree
Showing 5 changed files with 696 additions and 0 deletions.
207 changes: 207 additions & 0 deletions pushtx/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package pushtx

import (
"errors"
"fmt"
"sync"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightninglabs/neutrino/blockntfns"
)

var (
// ErrBroadcastStopped is an error returned when we attempt to process a
// request to broadcast a transaction but the Broadcaster has already
// been stopped.
ErrBroadcasterStopped = errors.New("broadcaster has been stopped")
)

// broadcastReq is an internal message the Broadcaster will use to process
// transaction broadcast requests.
type broadcastReq struct {
tx *wire.MsgTx
errChan chan error
}

// Config contains all of the external dependencies required for the Broadcaster
// to properly carry out its duties.
type Config struct {
// Broadcast broadcasts a transaction to the network. We expect certain
// BroadcastError's to be returned to handle special cases, namely
// errors with the codes Mempool and Confirmed.
Broadcast func(*wire.MsgTx) error

// SubscribeBlocks returns a block subscription that delivers block
// notifications in order. This will be used to rebroadcast all
// transactions once a new block arrives.
SubscribeBlocks func() (*blockntfns.Subscription, error)
}

// Broadcaster is a subsystem responsible for reliably broadcasting transactions
// to the network. Each transaction will be rebroadcast upon every new block
// being connected/disconnected to/from the chain.
type Broadcaster struct {
start sync.Once
stop sync.Once

cfg Config

// broadcastReqs is a channel through which new transaction broadcast
// requests from external callers will be streamed through.
broadcastReqs chan *broadcastReq

// transactions is the set of transactions we have broadcast so far.
transactions map[chainhash.Hash]*wire.MsgTx

quit chan struct{}
wg sync.WaitGroup
}

// NewBroadcaster creates a new Broadcaster backed by the given config.
func NewBroadcaster(cfg *Config) *Broadcaster {
b := &Broadcaster{
cfg: *cfg,
broadcastReqs: make(chan *broadcastReq),
transactions: make(map[chainhash.Hash]*wire.MsgTx),
quit: make(chan struct{}),
}

return b
}

// Start starts all of the necessary steps for the Broadcaster to begin properly
// carrying out its duties.
func (b *Broadcaster) Start() error {
var err error
b.start.Do(func() {
sub, err := b.cfg.SubscribeBlocks()
if err != nil {
err = fmt.Errorf("unable to subscribe for block "+
"notifications: %v", err)
return
}

b.wg.Add(1)
go b.broadcastHandler(sub)
})
return err
}

// Stop halts the Broadcaster from rebroadcasting pending transactions.
func (b *Broadcaster) Stop() {
b.stop.Do(func() {
close(b.quit)
b.wg.Wait()
})
}

// broadcastHandler is the main event handler of the Broadcaster responsible for
// handling new broadcast requests, rebroadcasting transactions upon every new
// block, etc.
//
// NOTE: This must be run as a goroutine.
func (b *Broadcaster) broadcastHandler(sub *blockntfns.Subscription) {
defer b.wg.Done()
defer sub.Cancel()

for {
select {
// A new broadcast request was submitted by an external caller.
case req := <-b.broadcastReqs:
req.errChan <- b.handleBroadcastReq(req)

// A new block notification has arrived, so we'll rebroadcast
// all of our pending transactions.
case _, ok := <-sub.Notifications:
if !ok {
log.Warn("Unable to rebroadcast transactions: " +
"block subscription was canceled")
continue
}

b.rebroadcast()

case <-b.quit:
return
}
}
}

// handleBroadcastReq handles a new external request to reliably broadcast a
// transaction to the network.
func (b *Broadcaster) handleBroadcastReq(req *broadcastReq) error {
err := b.cfg.Broadcast(req.tx)
if err != nil && !IsBroadcastError(err, Mempool) {
return err
}

b.transactions[req.tx.TxHash()] = req.tx

return nil
}

// rebroadcast rebroadcasts all of the currently pending transactions. Care has
// been taken to ensure that the transactions are sorted in their dependency
// order to prevent peers from deeming our transactions as invalid due to
// broadcasting them before their pending dependencies.
func (b *Broadcaster) rebroadcast() {
if len(b.transactions) == 0 {
return
}

sortedTxs := wtxmgr.DependencySort(b.transactions)
for _, tx := range sortedTxs {
err := b.cfg.Broadcast(tx)
switch {
// If the transaction has already confirmed on-chain, we can
// stop broadcasting it further.
//
// TODO(wilmer); This should ideally be implemented by checking
// the chain ourselves rather than trusting our peers.
case IsBroadcastError(err, Confirmed):
delete(b.transactions, tx.TxHash())
continue

// If the transaction already exists within our peers' mempool,
// we'll continue to rebroadcast it to ensure it actually
// propagates throughout the network.
//
// TODO(wilmer): Rate limit peers that have already accepted our
// transaction into their mempool to prevent resending to them
// every time.
case IsBroadcastError(err, Mempool):
continue

case err != nil:
log.Errorf("Unable to rebroadcast transaction %v: %v",
tx.TxHash(), err)
continue
}
}
}

// Broadcast submits a request to the Broadcaster to reliably broadcast the
// given transaction. An error won't be returned if the transaction already
// exists within the mempool. Any transaction broadcast through this method will
// be rebroadcast upon every change of the tip of the chain.
func (b *Broadcaster) Broadcast(tx *wire.MsgTx) error {
errChan := make(chan error, 1)

select {
case b.broadcastReqs <- &broadcastReq{
tx: tx,
errChan: errChan,
}:
case <-b.quit:
return ErrBroadcasterStopped
}

select {
case err := <-errChan:
return err
case <-b.quit:
return ErrBroadcasterStopped
}
}
180 changes: 180 additions & 0 deletions pushtx/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package pushtx

import (
"math/rand"
"testing"

"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/blockntfns"
)

// createTx is a helper method to create random transactions that spend
// particular inputs.
func createTx(t *testing.T, numOutputs int, inputs ...wire.OutPoint) *wire.MsgTx {
t.Helper()

tx := wire.NewMsgTx(1)
if len(inputs) == 0 {
tx.AddTxIn(&wire.TxIn{})
} else {
for _, input := range inputs {
tx.AddTxIn(&wire.TxIn{PreviousOutPoint: input})
}
}
for i := 0; i < numOutputs; i++ {
var pkScript [32]byte
if _, err := rand.Read(pkScript[:]); err != nil {
t.Fatal(err)
}

tx.AddTxOut(&wire.TxOut{
Value: rand.Int63(),
PkScript: pkScript[:],
})
}

return tx
}

// TestBroadcaster ensures that we can broadcast transactions while it is
// active.
func TestBroadcaster(t *testing.T) {
t.Parallel()

cfg := &Config{
Broadcast: func(*wire.MsgTx) error {
return nil
},
SubscribeBlocks: func() (*blockntfns.Subscription, error) {
return &blockntfns.Subscription{
Notifications: make(chan blockntfns.BlockNtfn),
Cancel: func() {},
}, nil
},
}

broadcaster := NewBroadcaster(cfg)

if err := broadcaster.Start(); err != nil {
t.Fatalf("unable to start broadcaster: %v", err)
}

tx := &wire.MsgTx{}
if err := broadcaster.Broadcast(tx); err != nil {
t.Fatalf("unable to broadcast transaction: %v", err)
}

broadcaster.Stop()

if err := broadcaster.Broadcast(tx); err != ErrBroadcasterStopped {
t.Fatalf("expected ErrBroadcasterStopped, got %v", err)
}
}

// TestRebroadcast ensures that we properly rebroadcast transactions upon every
// new block. Transactions that have confirmed should no longer be broadcast.
func TestRebroadcast(t *testing.T) {
t.Parallel()

const numTxs = 3

// We'll start by setting up the broadcaster with channels to mock the
// behavior of its external dependencies.
broadcastChan := make(chan *wire.MsgTx, numTxs)
ntfnChan := make(chan blockntfns.BlockNtfn)

cfg := &Config{
Broadcast: func(tx *wire.MsgTx) error {
broadcastChan <- tx
return nil
},
SubscribeBlocks: func() (*blockntfns.Subscription, error) {
return &blockntfns.Subscription{
Notifications: ntfnChan,
Cancel: func() {},
}, nil
},
}

broadcaster := NewBroadcaster(cfg)

if err := broadcaster.Start(); err != nil {
t.Fatalf("unable to start broadcaster: %v", err)
}
defer broadcaster.Stop()

// We'll then create some test transactions such that they all depend on
// the previous one, creating a dependency chain. We'll do this to
// ensure transactions are rebroadcast in the order of their
// dependencies.
txs := make([]*wire.MsgTx, 0, numTxs)
for i := 0; i < numTxs; i++ {
var tx *wire.MsgTx
if i == 0 {
tx = createTx(t, 1)
} else {
prevOut := wire.OutPoint{
Hash: txs[i-1].TxHash(),
Index: 0,
}
tx = createTx(t, 1, prevOut)
}
txs = append(txs, tx)
}

// assertBroadcastOrder is a helper closure to ensure that the
// transactions rebroadcast match the expected order.
assertBroadcastOrder := func(expectedOrder []*wire.MsgTx) {
t.Helper()

for i := 0; i < len(expectedOrder); i++ {
tx := <-broadcastChan
if tx != expectedOrder[i] {
t.Fatalf("expected transaction %v, got %v",
expectedOrder[i].TxHash(), tx.TxHash())
}
}
}

// Broadcast the transactions. We'll be broadcasting them in order so
// assertBroadcastOrder is more of a sanity check to ensure that all of
// the transactions were actually broadcast.
for _, tx := range txs {
if err := broadcaster.Broadcast(tx); err != nil {
t.Fatalf("unable to broadcast transaction %v: %v",
tx.TxHash(), err)
}
}

assertBroadcastOrder(txs)

// Now, we'll modify the Broadcast method to mark the first transaction
// as confirmed, and the second as it being accepted into the mempool.
broadcaster.cfg.Broadcast = func(tx *wire.MsgTx) error {
broadcastChan <- tx
if tx == txs[0] {
return &BroadcastError{Code: Confirmed}
}
if tx == txs[1] {
return &BroadcastError{Code: Mempool}
}
return nil
}

// Trigger a new block notification to rebroadcast the transactions.
ntfnChan <- blockntfns.NewBlockConnected(wire.BlockHeader{}, 100)

// They should all be broadcast in their expected dependency order.
assertBroadcastOrder(txs)

// Trigger another block notification simulating a reorg in the chain.
// The transactions should be rebroadcast again to ensure they properly
// propagate throughout the network.
ntfnChan <- blockntfns.NewBlockDisconnected(
wire.BlockHeader{}, 100, wire.BlockHeader{},
)

// This time however, only the last two transactions will be rebroadcast
// since the first one confirmed in the previous rebroadcast attempt.
assertBroadcastOrder(txs[1:])
}
Loading

0 comments on commit 294dd75

Please sign in to comment.