Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Txpool 4844 upgrades Part 2 #1125

Merged
merged 31 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
101b29d
Fix build
somnathb1 Sep 10, 2023
109504c
Remove go.work.sum
somnathb1 Sep 10, 2023
95c1313
Remove redundant check
somnathb1 Sep 10, 2023
969e427
Add err check
somnathb1 Sep 12, 2023
54a5d23
Comment
somnathb1 Sep 12, 2023
2b62b55
Merge remote-tracking branch 'ledgerwatch/main' into txpool-4844-upgr…
somnathb1 Sep 12, 2023
0101e2c
ignore go.work - merge
somnathb1 Sep 12, 2023
73b24db
Add PendingBlobFeePerGas
somnathb1 Sep 12, 2023
a310392
Merge remote-tracking branch 'ledgerwatch/main' into txpool-4844-upgr…
somnathb1 Sep 13, 2023
60e80d1
Merge main
somnathb1 Sep 13, 2023
a4f8130
Increase blob slots
somnathb1 Sep 15, 2023
8c3610a
Discard unknown blob txs
somnathb1 Sep 15, 2023
dfac277
Order add before delete cache
somnathb1 Sep 15, 2023
9c91ded
Merge remote-tracking branch 'ledgerwatch/main' into txpool-4844-upgr…
somnathb1 Sep 15, 2023
38fc5b4
fmt
somnathb1 Sep 15, 2023
056b5a8
Fine tune
somnathb1 Sep 15, 2023
e5deeda
Chg blobslots
somnathb1 Sep 15, 2023
55ae513
Change blobslots
somnathb1 Sep 16, 2023
4b4492d
Comment
somnathb1 Sep 16, 2023
0b073c2
Fix test
somnathb1 Sep 16, 2023
fb3b618
Fix review comments
somnathb1 Sep 17, 2023
c01aa0d
fmt
somnathb1 Sep 17, 2023
485e52c
Fix parenthesis
somnathb1 Sep 17, 2023
778be3a
Merge remote-tracking branch 'ledgerwatch/main' into txpool-4844-upgr…
somnathb1 Sep 18, 2023
c71bfc3
Send back errors
somnathb1 Sep 18, 2023
a726ee9
Handle err and don't overflow
somnathb1 Sep 18, 2023
c54dc75
Forgot this guy
somnathb1 Sep 18, 2023
5bf4f1d
Tidy
somnathb1 Sep 18, 2023
87c22fb
Add more fee bump scenarios
somnathb1 Sep 18, 2023
ac899d3
Comment
somnathb1 Sep 18, 2023
e606c62
Add to unwindTxs if not type 3
somnathb1 Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/erigontech/mdbx-go v0.27.14
github.com/ledgerwatch/interfaces v0.0.0-20230909005156-bff86c603a43
github.com/ledgerwatch/interfaces v0.0.0-20230912104607-5501cfd6e5af
github.com/ledgerwatch/log/v3 v3.9.0
github.com/ledgerwatch/secp256k1 v1.0.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/ledgerwatch/interfaces v0.0.0-20230909005156-bff86c603a43 h1:AXQ1vPkmuBPtVRpAehMAXzmsRmdqUpNvl93wWE6gjCU=
github.com/ledgerwatch/interfaces v0.0.0-20230909005156-bff86c603a43/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/interfaces v0.0.0-20230912104607-5501cfd6e5af h1:gGWTa4p8npycnK9gVBbZxMSOBvUgM80lsDU9rnFqyHU=
somnathb1 marked this conversation as resolved.
Show resolved Hide resolved
github.com/ledgerwatch/interfaces v0.0.0-20230912104607-5501cfd6e5af/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/log/v3 v3.9.0 h1:iDwrXe0PVwBC68Dd94YSsHbMgQ3ufsgjzXtFNFVZFRk=
github.com/ledgerwatch/log/v3 v3.9.0/go.mod h1:EiAY6upmI/6LkNhOVxb4eVsmsP11HZCnZ3PlJMjYiqE=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
397 changes: 204 additions & 193 deletions gointerfaces/remote/kv.pb.go

