diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 3b3f16b4ebc..cdc2f816e06 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -1,8 +1,8 @@ package btcdnotify import ( - "container/heap" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -20,6 +20,11 @@ const ( // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "btcd" + + // reorgSafetyLimit is assumed maximum depth of a chain reorganization. + // After this many confirmation, transaction confirmation info will be + // pruned. + reorgSafetyLimit = 100 ) var ( @@ -35,6 +40,10 @@ var ( type chainUpdate struct { blockHash *chainhash.Hash blockHeight int32 + + // connected is true if this update is a new block and false if it is a + // disconnected block. + connect bool } // txUpdate encapsulates a transaction related notification sent from btcd to @@ -65,13 +74,10 @@ type BtcdNotifier struct { spendNotifications map[wire.OutPoint]map[uint64]*spendNotification - confNotifications map[chainhash.Hash][]*confirmationsNotification - confHeap *confirmationHeap + txConfNotifier *chainntnfs.TxConfNotifier blockEpochClients map[uint64]*blockEpochRegistration - disconnectedBlockHashes chan *blockNtfn - chainUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue @@ -94,11 +100,6 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), - confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), - confHeap: newConfirmationHeap(), - - disconnectedBlockHashes: make(chan *blockNtfn, 20), - chainUpdates: chainntnfs.NewConcurrentQueue(10), txUpdates: chainntnfs.NewConcurrentQueue(10), @@ -146,6 +147,9 @@ func (b *BtcdNotifier) Start() error { return err } + b.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(currentHeight), reorgSafetyLimit) + b.chainUpdates.Start() b.txUpdates.Start() @@ -179,37 +183,36 @@ func (b *BtcdNotifier) Stop() error { close(spendClient.spendChan) } } - for _, confClients := range b.confNotifications { - for _, confClient := range confClients { - close(confClient.finConf) - close(confClient.negativeConf) - } - } for _, epochClient := range b.blockEpochClients { close(epochClient.epochChan) } + b.txConfNotifier.TearDown() return nil } -// blockNtfn packages a notification of a connected/disconnected block along -// with its height at the time. -type blockNtfn struct { - sha *chainhash.Hash - height int32 -} - // onBlockConnected implements on OnBlockConnected callback for rpcclient. // Ingesting a block updates the wallet's internal utxo state based on the // outputs created and destroyed within each block. func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) { // Append this new chain update to the end of the queue of new chain // updates. - b.chainUpdates.ChanIn() <- &chainUpdate{hash, height} + b.chainUpdates.ChanIn() <- &chainUpdate{ + blockHash: hash, + blockHeight: height, + connect: true, + } } // onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) { + // Append this new chain update to the end of the queue of new chain + // updates. + b.chainUpdates.ChanIn() <- &chainUpdate{ + blockHash: hash, + blockHeight: height, + connect: false, + } } // onRedeemingTx implements on OnRedeemingTx callback for rpcclient. @@ -273,63 +276,71 @@ out: case *confirmationsNotification: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", - *msg.txid, msg.numConfirmations) + msg.TxID, msg.NumConfirmations) - // If the notification can be partially or - // fully dispatched, then we can skip the first - // phase for ntfns. - if b.attemptHistoricalDispatch(msg) { - continue + // Lookup whether the transaction is already included in the + // active chain. + txConf, err := b.historicalConfDetails(msg.TxID) + if err != nil { + chainntnfs.Log.Error(err) + } + err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) + if err != nil { + chainntnfs.Log.Error(err) } - - txid := *msg.txid - b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg } - case staleBlockHash := <-b.disconnectedBlockHashes: - // TODO(roasbeef): re-orgs - // * second channel to notify of confirmation decrementing - // re-org? - // * notify of negative confirmations - chainntnfs.Log.Warnf("Block disconnected from main "+ - "chain: %v", staleBlockHash) - case item := <-b.chainUpdates.ChanOut(): update := item.(*chainUpdate) - currentHeight = update.blockHeight + if update.connect { + if update.blockHeight != currentHeight+1 { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, new height=%d", + currentHeight, update.blockHeight) + continue + } - newBlock, err := b.chainConn.GetBlock(update.blockHash) - if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) + currentHeight = update.blockHeight + + rawBlock, err := b.chainConn.GetBlock(update.blockHash) + if err != nil { + chainntnfs.Log.Errorf("Unable to get block: %v", err) + continue + } + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + update.blockHeight, update.blockHash) + + b.notifyBlockEpochs(update.blockHeight, update.blockHash) + + txns := btcutil.NewBlock(rawBlock).Transactions() + err = b.txConfNotifier.ConnectTip(update.blockHash, + uint32(update.blockHeight), txns) + if err != nil { + chainntnfs.Log.Error(err) + } continue } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.blockHeight, update.blockHash) - - b.notifyBlockEpochs(update.blockHeight, - update.blockHash) - - newHeight := update.blockHeight - for i, tx := range newBlock.Transactions { - // Check if the inclusion of this transaction - // within a block by itself triggers a block - // confirmation threshold, if so send a - // notification. Otherwise, place the - // notification on a heap to be triggered in - // the future once additional confirmations are - // attained. - txSha := tx.TxHash() - b.checkConfirmationTrigger(&txSha, update, i) + if update.blockHeight != currentHeight { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + currentHeight, update.blockHeight) + continue } - // A new block has been connected to the main - // chain. Send out any N confirmation notifications - // which may have been triggered by this new block. - b.notifyConfs(newHeight) + currentHeight = update.blockHeight - 1 + + chainntnfs.Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", update.blockHeight, update.blockHash) + + err := b.txConfNotifier.DisconnectTip(uint32(update.blockHeight)) + if err != nil { + chainntnfs.Log.Error(err) + } case item := <-b.txUpdates.ChanOut(): newSpend := item.(*txUpdate) @@ -384,29 +395,25 @@ out: b.wg.Done() } -// attemptHistoricalDispatch tries to use historical information to decide if a -// notification ca be dispatched immediately, or is partially confirmed so it -// can skip straight to the confirmations heap. -// -// Returns true if the transaction was either partially or completely confirmed -func (b *BtcdNotifier) attemptHistoricalDispatch( - msg *confirmationsNotification) bool { - - chainntnfs.Log.Infof("Attempting to trigger dispatch for %v from "+ - "historical chain", msg.txid) +// historicalConfDetails looks up whether a transaction is already included in a +// block in the active chain and, if so, returns details about the confirmation. +func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash, +) (*chainntnfs.TxConfirmation, error) { // If the transaction already has some or all of the confirmations, // then we may be able to dispatch it immediately. - tx, err := b.chainConn.GetRawTransactionVerbose(msg.txid) + tx, err := b.chainConn.GetRawTransactionVerbose(txid) if err != nil || tx == nil || tx.BlockHash == "" { - jsonErr, ok := err.(*btcjson.RPCError) - switch { - case ok && jsonErr.Code == -5: - default: - chainntnfs.Log.Warnf("unable to query for txid(%v): %v", - msg.txid, err) + if err == nil { + return nil, nil + } + // Do not return an error if the transaction was not found. + if jsonErr, ok := err.(*btcjson.RPCError); ok { + if jsonErr.Code == btcjson.ErrRPCNoTxInfo { + return nil, nil + } } - return false + return nil, fmt.Errorf("unable to query for txid(%v): %v", txid, err) } // As we need to fully populate the returned TxConfirmation struct, @@ -414,55 +421,36 @@ func (b *BtcdNotifier) attemptHistoricalDispatch( // locate its exact index within the block. blockHash, err := chainhash.NewHashFromStr(tx.BlockHash) if err != nil { - chainntnfs.Log.Errorf("unable to get block hash %v for "+ - "historical dispatch: %v", tx.BlockHash, err) - return false + return nil, fmt.Errorf("unable to get block hash %v for historical "+ + "dispatch: %v", tx.BlockHash, err) } block, err := b.chainConn.GetBlockVerbose(blockHash) if err != nil { - chainntnfs.Log.Errorf("unable to get block hash: %v", err) - return false + return nil, fmt.Errorf("unable to get block hash: %v", err) } // If the block obtained, locate the transaction's index within the // block so we can give the subscriber full confirmation details. - var txIndex uint32 - targetTxidStr := msg.txid.String() + txIndex := -1 + targetTxidStr := txid.String() for i, txHash := range block.Tx { if txHash == targetTxidStr { - txIndex = uint32(i) + txIndex = i break } } - confDetails := &chainntnfs.TxConfirmation{ - BlockHash: blockHash, - BlockHeight: uint32(block.Height), - TxIndex: txIndex, - } - - // If the transaction has more that enough confirmations, then we can - // dispatch it immediately after obtaining for information w.r.t - // exactly *when* if got all its confirmations. - if uint32(tx.Confirmations) >= msg.numConfirmations { - chainntnfs.Log.Infof("Dispatching %v conf notification", - msg.numConfirmations) - msg.finConf <- confDetails - return true + if txIndex == -1 { + return nil, fmt.Errorf("unable to locate tx %v in block %v", + txid, blockHash) } - // Otherwise, the transaction has only been *partially* confirmed, so - // we need to insert it into the confirmation heap. - // Find the block height at which this transaction will be confirmed - confHeight := uint32(block.Height) + msg.numConfirmations - 1 - heapEntry := &confEntry{ - msg, - confDetails, - confHeight, + txConf := chainntnfs.TxConfirmation{ + BlockHash: blockHash, + BlockHeight: uint32(block.Height), + TxIndex: uint32(txIndex), } - heap.Push(b.confHeap, heapEntry) - - return true + return &txConf, nil } // notifyBlockEpochs notifies all registered block epoch clients of the newly @@ -498,92 +486,6 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash } } -// notifyConfs examines the current confirmation heap, sending off any -// notifications which have been triggered by the connection of a new block at -// newBlockHeight. -func (b *BtcdNotifier) notifyConfs(newBlockHeight int32) { - // If the heap is empty, we have nothing to do. - if b.confHeap.Len() == 0 { - return - } - - // Traverse our confirmation heap. The heap is a - // min-heap, so the confirmation notification which requires - // the smallest block-height will always be at the top - // of the heap. If a confirmation notification is eligible - // for triggering, then fire it off, and check if another - // is eligible until there are no more eligible entries. - nextConf := heap.Pop(b.confHeap).(*confEntry) - for nextConf.triggerHeight <= uint32(newBlockHeight) { - chainntnfs.Log.Infof("Dispatching %v conf notification, "+ - "height=%v", nextConf.numConfirmations, newBlockHeight) - nextConf.finConf <- nextConf.initialConfDetails - - if b.confHeap.Len() == 0 { - return - } - - nextConf = heap.Pop(b.confHeap).(*confEntry) - } - - heap.Push(b.confHeap, nextConf) -} - -// checkConfirmationTrigger determines if the passed txSha included at blockHeight -// triggers any single confirmation notifications. In the event that the txid -// matches, yet needs additional confirmations, it is added to the confirmation -// heap to be triggered at a later time. -// TODO(roasbeef): perhaps lookup, then track by inputs instead? -func (b *BtcdNotifier) checkConfirmationTrigger(txSha *chainhash.Hash, - newTip *chainUpdate, txIndex int) { - - // If a confirmation notification has been registered - // for this txid, then either trigger a notification - // event if only a single confirmation notification was - // requested, or place the notification on the - // confirmation heap for future usage. - if confClients, ok := b.confNotifications[*txSha]; ok { - // Either all of the registered confirmations will be - // dispatched due to a single confirmation, or added to the - // conf head. Therefore we unconditionally delete the registered - // confirmations from the staging zone. - defer func() { - delete(b.confNotifications, *txSha) - }() - - for _, confClient := range confClients { - confDetails := &chainntnfs.TxConfirmation{ - BlockHash: newTip.blockHash, - BlockHeight: uint32(newTip.blockHeight), - TxIndex: uint32(txIndex), - } - - if confClient.numConfirmations == 1 { - chainntnfs.Log.Infof("Dispatching single conf "+ - "notification, sha=%v, height=%v", txSha, - newTip.blockHeight) - confClient.finConf <- confDetails - continue - } - - // The registered notification requires more - // than one confirmation before triggering. So - // we create a heapConf entry for this notification. - // The heapConf allows us to easily keep track of - // which notification(s) we should fire off with - // each incoming block. - confClient.initialConfirmHeight = uint32(newTip.blockHeight) - finalConfHeight := confClient.initialConfirmHeight + confClient.numConfirmations - 1 - heapEntry := &confEntry{ - confClient, - confDetails, - finalConfHeight, - } - heap.Push(b.confHeap, heapEntry) - } - } -} - // spendNotification couples a target outpoint along with the channel used for // notifications once a spend of the outpoint has been detected. type spendNotification struct { @@ -640,9 +542,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, transaction, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) if err != nil { jsonErr, ok := err.(*btcjson.RPCError) - switch { - case ok && jsonErr.Code == -5: - default: + if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo { return nil, err } } @@ -694,13 +594,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // confirmationNotification represents a client's intent to receive a // notification once the target txid reaches numConfirmations confirmations. type confirmationsNotification struct { - txid *chainhash.Hash - - initialConfirmHeight uint32 - numConfirmations uint32 - - finConf chan *chainntnfs.TxConfirmation - negativeConf chan int32 // TODO(roasbeef): re-org funny business + chainntnfs.ConfNtfn } // RegisterConfirmationsNtfn registers a notification with BtcdNotifier @@ -710,20 +604,18 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, _ uint32) (*chainntnfs.ConfirmationEvent, error) { ntfn := &confirmationsNotification{ - txid: txid, - numConfirmations: numConfs, - finConf: make(chan *chainntnfs.TxConfirmation, 1), - negativeConf: make(chan int32, 1), + chainntnfs.ConfNtfn{ + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(), + }, } select { case <-b.quit: return nil, ErrChainNotifierShuttingDown case b.notificationRegistry <- ntfn: - return &chainntnfs.ConfirmationEvent{ - Confirmed: ntfn.finConf, - NegativeConf: ntfn.negativeConf, - }, nil + return ntfn.Event, nil } } diff --git a/chainntnfs/btcdnotify/confheap.go b/chainntnfs/btcdnotify/confheap.go deleted file mode 100644 index 84379c29cc6..00000000000 --- a/chainntnfs/btcdnotify/confheap.go +++ /dev/null @@ -1,58 +0,0 @@ -package btcdnotify - -import "github.com/lightningnetwork/lnd/chainntnfs" - -// confEntry represents an entry in the min-confirmation heap. -type confEntry struct { - *confirmationsNotification - - initialConfDetails *chainntnfs.TxConfirmation - - triggerHeight uint32 -} - -// confirmationHeap is a list of confEntries sorted according to nearest -// "confirmation" height.Each entry within the min-confirmation heap is sorted -// according to the smallest delta from the current blockheight to the -// triggerHeight of the next entry confirmationHeap -type confirmationHeap struct { - items []*confEntry -} - -// newConfirmationHeap returns a new confirmationHeap with zero items. -func newConfirmationHeap() *confirmationHeap { - var confItems []*confEntry - return &confirmationHeap{confItems} -} - -// Len returns the number of items in the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Len() int { return len(c.items) } - -// Less returns whether the item in the priority queue with index i should sort -// before the item with index j. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Less(i, j int) bool { - return c.items[i].triggerHeight < c.items[j].triggerHeight -} - -// Swap swaps the items at the passed indices in the priority queue. It is -// part of the heap.Interface implementation. -func (c *confirmationHeap) Swap(i, j int) { - c.items[i], c.items[j] = c.items[j], c.items[i] -} - -// Push pushes the passed item onto the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Push(x interface{}) { - c.items = append(c.items, x.(*confEntry)) -} - -// Pop removes the highest priority item (according to Less) from the priority -// queue and returns it. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Pop() interface{} { - n := len(c.items) - x := c.items[n-1] - c.items[n-1] = nil - c.items = c.items[0 : n-1] - return x -} diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 65955da4b62..dc859926577 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -19,6 +19,7 @@ import ( "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg" "github.com/roasbeef/btcd/integration/rpctest" + "github.com/roasbeef/btcd/rpcclient" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" @@ -84,7 +85,7 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, t.Fatalf("unable to get current height: %v", err) } - // Now that we have a txid, register a confirmation notiication with + // Now that we have a txid, register a confirmation notification with // the chainntfn source. numConfs := uint32(1) confIntent, err := notifier.RegisterConfirmationsNtfn(txid, numConfs, @@ -102,6 +103,11 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, select { case confInfo := <-confIntent.Confirmed: + if !confInfo.BlockHash.IsEqual(blockHash[0]) { + t.Fatalf("mismatched block hashes: expected %v, got %v", + blockHash[0], confInfo.BlockHash) + } + // Finally, we'll verify that the tx index returned is the exact same // as the tx index of the transaction within the block itself. msgBlock, err := miner.Node.GetBlock(blockHash[0]) @@ -503,36 +509,54 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, // spending from a coinbase output here, so we use the dedicated // function. - txid, err := getTestTxId(miner) + txid3, err := getTestTxId(miner) if err != nil { t.Fatalf("unable to create test tx: %v", err) } - // Now generate one block. The notifier must check older blocks when - // the confirmation event is registered below to ensure that the TXID - // hasn't already been included in the chain, otherwise the + // Generate another block containing tx 3, but we won't register conf + // notifications for this tx until much later. The notifier must check + // older blocks when the confirmation event is registered below to ensure + // that the TXID hasn't already been included in the chain, otherwise the // notification will never be sent. - blockHash, err := miner.Node.Generate(1) + _, err = miner.Node.Generate(1) if err != nil { t.Fatalf("unable to generate block: %v", err) } + txid1, err := getTestTxId(miner) + if err != nil { + t.Fatalf("unable to create test tx: %v", err) + } + + txid2, err := getTestTxId(miner) + if err != nil { + t.Fatalf("unable to create test tx: %v", err) + } + _, currentHeight, err := miner.Node.GetBestBlock() if err != nil { t.Fatalf("unable to get current height: %v", err) } - // Now that we have a txid, register a confirmation notification with - // the chainntfn source. - numConfs := uint32(1) - confIntent, err := notifier.RegisterConfirmationsNtfn(txid, numConfs, + // Now generate another block containing txs 1 & 2. + blockHash, err := miner.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate block: %v", err) + } + + // Register a confirmation notification with the chainntfn source for tx2, + // which is included in the last block. The height hint is the height before + // the block is included. This notification should fire immediately since + // only 1 confirmation is required. + ntfn1, err := notifier.RegisterConfirmationsNtfn(txid1, 1, uint32(currentHeight)) if err != nil { t.Fatalf("unable to register ntfn: %v", err) } select { - case confInfo := <-confIntent.Confirmed: + case confInfo := <-ntfn1.Confirmed: // Finally, we'll verify that the tx index returned is the exact same // as the tx index of the transaction within the block itself. msgBlock, err := miner.Node.GetBlock(blockHash[0]) @@ -544,14 +568,14 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, if err != nil { t.Fatalf("unable to index into block: %v", err) } - if !specifiedTxHash.IsEqual(txid) { + if !specifiedTxHash.IsEqual(txid1) { t.Fatalf("mismatched tx indexes: expected %v, got %v", - txid, specifiedTxHash) + txid1, specifiedTxHash) } // We'll also ensure that the block height has been set // properly. - if confInfo.BlockHeight != uint32(currentHeight) { + if confInfo.BlockHeight != uint32(currentHeight+1) { t.Fatalf("incorrect block height: expected %v, got %v", confInfo.BlockHeight, currentHeight) } @@ -560,75 +584,60 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, t.Fatalf("confirmation notification never received") } - // Next, we want to test fully dispatching the notification for a - // transaction that has been *partially* confirmed. So we'll create - // another test txid. - txid, err = getTestTxId(miner) + // Register a confirmation notification for tx2, requiring 3 confirmations. + // This transaction is only partially confirmed, so the notification should + // not fire yet. + ntfn2, err := notifier.RegisterConfirmationsNtfn(txid2, 3, + uint32(currentHeight)) if err != nil { - t.Fatalf("unable to create test tx: %v", err) + t.Fatalf("unable to register ntfn: %v", err) } - _, currentHeight, err = miner.Node.GetBestBlock() + // Fully confirm tx3. + _, err = miner.Node.Generate(2) if err != nil { - t.Fatalf("unable to get current height: %v", err) + t.Fatalf("unable to generate block: %v", err) } - // We'll request 6 confirmations for the above generated txid, but we - // will generate the confirmations in chunks. - numConfs = 6 - - time.Sleep(time.Second * 2) - - // First, generate 2 confirmations. - if _, err := miner.Node.Generate(2); err != nil { - t.Fatalf("unable to generate blocks: %v", err) + select { + case <-ntfn2.Confirmed: + case <-time.After(10 * time.Second): + t.Fatalf("confirmation notification never received") } - time.Sleep(time.Second * 2) + select { + case <-ntfn1.Confirmed: + t.Fatalf("received multiple confirmations for tx") + case <-time.After(1 * time.Second): + } - // Next, register for the notification *after* the transition has - // already been partially confirmed. - confIntent, err = notifier.RegisterConfirmationsNtfn(txid, numConfs, - uint32(currentHeight)) + // Finally register a confirmation notification for tx3, requiring 1 + // confirmation. Ensure that conf notifications do not refire on txs + // 1 or 2. + ntfn3, err := notifier.RegisterConfirmationsNtfn(txid3, 1, + uint32(currentHeight-1)) if err != nil { t.Fatalf("unable to register ntfn: %v", err) } - // We shouldn't receive a notification at this point, as the - // transaction hasn't yet been fully confirmed. select { - case <-confIntent.Confirmed: - t.Fatalf("received confirmation notification but shouldn't " + - "have") - default: - // Expected case + case <-ntfn3.Confirmed: + case <-time.After(10 * time.Second): + t.Fatalf("confirmation notification never received") } - // With the notification registered, generate another 3 blocks, this - // shouldn't yet dispatch the notification. - if _, err := miner.Node.Generate(3); err != nil { - t.Fatalf("unable to generate blocks: %v", err) - } + time.Sleep(1 * time.Second) select { - case <-confIntent.Confirmed: - t.Fatalf("received confirmation notification but shouldn't " + - "have") + case <-ntfn1.Confirmed: + t.Fatalf("received multiple confirmations for tx") default: - // Expected case - } - - // Finally, we'll mine the final block which should dispatch the - // notification. - if _, err := miner.Node.Generate(1); err != nil { - t.Fatalf("unable to generate blocks: %v", err) } select { - case <-confIntent.Confirmed: - break - case <-time.After(30 * time.Second): - t.Fatalf("confirmation notification never received") + case <-ntfn2.Confirmed: + t.Fatalf("received multiple confirmations for tx") + default: } } @@ -961,6 +970,147 @@ func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifie } } +func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, + t *testing.T) { + + // Set up a new miner that we can use to cause a reorg. + miner2, err := rpctest.New(netParams, nil, nil) + if err != nil { + t.Fatalf("unable to create mining node: %v", err) + } + if err := miner2.SetUp(false, 0); err != nil { + t.Fatalf("unable to set up mining node: %v", err) + } + defer miner2.TearDown() + + // We start by connecting the new miner to our original miner, + // such that it will sync to our original chain. + if err := rpctest.ConnectNode(miner, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice := []*rpctest.Harness{miner, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + // The two should be on the same blockheight. + _, nodeHeight1, err := miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + _, nodeHeight2, err := miner2.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + if nodeHeight1 != nodeHeight2 { + t.Fatalf("expected both miners to be on the same height", + nodeHeight1, nodeHeight2) + } + + // We disconnect the two nodes, such that we can start mining on them + // individually without the other one learning about the new blocks. + err = miner.Node.AddNode(miner2.P2PAddress(), rpcclient.ANRemove) + if err != nil { + t.Fatalf("unable to remove node: %v", err) + } + + txid, err := getTestTxId(miner) + if err != nil { + t.Fatalf("unable to create test tx: %v", err) + } + + _, currentHeight, err := miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current height: %v", err) + } + + // Now that we have a txid, register a confirmation notification with + // the chainntfn source. + numConfs := uint32(2) + confIntent, err := notifier.RegisterConfirmationsNtfn(txid, numConfs, + uint32(currentHeight)) + if err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + // Now generate a single block, the transaction should be included. + _, err = miner.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // Transaction only has one confirmation, and the notification is registered + // with 2 confirmations, so we should not be notified yet. + select { + case <-confIntent.Confirmed: + t.Fatal("tx was confirmed unexpectedly") + case <-time.After(1 * time.Second): + } + + // Reorganize transaction out of the chain by generating a longer fork + // from the other miner. The transaction is not included in this fork. + miner2.Node.Generate(2) + + // Reconnect nodes to reach consensus on the longest chain. miner2's chain + // should win and become active on miner1. + if err := rpctest.ConnectNode(miner, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice = []*rpctest.Harness{miner, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + _, nodeHeight1, err = miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + _, nodeHeight2, err = miner2.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + if nodeHeight1 != nodeHeight2 { + t.Fatalf("expected both miners to be on the same height", + nodeHeight1, nodeHeight2) + } + + // Even though there is one block above the height of the block that the + // transaction was included in, it is not the active chain so the + // notification should not be sent. + select { + case <-confIntent.Confirmed: + t.Fatal("tx was confirmed unexpectedly") + case <-time.After(1 * time.Second): + } + + // Now confirm the transaction on the longest chain and verify that we + // receive the notification. + tx, err := miner.Node.GetRawTransaction(txid) + if err != nil { + t.Fatalf("unable to get raw tx: %v", err) + } + + _, err = miner2.Node.SendRawTransaction(tx.MsgTx(), false) + if err != nil { + t.Fatalf("unable to get send tx: %v", err) + } + + _, err = miner.Node.Generate(3) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + select { + case <-confIntent.Confirmed: + case <-time.After(20 * time.Second): + t.Fatalf("confirmation notification never received") + } +} + type testCase struct { name string @@ -1012,6 +1162,10 @@ var ntfnTests = []testCase{ name: "lazy ntfn consumer", test: testLazyNtfnConsumer, }, + { + name: "reorg conf", + test: testReorgConf, + }, } // TestInterfaces tests all registered interfaces with a unified set of tests diff --git a/chainntnfs/neutrinonotify/confheap.go b/chainntnfs/neutrinonotify/confheap.go deleted file mode 100644 index 4ac76390a30..00000000000 --- a/chainntnfs/neutrinonotify/confheap.go +++ /dev/null @@ -1,58 +0,0 @@ -package neutrinonotify - -import "github.com/lightningnetwork/lnd/chainntnfs" - -// confEntry represents an entry in the min-confirmation heap. . -type confEntry struct { - *confirmationsNotification - - initialConfDetails *chainntnfs.TxConfirmation - - triggerHeight uint32 -} - -// confirmationHeap is a list of confEntries sorted according to nearest -// "confirmation" height.Each entry within the min-confirmation heap is sorted -// according to the smallest dleta from the current blockheight to the -// triggerHeight of the next entry confirmationHeap -type confirmationHeap struct { - items []*confEntry -} - -// newConfirmationHeap returns a new confirmationHeap with zero items. -func newConfirmationHeap() *confirmationHeap { - var confItems []*confEntry - return &confirmationHeap{confItems} -} - -// Len returns the number of items in the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Len() int { return len(c.items) } - -// Less returns whether the item in the priority queue with index i should sort -// before the item with index j. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Less(i, j int) bool { - return c.items[i].triggerHeight < c.items[j].triggerHeight -} - -// Swap swaps the items at the passed indices in the priority queue. It is -// part of the heap.Interface implementation. -func (c *confirmationHeap) Swap(i, j int) { - c.items[i], c.items[j] = c.items[j], c.items[i] -} - -// Push pushes the passed item onto the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Push(x interface{}) { - c.items = append(c.items, x.(*confEntry)) -} - -// Pop removes the highest priority item (according to Less) from the priority -// queue and returns it. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Pop() interface{} { - n := len(c.items) - x := c.items[n-1] - c.items[n-1] = nil - c.items = c.items[0 : n-1] - return x -} diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 87d28679433..28d738e76a3 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -1,8 +1,8 @@ package neutrinonotify import ( - "container/heap" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -22,6 +22,12 @@ const ( // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "neutrino" + + // reorgSafetyLimit is the chain depth beyond which it is assumed a block + // will not be reorganized out of the chain. This is used to determine when + // to prune old confirmation requests so that reorgs are handled correctly. + // The coinbase maturity period is a reasonable value to use. + reorgSafetyLimit = 100 ) var ( @@ -58,15 +64,13 @@ type NeutrinoNotifier struct { spendNotifications map[wire.OutPoint]map[uint64]*spendNotification - confNotifications map[chainhash.Hash][]*confirmationsNotification - confHeap *confirmationHeap + txConfNotifier *chainntnfs.TxConfNotifier blockEpochClients map[uint64]*blockEpochRegistration rescanErr <-chan error - newBlocks *chainntnfs.ConcurrentQueue - staleBlocks *chainntnfs.ConcurrentQueue + chainUpdates *chainntnfs.ConcurrentQueue wg sync.WaitGroup quit chan struct{} @@ -89,16 +93,11 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), - confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), - confHeap: newConfirmationHeap(), - p2pNode: node, rescanErr: make(chan error), - newBlocks: chainntnfs.NewConcurrentQueue(10), - - staleBlocks: chainntnfs.NewConcurrentQueue(10), + chainUpdates: chainntnfs.NewConcurrentQueue(10), quit: make(chan struct{}), } @@ -145,13 +144,15 @@ func (n *NeutrinoNotifier) Start() error { neutrino.WatchTxIDs(zeroHash), } + n.txConfNotifier = chainntnfs.NewTxConfNotifier( + bestHeight, reorgSafetyLimit) + // Finally, we'll create our rescan struct, start it, and launch all // the goroutines we need to operate this ChainNotifier instance. n.chainView = n.p2pNode.NewRescan(rescanOptions...) n.rescanErr = n.chainView.Start() - n.newBlocks.Start() - n.staleBlocks.Start() + n.chainUpdates.Start() n.wg.Add(1) go n.notificationDispatcher() @@ -169,8 +170,7 @@ func (n *NeutrinoNotifier) Stop() error { close(n.quit) n.wg.Wait() - n.newBlocks.Stop() - n.staleBlocks.Stop() + n.chainUpdates.Stop() // Notify all pending clients of our shutdown by closing the related // notification channels. @@ -179,15 +179,10 @@ func (n *NeutrinoNotifier) Stop() error { close(spendClient.spendChan) } } - for _, confClients := range n.confNotifications { - for _, confClient := range confClients { - close(confClient.finConf) - close(confClient.negativeConf) - } - } for _, epochClient := range n.blockEpochClients { close(epochClient.epochChan) } + n.txConfNotifier.TearDown() return nil } @@ -200,6 +195,10 @@ type filteredBlock struct { hash chainhash.Hash height uint32 txns []*btcutil.Tx + + // connected is true if this update is a new block and false if it is a + // disconnected block. + connect bool } // onFilteredBlockConnected is a callback which is executed each a new block is @@ -209,10 +208,11 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. - n.newBlocks.ChanIn() <- &filteredBlock{ - hash: header.BlockHash(), - height: uint32(height), - txns: txns, + n.chainUpdates.ChanIn() <- &filteredBlock{ + hash: header.BlockHash(), + height: uint32(height), + txns: txns, + connect: true, } } @@ -223,9 +223,10 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. - n.staleBlocks.ChanIn() <- &filteredBlock{ - hash: header.BlockHash(), - height: uint32(height), + n.chainUpdates.ChanIn() <- &filteredBlock{ + hash: header.BlockHash(), + height: uint32(height), + connect: false, } } @@ -283,121 +284,92 @@ func (n *NeutrinoNotifier) notificationDispatcher() { n.spendNotifications[op][msg.spendID] = msg case *confirmationsNotification: - chainntnfs.Log.Infof("New confirmations "+ - "subscription: txid=%v, numconfs=%v, "+ - "height_hint=%v", *msg.txid, - msg.numConfirmations, msg.heightHint) + chainntnfs.Log.Infof("New confirmations subscription: "+ + "txid=%v, numconfs=%v, height_hint=%v", + msg.TxID, msg.NumConfirmations, msg.heightHint) // If the notification can be partially or // fully dispatched, then we can skip the first // phase for ntfns. n.heightMtx.RLock() currentHeight := n.bestHeight - if n.attemptHistoricalDispatch(msg, currentHeight, msg.heightHint) { - n.heightMtx.RUnlock() - continue - } n.heightMtx.RUnlock() - // If we can't fully dispatch confirmation, - // then we'll update our filter so we can be - // notified of its future initial confirmation. - rescanUpdate := []neutrino.UpdateOption{ - neutrino.AddTxIDs(*msg.txid), - neutrino.Rewind(currentHeight), + // Lookup whether the transaction is already included in the + // active chain. + txConf, err := n.historicalConfDetails(msg.TxID, currentHeight, + msg.heightHint) + if err != nil { + chainntnfs.Log.Error(err) } - if err := n.chainView.Update(rescanUpdate...); err != nil { - chainntnfs.Log.Errorf("unable to update rescan: %v", err) + + if txConf == nil { + // If we can't fully dispatch confirmation, + // then we'll update our filter so we can be + // notified of its future initial confirmation. + rescanUpdate := []neutrino.UpdateOption{ + neutrino.AddTxIDs(*msg.TxID), + neutrino.Rewind(currentHeight), + } + if err := n.chainView.Update(rescanUpdate...); err != nil { + chainntnfs.Log.Errorf("unable to update rescan: %v", err) + } } - txid := *msg.txid - n.confNotifications[txid] = append(n.confNotifications[txid], msg) + err = n.txConfNotifier.Register(&msg.ConfNtfn, txConf) + if err != nil { + chainntnfs.Log.Error(err) + } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") n.blockEpochClients[msg.epochID] = msg } - case item := <-n.newBlocks.ChanOut(): - newBlock := item.(*filteredBlock) - - n.heightMtx.Lock() - n.bestHeight = newBlock.height - n.heightMtx.Unlock() - - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - newBlock.height, newBlock.hash) - - // First we'll notify any subscribed clients of the - // block. - n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - - // Next, we'll scan over the list of relevant - // transactions and possibly dispatch notifications for - // confirmations and spends. - for _, tx := range newBlock.txns { - // Check if the inclusion of this transaction - // within a block by itself triggers a block - // confirmation threshold, if so send a - // notification. Otherwise, place the - // notification on a heap to be triggered in - // the future once additional confirmations are - // attained. - mtx := tx.MsgTx() - txIndex := tx.Index() - txSha := mtx.TxHash() - n.checkConfirmationTrigger(&txSha, newBlock, txIndex) - - for i, txIn := range mtx.TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does - // spend an output which we have a - // registered notification for, then - // create a spend summary, finally - // sending off the details to the - // notification subscriber. - if clients, ok := n.spendNotifications[prevOut]; ok { - // TODO(roasbeef): many - // integration tests expect - // spend to be notified within - // the mempool. - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: &prevOut, - SpenderTxHash: &txSha, - SpendingTx: mtx, - SpenderInputIndex: uint32(i), - SpendingHeight: int32(newBlock.height), - } - - for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching "+ - "spend notification for "+ - "outpoint=%v", ntfn.targetOutpoint) - ntfn.spendChan <- spendDetails + case item := <-n.chainUpdates.ChanOut(): + update := item.(*filteredBlock) + if update.connect { + n.heightMtx.Lock() + if update.height != n.bestHeight+1 { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, new height=%d", + n.bestHeight, update.height) + n.heightMtx.Unlock() + continue + } - // Close spendChan to ensure that any calls to Cancel will not - // block. This is safe to do since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) - } + n.bestHeight = update.height + n.heightMtx.Unlock() - delete(n.spendNotifications, prevOut) - } + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + update.height, update.hash) + err := n.handleBlockConnected(update) + if err != nil { + chainntnfs.Log.Error(err) } + continue + } + n.heightMtx.Lock() + if update.height != n.bestHeight { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + n.bestHeight, update.height) + n.heightMtx.Unlock() + continue } - // A new block has been connected to the main chain. - // Send out any N confirmation notifications which may - // have been triggered by this new block. - n.notifyConfs(int32(newBlock.height)) + n.bestHeight = update.height - 1 + n.heightMtx.Unlock() - case item := <-n.staleBlocks.ChanOut(): - staleBlock := item.(*filteredBlock) - chainntnfs.Log.Warnf("Block disconnected from main "+ - "chain: %v", staleBlock.hash) + chainntnfs.Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", update.height, update.hash) + + err := n.txConfNotifier.DisconnectTip(update.height) + if err != nil { + chainntnfs.Log.Error(err) + } case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err) @@ -409,32 +381,20 @@ func (n *NeutrinoNotifier) notificationDispatcher() { } } -// attemptHistoricalDispatch attempts to consult the historical chain data to -// see if a transaction has already reached full confirmation status at the -// time a notification for it was registered. If it has, then we do an -// immediate dispatch. Otherwise, we'll add the partially confirmed transaction -// to the confirmation heap. -func (n *NeutrinoNotifier) attemptHistoricalDispatch(msg *confirmationsNotification, - currentHeight, heightHint uint32) bool { - - targetHash := msg.txid - - var confDetails *chainntnfs.TxConfirmation - - chainntnfs.Log.Infof("Attempting to trigger dispatch for %v from "+ - "historical chain", msg.txid) +// historicalConfDetails looks up whether a transaction is already included in a +// block in the active chain and, if so, returns details about the confirmation. +func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, + currentHeight, heightHint uint32) (*chainntnfs.TxConfirmation, error) { // Starting from the height hint, we'll walk forwards in the chain to // see if this transaction has already been confirmed. -chainScan: for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { // First, we'll fetch the block header for this height so we // can compute the current block hash. header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight) if err != nil { - chainntnfs.Log.Errorf("unable to get header for "+ - "height=%v: %v", scanHeight, err) - return false + return nil, fmt.Errorf("unable to get header for height=%v: %v", + scanHeight, err) } blockHash := header.BlockHash() @@ -443,9 +403,8 @@ chainScan: regFilter, err := n.p2pNode.GetCFilter(blockHash, wire.GCSFilterRegular) if err != nil { - chainntnfs.Log.Errorf("unable to retrieve regular "+ - "filter for height=%v: %v", scanHeight, err) - return false + return nil, fmt.Errorf("unable to retrieve regular filter for "+ + "height=%v: %v", scanHeight, err) } // If the block has no transactions other than the coinbase @@ -460,8 +419,7 @@ chainScan: key := builder.DeriveKey(&blockHash) match, err := regFilter.Match(key, targetHash[:]) if err != nil { - chainntnfs.Log.Errorf("unable to query filter: %v", err) - return false + return nil, fmt.Errorf("unable to query filter: %v", err) } // If there's no match, then we can continue forward to the @@ -475,54 +433,79 @@ chainScan: // to send the proper response. block, err := n.p2pNode.GetBlockFromNetwork(blockHash) if err != nil { - chainntnfs.Log.Errorf("unable to get block from "+ - "network: %v", err) - return false + return nil, fmt.Errorf("unable to get block from network: %v", err) } for j, tx := range block.Transactions() { txHash := tx.Hash() if txHash.IsEqual(targetHash) { - confDetails = &chainntnfs.TxConfirmation{ + confDetails := chainntnfs.TxConfirmation{ BlockHash: &blockHash, BlockHeight: scanHeight, TxIndex: uint32(j), } - break chainScan + return &confDetails, nil } } } - // If it hasn't yet been confirmed, then we can exit early. - if confDetails == nil { - return false - } + return nil, nil +} - // Otherwise, we'll calculate the number of confirmations that the - // transaction has so we can decide if it has reached the desired - // number of confirmations or not. - txConfs := currentHeight - confDetails.BlockHeight + 1 - - // If the transaction has more that enough confirmations, then we can - // dispatch it immediately after obtaining for information w.r.t - // exactly *when* if got all its confirmations. - if uint32(txConfs) >= msg.numConfirmations { - chainntnfs.Log.Infof("Dispatching %v conf notification, "+ - "height=%v", msg.numConfirmations, currentHeight) - msg.finConf <- confDetails - return true - } +// handleBlocksConnected applies a chain update for a new block. Any watched +// transactions included this block will processed to either send notifications +// now or after numConfirmations confs. +func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { + // First we'll notify any subscribed clients of the block. + n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) + + // Next, we'll scan over the list of relevant transactions and possibly + // dispatch notifications for confirmations and spends. + for _, tx := range newBlock.txns { + mtx := tx.MsgTx() + txSha := mtx.TxHash() + + for i, txIn := range mtx.TxIn { + prevOut := txIn.PreviousOutPoint + + // If this transaction indeed does spend an output which we have a + // registered notification for, then create a spend summary, finally + // sending off the details to the notification subscriber. + clients, ok := n.spendNotifications[prevOut] + if !ok { + continue + } + + // TODO(roasbeef): many integration tests expect spend to be + // notified within the mempool. + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: &prevOut, + SpenderTxHash: &txSha, + SpendingTx: mtx, + SpenderInputIndex: uint32(i), + SpendingHeight: int32(newBlock.height), + } + + for _, ntfn := range clients { + chainntnfs.Log.Infof("Dispatching spend notification for "+ + "outpoint=%v", ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails - // Otherwise, the transaction has only been *partially* confirmed, so - // we need to insert it into the confirmation heap. - confHeight := confDetails.BlockHeight + msg.numConfirmations - 1 - heapEntry := &confEntry{ - msg, - confDetails, - confHeight, + // Close spendChan to ensure that any calls to Cancel will not + // block. This is safe to do since the channel is buffered, and + // the message can still be read by the receiver. + close(ntfn.spendChan) + } + + delete(n.spendNotifications, prevOut) + } } - heap.Push(n.confHeap, heapEntry) - return true + // A new block has been connected to the main chain. + // Send out any N confirmation notifications which may + // have been triggered by this new block. + n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns) + + return nil } // notifyBlockEpochs notifies all registered block epoch clients of the newly @@ -555,91 +538,6 @@ func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash. } } -// notifyConfs examines the current confirmation heap, sending off any -// notifications which have been triggered by the connection of a new block at -// newBlockHeight. -func (n *NeutrinoNotifier) notifyConfs(newBlockHeight int32) { - // If the heap is empty, we have nothing to do. - if n.confHeap.Len() == 0 { - return - } - - // Traverse our confirmation heap. The heap is a min-heap, so the - // confirmation notification which requires the smallest block-height - // will always be at the top of the heap. If a confirmation - // notification is eligible for triggering, then fire it off, and check - // if another is eligible until there are no more eligible entries. - nextConf := heap.Pop(n.confHeap).(*confEntry) - for nextConf.triggerHeight <= uint32(newBlockHeight) { - - chainntnfs.Log.Infof("Dispatching %v conf notification, "+ - "height=%v", nextConf.numConfirmations, newBlockHeight) - - nextConf.finConf <- nextConf.initialConfDetails - - if n.confHeap.Len() == 0 { - return - } - - nextConf = heap.Pop(n.confHeap).(*confEntry) - } - - heap.Push(n.confHeap, nextConf) -} - -// checkConfirmationTrigger determines if the passed txSha included at -// blockHeight triggers any single confirmation notifications. In the event -// that the txid matches, yet needs additional confirmations, it is added to -// the confirmation heap to be triggered at a later time. -func (n *NeutrinoNotifier) checkConfirmationTrigger(txSha *chainhash.Hash, - newTip *filteredBlock, txIndex int) { - - // If a confirmation notification has been registered for this txid, - // then either trigger a notification event if only a single - // confirmation notification was requested, or place the notification - // on the confirmation heap for future usage. - if confClients, ok := n.confNotifications[*txSha]; ok { - // Either all of the registered confirmations will be - // dispatched due to a single confirmation, or added to the - // conf head. Therefor we unconditionally delete the registered - // confirmations from the staging zone. - defer func() { - delete(n.confNotifications, *txSha) - }() - - for _, confClient := range confClients { - confDetails := &chainntnfs.TxConfirmation{ - BlockHash: &newTip.hash, - BlockHeight: uint32(newTip.height), - TxIndex: uint32(txIndex), - } - - if confClient.numConfirmations == 1 { - chainntnfs.Log.Infof("Dispatching single conf "+ - "notification, sha=%v, height=%v", txSha, - newTip.height) - confClient.finConf <- confDetails - continue - } - - // The registered notification requires more than one - // confirmation before triggering. So we create a - // heapConf entry for this notification. The heapConf - // allows us to easily keep track of which - // notification(s) we should fire off with each - // incoming block. - confClient.initialConfirmHeight = uint32(newTip.height) - finalConfHeight := confClient.initialConfirmHeight + confClient.numConfirmations - 1 - heapEntry := &confEntry{ - confClient, - confDetails, - finalConfHeight, - } - heap.Push(n.confHeap, heapEntry) - } - } -} - // spendNotification couples a target outpoint along with the channel used for // notifications once a spend of the outpoint has been detected. type spendNotification struct { @@ -782,15 +680,8 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // confirmationNotification represents a client's intent to receive a // notification once the target txid reaches numConfirmations confirmations. type confirmationsNotification struct { - txid *chainhash.Hash - + chainntnfs.ConfNtfn heightHint uint32 - - initialConfirmHeight uint32 - numConfirmations uint32 - - finConf chan *chainntnfs.TxConfirmation - negativeConf chan int32 // TODO(roasbeef): re-org funny business } // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier @@ -800,21 +691,19 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { ntfn := &confirmationsNotification{ - txid: txid, - heightHint: heightHint, - numConfirmations: numConfs, - finConf: make(chan *chainntnfs.TxConfirmation, 1), - negativeConf: make(chan int32, 1), + ConfNtfn: chainntnfs.ConfNtfn{ + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(), + }, + heightHint: heightHint, } select { case <-n.quit: return nil, ErrChainNotifierShuttingDown case n.notificationRegistry <- ntfn: - return &chainntnfs.ConfirmationEvent{ - Confirmed: ntfn.finConf, - NegativeConf: ntfn.negativeConf, - }, nil + return ntfn.Event, nil } } diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go new file mode 100644 index 00000000000..414b6b06082 --- /dev/null +++ b/chainntnfs/txconfnotifier.go @@ -0,0 +1,296 @@ +package chainntnfs + +import ( + "fmt" + + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcutil" +) + +// ConfNtfn represents a notifier client's request to receive a notification +// once the target transaction gets sufficient confirmations. The client is +// asynchronously notified via the ConfirmationEvent channels. +type ConfNtfn struct { + // TxID is the hash of the transaction for which confirmatino notifications + // are requested. + TxID *chainhash.Hash + + // NumConfirmations is the number of confirmations after which the + // notification is to be sent. + NumConfirmations uint32 + + // Event contains references to the channels that the notifications are to + // be sent over. + Event *ConfirmationEvent + + // details describes the transaction's position is the blockchain. May be + // nil for unconfirmed transactions. + details *TxConfirmation + + // dispatched is false if the confirmed notification has not been sent yet. + dispatched bool +} + +// NewConfirmationEvent constructs a new ConfirmationEvent with newly opened +// channels. +func NewConfirmationEvent() *ConfirmationEvent { + return &ConfirmationEvent{ + Confirmed: make(chan *TxConfirmation, 1), + NegativeConf: make(chan int32, 1), + } +} + +// TxConfNotifier is used to register transaction confirmation notifications and +// dispatch them as the transactions confirm. A client can request to be +// notified when a particular transaction has sufficient on-chain confirmations +// (or be notified immediately if the tx already does), and the TxConfNotifier +// will watch changes to the blockchain in order to satisfy these requests. +type TxConfNotifier struct { + // currentHeight is the height of the tracked blockchain. It is used to + // determine the number of confirmations a tx has and ensure blocks are + // connected and disconnected in order. + currentHeight uint32 + + // reorgSafetyLimit is the chain depth beyond which it is assumed a block + // will not be reorganized out of the chain. This is used to determine when + // to prune old confirmation requests so that reorgs are handled correctly. + // The coinbase maturity period is a reasonable value to use. + reorgSafetyLimit uint32 + + // reorgDepth is the depth of a chain organization that this system is being + // informed of. This is incremented as long as a sequence of blocks are + // disconnected without being interrupted by a new block. + reorgDepth uint32 + + // confNotifications is an index of notification requests by transaction + // hash. + confNotifications map[chainhash.Hash][]*ConfNtfn + + // confTxsByInitialHeight is an index of watched transactions by the height + // that they are included at in the blockchain. This is tracked so that + // incorrect notifications are not sent if a transaction is reorganized out + // of the chain and so that negative confirmations can be recognized. + confTxsByInitialHeight map[uint32][]*chainhash.Hash + + // ntfnsByConfirmHeight is an index of notification requests by the height + // at which the transaction will have sufficient confirmations. + ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} + + // quit is closed in order to signal that the notifier is gracefully + // exiting. + quit chan struct{} +} + +// NewTxConfNotifier creates a TxConfNotifier. The current height of the +// blockchain is accepted as a parameter. +func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotifier { + return &TxConfNotifier{ + currentHeight: startHeight, + reorgSafetyLimit: reorgSafetyLimit, + confNotifications: make(map[chainhash.Hash][]*ConfNtfn), + confTxsByInitialHeight: make(map[uint32][]*chainhash.Hash), + ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), + quit: make(chan struct{}), + } +} + +// Register handles a new notification request. The client will be notified when +// the transaction gets a sufficient number of confirmations on the blockchain. +// If the transaction has already been included in a block on the chain, the +// confirmation details must be given as the txConf argument, otherwise it +// should be nil. If the transaction already has the sufficient number of +// confirmations, this dispatches the notification immediately. +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error { + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + default: + } + + if txConf == nil || txConf.BlockHeight > tcn.currentHeight { + // Transaction is unconfirmed. + tcn.confNotifications[*ntfn.TxID] = + append(tcn.confNotifications[*ntfn.TxID], ntfn) + return nil + } + + // If the transaction already has the required confirmations, dispatch + // notification immediately, otherwise record along with the height at + // which to notify. + confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 + if confHeight <= tcn.currentHeight { + Log.Infof("Dispatching %v conf notification for %v", + ntfn.NumConfirmations, ntfn.TxID) + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + case ntfn.Event.Confirmed <- txConf: + ntfn.dispatched = true + } + } else { + ntfn.details = txConf + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + ntfnSet = make(map[*ConfNtfn]struct{}) + tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet + } + ntfnSet[ntfn] = struct{}{} + } + + // Unless the transaction is finalized, include transaction information in + // confNotifications and confTxsByInitialHeight in case the tx gets + // reorganized out of the chain. + if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { + tcn.confNotifications[*ntfn.TxID] = + append(tcn.confNotifications[*ntfn.TxID], ntfn) + tcn.confTxsByInitialHeight[txConf.BlockHeight] = + append(tcn.confTxsByInitialHeight[txConf.BlockHeight], ntfn.TxID) + } + + return nil +} + +// ConnectTip handles a new block extending the current chain. This checks each +// transaction in the block to see if any watched transactions are included. +// Also, if any watched transactions now have the required number of +// confirmations as a result of this block being connected, this dispatches +// notifications. +func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, + blockHeight uint32, txns []*btcutil.Tx) error { + + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + default: + } + + if blockHeight != tcn.currentHeight+1 { + return fmt.Errorf("Received blocks out of order: "+ + "current height=%d, new height=%d", + tcn.currentHeight, blockHeight) + } + tcn.currentHeight++ + tcn.reorgDepth = 0 + + // Record any newly confirmed transactions in ntfnsByConfirmHeight so that + // notifications get dispatched when the tx gets sufficient confirmations. + // Also record txs in confTxsByInitialHeight so reorgs can be handled + // correctly. + for _, tx := range txns { + txHash := tx.Hash() + for _, ntfn := range tcn.confNotifications[*txHash] { + ntfn.details = &TxConfirmation{ + BlockHash: blockHash, + BlockHeight: blockHeight, + TxIndex: uint32(tx.Index()), + } + + confHeight := blockHeight + ntfn.NumConfirmations - 1 + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + ntfnSet = make(map[*ConfNtfn]struct{}) + tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet + } + ntfnSet[ntfn] = struct{}{} + + tcn.confTxsByInitialHeight[blockHeight] = + append(tcn.confTxsByInitialHeight[blockHeight], tx.Hash()) + } + } + + // Dispatch notifications for all transactions that are considered confirmed + // at this new block height. + for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { + Log.Infof("Dispatching %v conf notification for %v", + ntfn.NumConfirmations, ntfn.TxID) + select { + case ntfn.Event.Confirmed <- ntfn.details: + ntfn.dispatched = true + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + } + } + delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) + + // Clear entries from confNotifications and confTxsByInitialHeight. We + // assume that reorgs deeper than the reorg safety limit do not happen, so + // we can clear out entries for the block that is now mature. + if tcn.currentHeight >= tcn.reorgSafetyLimit { + matureBlockHeight := tcn.currentHeight - tcn.reorgSafetyLimit + for _, txHash := range tcn.confTxsByInitialHeight[matureBlockHeight] { + delete(tcn.confNotifications, *txHash) + } + delete(tcn.confTxsByInitialHeight, matureBlockHeight) + } + + return nil +} + +// DisconnectTip handles the tip of the current chain being disconnected during +// a chain reorganization. If any watched transactions were included in this +// block, internal structures are updated to ensure a confirmation notification +// is not sent unless the transaction is included in the new chain. +func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + default: + } + + if blockHeight != tcn.currentHeight { + return fmt.Errorf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + tcn.currentHeight, blockHeight) + } + tcn.currentHeight-- + tcn.reorgDepth++ + + for _, txHash := range tcn.confTxsByInitialHeight[blockHeight] { + for _, ntfn := range tcn.confNotifications[*txHash] { + // If notification has been dispatched with sufficient + // confirmations, notify of the reversal. + if ntfn.dispatched { + select { + case <-ntfn.Event.Confirmed: + // Drain confirmation notification instead of sending + // negative conf if the receiver has not processed it yet. + // This ensures sends to the Confirmed channel are always + // non-blocking. + case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + } + ntfn.dispatched = false + continue + } + + confHeight := blockHeight + ntfn.NumConfirmations - 1 + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + continue + } + delete(ntfnSet, ntfn) + } + } + delete(tcn.confTxsByInitialHeight, blockHeight) + + return nil +} + +// TearDown is to be called when the owner of the TxConfNotifier is exiting. +// This closes the event channels of all registered notifications that have +// not been dispatched yet. +func (tcn *TxConfNotifier) TearDown() { + close(tcn.quit) + + for _, ntfns := range tcn.confNotifications { + for _, ntfn := range ntfns { + if ntfn.dispatched { + continue + } + + close(ntfn.Event.Confirmed) + close(ntfn.Event.NegativeConf) + } + } +} diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go new file mode 100644 index 00000000000..d60b5227058 --- /dev/null +++ b/chainntnfs/txconfnotifier_test.go @@ -0,0 +1,432 @@ +package chainntnfs_test + +import ( + "testing" + + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +var zeroHash chainhash.Hash + +// TestTxConfFutureDispatch tests that the TxConfNotifier dispatches +// registered notifications when the transaction confirms after registration. +func TestTxConfFutureDispatch(t *testing.T) { + t.Parallel() + + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + var ( + tx1 = wire.MsgTx{Version: 1} + tx2 = wire.MsgTx{Version: 2} + tx3 = wire.MsgTx{Version: 3} + ) + + tx1Hash := tx1.TxHash() + ntfn1 := chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn1, nil) + + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn2, nil) + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + block1 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3}, + }) + + err := txConfNotifier.ConnectTip(block1.Hash(), 11, block1.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case txConf := <-ntfn1.Event.Confirmed: + expectedConf := chainntnfs.TxConfirmation{ + BlockHash: block1.Hash(), + BlockHeight: 11, + TxIndex: 0, + } + assertEqualTxConf(t, txConf, &expectedConf) + default: + t.Fatalf("Expected confirmation for tx1") + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + block2 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx3}, + }) + + err = txConfNotifier.ConnectTip(block2.Hash(), 12, block2.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + expectedConf := chainntnfs.TxConfirmation{ + BlockHash: block1.Hash(), + BlockHeight: 11, + TxIndex: 1, + } + assertEqualTxConf(t, txConf, &expectedConf) + default: + t.Fatalf("Expected confirmation for tx2") + } +} + +// TestTxConfHistoricalDispatch tests that the TxConfNotifier dispatches +// registered notifications when the transaction is confirmed before +// registration. +func TestTxConfHistoricalDispatch(t *testing.T) { + t.Parallel() + + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + var ( + tx1 = wire.MsgTx{Version: 1} + tx2 = wire.MsgTx{Version: 2} + tx3 = wire.MsgTx{Version: 3} + ) + + tx1Hash := tx1.TxHash() + ntfn1 := chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(), + } + txConf1 := chainntnfs.TxConfirmation{ + BlockHash: &zeroHash, + BlockHeight: 9, + TxIndex: 1, + } + txConfNotifier.Register(&ntfn1, &txConf1) + + tx2Hash := tx2.TxHash() + txConf2 := chainntnfs.TxConfirmation{ + BlockHash: &zeroHash, + BlockHeight: 9, + TxIndex: 2, + } + ntfn2 := chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 3, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn2, &txConf2) + + select { + case txConf := <-ntfn1.Event.Confirmed: + assertEqualTxConf(t, txConf, &txConf1) + default: + t.Fatalf("Expected confirmation for tx1") + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + block := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx3}, + }) + + err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + assertEqualTxConf(t, txConf, &txConf2) + default: + t.Fatalf("Expected confirmation for tx2") + } +} + +// TestTxConfChainReorg tests that TxConfNotifier dispatches Confirmed and +// NegativeConf notifications appropriately when there is a chain +// reorganization. +func TestTxConfChainReorg(t *testing.T) { + t.Parallel() + + txConfNotifier := chainntnfs.NewTxConfNotifier(8, 100) + + var ( + tx1 = wire.MsgTx{Version: 1} + tx2 = wire.MsgTx{Version: 2} + tx3 = wire.MsgTx{Version: 3} + ) + + // Tx 1 will be confirmed in block 9 and requires 2 confs. + tx1Hash := tx1.TxHash() + ntfn1 := chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn1, nil) + + // Tx 2 will be confirmed in block 10 and requires 1 conf. + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn2, nil) + + // Tx 3 will be confirmed in block 10 and requires 2 confs. + tx3Hash := tx3.TxHash() + ntfn3 := chainntnfs.ConfNtfn{ + TxID: &tx3Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn3, nil) + + // Sync chain to block 10. Txs 1 & 2 should be confirmed. + block1 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1}, + }) + err := txConfNotifier.ConnectTip(nil, 9, block1.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + block2 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx2, &tx3}, + }) + err = txConfNotifier.ConnectTip(nil, 10, block2.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case <-ntfn1.Event.Confirmed: + default: + t.Fatalf("Expected confirmation for tx1") + } + + select { + case <-ntfn2.Event.Confirmed: + default: + t.Fatalf("Expected confirmation for tx2") + } + + select { + case txConf := <-ntfn3.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) + default: + } + + // Block that tx2 and tx3 were included in is disconnected and two next + // blocks without them are connected. + err = txConfNotifier.DisconnectTip(10) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + err = txConfNotifier.ConnectTip(nil, 10, nil) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + err = txConfNotifier.ConnectTip(nil, 11, nil) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case reorgDepth := <-ntfn2.Event.NegativeConf: + if reorgDepth != 1 { + t.Fatalf("Incorrect value for negative conf notification: "+ + "expected %d, got %d", 1, reorgDepth) + } + default: + t.Fatalf("Expected negative conf notification for tx1") + } + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + select { + case txConf := <-ntfn3.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) + default: + } + + // Now transactions 2 & 3 are re-included in a new block. + block3 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx2, &tx3}, + }) + block4 := btcutil.NewBlock(&wire.MsgBlock{}) + + err = txConfNotifier.ConnectTip(block3.Hash(), 12, block3.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + err = txConfNotifier.ConnectTip(block4.Hash(), 13, block4.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // Both transactions should be newly confirmed. + select { + case txConf := <-ntfn2.Event.Confirmed: + expectedConf := chainntnfs.TxConfirmation{ + BlockHash: block3.Hash(), + BlockHeight: 12, + TxIndex: 0, + } + assertEqualTxConf(t, txConf, &expectedConf) + default: + t.Fatalf("Expected confirmation for tx2") + } + + select { + case txConf := <-ntfn3.Event.Confirmed: + expectedConf := chainntnfs.TxConfirmation{ + BlockHash: block3.Hash(), + BlockHeight: 12, + TxIndex: 1, + } + assertEqualTxConf(t, txConf, &expectedConf) + default: + t.Fatalf("Expected confirmation for tx3") + } +} + +func TestTxConfTearDown(t *testing.T) { + t.Parallel() + + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + var ( + tx1 = wire.MsgTx{Version: 1} + tx2 = wire.MsgTx{Version: 2} + ) + + tx1Hash := tx1.TxHash() + ntfn1 := chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn1, nil) + + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(), + } + txConfNotifier.Register(&ntfn2, nil) + + block := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1, &tx2}, + }) + + err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + select { + case <-ntfn1.Event.Confirmed: + default: + t.Fatalf("Expected confirmation for tx1") + } + + select { + case txConf := <-ntfn2.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) + default: + } + + // Confirmed channels should be closed for notifications that have not been + // dispatched yet. + txConfNotifier.TearDown() + + select { + case txConf := <-ntfn1.Event.Confirmed: + t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) + default: + } + + select { + case _, more := <-ntfn2.Event.Confirmed: + if more { + t.Fatalf("Expected channel close for tx2") + } + default: + t.Fatalf("Expected channel close for tx2") + } +} + +func assertEqualTxConf(t *testing.T, + actualConf, expectedConf *chainntnfs.TxConfirmation) { + + if actualConf.BlockHeight != expectedConf.BlockHeight { + t.Fatalf("Incorrect block height in confirmation details: "+ + "expected %d, got %d", + expectedConf.BlockHeight, actualConf.BlockHeight) + } + if !actualConf.BlockHash.IsEqual(expectedConf.BlockHash) { + t.Fatalf("Incorrect block hash in confirmation details: "+ + "expected %d, got %d", expectedConf.BlockHash, actualConf.BlockHash) + } + if actualConf.TxIndex != expectedConf.TxIndex { + t.Fatalf("Incorrect tx index in confirmation details: "+ + "expected %d, got %d", expectedConf.TxIndex, actualConf.TxIndex) + } +}