Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
Txpool 4844 upgrades Part 2 (#1125)
Browse files Browse the repository at this point in the history
Fix some peer-review comments from the last related PR, and add some
enhancements

#### Change summary
- Addition of a flag for BlobSlots - for max allowed blobs per account
in txpool
- Use BlobFee from the block to validate txs in the pool
- Let go of unwound blob txn's onNewBlock, if not present in the cache;
we don't request them again from the network

Related: ledgerwatch/erigon#8213
ledgerwatch/interfaces#195
  • Loading branch information
somnathb1 committed Sep 20, 2023
1 parent 2fca6e8 commit 93d9c9d
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 272 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/erigontech/mdbx-go v0.27.14
github.com/ledgerwatch/interfaces v0.0.0-20230909005156-bff86c603a43
github.com/ledgerwatch/interfaces v0.0.0-20230912104607-5501cfd6e5af
github.com/ledgerwatch/log/v3 v3.9.0
github.com/ledgerwatch/secp256k1 v1.0.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/ledgerwatch/interfaces v0.0.0-20230909005156-bff86c603a43 h1:AXQ1vPkmuBPtVRpAehMAXzmsRmdqUpNvl93wWE6gjCU=
github.com/ledgerwatch/interfaces v0.0.0-20230909005156-bff86c603a43/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/interfaces v0.0.0-20230912104607-5501cfd6e5af h1:gGWTa4p8npycnK9gVBbZxMSOBvUgM80lsDU9rnFqyHU=
github.com/ledgerwatch/interfaces v0.0.0-20230912104607-5501cfd6e5af/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/log/v3 v3.9.0 h1:iDwrXe0PVwBC68Dd94YSsHbMgQ3ufsgjzXtFNFVZFRk=
github.com/ledgerwatch/log/v3 v3.9.0/go.mod h1:EiAY6upmI/6LkNhOVxb4eVsmsP11HZCnZ3PlJMjYiqE=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
397 changes: 204 additions & 193 deletions gointerfaces/remote/kv.pb.go

Large diffs are not rendered by default.

43 changes: 15 additions & 28 deletions txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,19 +465,25 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
if change.Direction == remote.Direction_UNWIND {
unwindTxs.Resize(uint(len(change.Txs)))
for i := range change.Txs {
unwindTxs.Txs[i] = &types2.TxSlot{}
if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error {
_, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
if unwindTxs.Txs[i].Type == types2.BlobTxType {
knownBlobTxn, err := f.pool.GetKnownBlobTxn(tx, unwindTxs.Txs[i].IDHash[:])
if err != nil {
return err
utx := &types2.TxSlot{}
sender := make([]byte, 20)
_, err2 := parseContext.ParseTransaction(change.Txs[i], 0, utx, sender, false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
if err2 != nil {
return err2
}
if utx.Type == types2.BlobTxType {
knownBlobTxn, err2 := f.pool.GetKnownBlobTxn(tx, utx.IDHash[:])
if err2 != nil {
return err2
}
// Get the blob tx from cache; ignore altogether if it isn't there
if knownBlobTxn != nil {
unwindTxs.Txs[i] = knownBlobTxn.Tx
unwindTxs.Append(knownBlobTxn.Tx, sender, false)
}
} else {
unwindTxs.Append(utx, sender, false)
}
return err
}); err != nil && !errors.Is(err, context.Canceled) {
Expand All @@ -487,8 +493,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
}
// TODO(eip-4844): If there are blob txs that need to be unwound, these will not replay properly since we only have the
// unwrapped version here (we would need to re-wrap the tx with its blobs & kzg commitments).

if err := f.db.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs, tx)
}); err != nil && !errors.Is(err, context.Canceled) {
Expand All @@ -499,21 +504,3 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
}

// func (f *Fetch) requestUnknownTxs(unknownHashes types2.Hashes, sentryClient sentry.SentryClient, PeerId *types.H512) error{
// if len(unknownHashes) > 0 {
// var encodedRequest []byte
// var err error
// var messageID sentry.MessageId
// if encodedRequest, err = types2.EncodeGetPooledTransactions66(unknownHashes, uint64(1), nil); err != nil {
// return err
// }
// messageID = sentry.MessageId_GET_POOLED_TRANSACTIONS_66
// if _, err := sentryClient.SendMessageById(f.ctx, &sentry.SendMessageByIdRequest{
// Data: &sentry.OutboundMessageData{Id: messageID, Data: encodedRequest},
// PeerId: PeerId,
// }, &grpc.EmptyCallOption{}); err != nil {
// return err
// }
// }
// }
83 changes: 57 additions & 26 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ type TxPool struct {
lastFinalizedBlock atomic.Uint64
started atomic.Bool
pendingBaseFee atomic.Uint64
pendingBlobFee atomic.Uint64 // For gas accounting for blobs, which has its own dimension
blockGasLimit atomic.Uint64
shanghaiTime *uint64
isPostShanghai atomic.Bool
Expand Down Expand Up @@ -340,6 +341,9 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.queued.worst.pendingBaseFee = pendingBaseFee
}

pendingBlobFee := stateChanges.PendingBlobFeePerGas
p.setBlobFee(pendingBlobFee)

p.blockGasLimit.Store(stateChanges.BlockGasLimit)
if err := p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil {
return err
Expand All @@ -362,11 +366,10 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
}
}

if err := removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil {
if err := p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil {
return err
}

if err := p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil {
if err := removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil {
return err
}

Expand All @@ -381,7 +384,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.pending.EnforceWorstInvariants()
p.baseFee.EnforceInvariants()
p.queued.EnforceInvariants()
promote(p.pending, p.baseFee, p.queued, pendingBaseFee, p.discardLocked, &announcements, p.logger)
promote(p.pending, p.baseFee, p.queued, pendingBaseFee, pendingBlobFee, p.discardLocked, &announcements, p.logger)
p.pending.EnforceBestInvariants()
p.promoted.Reset()
p.promoted.AppendOther(announcements)
Expand Down Expand Up @@ -438,7 +441,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
}

announcements, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -581,7 +584,11 @@ func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) {
if !has {
return nil, nil
}
txn, _ := tx.GetOne(kv.PoolTransaction, hash)

txn, err := tx.GetOne(kv.PoolTransaction, hash)
if err != nil {
return nil, err
}
parseCtx := types.NewTxParseContext(p.chainID)
parseCtx.WithSender(false)
txSlot := &types.TxSlot{}
Expand Down Expand Up @@ -1039,7 +1046,7 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots,
}

announcements, addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
if err == nil {
for i, reason := range addReasons {
if reason != txpoolcfg.NotSet {
Expand Down Expand Up @@ -1076,7 +1083,7 @@ func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) {
return p._chainDB, p._stateCache
}
func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64,
newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64,
pending *PendingPool, baseFee, queued *SubPool,
byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) txpoolcfg.DiscardReason, discard func(*metaTx, txpoolcfg.DiscardReason), collect bool,
logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) {
Expand Down Expand Up @@ -1130,7 +1137,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
protocolBaseFee, blockGasLimit, pending, baseFee, queued, discard, logger)
}