Large diffs are not rendered by default.

43 changes: 15 additions & 28 deletions txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,19 +465,25 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
if change.Direction == remote.Direction_UNWIND {
unwindTxs.Resize(uint(len(change.Txs)))
for i := range change.Txs {
unwindTxs.Txs[i] = &types2.TxSlot{}
if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error {
_, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
if unwindTxs.Txs[i].Type == types2.BlobTxType {
knownBlobTxn, err := f.pool.GetKnownBlobTxn(tx, unwindTxs.Txs[i].IDHash[:])
if err != nil {
return err
utx := &types2.TxSlot{}
sender := make([]byte, 20)
_, err2 := parseContext.ParseTransaction(change.Txs[i], 0, utx, sender, false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
if err2 != nil {
return err2
}
if utx.Type == types2.BlobTxType {
knownBlobTxn, err2 := f.pool.GetKnownBlobTxn(tx, utx.IDHash[:])
if err2 != nil {
return err2
}
// Get the blob tx from cache; ignore altogether if it isn't there
if knownBlobTxn != nil {
unwindTxs.Txs[i] = knownBlobTxn.Tx
unwindTxs.Append(knownBlobTxn.Tx, sender, false)
}
} else {
unwindTxs.Append(utx, sender, false)
}
somnathb1 marked this conversation as resolved.
Show resolved Hide resolved
return err
}); err != nil && !errors.Is(err, context.Canceled) {
Expand All @@ -487,8 +493,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
}
// TODO(eip-4844): If there are blob txs that need to be unwound, these will not replay properly since we only have the
// unwrapped version here (we would need to re-wrap the tx with its blobs & kzg commitments).

if err := f.db.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs, tx)
}); err != nil && !errors.Is(err, context.Canceled) {
Expand All @@ -499,21 +504,3 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
}

// func (f *Fetch) requestUnknownTxs(unknownHashes types2.Hashes, sentryClient sentry.SentryClient, PeerId *types.H512) error{
// if len(unknownHashes) > 0 {
// var encodedRequest []byte
// var err error
// var messageID sentry.MessageId
// if encodedRequest, err = types2.EncodeGetPooledTransactions66(unknownHashes, uint64(1), nil); err != nil {
// return err
// }
// messageID = sentry.MessageId_GET_POOLED_TRANSACTIONS_66
// if _, err := sentryClient.SendMessageById(f.ctx, &sentry.SendMessageByIdRequest{
// Data: &sentry.OutboundMessageData{Id: messageID, Data: encodedRequest},
// PeerId: PeerId,
// }, &grpc.EmptyCallOption{}); err != nil {
// return err
// }
// }
// }
83 changes: 57 additions & 26 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ type TxPool struct {
lastFinalizedBlock atomic.Uint64
started atomic.Bool
pendingBaseFee atomic.Uint64
pendingBlobFee atomic.Uint64 // For gas accounting for blobs, which has its own dimension
blockGasLimit atomic.Uint64
shanghaiTime *uint64
isPostShanghai atomic.Bool
Expand Down Expand Up @@ -340,6 +341,9 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.queued.worst.pendingBaseFee = pendingBaseFee
}

pendingBlobFee := stateChanges.PendingBlobFeePerGas
p.setBlobFee(pendingBlobFee)

p.blockGasLimit.Store(stateChanges.BlockGasLimit)
if err := p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil {
return err
Expand All @@ -362,11 +366,10 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
}
}

if err := removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil {
if err := p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil {
return err
}

if err := p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil {
if err := removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil {
return err
}

Expand All @@ -381,7 +384,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.pending.EnforceWorstInvariants()
p.baseFee.EnforceInvariants()
p.queued.EnforceInvariants()
promote(p.pending, p.baseFee, p.queued, pendingBaseFee, p.discardLocked, &announcements, p.logger)
promote(p.pending, p.baseFee, p.queued, pendingBaseFee, pendingBlobFee, p.discardLocked, &announcements, p.logger)
p.pending.EnforceBestInvariants()
p.promoted.Reset()
p.promoted.AppendOther(announcements)
Expand Down Expand Up @@ -438,7 +441,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
}

announcements, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -581,7 +584,11 @@ func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) {
if !has {
return nil, nil
}
txn, _ := tx.GetOne(kv.PoolTransaction, hash)

txn, err := tx.GetOne(kv.PoolTransaction, hash)
if err != nil {
return nil, err
}
parseCtx := types.NewTxParseContext(p.chainID)
parseCtx.WithSender(false)
txSlot := &types.TxSlot{}
Expand Down Expand Up @@ -1039,7 +1046,7 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots,
}

