Skip to content

Commit

Permalink
mempool, main: cache tx proofs for CSNs
Browse files Browse the repository at this point in the history
For utreexo CSN to CSN proof serving, each node needs to keep track of
the proof for the given mempool tx. The change made here introduces a
pool of leaf datas that are stored in the mempool and when adding
transactions to the pool, the accumulator proof for that is cached in
the accumulator as well.

This code is mostly for the p2p to eventually support receiving and
serving partial proofs for utreexo peers.
  • Loading branch information
kcalvinalvin committed Nov 8, 2023
1 parent 6bf7025 commit 5604fb9
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 12 deletions.
62 changes: 59 additions & 3 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type Config struct {
// activated.
VerifyUData func(ud *wire.UData, txIns []*wire.TxIn, remember bool) error

// PruneFromAccumulator uncaches the given hashes from the accumulator.
PruneFromAccumulator func(hashes []wire.LeafData) error

// SigCache defines a signature cache to use.
SigCache *txscript.SigCache

Expand Down Expand Up @@ -189,6 +192,7 @@ type TxPool struct {
mtx sync.RWMutex
cfg Config
pool map[chainhash.Hash]*TxDesc
poolLeaves map[chainhash.Hash][]wire.LeafData
orphans map[chainhash.Hash]*orphanTx
orphansByPrev map[wire.OutPoint]map[chainhash.Hash]*btcutil.Tx
outpoints map[wire.OutPoint]*btcutil.Tx
Expand Down Expand Up @@ -496,6 +500,25 @@ func (mp *TxPool) removeTransaction(tx *btcutil.Tx, removeRedeemers bool) {
mp.cfg.AddrIndex.RemoveUnconfirmedTx(txHash)
}

// If the utreexo view is active, then remove the cached hashes from the
// accumulator.
if mp.cfg.IsUtreexoViewActive != nil && mp.cfg.IsUtreexoViewActive() {
leaves, found := mp.poolLeaves[*txHash]
if !found {
log.Infof("missing the leaf hashes for tx %s from while "+
"removing it from the pool",
tx.MsgTx().TxHash().String())
} else {
delete(mp.poolLeaves, *txHash)

err := mp.cfg.PruneFromAccumulator(leaves)
if err != nil {
log.Infof("err while pruning proof for inputs of tx %s: ",
err, tx.MsgTx().TxHash().String())
}
}
}

// Mark the referenced outpoints as unspent by the pool.
for _, txIn := range txDesc.Tx.MsgTx().TxIn {
delete(mp.outpoints, txIn.PreviousOutPoint)
Expand Down Expand Up @@ -543,7 +566,21 @@ func (mp *TxPool) RemoveDoubleSpends(tx *btcutil.Tx) {
// helper for maybeAcceptTransaction.
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *btcutil.Tx, height int32, fee int64) *TxDesc {
func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *btcutil.Tx, height int32, fee int64) (*TxDesc, error) {
if mp.cfg.IsUtreexoViewActive != nil && mp.cfg.IsUtreexoViewActive() {
// Ingest the proof. Shouldn't error out with the proof being invalid
// here since we've already verified it above.
err := mp.cfg.VerifyUData(tx.MsgTx().UData, tx.MsgTx().TxIn, true)
if err != nil {
return nil, fmt.Errorf("error while ingesting proof. %v", err)
}

mp.poolLeaves[*tx.Hash()] = tx.MsgTx().UData.LeafDatas
}

// Nil out uneeded udata for the mempool.
tx.MsgTx().UData = nil

// Add the transaction to the pool and mark the referenced outpoints
// as spent by the pool.
txD := &TxDesc{
Expand Down Expand Up @@ -574,7 +611,7 @@ func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *btcutil
mp.cfg.FeeEstimator.ObserveTransaction(txD)
}

return txD
return txD, nil
}

// checkPoolDoubleSpend checks whether or not the passed transaction is
Expand Down Expand Up @@ -880,6 +917,21 @@ func (mp *TxPool) FetchTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error)
return nil, fmt.Errorf("transaction is not in the pool")
}

// FetchLeafDatas returns the leafdatas for the given tx. Returns an error if
// the leaves for the given tx is not in the pool.
func (mp *TxPool) FetchLeafDatas(txHash *chainhash.Hash) ([]wire.LeafData, error) {
// Protect concurrent access.
mp.mtx.RLock()
leaves, exists := mp.poolLeaves[*txHash]
mp.mtx.RUnlock()

if exists {
return leaves, nil
}

return nil, fmt.Errorf("leafdata for the transaction is not in the pool")
}

// validateReplacement determines whether a transaction is deemed as a valid
// replacement of all of its conflicts according to the RBF policy. If it is
// valid, no error is returned. Otherwise, an error is returned indicating what
Expand Down Expand Up @@ -1305,7 +1357,10 @@ func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit, rejec
// this call as they'll be removed eventually.
mp.removeTransaction(conflict, false)
}
txD := mp.addTransaction(utxoView, tx, bestHeight, txFee)
txD, err := mp.addTransaction(utxoView, tx, bestHeight, txFee)
if err != nil {
return nil, txD, err
}