promote(pending, baseFee, queued, pendingBaseFee, discard, &announcements, logger)
promote(pending, baseFee, queued, pendingBaseFee, pendingBlobFee, discard, &announcements, logger)
pending.EnforceBestInvariants()

return announcements, discardReasons, nil
Expand Down Expand Up @@ -1211,6 +1218,12 @@ func (p *TxPool) setBaseFee(baseFee uint64) (uint64, bool) {
return p.pendingBaseFee.Load(), changed
}

func (p *TxPool) setBlobFee(blobFee uint64) {
if blobFee > 0 {
p.pendingBaseFee.Store(blobFee)
}
}

func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoolcfg.DiscardReason {
// Insert to pending pool, if pool doesn't have txn with same Nonce and bigger Tip
found := p.all.get(mt.Tx.SenderID, mt.Tx.Nonce)
Expand All @@ -1220,7 +1233,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
}
priceBump := p.cfg.PriceBump

//Blob txn threshold checks
//Blob txn threshold checks for replace txn
if mt.Tx.Type == types.BlobTxType {
priceBump = p.cfg.BlobPriceBump
blobFeeThreshold, overflow := (&uint256.Int{}).MulDivOverflow(
Expand Down Expand Up @@ -1269,13 +1282,12 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
p.discardLocked(found, txpoolcfg.ReplacedByHigherTip)
}

// Remove from mined cache in case this is coming from unwind txs
// and to ensure not double adding into the memory
hashStr := string(mt.Tx.IDHash[:])
if _, ok := p.minedBlobTxsByHash[hashStr]; ok {
p.deleteMinedBlobTxn(hashStr)
// Don't add blob tx to queued if it's less than current pending blob base fee
if mt.Tx.Type == types.BlobTxType && mt.Tx.BlobFeeCap.LtUint64(p.pendingBlobFee.Load()) {
return txpoolcfg.FeeTooLow
}

hashStr := string(mt.Tx.IDHash[:])
p.byHash[hashStr] = mt

if replaced := p.all.replaceOrInsert(mt); replaced != nil {
Expand All @@ -1289,6 +1301,8 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
}
// All transactions are first added to the queued pool and then immediately promoted from there if required
p.queued.Add(mt, p.logger)
// Remove from mined cache as we are now "resurrecting" it to a sub-pool
p.deleteMinedBlobTxn(hashStr)
return txpoolcfg.NotSet
}

Expand All @@ -1304,6 +1318,7 @@ func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) {

// Cache recently mined blobs in anticipation of reorg, delete finalized ones
func (p *TxPool) processMinedFinalizedBlobs(coreTx kv.Tx, minedTxs []*types.TxSlot, finalizedBlock uint64) error {
p.lastFinalizedBlock.Store(finalizedBlock)
// Remove blobs in the finalized block and older, loop through all entries
for l := len(p.minedBlobTxsByBlock); l > 0 && finalizedBlock > 0; l-- {
// delete individual hashes
Expand All @@ -1327,8 +1342,6 @@ func (p *TxPool) processMinedFinalizedBlobs(coreTx kv.Tx, minedTxs []*types.TxSl
p.minedBlobTxsByHash[string(txn.IDHash[:])] = mt
}
}

p.lastFinalizedBlock.Store(finalizedBlock)
return nil
}

Expand Down Expand Up @@ -1530,10 +1543,10 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint

// promote reasserts invariants of the subpool and returns the list of transactions that ended up
// being promoted to the pending or basefee pool, for re-broadcasting
func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, discard func(*metaTx, txpoolcfg.DiscardReason), announcements *types.Announcements,
func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, pendingBlobFee uint64, discard func(*metaTx, txpoolcfg.DiscardReason), announcements *types.Announcements,
logger log.Logger) {
// Demote worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard
for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) < 0); worst = pending.Worst() {
for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = pending.Worst() {
if worst.subPool >= BaseFeePoolBits {
tx := pending.PopWorst()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
Expand All @@ -1546,7 +1559,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
}

// Promote best transactions from base fee pool to pending pool while they qualify
for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() {
for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = baseFee.Best() {
tx := baseFee.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
pending.Add(tx, logger)
Expand Down Expand Up @@ -1846,6 +1859,10 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil {
return err
}
binary.BigEndian.PutUint64(encID, p.pendingBlobFee.Load())
if err := tx.Put(kv.PoolInfo, PoolPendingBlobFeeKey, encID); err != nil {
return err
}
if err := PutLastSeenBlock(tx, p.lastSeenBlock.Load(), encID); err != nil {
return err
}
Expand Down Expand Up @@ -1938,16 +1955,27 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
pendingBaseFee = binary.BigEndian.Uint64(v)
}
}
var pendingBlobFee uint64 = 1 // MIN_BLOB_GAS_PRICE A/EIP-4844
{
v, err := tx.GetOne(kv.PoolInfo, PoolPendingBlobFeeKey)
if err != nil {
return err
}
if len(v) > 0 {
pendingBlobFee = binary.BigEndian.Uint64(v)
}
}

err = p.senders.registerNewSenders(&txs, p.logger)
if err != nil {
return err
}
if _, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs,
pendingBaseFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false, p.logger); err != nil {
pendingBaseFee, pendingBlobFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false, p.logger); err != nil {
return err
}
p.pendingBaseFee.Store(pendingBaseFee)

p.pendingBlobFee.Store(pendingBlobFee)
return nil
}
func LastSeenBlock(tx kv.Getter) (uint64, error) {
Expand Down Expand Up @@ -2068,6 +2096,7 @@ func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp []byte, sender
var PoolChainConfigKey = []byte("chain_config")
var PoolLastSeenBlockKey = []byte("last_seen_block")
var PoolPendingBaseFeeKey = []byte("pending_base_fee")
var PoolPendingBlobFeeKey = []byte("pending_blob_fee")

// recentlyConnectedPeers does buffer IDs of recently connected good peers
// then sync of pooled Transaction can happen to all of then at once
Expand Down Expand Up @@ -2461,10 +2490,12 @@ type BestQueue struct {
pendingBastFee uint64
}

// Returns true if the txn is better than the parameter txn
// it first compares the subpool markers of the two meta txns, then it compares
// depending on the pool (p, b, q) it compares the effective tip (p), nonceDistance (p,q)
// minFeeCap (b), and cumulative balance distance (p, q) for pending pool
// Returns true if the txn "mt" is better than the parameter txn "than"
// it first compares the subpool markers of the two meta txns, then,
// (since they have the same subpool marker, and thus same pool)
// depending on the pool - pending (P), basefee (B), queued (Q) -
// it compares the effective tip (for P), nonceDistance (for both P,Q)
// minFeeCap (for B), and cumulative balance distance (for P, Q)
func (mt *metaTx) better(than *metaTx, pendingBaseFee uint256.Int) bool {
subPool := mt.subPool
thanSubPool := than.subPool
Expand Down
Loading

0 comments on commit 93d9c9d

Please sign in to comment.