From b6e6d1aa3c6f45b053401a72b4678a305000ef3d Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Thu, 9 Nov 2017 21:29:23 -0800 Subject: [PATCH 01/11] chainntnfs: Test that chain notifiers handle chain reorgs correctly. Tests are failing for both btcd and neutrino notifiers. --- chainntnfs/interface_test.go | 148 ++++++++++++++++++++++++++++++++++- 1 file changed, 147 insertions(+), 1 deletion(-) diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 65955da4b62..6f9c8a3ab8f 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -19,6 +19,7 @@ import ( "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg" "github.com/roasbeef/btcd/integration/rpctest" + "github.com/roasbeef/btcd/rpcclient" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" @@ -84,7 +85,7 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, t.Fatalf("unable to get current height: %v", err) } - // Now that we have a txid, register a confirmation notiication with + // Now that we have a txid, register a confirmation notification with // the chainntfn source. numConfs := uint32(1) confIntent, err := notifier.RegisterConfirmationsNtfn(txid, numConfs, @@ -961,6 +962,147 @@ func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifie } } +func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, + t *testing.T) { + + // Set up a new miner that we can use to cause a reorg. + miner2, err := rpctest.New(netParams, nil, nil) + if err != nil { + t.Fatalf("unable to create mining node: %v", err) + } + if err := miner2.SetUp(false, 0); err != nil { + t.Fatalf("unable to set up mining node: %v", err) + } + defer miner2.TearDown() + + // We start by connecting the new miner to our original miner, + // such that it will sync to our original chain. + if err := rpctest.ConnectNode(miner, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice := []*rpctest.Harness{miner, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + // The two should be on the same blockheight. + _, nodeHeight1, err := miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + _, nodeHeight2, err := miner2.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + if nodeHeight1 != nodeHeight2 { + t.Fatalf("expected both miners to be on the same height", + nodeHeight1, nodeHeight2) + } + + // We disconnect the two nodes, such that we can start mining on them + // individually without the other one learning about the new blocks. + err = miner.Node.AddNode(miner2.P2PAddress(), rpcclient.ANRemove) + if err != nil { + t.Fatalf("unable to remove node: %v", err) + } + + txid, err := getTestTxId(miner) + if err != nil { + t.Fatalf("unable to create test tx: %v", err) + } + + _, currentHeight, err := miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current height: %v", err) + } + + // Now that we have a txid, register a confirmation notification with + // the chainntfn source. + numConfs := uint32(2) + confIntent, err := notifier.RegisterConfirmationsNtfn(txid, numConfs, + uint32(currentHeight)) + if err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + // Now generate a single block, the transaction should be included. + _, err = miner.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // Transaction only has one confirmation, and the notification is registered + // with 2 confirmations, so we should not be notified yet. + select { + case <-confIntent.Confirmed: + t.Fatal("tx was confirmed unexpectedly") + case <-time.After(1 * time.Second): + } + + // Reorganize transaction out of the chain by generating a longer fork + // from the other miner. The transaction is not included in this fork. + miner2.Node.Generate(2) + + // Reconnect nodes to reach consensus on the longest chain. miner2's chain + // should win and become active on miner1. + if err := rpctest.ConnectNode(miner, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice = []*rpctest.Harness{miner, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + _, nodeHeight1, err = miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + _, nodeHeight2, err = miner2.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + if nodeHeight1 != nodeHeight2 { + t.Fatalf("expected both miners to be on the same height", + nodeHeight1, nodeHeight2) + } + + // Even though there is one block above the height of the block that the + // transaction was included in, it is not the active chain so the + // notification should not be sent. + select { + case <-confIntent.Confirmed: + t.Fatal("tx was confirmed unexpectedly") + case <-time.After(1 * time.Second): + } + + // Now confirm the transaction on the longest chain and verify that we + // receive the notification. + tx, err := miner.Node.GetRawTransaction(txid) + if err != nil { + t.Fatalf("unable to get raw tx: %v", err) + } + + _, err = miner2.Node.SendRawTransaction(tx.MsgTx(), false) + if err != nil { + t.Fatalf("unable to get send tx: %v", err) + } + + _, err = miner.Node.Generate(3) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + select { + case <-confIntent.Confirmed: + case <-time.After(20 * time.Second): + t.Fatalf("confirmation notification never received") + } +} + type testCase struct { name string @@ -1012,6 +1154,10 @@ var ntfnTests = []testCase{ name: "lazy ntfn consumer", test: testLazyNtfnConsumer, }, + { + name: "reorg conf", + test: testReorgConf, + }, } // TestInterfaces tests all registered interfaces with a unified set of tests From 63acefcc3018343a0533079202ac4914a4a97534 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Thu, 9 Nov 2017 15:30:38 -0800 Subject: [PATCH 02/11] chainntnfs/btcd: Handle block disconnects with chainUpdate. This does not implement full handling of block disconnections, but ensures that all chain updates are processed in order. --- chainntnfs/btcdnotify/btcd.go | 115 ++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 48 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 3b3f16b4ebc..249e861f12c 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -35,6 +35,10 @@ var ( type chainUpdate struct { blockHash *chainhash.Hash blockHeight int32 + + // connected is true if this update is a new block and false if it is a + // disconnected block. + connect bool } // txUpdate encapsulates a transaction related notification sent from btcd to @@ -70,8 +74,6 @@ type BtcdNotifier struct { blockEpochClients map[uint64]*blockEpochRegistration - disconnectedBlockHashes chan *blockNtfn - chainUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue @@ -97,8 +99,6 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), confHeap: newConfirmationHeap(), - disconnectedBlockHashes: make(chan *blockNtfn, 20), - chainUpdates: chainntnfs.NewConcurrentQueue(10), txUpdates: chainntnfs.NewConcurrentQueue(10), @@ -192,24 +192,28 @@ func (b *BtcdNotifier) Stop() error { return nil } -// blockNtfn packages a notification of a connected/disconnected block along -// with its height at the time. -type blockNtfn struct { - sha *chainhash.Hash - height int32 -} - // onBlockConnected implements on OnBlockConnected callback for rpcclient. // Ingesting a block updates the wallet's internal utxo state based on the // outputs created and destroyed within each block. func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) { // Append this new chain update to the end of the queue of new chain // updates. - b.chainUpdates.ChanIn() <- &chainUpdate{hash, height} + b.chainUpdates.ChanIn() <- &chainUpdate{ + blockHash: hash, + blockHeight: height, + connect: true, + } } // onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) { + // Append this new chain update to the end of the queue of new chain + // updates. + b.chainUpdates.ChanIn() <- &chainUpdate{ + blockHash: hash, + blockHeight: height, + connect: false, + } } // onRedeemingTx implements on OnRedeemingTx callback for rpcclient. @@ -289,47 +293,62 @@ out: b.blockEpochClients[msg.epochID] = msg } - case staleBlockHash := <-b.disconnectedBlockHashes: - // TODO(roasbeef): re-orgs - // * second channel to notify of confirmation decrementing - // re-org? - // * notify of negative confirmations - chainntnfs.Log.Warnf("Block disconnected from main "+ - "chain: %v", staleBlockHash) - case item := <-b.chainUpdates.ChanOut(): update := item.(*chainUpdate) - currentHeight = update.blockHeight + if update.connect { + if update.blockHeight != currentHeight+1 { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, new height=%d", + currentHeight, update.blockHeight) + } - newBlock, err := b.chainConn.GetBlock(update.blockHash) - if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) - continue - } + currentHeight = update.blockHeight - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.blockHeight, update.blockHash) - - b.notifyBlockEpochs(update.blockHeight, - update.blockHash) - - newHeight := update.blockHeight - for i, tx := range newBlock.Transactions { - // Check if the inclusion of this transaction - // within a block by itself triggers a block - // confirmation threshold, if so send a - // notification. Otherwise, place the - // notification on a heap to be triggered in - // the future once additional confirmations are - // attained. - txSha := tx.TxHash() - b.checkConfirmationTrigger(&txSha, update, i) - } + newBlock, err := b.chainConn.GetBlock(update.blockHash) + if err != nil { + chainntnfs.Log.Errorf("Unable to get block: %v", err) + continue + } - // A new block has been connected to the main - // chain. Send out any N confirmation notifications - // which may have been triggered by this new block. - b.notifyConfs(newHeight) + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + update.blockHeight, update.blockHash) + + b.notifyBlockEpochs(update.blockHeight, + update.blockHash) + + newHeight := update.blockHeight + for i, tx := range newBlock.Transactions { + // Check if the inclusion of this transaction + // within a block by itself triggers a block + // confirmation threshold, if so send a + // notification. Otherwise, place the + // notification on a heap to be triggered in + // the future once additional confirmations are + // attained. + txSha := tx.TxHash() + b.checkConfirmationTrigger(&txSha, update, i) + } + + // A new block has been connected to the main + // chain. Send out any N confirmation notifications + // which may have been triggered by this new block. + b.notifyConfs(newHeight) + } else { + if update.blockHeight != currentHeight { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + currentHeight, update.blockHeight) + } + + currentHeight = update.blockHeight - 1 + + // TODO(roasbeef): re-orgs + // * second channel to notify of confirmation decrementing + // re-org? + // * notify of negative confirmations + chainntnfs.Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", update.blockHeight, update.blockHash) + } case item := <-b.txUpdates.ChanOut(): newSpend := item.(*txUpdate) From 7cddf08908f9d630542deffb7ca54dd74651991d Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 10 Nov 2017 11:01:36 -0800 Subject: [PATCH 03/11] chainntnfs/neutrino: Handle block connects and disconnects in order. --- chainntnfs/neutrinonotify/neutrino.go | 199 ++++++++++++++------------ 1 file changed, 108 insertions(+), 91 deletions(-) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 87d28679433..821252d11a2 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -65,8 +65,7 @@ type NeutrinoNotifier struct { rescanErr <-chan error - newBlocks *chainntnfs.ConcurrentQueue - staleBlocks *chainntnfs.ConcurrentQueue + chainUpdates *chainntnfs.ConcurrentQueue wg sync.WaitGroup quit chan struct{} @@ -96,9 +95,7 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { rescanErr: make(chan error), - newBlocks: chainntnfs.NewConcurrentQueue(10), - - staleBlocks: chainntnfs.NewConcurrentQueue(10), + chainUpdates: chainntnfs.NewConcurrentQueue(10), quit: make(chan struct{}), } @@ -150,8 +147,7 @@ func (n *NeutrinoNotifier) Start() error { n.chainView = n.p2pNode.NewRescan(rescanOptions...) n.rescanErr = n.chainView.Start() - n.newBlocks.Start() - n.staleBlocks.Start() + n.chainUpdates.Start() n.wg.Add(1) go n.notificationDispatcher() @@ -169,8 +165,7 @@ func (n *NeutrinoNotifier) Stop() error { close(n.quit) n.wg.Wait() - n.newBlocks.Stop() - n.staleBlocks.Stop() + n.chainUpdates.Stop() // Notify all pending clients of our shutdown by closing the related // notification channels. @@ -200,6 +195,10 @@ type filteredBlock struct { hash chainhash.Hash height uint32 txns []*btcutil.Tx + + // connected is true if this update is a new block and false if it is a + // disconnected block. + connect bool } // onFilteredBlockConnected is a callback which is executed each a new block is @@ -209,10 +208,11 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. - n.newBlocks.ChanIn() <- &filteredBlock{ - hash: header.BlockHash(), - height: uint32(height), - txns: txns, + n.chainUpdates.ChanIn() <- &filteredBlock{ + hash: header.BlockHash(), + height: uint32(height), + txns: txns, + connect: true, } } @@ -223,9 +223,10 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. - n.staleBlocks.ChanIn() <- &filteredBlock{ - hash: header.BlockHash(), - height: uint32(height), + n.chainUpdates.ChanIn() <- &filteredBlock{ + hash: header.BlockHash(), + height: uint32(height), + connect: false, } } @@ -318,87 +319,40 @@ func (n *NeutrinoNotifier) notificationDispatcher() { n.blockEpochClients[msg.epochID] = msg } - case item := <-n.newBlocks.ChanOut(): - newBlock := item.(*filteredBlock) - - n.heightMtx.Lock() - n.bestHeight = newBlock.height - n.heightMtx.Unlock() - - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - newBlock.height, newBlock.hash) - - // First we'll notify any subscribed clients of the - // block. - n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - - // Next, we'll scan over the list of relevant - // transactions and possibly dispatch notifications for - // confirmations and spends. - for _, tx := range newBlock.txns { - // Check if the inclusion of this transaction - // within a block by itself triggers a block - // confirmation threshold, if so send a - // notification. Otherwise, place the - // notification on a heap to be triggered in - // the future once additional confirmations are - // attained. - mtx := tx.MsgTx() - txIndex := tx.Index() - txSha := mtx.TxHash() - n.checkConfirmationTrigger(&txSha, newBlock, txIndex) - - for i, txIn := range mtx.TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does - // spend an output which we have a - // registered notification for, then - // create a spend summary, finally - // sending off the details to the - // notification subscriber. - if clients, ok := n.spendNotifications[prevOut]; ok { - // TODO(roasbeef): many - // integration tests expect - // spend to be notified within - // the mempool. - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: &prevOut, - SpenderTxHash: &txSha, - SpendingTx: mtx, - SpenderInputIndex: uint32(i), - SpendingHeight: int32(newBlock.height), - } - - for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching "+ - "spend notification for "+ - "outpoint=%v", ntfn.targetOutpoint) - ntfn.spendChan <- spendDetails + case item := <-n.chainUpdates.ChanOut(): + update := item.(*filteredBlock) + if update.connect { + n.heightMtx.Lock() + if update.height != n.bestHeight+1 { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, new height=%d", + n.bestHeight, update.height) + } - // Close spendChan to ensure that any calls to Cancel will not - // block. This is safe to do since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) - } + n.bestHeight = update.height + n.heightMtx.Unlock() - delete(n.spendNotifications, prevOut) - } + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + update.height, update.hash) + err := n.handleBlockConnected(update) + if err != nil { + chainntnfs.Log.Error(err) + } + } else { + n.heightMtx.Lock() + if update.height != n.bestHeight { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + n.bestHeight, update.height) } - } - - // A new block has been connected to the main chain. - // Send out any N confirmation notifications which may - // have been triggered by this new block. - n.notifyConfs(int32(newBlock.height)) - - case item := <-n.staleBlocks.ChanOut(): - staleBlock := item.(*filteredBlock) - chainntnfs.Log.Warnf("Block disconnected from main "+ - "chain: %v", staleBlock.hash) + n.bestHeight = update.height - 1 + n.heightMtx.Unlock() + chainntnfs.Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", update.height, update.hash) + } case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err) @@ -525,6 +479,69 @@ chainScan: return true } +// handleBlocksConnected applies a chain update for a new block. Any watched +// transactions included this block will processed to either send notifications +// now or after numConfirmations confs. +func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { + // First we'll notify any subscribed clients of the block. + n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) + + // Next, we'll scan over the list of relevant transactions and possibly + // dispatch notifications for confirmations and spends. + for _, tx := range newBlock.txns { + // Check if the inclusion of this transaction within a block by itself + // triggers a block confirmation threshold, if so send a notification. + // Otherwise, place the notification on a heap to be triggered in the + // future once additional confirmations are attained. + mtx := tx.MsgTx() + txIndex := tx.Index() + txSha := mtx.TxHash() + n.checkConfirmationTrigger(&txSha, newBlock, txIndex) + + for i, txIn := range mtx.TxIn { + prevOut := txIn.PreviousOutPoint + + // If this transaction indeed does spend an output which we have a + // registered notification for, then create a spend summary, finally + // sending off the details to the notification subscriber. + clients, ok := n.spendNotifications[prevOut] + if !ok { + continue + } + + // TODO(roasbeef): many integration tests expect spend to be + // notified within the mempool. + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: &prevOut, + SpenderTxHash: &txSha, + SpendingTx: mtx, + SpenderInputIndex: uint32(i), + SpendingHeight: int32(newBlock.height), + } + + for _, ntfn := range clients { + chainntnfs.Log.Infof("Dispatching spend notification for "+ + "outpoint=%v", ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails + + // Close spendChan to ensure that any calls to Cancel will not + // block. This is safe to do since the channel is buffered, and + // the message can still be read by the receiver. + close(ntfn.spendChan) + } + + delete(n.spendNotifications, prevOut) + } + } + + // A new block has been connected to the main chain. + // Send out any N confirmation notifications which may + // have been triggered by this new block. + n.notifyConfs(int32(newBlock.height)) + + return nil +} + // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { From f83d3f58437ebce21f33f4231e9af31715f1480b Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 13 Nov 2017 11:55:22 -0800 Subject: [PATCH 04/11] chainntnfs: TxConfNotifier struct to implement shared notifer logic. All implementations of the ChainNotifier interface support registering notifications on transaction confirmations. This struct is intended to be used internally by ChainNotifier implementations to handle much of this logic. --- chainntnfs/txconfnotifier.go | 235 +++++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 chainntnfs/txconfnotifier.go diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go new file mode 100644 index 00000000000..22abada079e --- /dev/null +++ b/chainntnfs/txconfnotifier.go @@ -0,0 +1,235 @@ +package chainntnfs + +import ( + "fmt" + + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcutil" +) + +// ConfNtfn represents a notifier client's request to receive a notification +// once the target transaction gets sufficient confirmations. The client is +// asynchronously notified via the ConfirmationEvent channels. +type ConfNtfn struct { + // TxID is the hash of the transaction for which confirmatino notifications + // are requested. + TxID *chainhash.Hash + + // NumConfirmations is the number of confirmations after which the + // notification is to be sent. + NumConfirmations uint32 + + // Event contains references to the channels that the notifications are to + // be sent over. + Event *ConfirmationEvent + + // details describes the transaction's position is the blockchain. May be + // nil for unconfirmed transactions. + details *TxConfirmation + + // dispatched is false if the confirmed notification has not been sent yet. + dispatched bool +} + +// NewConfirmationEvent constructs a new ConfirmationEvent with newly opened +// channels. +func NewConfirmationEvent() *ConfirmationEvent { + return &ConfirmationEvent{ + Confirmed: make(chan *TxConfirmation, 1), + NegativeConf: make(chan int32, 1), + } +} + +// TxConfNotifier is used to register transaction confirmation notifications and +// dispatch them as the transactions confirm. A client can request to be +// notified when a particular transaction has sufficient on-chain confirmations +// (or be notified immediately if the tx already does), and the TxConfNotifier +// will watch changes to the blockchain in order to satisfy these requests. +type TxConfNotifier struct { + // currentHeight is the height of the tracked blockchain. It is used to + // determine the number of confirmations a tx has and ensure blocks are + // connected and disconnected in order. + currentHeight uint32 + + // reorgSafetyLimit is the chain depth beyond which it is assumed a block + // will not be reorganized out of the chain. This is used to determine when + // to prune old confirmation requests so that reorgs are handled correctly. + // The coinbase maturity period is a reasonable value to use. + reorgSafetyLimit uint32 + + // confNotifications is an index of notification requests by transaction + // hash. + confNotifications map[chainhash.Hash][]*ConfNtfn + + // confTxsByInitialHeight is an index of watched transactions by the height + // that they are included at in the blockchain. This is tracked so that + // incorrect notifications are not sent if a transaction is reorganized out + // of the chain and so that negative confirmations can be recognized. + confTxsByInitialHeight map[uint32][]*chainhash.Hash + + // ntfnsByConfirmHeight is an index of notification requests by the height + // at which the transaction will have sufficient confirmations. + ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} +} + +// NewTxConfNotifier creates a TxConfNotifier. The current height of the +// blockchain is accepted as a parameter. +func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotifier { + return &TxConfNotifier{ + currentHeight: startHeight, + reorgSafetyLimit: reorgSafetyLimit, + confNotifications: make(map[chainhash.Hash][]*ConfNtfn), + confTxsByInitialHeight: make(map[uint32][]*chainhash.Hash), + ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), + } +} + +// Register handles a new notification request. The client will be notified when +// the transaction gets a sufficient number of confirmations on the blockchain. +// If the transaction has already been included in a block on the chain, the +// confirmation details must be given as the txConf argument, otherwise it +// should be nil. If the transaction already has the sufficient number of +// confirmations, this dispatches the notification immediately. +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { + if txConf == nil || txConf.BlockHeight > tcn.currentHeight { + // Transaction is unconfirmed. + tcn.confNotifications[*ntfn.TxID] = + append(tcn.confNotifications[*ntfn.TxID], ntfn) + return + } + + // If the transaction already has the required confirmations, dispatch + // notification immediately, otherwise record along with the height at + // which to notify. + confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 + if confHeight <= tcn.currentHeight { + Log.Infof("Dispatching %v conf notification for %v", + ntfn.NumConfirmations, ntfn.TxID) + ntfn.Event.Confirmed <- txConf + ntfn.dispatched = true + } else { + ntfn.details = txConf + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + ntfnSet = make(map[*ConfNtfn]struct{}) + tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet + } + ntfnSet[ntfn] = struct{}{} + } + + // Unless the transaction is finalized, include transaction information in + // confNotifications and confTxsByInitialHeight in case the tx gets + // reorganized out of the chain. + if txConf.BlockHeight > tcn.currentHeight-tcn.reorgSafetyLimit { + tcn.confNotifications[*ntfn.TxID] = + append(tcn.confNotifications[*ntfn.TxID], ntfn) + tcn.confTxsByInitialHeight[txConf.BlockHeight] = + append(tcn.confTxsByInitialHeight[txConf.BlockHeight], ntfn.TxID) + } +} + +// ConnectTip handles a new block extending the current chain. This checks each +// transaction in the block to see if any watched transactions are included. +// Also, if any watched transactions now have the required number of +// confirmations as a result of this block being connected, this dispatches +// notifications. +func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, + blockHeight uint32, txns []*btcutil.Tx) error { + + if blockHeight != tcn.currentHeight+1 { + return fmt.Errorf("Received blocks out of order: "+ + "current height=%d, new height=%d", + tcn.currentHeight, blockHeight) + } + tcn.currentHeight++ + + // Record any newly confirmed transactions in ntfnsByConfirmHeight so that + // notifications get dispatched when the tx gets sufficient confirmations. + // Also record txs in confTxsByInitialHeight so reorgs can be handled + // correctly. + for _, tx := range txns { + txHash := tx.Hash() + for _, ntfn := range tcn.confNotifications[*txHash] { + ntfn.details = &TxConfirmation{ + BlockHash: blockHash, + BlockHeight: blockHeight, + TxIndex: uint32(tx.Index()), + } + + confHeight := blockHeight + ntfn.NumConfirmations - 1 + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + ntfnSet = make(map[*ConfNtfn]struct{}) + tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet + } + ntfnSet[ntfn] = struct{}{} + + tcn.confTxsByInitialHeight[blockHeight] = + append(tcn.confTxsByInitialHeight[blockHeight], tx.Hash()) + } + } + + // Dispatch notifications for all transactions that are considered confirmed + // at this new block height. + for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { + Log.Infof("Dispatching %v conf notification for %v", + ntfn.NumConfirmations, ntfn.TxID) + ntfn.Event.Confirmed <- ntfn.details + ntfn.dispatched = true + } + delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) + + // Clear entries from confNotifications and confTxsByInitialHeight. We + // assume that reorgs deeper than the reorg safety limit do not happen, so + // we can clear out entries for the block that is now mature. + matureBlockHeight := tcn.currentHeight - tcn.reorgSafetyLimit + for _, txHash := range tcn.confTxsByInitialHeight[matureBlockHeight] { + delete(tcn.confNotifications, *txHash) + } + delete(tcn.confTxsByInitialHeight, matureBlockHeight) + + return nil +} + +// DisconnectTip handles the tip of the current chain being disconnected during +// a chain reorganization. If any watched transactions were included in this +// block, internal structures are updated to ensure a confirmation notification +// is not sent unless the transaction is included in the new chain. +func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { + if blockHeight != tcn.currentHeight { + return fmt.Errorf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + tcn.currentHeight, blockHeight) + } + tcn.currentHeight-- + + for _, txHash := range tcn.confTxsByInitialHeight[blockHeight] { + for _, ntfn := range tcn.confNotifications[*txHash] { + confHeight := blockHeight + ntfn.NumConfirmations - 1 + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + continue + } + delete(ntfnSet, ntfn) + } + } + delete(tcn.confTxsByInitialHeight, blockHeight) + + return nil +} + +// TearDown is to be called when the owner of the TxConfNotifier is exiting. +// This closes the event channels of all registered notifications that have +// not been dispatched yet. +func (tcn *TxConfNotifier) TearDown() { + for _, ntfns := range tcn.confNotifications { + for _, ntfn := range ntfns { + if ntfn.dispatched { + continue + } + + close(ntfn.Event.Confirmed) + close(ntfn.Event.NegativeConf) + } + } +} From da17404ef15586dd2c2aa0572a380af2e37384d1 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 13 Nov 2017 11:56:25 -0800 Subject: [PATCH 05/11] chainntnfs: Unit tests for TxConfNotifier. Also fix overflow issue with reorg handling. --- chainntnfs/txconfnotifier.go | 12 +- chainntnfs/txconfnotifier_test.go | 422 ++++++++++++++++++++++++++++++ 2 files changed, 429 insertions(+), 5 deletions(-) create mode 100644 chainntnfs/txconfnotifier_test.go diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 22abada079e..4eeea84dacd 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -120,7 +120,7 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { // Unless the transaction is finalized, include transaction information in // confNotifications and confTxsByInitialHeight in case the tx gets // reorganized out of the chain. - if txConf.BlockHeight > tcn.currentHeight-tcn.reorgSafetyLimit { + if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { tcn.confNotifications[*ntfn.TxID] = append(tcn.confNotifications[*ntfn.TxID], ntfn) tcn.confTxsByInitialHeight[txConf.BlockHeight] = @@ -182,11 +182,13 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, // Clear entries from confNotifications and confTxsByInitialHeight. We // assume that reorgs deeper than the reorg safety limit do not happen, so // we can clear out entries for the block that is now mature. - matureBlockHeight := tcn.currentHeight - tcn.reorgSafetyLimit - for _, txHash := range tcn.confTxsByInitialHeight[matureBlockHeight] { - delete(tcn.confNotifications, *txHash) + if tcn.currentHeight >= tcn.reorgSafetyLimit { + matureBlockHeight := tcn.currentHeight - tcn.reorgSafetyLimit + for _, txHash := range tcn.confTxsByInitialHeight[matureBlockHeight] { + delete(tcn.confNotifications, *txHash) + } + delete(tcn.confTxsByInitialHeight, matureBlockHeight) } - delete(tcn.confTxsByInitialHeight, matureBlockHeight) return nil } diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go new file mode 100644 index 00000000000..3b2ec5a6095 --- /dev/null +++ b/chainntnfs/txconfnotifier_test.go @@ -0,0 +1,422 @@ +package chainntnfs_test + +import ( + "testing" + + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +var zeroHash chainhash.Hash + +// TestTxConfFutureDispatch tests that the TxConfNotifier dispatches +// registered notifications when the transaction confirms after registration. +func TestTxConfFutureDispatch(t *testing.T) { + t.Parallel() + + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + var ( + tx1 = wire.MsgTx{Version: 1} + tx2 = wire.MsgTx{Version: 2} + tx3 = wire.MsgTx{Version: 3} + ) + + tx1Hash := tx1.TxHash() + ntfn1 := chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn1, nil) + + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn2, nil) + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + block1 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3}, + }) + + err := txConfNotifier.ConnectTip(block1.Hash(), 11, block1.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case txConf := <-ntfn1.Event.Confirmed: + expectedConf := chainntnfs.TxConfirmation{ + BlockHash: block1.Hash(), + BlockHeight: 11, + TxIndex: 0, + } + assertEqualTxConf(t, txConf, &expectedConf) + default: + t.Fatalf("Expected confirmation for tx1") + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + block2 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx3}, + }) + + err = txConfNotifier.ConnectTip(block2.Hash(), 12, block2.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + expectedConf := chainntnfs.TxConfirmation{ + BlockHash: block1.Hash(), + BlockHeight: 11, + TxIndex: 1, + } + assertEqualTxConf(t, txConf, &expectedConf) + default: + t.Fatalf("Expected confirmation for tx2") + } +} + +// TestTxConfHistoricalDispatch tests that the TxConfNotifier dispatches +// registered notifications when the transaction is confirmed before +// registration. +func TestTxConfHistoricalDispatch(t *testing.T) { + t.Parallel() + + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + var ( + tx1 = wire.MsgTx{Version: 1} + tx2 = wire.MsgTx{Version: 2} + tx3 = wire.MsgTx{Version: 3} + ) + + tx1Hash := tx1.TxHash() + ntfn1 := chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(), + } + txConf1 := chainntnfs.TxConfirmation{ + BlockHash: &zeroHash, + BlockHeight: 9, + TxIndex: 1, + } + txConfNotifier.Register(&ntfn1, &txConf1) + + tx2Hash := tx2.TxHash() + txConf2 := chainntnfs.TxConfirmation{ + BlockHash: &zeroHash, + BlockHeight: 9, + TxIndex: 2, + } + ntfn2 := chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 3, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn2, &txConf2) + + select { + case txConf := <-ntfn1.Event.Confirmed: + assertEqualTxConf(t, txConf, &txConf1) + default: + t.Fatalf("Expected confirmation for tx1") + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + block := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx3}, + }) + + err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + assertEqualTxConf(t, txConf, &txConf2) + default: + t.Fatalf("Expected confirmation for tx2") + } +} + +// TestTxConfChainReorg tests that TxConfNotifier dispatches Confirmed and +// NegativeConf notifications appropriately when there is a chain +// reorganization. +func TestTxConfChainReorg(t *testing.T) { + t.Parallel() + + txConfNotifier := chainntnfs.NewTxConfNotifier(8, 100) + + var ( + tx1 = wire.MsgTx{Version: 1} + tx2 = wire.MsgTx{Version: 2} + tx3 = wire.MsgTx{Version: 3} + ) + + // Tx 1 will be confirmed in block 9 and requires 2 confs. + tx1Hash := tx1.TxHash() + ntfn1 := chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn1, nil) + + // Tx 2 will be confirmed in block 10 and requires 1 conf. + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn2, nil) + + // Tx 3 will be confirmed in block 10 and requires 2 confs. + tx3Hash := tx3.TxHash() + ntfn3 := chainntnfs.ConfNtfn{ + TxID: &tx3Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn3, nil) + + // Sync chain to block 10. Txs 1 & 2 should be confirmed. + block1 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1}, + }) + err := txConfNotifier.ConnectTip(nil, 9, block1.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + block2 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx2, &tx3}, + }) + err = txConfNotifier.ConnectTip(nil, 10, block2.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case <-ntfn1.Event.Confirmed: + default: + t.Fatalf("Expected confirmation for tx1") + } + + select { + case <-ntfn2.Event.Confirmed: + default: + t.Fatalf("Expected confirmation for tx2") + } + + select { + case txConf := <-ntfn3.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) + default: + } + + // Block that tx2 and tx3 were included in is disconnected and two next + // blocks without them are connected. + err = txConfNotifier.DisconnectTip(10) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + err = txConfNotifier.ConnectTip(nil, 10, nil) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + err = txConfNotifier.ConnectTip(nil, 11, nil) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + select { + case txConf := <-ntfn3.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) + default: + } + + // Now transactions 2 & 3 are re-included in a new block. + block3 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx2, &tx3}, + }) + block4 := btcutil.NewBlock(&wire.MsgBlock{}) + + err = txConfNotifier.ConnectTip(block3.Hash(), 12, block3.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + err = txConfNotifier.ConnectTip(block4.Hash(), 13, block4.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // Both transactions should be newly confirmed. + select { + case txConf := <-ntfn2.Event.Confirmed: + expectedConf := chainntnfs.TxConfirmation{ + BlockHash: block3.Hash(), + BlockHeight: 12, + TxIndex: 0, + } + assertEqualTxConf(t, txConf, &expectedConf) + default: + t.Fatalf("Expected confirmation for tx2") + } + + select { + case txConf := <-ntfn3.Event.Confirmed: + expectedConf := chainntnfs.TxConfirmation{ + BlockHash: block3.Hash(), + BlockHeight: 12, + TxIndex: 1, + } + assertEqualTxConf(t, txConf, &expectedConf) + default: + t.Fatalf("Expected confirmation for tx3") + } +} + +func TestTxConfTearDown(t *testing.T) { + t.Parallel() + + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + var ( + tx1 = wire.MsgTx{Version: 1} + tx2 = wire.MsgTx{Version: 2} + ) + + tx1Hash := tx1.TxHash() + ntfn1 := chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn1, nil) + + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn2, nil) + + block := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1, &tx2}, + }) + + err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case <-ntfn1.Event.Confirmed: + default: + t.Fatalf("Expected confirmation for tx1") + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + // Confirmed channels should be closed for notifications that have not been + // dispatched yet. + txConfNotifier.TearDown() + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case _, more := <-ntfn2.Event.Confirmed: + if more { + t.Fatalf("Expected channel close for tx2") + } + default: + t.Fatalf("Expected channel close for tx2") + } +} + +func assertEqualTxConf(t *testing.T, + actualConf, expectedConf *chainntnfs.TxConfirmation) { + + if actualConf.BlockHeight != expectedConf.BlockHeight { + t.Fatalf("Incorrect block height in confirmation details: "+ + "expected %d, got %d", + expectedConf.BlockHeight, actualConf.BlockHeight) + } + if !actualConf.BlockHash.IsEqual(expectedConf.BlockHash) { + t.Fatalf("Incorrect block hash in confirmation details: "+ + "expected %d, got %d", expectedConf.BlockHash, actualConf.BlockHash) + } + if actualConf.TxIndex != expectedConf.TxIndex { + t.Fatalf("Incorrect tx index in confirmation details: "+ + "expected %d, got %d", expectedConf.TxIndex, actualConf.TxIndex) + } +} From 6b7c06a1338f9100d49f465bef3aedc3b1a9963f Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 13 Nov 2017 12:42:50 -0800 Subject: [PATCH 06/11] chainntnfs/btcd: Refactor BtcdNotifier to use TxConfNotifier. --- chainntnfs/btcdnotify/btcd.go | 267 ++++++++---------------------- chainntnfs/btcdnotify/confheap.go | 58 ------- 2 files changed, 67 insertions(+), 258 deletions(-) delete mode 100644 chainntnfs/btcdnotify/confheap.go diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 249e861f12c..bc9004358c9 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -1,8 +1,8 @@ package btcdnotify import ( - "container/heap" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -20,6 +20,11 @@ const ( // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "btcd" + + // reorgSafetyLimit is assumed maximum depth of a chain reorganization. + // After this many confirmation, transaction confirmation info will be + // pruned. + reorgSafetyLimit = 100 ) var ( @@ -69,8 +74,7 @@ type BtcdNotifier struct { spendNotifications map[wire.OutPoint]map[uint64]*spendNotification - confNotifications map[chainhash.Hash][]*confirmationsNotification - confHeap *confirmationHeap + txConfNotifier *chainntnfs.TxConfNotifier blockEpochClients map[uint64]*blockEpochRegistration @@ -96,9 +100,6 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), - confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), - confHeap: newConfirmationHeap(), - chainUpdates: chainntnfs.NewConcurrentQueue(10), txUpdates: chainntnfs.NewConcurrentQueue(10), @@ -146,6 +147,9 @@ func (b *BtcdNotifier) Start() error { return err } + b.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(currentHeight), reorgSafetyLimit) + b.chainUpdates.Start() b.txUpdates.Start() @@ -179,15 +183,10 @@ func (b *BtcdNotifier) Stop() error { close(spendClient.spendChan) } } - for _, confClients := range b.confNotifications { - for _, confClient := range confClients { - close(confClient.finConf) - close(confClient.negativeConf) - } - } for _, epochClient := range b.blockEpochClients { close(epochClient.epochChan) } + b.txConfNotifier.TearDown() return nil } @@ -277,17 +276,15 @@ out: case *confirmationsNotification: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", - *msg.txid, msg.numConfirmations) + msg.TxID, msg.NumConfirmations) - // If the notification can be partially or - // fully dispatched, then we can skip the first - // phase for ntfns. - if b.attemptHistoricalDispatch(msg) { - continue + // Lookup whether the transaction is already included in the + // active chain. + txConf, err := b.historicalConfDetails(msg.TxID) + if err != nil { + chainntnfs.Log.Error(err) } - - txid := *msg.txid - b.confNotifications[txid] = append(b.confNotifications[txid], msg) + b.txConfNotifier.Register(&msg.ConfNtfn, txConf) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg @@ -304,7 +301,7 @@ out: currentHeight = update.blockHeight - newBlock, err := b.chainConn.GetBlock(update.blockHash) + rawBlock, err := b.chainConn.GetBlock(update.blockHash) if err != nil { chainntnfs.Log.Errorf("Unable to get block: %v", err) continue @@ -313,26 +310,14 @@ out: chainntnfs.Log.Infof("New block: height=%v, sha=%v", update.blockHeight, update.blockHash) - b.notifyBlockEpochs(update.blockHeight, - update.blockHash) - - newHeight := update.blockHeight - for i, tx := range newBlock.Transactions { - // Check if the inclusion of this transaction - // within a block by itself triggers a block - // confirmation threshold, if so send a - // notification. Otherwise, place the - // notification on a heap to be triggered in - // the future once additional confirmations are - // attained. - txSha := tx.TxHash() - b.checkConfirmationTrigger(&txSha, update, i) - } + b.notifyBlockEpochs(update.blockHeight, update.blockHash) - // A new block has been connected to the main - // chain. Send out any N confirmation notifications - // which may have been triggered by this new block. - b.notifyConfs(newHeight) + txns := btcutil.NewBlock(rawBlock).Transactions() + err = b.txConfNotifier.ConnectTip(update.blockHash, + uint32(update.blockHeight), txns) + if err != nil { + chainntnfs.Log.Error(err) + } } else { if update.blockHeight != currentHeight { chainntnfs.Log.Warnf("Received blocks out of order: "+ @@ -342,12 +327,13 @@ out: currentHeight = update.blockHeight - 1 - // TODO(roasbeef): re-orgs - // * second channel to notify of confirmation decrementing - // re-org? - // * notify of negative confirmations chainntnfs.Log.Infof("Block disconnected from main chain: "+ "height=%v, sha=%v", update.blockHeight, update.blockHash) + + err := b.txConfNotifier.DisconnectTip(uint32(update.blockHeight)) + if err != nil { + chainntnfs.Log.Error(err) + } } case item := <-b.txUpdates.ChanOut(): @@ -403,29 +389,25 @@ out: b.wg.Done() } -// attemptHistoricalDispatch tries to use historical information to decide if a -// notification ca be dispatched immediately, or is partially confirmed so it -// can skip straight to the confirmations heap. -// -// Returns true if the transaction was either partially or completely confirmed -func (b *BtcdNotifier) attemptHistoricalDispatch( - msg *confirmationsNotification) bool { - - chainntnfs.Log.Infof("Attempting to trigger dispatch for %v from "+ - "historical chain", msg.txid) +// historicalConfDetails looks up whether a transaction is already included in a +// block in the active chain and, if so, returns details about the confirmation. +func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash, +) (*chainntnfs.TxConfirmation, error) { // If the transaction already has some or all of the confirmations, // then we may be able to dispatch it immediately. - tx, err := b.chainConn.GetRawTransactionVerbose(msg.txid) + tx, err := b.chainConn.GetRawTransactionVerbose(txid) if err != nil || tx == nil || tx.BlockHash == "" { - jsonErr, ok := err.(*btcjson.RPCError) - switch { - case ok && jsonErr.Code == -5: - default: - chainntnfs.Log.Warnf("unable to query for txid(%v): %v", - msg.txid, err) + if err == nil { + return nil, nil } - return false + // Do not return an error if the transaction was not found. + if jsonErr, ok := err.(*btcjson.RPCError); ok { + if jsonErr.Code == btcjson.ErrRPCNoTxInfo { + return nil, nil + } + } + return nil, fmt.Errorf("unable to query for txid(%v): %v", txid, err) } // As we need to fully populate the returned TxConfirmation struct, @@ -433,55 +415,36 @@ func (b *BtcdNotifier) attemptHistoricalDispatch( // locate its exact index within the block. blockHash, err := chainhash.NewHashFromStr(tx.BlockHash) if err != nil { - chainntnfs.Log.Errorf("unable to get block hash %v for "+ - "historical dispatch: %v", tx.BlockHash, err) - return false + return nil, fmt.Errorf("unable to get block hash %v for historical "+ + "dispatch: %v", tx.BlockHash, err) } block, err := b.chainConn.GetBlockVerbose(blockHash) if err != nil { - chainntnfs.Log.Errorf("unable to get block hash: %v", err) - return false + return nil, fmt.Errorf("unable to get block hash: %v", err) } // If the block obtained, locate the transaction's index within the // block so we can give the subscriber full confirmation details. - var txIndex uint32 - targetTxidStr := msg.txid.String() + txIndex := -1 + targetTxidStr := txid.String() for i, txHash := range block.Tx { if txHash == targetTxidStr { - txIndex = uint32(i) + txIndex = i break } } - confDetails := &chainntnfs.TxConfirmation{ - BlockHash: blockHash, - BlockHeight: uint32(block.Height), - TxIndex: txIndex, - } - - // If the transaction has more that enough confirmations, then we can - // dispatch it immediately after obtaining for information w.r.t - // exactly *when* if got all its confirmations. - if uint32(tx.Confirmations) >= msg.numConfirmations { - chainntnfs.Log.Infof("Dispatching %v conf notification", - msg.numConfirmations) - msg.finConf <- confDetails - return true + if txIndex == -1 { + return nil, fmt.Errorf("unable to locate tx %v in block %v", + txid, blockHash) } - // Otherwise, the transaction has only been *partially* confirmed, so - // we need to insert it into the confirmation heap. - // Find the block height at which this transaction will be confirmed - confHeight := uint32(block.Height) + msg.numConfirmations - 1 - heapEntry := &confEntry{ - msg, - confDetails, - confHeight, + txConf := chainntnfs.TxConfirmation{ + BlockHash: blockHash, + BlockHeight: uint32(block.Height), + TxIndex: uint32(txIndex), } - heap.Push(b.confHeap, heapEntry) - - return true + return &txConf, nil } // notifyBlockEpochs notifies all registered block epoch clients of the newly @@ -517,92 +480,6 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash } } -// notifyConfs examines the current confirmation heap, sending off any -// notifications which have been triggered by the connection of a new block at -// newBlockHeight. -func (b *BtcdNotifier) notifyConfs(newBlockHeight int32) { - // If the heap is empty, we have nothing to do. - if b.confHeap.Len() == 0 { - return - } - - // Traverse our confirmation heap. The heap is a - // min-heap, so the confirmation notification which requires - // the smallest block-height will always be at the top - // of the heap. If a confirmation notification is eligible - // for triggering, then fire it off, and check if another - // is eligible until there are no more eligible entries. - nextConf := heap.Pop(b.confHeap).(*confEntry) - for nextConf.triggerHeight <= uint32(newBlockHeight) { - chainntnfs.Log.Infof("Dispatching %v conf notification, "+ - "height=%v", nextConf.numConfirmations, newBlockHeight) - nextConf.finConf <- nextConf.initialConfDetails - - if b.confHeap.Len() == 0 { - return - } - - nextConf = heap.Pop(b.confHeap).(*confEntry) - } - - heap.Push(b.confHeap, nextConf) -} - -// checkConfirmationTrigger determines if the passed txSha included at blockHeight -// triggers any single confirmation notifications. In the event that the txid -// matches, yet needs additional confirmations, it is added to the confirmation -// heap to be triggered at a later time. -// TODO(roasbeef): perhaps lookup, then track by inputs instead? -func (b *BtcdNotifier) checkConfirmationTrigger(txSha *chainhash.Hash, - newTip *chainUpdate, txIndex int) { - - // If a confirmation notification has been registered - // for this txid, then either trigger a notification - // event if only a single confirmation notification was - // requested, or place the notification on the - // confirmation heap for future usage. - if confClients, ok := b.confNotifications[*txSha]; ok { - // Either all of the registered confirmations will be - // dispatched due to a single confirmation, or added to the - // conf head. Therefore we unconditionally delete the registered - // confirmations from the staging zone. - defer func() { - delete(b.confNotifications, *txSha) - }() - - for _, confClient := range confClients { - confDetails := &chainntnfs.TxConfirmation{ - BlockHash: newTip.blockHash, - BlockHeight: uint32(newTip.blockHeight), - TxIndex: uint32(txIndex), - } - - if confClient.numConfirmations == 1 { - chainntnfs.Log.Infof("Dispatching single conf "+ - "notification, sha=%v, height=%v", txSha, - newTip.blockHeight) - confClient.finConf <- confDetails - continue - } - - // The registered notification requires more - // than one confirmation before triggering. So - // we create a heapConf entry for this notification. - // The heapConf allows us to easily keep track of - // which notification(s) we should fire off with - // each incoming block. - confClient.initialConfirmHeight = uint32(newTip.blockHeight) - finalConfHeight := confClient.initialConfirmHeight + confClient.numConfirmations - 1 - heapEntry := &confEntry{ - confClient, - confDetails, - finalConfHeight, - } - heap.Push(b.confHeap, heapEntry) - } - } -} - // spendNotification couples a target outpoint along with the channel used for // notifications once a spend of the outpoint has been detected. type spendNotification struct { @@ -659,9 +536,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, transaction, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) if err != nil { jsonErr, ok := err.(*btcjson.RPCError) - switch { - case ok && jsonErr.Code == -5: - default: + if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo { return nil, err } } @@ -713,13 +588,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // confirmationNotification represents a client's intent to receive a // notification once the target txid reaches numConfirmations confirmations. type confirmationsNotification struct { - txid *chainhash.Hash - - initialConfirmHeight uint32 - numConfirmations uint32 - - finConf chan *chainntnfs.TxConfirmation - negativeConf chan int32 // TODO(roasbeef): re-org funny business + chainntnfs.ConfNtfn } // RegisterConfirmationsNtfn registers a notification with BtcdNotifier @@ -729,20 +598,18 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, _ uint32) (*chainntnfs.ConfirmationEvent, error) { ntfn := &confirmationsNotification{ - txid: txid, - numConfirmations: numConfs, - finConf: make(chan *chainntnfs.TxConfirmation, 1), - negativeConf: make(chan int32, 1), + chainntnfs.ConfNtfn{ + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(), + }, } select { case <-b.quit: return nil, ErrChainNotifierShuttingDown case b.notificationRegistry <- ntfn: - return &chainntnfs.ConfirmationEvent{ - Confirmed: ntfn.finConf, - NegativeConf: ntfn.negativeConf, - }, nil + return ntfn.Event, nil } } diff --git a/chainntnfs/btcdnotify/confheap.go b/chainntnfs/btcdnotify/confheap.go deleted file mode 100644 index 84379c29cc6..00000000000 --- a/chainntnfs/btcdnotify/confheap.go +++ /dev/null @@ -1,58 +0,0 @@ -package btcdnotify - -import "github.com/lightningnetwork/lnd/chainntnfs" - -// confEntry represents an entry in the min-confirmation heap. -type confEntry struct { - *confirmationsNotification - - initialConfDetails *chainntnfs.TxConfirmation - - triggerHeight uint32 -} - -// confirmationHeap is a list of confEntries sorted according to nearest -// "confirmation" height.Each entry within the min-confirmation heap is sorted -// according to the smallest delta from the current blockheight to the -// triggerHeight of the next entry confirmationHeap -type confirmationHeap struct { - items []*confEntry -} - -// newConfirmationHeap returns a new confirmationHeap with zero items. -func newConfirmationHeap() *confirmationHeap { - var confItems []*confEntry - return &confirmationHeap{confItems} -} - -// Len returns the number of items in the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Len() int { return len(c.items) } - -// Less returns whether the item in the priority queue with index i should sort -// before the item with index j. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Less(i, j int) bool { - return c.items[i].triggerHeight < c.items[j].triggerHeight -} - -// Swap swaps the items at the passed indices in the priority queue. It is -// part of the heap.Interface implementation. -func (c *confirmationHeap) Swap(i, j int) { - c.items[i], c.items[j] = c.items[j], c.items[i] -} - -// Push pushes the passed item onto the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Push(x interface{}) { - c.items = append(c.items, x.(*confEntry)) -} - -// Pop removes the highest priority item (according to Less) from the priority -// queue and returns it. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Pop() interface{} { - n := len(c.items) - x := c.items[n-1] - c.items[n-1] = nil - c.items = c.items[0 : n-1] - return x -} From 15c29250b42745bca88997280117fc1cc137663b Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 13 Nov 2017 15:49:58 -0800 Subject: [PATCH 07/11] chainntnfs/neutrino: Refactor NeutrinoNotifier to use TxConfNotifier. --- chainntnfs/interface_test.go | 5 + chainntnfs/neutrinonotify/confheap.go | 58 ------ chainntnfs/neutrinonotify/neutrino.go | 259 ++++++-------------------- 3 files changed, 66 insertions(+), 256 deletions(-) delete mode 100644 chainntnfs/neutrinonotify/confheap.go diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 6f9c8a3ab8f..f59582fe17f 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -103,6 +103,11 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, select { case confInfo := <-confIntent.Confirmed: + if !confInfo.BlockHash.IsEqual(blockHash[0]) { + t.Fatalf("mismatched block hashes: expected %v, got %v", + blockHash[0], confInfo.BlockHash) + } + // Finally, we'll verify that the tx index returned is the exact same // as the tx index of the transaction within the block itself. msgBlock, err := miner.Node.GetBlock(blockHash[0]) diff --git a/chainntnfs/neutrinonotify/confheap.go b/chainntnfs/neutrinonotify/confheap.go deleted file mode 100644 index 4ac76390a30..00000000000 --- a/chainntnfs/neutrinonotify/confheap.go +++ /dev/null @@ -1,58 +0,0 @@ -package neutrinonotify - -import "github.com/lightningnetwork/lnd/chainntnfs" - -// confEntry represents an entry in the min-confirmation heap. . -type confEntry struct { - *confirmationsNotification - - initialConfDetails *chainntnfs.TxConfirmation - - triggerHeight uint32 -} - -// confirmationHeap is a list of confEntries sorted according to nearest -// "confirmation" height.Each entry within the min-confirmation heap is sorted -// according to the smallest dleta from the current blockheight to the -// triggerHeight of the next entry confirmationHeap -type confirmationHeap struct { - items []*confEntry -} - -// newConfirmationHeap returns a new confirmationHeap with zero items. -func newConfirmationHeap() *confirmationHeap { - var confItems []*confEntry - return &confirmationHeap{confItems} -} - -// Len returns the number of items in the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Len() int { return len(c.items) } - -// Less returns whether the item in the priority queue with index i should sort -// before the item with index j. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Less(i, j int) bool { - return c.items[i].triggerHeight < c.items[j].triggerHeight -} - -// Swap swaps the items at the passed indices in the priority queue. It is -// part of the heap.Interface implementation. -func (c *confirmationHeap) Swap(i, j int) { - c.items[i], c.items[j] = c.items[j], c.items[i] -} - -// Push pushes the passed item onto the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Push(x interface{}) { - c.items = append(c.items, x.(*confEntry)) -} - -// Pop removes the highest priority item (according to Less) from the priority -// queue and returns it. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Pop() interface{} { - n := len(c.items) - x := c.items[n-1] - c.items[n-1] = nil - c.items = c.items[0 : n-1] - return x -} diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 821252d11a2..518ec9a9256 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -1,8 +1,8 @@ package neutrinonotify import ( - "container/heap" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -22,6 +22,12 @@ const ( // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "neutrino" + + // reorgSafetyLimit is the chain depth beyond which it is assumed a block + // will not be reorganized out of the chain. This is used to determine when + // to prune old confirmation requests so that reorgs are handled correctly. + // The coinbase maturity period is a reasonable value to use. + reorgSafetyLimit = 100 ) var ( @@ -58,8 +64,7 @@ type NeutrinoNotifier struct { spendNotifications map[wire.OutPoint]map[uint64]*spendNotification - confNotifications map[chainhash.Hash][]*confirmationsNotification - confHeap *confirmationHeap + txConfNotifier *chainntnfs.TxConfNotifier blockEpochClients map[uint64]*blockEpochRegistration @@ -88,9 +93,6 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), - confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), - confHeap: newConfirmationHeap(), - p2pNode: node, rescanErr: make(chan error), @@ -142,6 +144,9 @@ func (n *NeutrinoNotifier) Start() error { neutrino.WatchTxIDs(zeroHash), } + n.txConfNotifier = chainntnfs.NewTxConfNotifier( + bestHeight, reorgSafetyLimit) + // Finally, we'll create our rescan struct, start it, and launch all // the goroutines we need to operate this ChainNotifier instance. n.chainView = n.p2pNode.NewRescan(rescanOptions...) @@ -174,15 +179,10 @@ func (n *NeutrinoNotifier) Stop() error { close(spendClient.spendChan) } } - for _, confClients := range n.confNotifications { - for _, confClient := range confClients { - close(confClient.finConf) - close(confClient.negativeConf) - } - } for _, epochClient := range n.blockEpochClients { close(epochClient.epochChan) } + n.txConfNotifier.TearDown() return nil } @@ -284,35 +284,39 @@ func (n *NeutrinoNotifier) notificationDispatcher() { n.spendNotifications[op][msg.spendID] = msg case *confirmationsNotification: - chainntnfs.Log.Infof("New confirmations "+ - "subscription: txid=%v, numconfs=%v, "+ - "height_hint=%v", *msg.txid, - msg.numConfirmations, msg.heightHint) + chainntnfs.Log.Infof("New confirmations subscription: "+ + "txid=%v, numconfs=%v, height_hint=%v", + msg.TxID, msg.NumConfirmations, msg.heightHint) // If the notification can be partially or // fully dispatched, then we can skip the first // phase for ntfns. n.heightMtx.RLock() currentHeight := n.bestHeight - if n.attemptHistoricalDispatch(msg, currentHeight, msg.heightHint) { - n.heightMtx.RUnlock() - continue - } n.heightMtx.RUnlock() - // If we can't fully dispatch confirmation, - // then we'll update our filter so we can be - // notified of its future initial confirmation. - rescanUpdate := []neutrino.UpdateOption{ - neutrino.AddTxIDs(*msg.txid), - neutrino.Rewind(currentHeight), + // Lookup whether the transaction is already included in the + // active chain. + txConf, err := n.historicalConfDetails(msg.TxID, currentHeight, + msg.heightHint) + if err != nil { + chainntnfs.Log.Error(err) } - if err := n.chainView.Update(rescanUpdate...); err != nil { - chainntnfs.Log.Errorf("unable to update rescan: %v", err) + + if txConf == nil { + // If we can't fully dispatch confirmation, + // then we'll update our filter so we can be + // notified of its future initial confirmation. + rescanUpdate := []neutrino.UpdateOption{ + neutrino.AddTxIDs(*msg.TxID), + neutrino.Rewind(currentHeight), + } + if err := n.chainView.Update(rescanUpdate...); err != nil { + chainntnfs.Log.Errorf("unable to update rescan: %v", err) + } } - txid := *msg.txid - n.confNotifications[txid] = append(n.confNotifications[txid], msg) + n.txConfNotifier.Register(&msg.ConfNtfn, txConf) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") @@ -352,6 +356,11 @@ func (n *NeutrinoNotifier) notificationDispatcher() { chainntnfs.Log.Infof("Block disconnected from main chain: "+ "height=%v, sha=%v", update.height, update.hash) + + err := n.txConfNotifier.DisconnectTip(update.height) + if err != nil { + chainntnfs.Log.Error(err) + } } case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err) @@ -363,32 +372,20 @@ func (n *NeutrinoNotifier) notificationDispatcher() { } } -// attemptHistoricalDispatch attempts to consult the historical chain data to -// see if a transaction has already reached full confirmation status at the -// time a notification for it was registered. If it has, then we do an -// immediate dispatch. Otherwise, we'll add the partially confirmed transaction -// to the confirmation heap. -func (n *NeutrinoNotifier) attemptHistoricalDispatch(msg *confirmationsNotification, - currentHeight, heightHint uint32) bool { - - targetHash := msg.txid - - var confDetails *chainntnfs.TxConfirmation - - chainntnfs.Log.Infof("Attempting to trigger dispatch for %v from "+ - "historical chain", msg.txid) +// historicalConfDetails looks up whether a transaction is already included in a +// block in the active chain and, if so, returns details about the confirmation. +func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, + currentHeight, heightHint uint32) (*chainntnfs.TxConfirmation, error) { // Starting from the height hint, we'll walk forwards in the chain to // see if this transaction has already been confirmed. -chainScan: for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { // First, we'll fetch the block header for this height so we // can compute the current block hash. header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight) if err != nil { - chainntnfs.Log.Errorf("unable to get header for "+ - "height=%v: %v", scanHeight, err) - return false + return nil, fmt.Errorf("unable to get header for height=%v: %v", + scanHeight, err) } blockHash := header.BlockHash() @@ -397,9 +394,8 @@ chainScan: regFilter, err := n.p2pNode.GetCFilter(blockHash, wire.GCSFilterRegular) if err != nil { - chainntnfs.Log.Errorf("unable to retrieve regular "+ - "filter for height=%v: %v", scanHeight, err) - return false + return nil, fmt.Errorf("unable to retrieve regular filter for "+ + "height=%v: %v", scanHeight, err) } // If the block has no transactions other than the coinbase @@ -414,8 +410,7 @@ chainScan: key := builder.DeriveKey(&blockHash) match, err := regFilter.Match(key, targetHash[:]) if err != nil { - chainntnfs.Log.Errorf("unable to query filter: %v", err) - return false + return nil, fmt.Errorf("unable to query filter: %v", err) } // If there's no match, then we can continue forward to the @@ -429,54 +424,22 @@ chainScan: // to send the proper response. block, err := n.p2pNode.GetBlockFromNetwork(blockHash) if err != nil { - chainntnfs.Log.Errorf("unable to get block from "+ - "network: %v", err) - return false + return nil, fmt.Errorf("unable to get block from network: %v", err) } for j, tx := range block.Transactions() { txHash := tx.Hash() if txHash.IsEqual(targetHash) { - confDetails = &chainntnfs.TxConfirmation{ + confDetails := chainntnfs.TxConfirmation{ BlockHash: &blockHash, BlockHeight: scanHeight, TxIndex: uint32(j), } - break chainScan + return &confDetails, nil } } } - // If it hasn't yet been confirmed, then we can exit early. - if confDetails == nil { - return false - } - - // Otherwise, we'll calculate the number of confirmations that the - // transaction has so we can decide if it has reached the desired - // number of confirmations or not. - txConfs := currentHeight - confDetails.BlockHeight + 1 - - // If the transaction has more that enough confirmations, then we can - // dispatch it immediately after obtaining for information w.r.t - // exactly *when* if got all its confirmations. - if uint32(txConfs) >= msg.numConfirmations { - chainntnfs.Log.Infof("Dispatching %v conf notification, "+ - "height=%v", msg.numConfirmations, currentHeight) - msg.finConf <- confDetails - return true - } - - // Otherwise, the transaction has only been *partially* confirmed, so - // we need to insert it into the confirmation heap. - confHeight := confDetails.BlockHeight + msg.numConfirmations - 1 - heapEntry := &confEntry{ - msg, - confDetails, - confHeight, - } - heap.Push(n.confHeap, heapEntry) - - return true + return nil, nil } // handleBlocksConnected applies a chain update for a new block. Any watched @@ -489,14 +452,8 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // Next, we'll scan over the list of relevant transactions and possibly // dispatch notifications for confirmations and spends. for _, tx := range newBlock.txns { - // Check if the inclusion of this transaction within a block by itself - // triggers a block confirmation threshold, if so send a notification. - // Otherwise, place the notification on a heap to be triggered in the - // future once additional confirmations are attained. mtx := tx.MsgTx() - txIndex := tx.Index() txSha := mtx.TxHash() - n.checkConfirmationTrigger(&txSha, newBlock, txIndex) for i, txIn := range mtx.TxIn { prevOut := txIn.PreviousOutPoint @@ -537,7 +494,7 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // A new block has been connected to the main chain. // Send out any N confirmation notifications which may // have been triggered by this new block. - n.notifyConfs(int32(newBlock.height)) + n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns) return nil } @@ -572,91 +529,6 @@ func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash. } } -// notifyConfs examines the current confirmation heap, sending off any -// notifications which have been triggered by the connection of a new block at -// newBlockHeight. -func (n *NeutrinoNotifier) notifyConfs(newBlockHeight int32) { - // If the heap is empty, we have nothing to do. - if n.confHeap.Len() == 0 { - return - } - - // Traverse our confirmation heap. The heap is a min-heap, so the - // confirmation notification which requires the smallest block-height - // will always be at the top of the heap. If a confirmation - // notification is eligible for triggering, then fire it off, and check - // if another is eligible until there are no more eligible entries. - nextConf := heap.Pop(n.confHeap).(*confEntry) - for nextConf.triggerHeight <= uint32(newBlockHeight) { - - chainntnfs.Log.Infof("Dispatching %v conf notification, "+ - "height=%v", nextConf.numConfirmations, newBlockHeight) - - nextConf.finConf <- nextConf.initialConfDetails - - if n.confHeap.Len() == 0 { - return - } - - nextConf = heap.Pop(n.confHeap).(*confEntry) - } - - heap.Push(n.confHeap, nextConf) -} - -// checkConfirmationTrigger determines if the passed txSha included at -// blockHeight triggers any single confirmation notifications. In the event -// that the txid matches, yet needs additional confirmations, it is added to -// the confirmation heap to be triggered at a later time. -func (n *NeutrinoNotifier) checkConfirmationTrigger(txSha *chainhash.Hash, - newTip *filteredBlock, txIndex int) { - - // If a confirmation notification has been registered for this txid, - // then either trigger a notification event if only a single - // confirmation notification was requested, or place the notification - // on the confirmation heap for future usage. - if confClients, ok := n.confNotifications[*txSha]; ok { - // Either all of the registered confirmations will be - // dispatched due to a single confirmation, or added to the - // conf head. Therefor we unconditionally delete the registered - // confirmations from the staging zone. - defer func() { - delete(n.confNotifications, *txSha) - }() - - for _, confClient := range confClients { - confDetails := &chainntnfs.TxConfirmation{ - BlockHash: &newTip.hash, - BlockHeight: uint32(newTip.height), - TxIndex: uint32(txIndex), - } - - if confClient.numConfirmations == 1 { - chainntnfs.Log.Infof("Dispatching single conf "+ - "notification, sha=%v, height=%v", txSha, - newTip.height) - confClient.finConf <- confDetails - continue - } - - // The registered notification requires more than one - // confirmation before triggering. So we create a - // heapConf entry for this notification. The heapConf - // allows us to easily keep track of which - // notification(s) we should fire off with each - // incoming block. - confClient.initialConfirmHeight = uint32(newTip.height) - finalConfHeight := confClient.initialConfirmHeight + confClient.numConfirmations - 1 - heapEntry := &confEntry{ - confClient, - confDetails, - finalConfHeight, - } - heap.Push(n.confHeap, heapEntry) - } - } -} - // spendNotification couples a target outpoint along with the channel used for // notifications once a spend of the outpoint has been detected. type spendNotification struct { @@ -799,15 +671,8 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // confirmationNotification represents a client's intent to receive a // notification once the target txid reaches numConfirmations confirmations. type confirmationsNotification struct { - txid *chainhash.Hash - + chainntnfs.ConfNtfn heightHint uint32 - - initialConfirmHeight uint32 - numConfirmations uint32 - - finConf chan *chainntnfs.TxConfirmation - negativeConf chan int32 // TODO(roasbeef): re-org funny business } // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier @@ -817,21 +682,19 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { ntfn := &confirmationsNotification{ - txid: txid, - heightHint: heightHint, - numConfirmations: numConfs, - finConf: make(chan *chainntnfs.TxConfirmation, 1), - negativeConf: make(chan int32, 1), + ConfNtfn: chainntnfs.ConfNtfn{ + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(), + }, + heightHint: heightHint, } select { case <-n.quit: return nil, ErrChainNotifierShuttingDown case n.notificationRegistry <- ntfn: - return &chainntnfs.ConfirmationEvent{ - Confirmed: ntfn.finConf, - NegativeConf: ntfn.negativeConf, - }, nil + return ntfn.Event, nil } } From 7efd7f54af759d962b01cf74fc4370d3e2fcf00d Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 13 Nov 2017 17:19:43 -0800 Subject: [PATCH 08/11] chainntnfs: Test that neutrino rescan plays nice with txConfNotifier. --- chainntnfs/interface_test.go | 123 ++++++++++++++++++----------------- 1 file changed, 63 insertions(+), 60 deletions(-) diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index f59582fe17f..dc859926577 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -509,36 +509,54 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, // spending from a coinbase output here, so we use the dedicated // function. - txid, err := getTestTxId(miner) + txid3, err := getTestTxId(miner) if err != nil { t.Fatalf("unable to create test tx: %v", err) } - // Now generate one block. The notifier must check older blocks when - // the confirmation event is registered below to ensure that the TXID - // hasn't already been included in the chain, otherwise the + // Generate another block containing tx 3, but we won't register conf + // notifications for this tx until much later. The notifier must check + // older blocks when the confirmation event is registered below to ensure + // that the TXID hasn't already been included in the chain, otherwise the // notification will never be sent. - blockHash, err := miner.Node.Generate(1) + _, err = miner.Node.Generate(1) if err != nil { t.Fatalf("unable to generate block: %v", err) } + txid1, err := getTestTxId(miner) + if err != nil { + t.Fatalf("unable to create test tx: %v", err) + } + + txid2, err := getTestTxId(miner) + if err != nil { + t.Fatalf("unable to create test tx: %v", err) + } + _, currentHeight, err := miner.Node.GetBestBlock() if err != nil { t.Fatalf("unable to get current height: %v", err) } - // Now that we have a txid, register a confirmation notification with - // the chainntfn source. - numConfs := uint32(1) - confIntent, err := notifier.RegisterConfirmationsNtfn(txid, numConfs, + // Now generate another block containing txs 1 & 2. + blockHash, err := miner.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate block: %v", err) + } + + // Register a confirmation notification with the chainntfn source for tx2, + // which is included in the last block. The height hint is the height before + // the block is included. This notification should fire immediately since + // only 1 confirmation is required. + ntfn1, err := notifier.RegisterConfirmationsNtfn(txid1, 1, uint32(currentHeight)) if err != nil { t.Fatalf("unable to register ntfn: %v", err) } select { - case confInfo := <-confIntent.Confirmed: + case confInfo := <-ntfn1.Confirmed: // Finally, we'll verify that the tx index returned is the exact same // as the tx index of the transaction within the block itself. msgBlock, err := miner.Node.GetBlock(blockHash[0]) @@ -550,14 +568,14 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, if err != nil { t.Fatalf("unable to index into block: %v", err) } - if !specifiedTxHash.IsEqual(txid) { + if !specifiedTxHash.IsEqual(txid1) { t.Fatalf("mismatched tx indexes: expected %v, got %v", - txid, specifiedTxHash) + txid1, specifiedTxHash) } // We'll also ensure that the block height has been set // properly. - if confInfo.BlockHeight != uint32(currentHeight) { + if confInfo.BlockHeight != uint32(currentHeight+1) { t.Fatalf("incorrect block height: expected %v, got %v", confInfo.BlockHeight, currentHeight) } @@ -566,75 +584,60 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, t.Fatalf("confirmation notification never received") } - // Next, we want to test fully dispatching the notification for a - // transaction that has been *partially* confirmed. So we'll create - // another test txid. - txid, err = getTestTxId(miner) + // Register a confirmation notification for tx2, requiring 3 confirmations. + // This transaction is only partially confirmed, so the notification should + // not fire yet. + ntfn2, err := notifier.RegisterConfirmationsNtfn(txid2, 3, + uint32(currentHeight)) if err != nil { - t.Fatalf("unable to create test tx: %v", err) + t.Fatalf("unable to register ntfn: %v", err) } - _, currentHeight, err = miner.Node.GetBestBlock() + // Fully confirm tx3. + _, err = miner.Node.Generate(2) if err != nil { - t.Fatalf("unable to get current height: %v", err) + t.Fatalf("unable to generate block: %v", err) } - // We'll request 6 confirmations for the above generated txid, but we - // will generate the confirmations in chunks. - numConfs = 6 - - time.Sleep(time.Second * 2) - - // First, generate 2 confirmations. - if _, err := miner.Node.Generate(2); err != nil { - t.Fatalf("unable to generate blocks: %v", err) + select { + case <-ntfn2.Confirmed: + case <-time.After(10 * time.Second): + t.Fatalf("confirmation notification never received") } - time.Sleep(time.Second * 2) + select { + case <-ntfn1.Confirmed: + t.Fatalf("received multiple confirmations for tx") + case <-time.After(1 * time.Second): + } - // Next, register for the notification *after* the transition has - // already been partially confirmed. - confIntent, err = notifier.RegisterConfirmationsNtfn(txid, numConfs, - uint32(currentHeight)) + // Finally register a confirmation notification for tx3, requiring 1 + // confirmation. Ensure that conf notifications do not refire on txs + // 1 or 2. + ntfn3, err := notifier.RegisterConfirmationsNtfn(txid3, 1, + uint32(currentHeight-1)) if err != nil { t.Fatalf("unable to register ntfn: %v", err) } - // We shouldn't receive a notification at this point, as the - // transaction hasn't yet been fully confirmed. select { - case <-confIntent.Confirmed: - t.Fatalf("received confirmation notification but shouldn't " + - "have") - default: - // Expected case + case <-ntfn3.Confirmed: + case <-time.After(10 * time.Second): + t.Fatalf("confirmation notification never received") } - // With the notification registered, generate another 3 blocks, this - // shouldn't yet dispatch the notification. - if _, err := miner.Node.Generate(3); err != nil { - t.Fatalf("unable to generate blocks: %v", err) - } + time.Sleep(1 * time.Second) select { - case <-confIntent.Confirmed: - t.Fatalf("received confirmation notification but shouldn't " + - "have") + case <-ntfn1.Confirmed: + t.Fatalf("received multiple confirmations for tx") default: - // Expected case - } - - // Finally, we'll mine the final block which should dispatch the - // notification. - if _, err := miner.Node.Generate(1); err != nil { - t.Fatalf("unable to generate blocks: %v", err) } select { - case <-confIntent.Confirmed: - break - case <-time.After(30 * time.Second): - t.Fatalf("confirmation notification never received") + case <-ntfn2.Confirmed: + t.Fatalf("received multiple confirmations for tx") + default: } } From 984741d026fbda7d434b855e00fccf9e8d846d0b Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 13 Nov 2017 17:32:11 -0800 Subject: [PATCH 09/11] chainntnfs: Send negative confirmation notifications. --- chainntnfs/txconfnotifier.go | 23 +++++++++++++++++++++++ chainntnfs/txconfnotifier_test.go | 10 ++++++++++ 2 files changed, 33 insertions(+) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 4eeea84dacd..17471957b0d 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -57,6 +57,11 @@ type TxConfNotifier struct { // The coinbase maturity period is a reasonable value to use. reorgSafetyLimit uint32 + // reorgDepth is the depth of a chain organization that this system is being + // informed of. This is incremented as long as a sequence of blocks are + // disconnected without being interrupted by a new block. + reorgDepth uint32 + // confNotifications is an index of notification requests by transaction // hash. confNotifications map[chainhash.Hash][]*ConfNtfn @@ -142,6 +147,7 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, tcn.currentHeight, blockHeight) } tcn.currentHeight++ + tcn.reorgDepth = 0 // Record any newly confirmed transactions in ntfnsByConfirmHeight so that // notifications get dispatched when the tx gets sufficient confirmations. @@ -204,9 +210,26 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { tcn.currentHeight, blockHeight) } tcn.currentHeight-- + tcn.reorgDepth++ for _, txHash := range tcn.confTxsByInitialHeight[blockHeight] { for _, ntfn := range tcn.confNotifications[*txHash] { + // If notification has been dispatched with sufficient + // confirmations, notify of the reversal. + if ntfn.dispatched { + select { + case <-ntfn.Event.Confirmed: + // Drain confirmation notification instead of sending + // negative conf if the receiver has not processed it yet. + // This ensures sends to the Confirmed channel are always + // non-blocking. + default: + ntfn.Event.NegativeConf <- int32(tcn.reorgDepth) + } + ntfn.dispatched = false + continue + } + confHeight := blockHeight + ntfn.NumConfirmations - 1 ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 3b2ec5a6095..d60b5227058 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -276,6 +276,16 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + select { + case reorgDepth := <-ntfn2.Event.NegativeConf: + if reorgDepth != 1 { + t.Fatalf("Incorrect value for negative conf notification: "+ + "expected %d, got %d", 1, reorgDepth) + } + default: + t.Fatalf("Expected negative conf notification for tx1") + } + select { case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) From fa8c0195e603880982a74bd9ad8f4d5ea687355e Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 4 Dec 2017 13:30:33 -0800 Subject: [PATCH 10/11] chainntnfs: Implement quit signal in TxConfNotifier. --- chainntnfs/btcdnotify/btcd.go | 5 ++- chainntnfs/neutrinonotify/neutrino.go | 5 ++- chainntnfs/txconfnotifier.go | 52 ++++++++++++++++++++++----- 3 files changed, 52 insertions(+), 10 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index bc9004358c9..5ee9a61962d 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -284,7 +284,10 @@ out: if err != nil { chainntnfs.Log.Error(err) } - b.txConfNotifier.Register(&msg.ConfNtfn, txConf) + err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) + if err != nil { + chainntnfs.Log.Error(err) + } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 518ec9a9256..4bb2992087a 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -316,7 +316,10 @@ func (n *NeutrinoNotifier) notificationDispatcher() { } } - n.txConfNotifier.Register(&msg.ConfNtfn, txConf) + err = n.txConfNotifier.Register(&msg.ConfNtfn, txConf) + if err != nil { + chainntnfs.Log.Error(err) + } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 17471957b0d..414b6b06082 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -75,6 +75,10 @@ type TxConfNotifier struct { // ntfnsByConfirmHeight is an index of notification requests by the height // at which the transaction will have sufficient confirmations. ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} + + // quit is closed in order to signal that the notifier is gracefully + // exiting. + quit chan struct{} } // NewTxConfNotifier creates a TxConfNotifier. The current height of the @@ -86,6 +90,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif confNotifications: make(map[chainhash.Hash][]*ConfNtfn), confTxsByInitialHeight: make(map[uint32][]*chainhash.Hash), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), + quit: make(chan struct{}), } } @@ -95,12 +100,18 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif // confirmation details must be given as the txConf argument, otherwise it // should be nil. If the transaction already has the sufficient number of // confirmations, this dispatches the notification immediately. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error { + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + default: + } + if txConf == nil || txConf.BlockHeight > tcn.currentHeight { // Transaction is unconfirmed. tcn.confNotifications[*ntfn.TxID] = append(tcn.confNotifications[*ntfn.TxID], ntfn) - return + return nil } // If the transaction already has the required confirmations, dispatch @@ -110,8 +121,12 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) - ntfn.Event.Confirmed <- txConf - ntfn.dispatched = true + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + case ntfn.Event.Confirmed <- txConf: + ntfn.dispatched = true + } } else { ntfn.details = txConf ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] @@ -131,6 +146,8 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { tcn.confTxsByInitialHeight[txConf.BlockHeight] = append(tcn.confTxsByInitialHeight[txConf.BlockHeight], ntfn.TxID) } + + return nil } // ConnectTip handles a new block extending the current chain. This checks each @@ -141,6 +158,12 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32, txns []*btcutil.Tx) error { + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + default: + } + if blockHeight != tcn.currentHeight+1 { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, new height=%d", @@ -180,8 +203,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) - ntfn.Event.Confirmed <- ntfn.details - ntfn.dispatched = true + select { + case ntfn.Event.Confirmed <- ntfn.details: + ntfn.dispatched = true + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + } } delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) @@ -204,6 +231,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, // block, internal structures are updated to ensure a confirmation notification // is not sent unless the transaction is included in the new chain. func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + default: + } + if blockHeight != tcn.currentHeight { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, disconnected height=%d", @@ -223,8 +256,9 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // negative conf if the receiver has not processed it yet. // This ensures sends to the Confirmed channel are always // non-blocking. - default: - ntfn.Event.NegativeConf <- int32(tcn.reorgDepth) + case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") } ntfn.dispatched = false continue @@ -247,6 +281,8 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // This closes the event channels of all registered notifications that have // not been dispatched yet. func (tcn *TxConfNotifier) TearDown() { + close(tcn.quit) + for _, ntfns := range tcn.confNotifications { for _, ntfn := range ntfns { if ntfn.dispatched { From fa458c7c331cf50576045c08f8f414f12fbfeec2 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Sun, 10 Dec 2017 10:34:49 -0800 Subject: [PATCH 11/11] chainntnfs: Fix stylistic issues. --- chainntnfs/btcdnotify/btcd.go | 29 +++++++++++++---------- chainntnfs/neutrinonotify/neutrino.go | 34 ++++++++++++++++----------- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 5ee9a61962d..cdc2f816e06 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -300,6 +300,7 @@ out: chainntnfs.Log.Warnf("Received blocks out of order: "+ "current height=%d, new height=%d", currentHeight, update.blockHeight) + continue } currentHeight = update.blockHeight @@ -321,22 +322,24 @@ out: if err != nil { chainntnfs.Log.Error(err) } - } else { - if update.blockHeight != currentHeight { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, disconnected height=%d", - currentHeight, update.blockHeight) - } + continue + } - currentHeight = update.blockHeight - 1 + if update.blockHeight != currentHeight { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + currentHeight, update.blockHeight) + continue + } - chainntnfs.Log.Infof("Block disconnected from main chain: "+ - "height=%v, sha=%v", update.blockHeight, update.blockHash) + currentHeight = update.blockHeight - 1 - err := b.txConfNotifier.DisconnectTip(uint32(update.blockHeight)) - if err != nil { - chainntnfs.Log.Error(err) - } + chainntnfs.Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", update.blockHeight, update.blockHash) + + err := b.txConfNotifier.DisconnectTip(uint32(update.blockHeight)) + if err != nil { + chainntnfs.Log.Error(err) } case item := <-b.txUpdates.ChanOut(): diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 4bb2992087a..28d738e76a3 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -334,6 +334,8 @@ func (n *NeutrinoNotifier) notificationDispatcher() { chainntnfs.Log.Warnf("Received blocks out of order: "+ "current height=%d, new height=%d", n.bestHeight, update.height) + n.heightMtx.Unlock() + continue } n.bestHeight = update.height @@ -346,25 +348,29 @@ func (n *NeutrinoNotifier) notificationDispatcher() { if err != nil { chainntnfs.Log.Error(err) } - } else { - n.heightMtx.Lock() - if update.height != n.bestHeight { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, disconnected height=%d", - n.bestHeight, update.height) - } + continue + } - n.bestHeight = update.height - 1 + n.heightMtx.Lock() + if update.height != n.bestHeight { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + n.bestHeight, update.height) n.heightMtx.Unlock() + continue + } - chainntnfs.Log.Infof("Block disconnected from main chain: "+ - "height=%v, sha=%v", update.height, update.hash) + n.bestHeight = update.height - 1 + n.heightMtx.Unlock() - err := n.txConfNotifier.DisconnectTip(update.height) - if err != nil { - chainntnfs.Log.Error(err) - } + chainntnfs.Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", update.height, update.hash) + + err := n.txConfNotifier.DisconnectTip(update.height) + if err != nil { + chainntnfs.Log.Error(err) } + case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err)