log.Debugf("Accepted transaction %v (pool size: %v)", txHash,
len(mp.pool))
Expand Down Expand Up @@ -1626,6 +1681,7 @@ func New(cfg *Config) *TxPool {
return &TxPool{
cfg: *cfg,
pool: make(map[chainhash.Hash]*TxDesc),
poolLeaves: make(map[chainhash.Hash][]wire.LeafData),
orphans: make(map[chainhash.Hash]*orphanTx),
orphansByPrev: make(map[wire.OutPoint]map[chainhash.Hash]*btcutil.Tx),
nextExpireScan: time.Now().Add(orphanExpireScanInterval),
Expand Down
46 changes: 37 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1529,8 +1529,35 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<-
return err
}

// We already checked that at least one is active. Pick one and
// generate the UData.
// For compact state nodes.
if cfg.Utreexo {
// Fetch the necessary leafdatas to create the utreexo data.
leafDatas, err := s.txMemPool.FetchLeafDatas(tx.Hash())
if err != nil {
chanLog.Errorf(err.Error())
if doneChan != nil {
doneChan <- struct{}{}
}
return err
}

btcdLog.Debugf("fetched %v for tx %s", leafDatas, tx.Hash())

// This creates the accumulator proof and also puts the leaf datas
// in the utreexo data.
ud, err := s.chain.GenerateUData(leafDatas)
if err != nil {
chanLog.Errorf(err.Error())
if doneChan != nil {
doneChan <- struct{}{}
}
return err
}

tx.MsgTx().UData = ud
}

// For bridge nodes.
if s.utreexoProofIndex != nil {
leafDatas, err := blockchain.TxToDelLeaves(tx, s.chain)
if err != nil {
Expand Down Expand Up @@ -3160,13 +3187,14 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) {
return s.chain.CalcSequenceLock(tx, view, true)
},
IsDeploymentActive: s.chain.IsDeploymentActive,
IsUtreexoViewActive: s.chain.IsUtreexoViewActive,
VerifyUData: s.chain.VerifyUData,
SigCache: s.sigCache,
HashCache: s.hashCache,
AddrIndex: s.addrIndex,
FeeEstimator: s.feeEstimator,
IsDeploymentActive: s.chain.IsDeploymentActive,
IsUtreexoViewActive: s.chain.IsUtreexoViewActive,
VerifyUData: s.chain.VerifyUData,
PruneFromAccumulator: s.chain.PruneFromAccumulator,
SigCache: s.sigCache,
HashCache: s.hashCache,
AddrIndex: s.addrIndex,
FeeEstimator: s.feeEstimator,
}
s.txMemPool = mempool.New(&txC)

Expand Down

0 comments on commit 5604fb9

Please sign in to comment.