announcements, addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
if err == nil {
for i, reason := range addReasons {
if reason != txpoolcfg.NotSet {
Expand Down Expand Up @@ -1076,7 +1083,7 @@ func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) {
return p._chainDB, p._stateCache
}
func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64,
newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64,
pending *PendingPool, baseFee, queued *SubPool,
byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) txpoolcfg.DiscardReason, discard func(*metaTx, txpoolcfg.DiscardReason), collect bool,
logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) {
Expand Down Expand Up @@ -1130,7 +1137,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
protocolBaseFee, blockGasLimit, pending, baseFee, queued, discard, logger)
}

promote(pending, baseFee, queued, pendingBaseFee, discard, &announcements, logger)
promote(pending, baseFee, queued, pendingBaseFee, pendingBlobFee, discard, &announcements, logger)
pending.EnforceBestInvariants()

return announcements, discardReasons, nil
Expand Down Expand Up @@ -1211,6 +1218,12 @@ func (p *TxPool) setBaseFee(baseFee uint64) (uint64, bool) {
return p.pendingBaseFee.Load(), changed
}

func (p *TxPool) setBlobFee(blobFee uint64) {
if blobFee > 0 {
p.pendingBaseFee.Store(blobFee)
}
}

func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoolcfg.DiscardReason {
// Insert to pending pool, if pool doesn't have txn with same Nonce and bigger Tip
found := p.all.get(mt.Tx.SenderID, mt.Tx.Nonce)
Expand All @@ -1220,7 +1233,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
}
priceBump := p.cfg.PriceBump

//Blob txn threshold checks
//Blob txn threshold checks for replace txn
if mt.Tx.Type == types.BlobTxType {
priceBump = p.cfg.BlobPriceBump
blobFeeThreshold, overflow := (&uint256.Int{}).MulDivOverflow(
Expand Down Expand Up @@ -1269,13 +1282,12 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
p.discardLocked(found, txpoolcfg.ReplacedByHigherTip)
}

// Remove from mined cache in case this is coming from unwind txs
// and to ensure not double adding into the memory
hashStr := string(mt.Tx.IDHash[:])
if _, ok := p.minedBlobTxsByHash[hashStr]; ok {
p.deleteMinedBlobTxn(hashStr)
// Don't add blob tx to queued if it's less than current pending blob base fee
if mt.Tx.Type == types.BlobTxType && mt.Tx.BlobFeeCap.LtUint64(p.pendingBlobFee.Load()) {
return txpoolcfg.FeeTooLow
}

hashStr := string(mt.Tx.IDHash[:])
p.byHash[hashStr] = mt

if replaced := p.all.replaceOrInsert(mt); replaced != nil {
Expand All @@ -1289,6 +1301,8 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
}
// All transactions are first added to the queued pool and then immediately promoted from there if required
p.queued.Add(mt, p.logger)
// Remove from mined cache as we are now "resurrecting" it to a sub-pool
p.deleteMinedBlobTxn(hashStr)
return txpoolcfg.NotSet
}

Expand All @@ -1304,6 +1318,7 @@ func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) {

// Cache recently mined blobs in anticipation of reorg, delete finalized ones
func (p *TxPool) processMinedFinalizedBlobs(coreTx kv.Tx, minedTxs []*types.TxSlot, finalizedBlock uint64) error {
p.lastFinalizedBlock.Store(finalizedBlock)
// Remove blobs in the finalized block and older, loop through all entries
for l := len(p.minedBlobTxsByBlock); l > 0 && finalizedBlock > 0; l-- {
// delete individual hashes
Expand All @@ -1327,8 +1342,6 @@ func (p *TxPool) processMinedFinalizedBlobs(coreTx kv.Tx, minedTxs []*types.TxSl
p.minedBlobTxsByHash[string(txn.IDHash[:])] = mt
}
}

p.lastFinalizedBlock.Store(finalizedBlock)
return nil
}

Expand Down Expand Up @@ -1530,10 +1543,10 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint

// promote reasserts invariants of the subpool and returns the list of transactions that ended up
// being promoted to the pending or basefee pool, for re-broadcasting
func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, discard func(*metaTx, txpoolcfg.DiscardReason), announcements *types.Announcements,
func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, pendingBlobFee uint64, discard func(*metaTx, txpoolcfg.DiscardReason), announcements *types.Announcements,
logger log.Logger) {
// Demote worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard
for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) < 0); worst = pending.Worst() {
for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = pending.Worst() {
if worst.subPool >= BaseFeePoolBits {
tx := pending.PopWorst()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
Expand All @@ -1546,7 +1559,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
}

// Promote best transactions from base fee pool to pending pool while they qualify
for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() {
for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = baseFee.Best() {
tx := baseFee.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
pending.Add(tx, logger)
Expand Down Expand Up @@ -1846,6 +1859,10 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil {
return err
}
binary.BigEndian.PutUint64(encID, p.pendingBlobFee.Load())
if err := tx.Put(kv.PoolInfo, PoolPendingBlobFeeKey, encID); err != nil {
return err
}
if err := PutLastSeenBlock(tx, p.lastSeenBlock.Load(), encID); err != nil {
return err
}
Expand Down Expand Up @@ -1938,16 +1955,27 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
pendingBaseFee = binary.BigEndian.Uint64(v)
}
}
var pendingBlobFee uint64 = 1 // MIN_BLOB_GAS_PRICE A/EIP-4844
{
v, err := tx.GetOne(kv.PoolInfo, PoolPendingBlobFeeKey)
if err != nil {
return err
}
if len(v) > 0 {
pendingBlobFee = binary.BigEndian.Uint64(v)
}
}

err = p.senders.registerNewSenders(&txs, p.logger)
if err != nil {
return err
}
if _, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs,
pendingBaseFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false, p.logger); err != nil {
pendingBaseFee, pendingBlobFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false, p.logger); err != nil {
return err
}
p.pendingBaseFee.Store(pendingBaseFee)

p.pendingBlobFee.Store(pendingBlobFee)
return nil
}
func LastSeenBlock(tx kv.Getter) (uint64, error) {
Expand Down Expand Up @@ -2068,6 +2096,7 @@ func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp []byte, sender
var PoolChainConfigKey = []byte("chain_config")
var PoolLastSeenBlockKey = []byte("last_seen_block")
var PoolPendingBaseFeeKey = []byte("pending_base_fee")
var PoolPendingBlobFeeKey = []byte("pending_blob_fee")

// recentlyConnectedPeers does buffer IDs of recently connected good peers
// then sync of pooled Transaction can happen to all of then at once
Expand Down Expand Up @@ -2461,10 +2490,12 @@ type BestQueue struct {
pendingBastFee uint64
}

// Returns true if the txn is better than the parameter txn
// it first compares the subpool markers of the two meta txns, then it compares
// depending on the pool (p, b, q) it compares the effective tip (p), nonceDistance (p,q)
// minFeeCap (b), and cumulative balance distance (p, q) for pending pool
// Returns true if the txn "mt" is better than the parameter txn "than"
// it first compares the subpool markers of the two meta txns, then,
// (since they have the same subpool marker, and thus same pool)
// depending on the pool - pending (P), basefee (B), queued (Q) -
// it compares the effective tip (for P), nonceDistance (for both P,Q)
// minFeeCap (for B), and cumulative balance distance (for P, Q)
func (mt *metaTx) better(than *metaTx, pendingBaseFee uint256.Int) bool {
subPool := mt.subPool
thanSubPool := than.subPool
Expand Down
Loading
Loading