Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
EIP-4844: Handle data gas in txpool (#1029)
Browse files Browse the repository at this point in the history
Reworked bits of PR #850. Only stubs for transaction parsing.
  • Loading branch information
yperbasis authored and AskAlexSharov committed Sep 6, 2023
1 parent c75dc7a commit 70767a5
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 37 deletions.
24 changes: 24 additions & 0 deletions chain/protocol_param.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright 2023 The Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package chain

const (
// EIP-4844: Shard Blob Transactions
DataGasPerBlob uint64 = 0x20000
TargetDataGasPerBlock uint64 = 0x60000
MaxDataGasPerBlock uint64 = 0xC0000
)
1 change: 1 addition & 0 deletions rlp/encodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func AnnouncementsLen(types []byte, sizes []uint32, hashes []byte) int {
return ListPrefixLen(totalLen) + totalLen
}

// EIP-5793: eth/68 - Add tx type to tx announcement
func EncodeAnnouncements(types []byte, sizes []uint32, hashes []byte, encodeBuf []byte) int {
if len(types) == 0 {
encodeBuf[0] = 0xc3
Expand Down
6 changes: 4 additions & 2 deletions txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
for i := range change.Txs {
minedTxs.Txs[i] = &types2.TxSlot{}
if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error {
_, err := parseContext.ParseTransaction(change.Txs[i], 0, minedTxs.Txs[i], minedTxs.Senders.At(i), false /* hasEnvelope */, nil)
_, err := parseContext.ParseTransaction(change.Txs[i], 0, minedTxs.Txs[i], minedTxs.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
return err
}); err != nil {
f.logger.Warn("stream.Recv", "err", err)
Expand All @@ -484,7 +484,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
for i := range change.Txs {
unwindTxs.Txs[i] = &types2.TxSlot{}
if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error {
_, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, nil)
_, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
return err
}); err != nil {
f.logger.Warn("stream.Recv", "err", err)
Expand All @@ -493,6 +493,8 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
}
// TODO(eip-4844): If there are blob txs that need to be unwound, these will not replay properly since we only have the
// unwrapped version here (we would need to re-wrap the tx with its blobs & kzg commitments).
if err := f.db.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs, tx)
}); err != nil {
Expand Down
35 changes: 22 additions & 13 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func (p *TxPool) IsLocal(idHash []byte) bool {
func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
func (p *TxPool) Started() bool { return p.started.Load() }

func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableDataGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
p.lock.Lock()
defer p.lock.Unlock()

Expand Down Expand Up @@ -546,6 +546,12 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG
continue
}

// Skip transactions that require more data gas than is available
if mt.Tx.BlobCount*chain.DataGasPerBlob > availableDataGas {
continue
}
availableDataGas -= mt.Tx.BlobCount * chain.DataGasPerBlob

// make sure we have enough gas in the caller to add this transaction.
// not an exact science using intrinsic gas but as close as we could hope for at
// this stage
Expand All @@ -554,10 +560,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG
// we might find another TX with a low enough intrinsic gas to include so carry on
continue
}

if intrinsicGas <= availableGas { // check for potential underflow
availableGas -= intrinsicGas
}
availableGas -= intrinsicGas

txs.Txs[count] = rlpTx
copy(txs.Senders.At(count), sender.Bytes())
Expand All @@ -584,13 +587,13 @@ func (p *TxPool) ResetYieldedStatus() {
}
}

func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
return p.best(n, txs, tx, onTopOf, availableGas, toSkip)
func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableDataGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
return p.best(n, txs, tx, onTopOf, availableGas, availableDataGas, toSkip)
}

func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64) (bool, error) {
func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableDataGas uint64) (bool, error) {
set := mapset.NewThreadUnsafeSet[[32]byte]()
onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, set)
onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableDataGas, set)
return onTime, err
}

Expand Down Expand Up @@ -1390,18 +1393,22 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
continue
}

// Empty rlp can happen if a transaction we want to broadcase has just been mined, for example
// Empty rlp can happen if a transaction we want to broadcast has just been mined, for example
slotsRlp = append(slotsRlp, slotRlp)
if p.IsLocal(hash) {
localTxTypes = append(localTxTypes, t)
localTxSizes = append(localTxSizes, size)
localTxHashes = append(localTxHashes, hash...)
localTxRlps = append(localTxRlps, slotRlp)
if t != types.BlobTxType { // "Nodes MUST NOT automatically broadcast blob transactions to their peers" - EIP-4844
localTxRlps = append(localTxRlps, slotRlp)
}
} else {
remoteTxTypes = append(remoteTxTypes, t)
remoteTxSizes = append(remoteTxSizes, size)
remoteTxHashes = append(remoteTxHashes, hash...)
remoteTxRlps = append(remoteTxRlps, slotRlp)
if t != types.BlobTxType { // "Nodes MUST NOT automatically broadcast blob transactions to their peers" - EIP-4844
remoteTxRlps = append(remoteTxRlps, slotRlp)
}
}
}
return nil
Expand All @@ -1410,6 +1417,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
return
}
if newSlotsStreams != nil {
// TODO(eip-4844) What is this for? Is it OK to broadcast blob transactions?
newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp}, p.logger)
}

Expand Down Expand Up @@ -1580,7 +1588,8 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
addr, txRlp := *(*[20]byte)(v[:20]), v[20:]
txn := &types.TxSlot{}

_, err = parseCtx.ParseTransaction(txRlp, 0, txn, nil, false /* hasEnvelope */, nil)
// TODO(eip-4844) ensure wrappedWithBlobs when transactions are saved to the DB
_, err = parseCtx.ParseTransaction(txRlp, 0, txn, nil, false /* hasEnvelope */, true /*wrappedWithBlobs*/, nil)
if err != nil {
err = fmt.Errorf("err: %w, rlp: %x", err, txRlp)
p.logger.Warn("[txpool] fromDB: parseTransaction", "err", err)
Expand Down
2 changes: 1 addition & 1 deletion txpool/pool_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []b
FeeCap: *uint256.NewInt(feeCap[i%len(feeCap)]),
}
txRlp := fakeRlpTx(txs.Txs[i], senders.At(i%senders.Len()))
_, err := parseCtx.ParseTransaction(txRlp, 0, txs.Txs[i], nil, false, nil)
_, err := parseCtx.ParseTransaction(txRlp, 0, txs.Txs[i], nil, false /* hasEnvelope */, true /* wrappedWithBlobs */, nil)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions txpool/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (f *Send) AnnouncePooledTxs(types []byte, sizes []uint32, hashes types2.Has
prevI := 0
prevJ := 0
for prevI < len(hashes) || prevJ < len(types) {
// Prepare two versions of the annoucement message, one for pre-eth/68 peers, another for post-eth/68 peers
// Prepare two versions of the announcement message, one for pre-eth/68 peers, another for post-eth/68 peers
i := prevI
for i < len(hashes) && rlp.HashesLen(hashes[prevI:i+32]) < p2pTxPacketLimit {
i += 32
Expand All @@ -137,7 +137,7 @@ func (f *Send) AnnouncePooledTxs(types []byte, sizes []uint32, hashes types2.Has
panic(fmt.Sprintf("Serialised hashes encoding len mismatch, expected %d, got %d", iSize, s))
}
if s := rlp.EncodeAnnouncements(types[prevJ:j], sizes[prevJ:j], hashes[32*prevJ:32*j], jData); s != jSize {
panic(fmt.Sprintf("Serialised annoucements encoding len mismatch, expected %d, got %d", jSize, s))
panic(fmt.Sprintf("Serialised announcements encoding len mismatch, expected %d, got %d", jSize, s))
}
for _, sentryClient := range f.sentryClients {
if !sentryClient.Ready() {
Expand Down
8 changes: 4 additions & 4 deletions txpool/txpool_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var TxPoolAPIVersion = &types2.VersionReply{Major: 1, Minor: 0, Patch: 0}
type txPool interface {
ValidateSerializedTxn(serializedTxn []byte) error

PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64) (bool, error)
PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableDataGas uint64) (bool, error)
GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
AddLocalTxs(ctx context.Context, newTxs types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error)
deprecatedForEach(_ context.Context, f func(rlp []byte, sender common.Address, t SubPoolType), tx kv.Tx)
Expand Down Expand Up @@ -155,7 +155,7 @@ func (s *GrpcServer) Pending(ctx context.Context, _ *emptypb.Empty) (*txpool_pro
reply := &txpool_proto.PendingReply{}
reply.Txs = make([]*txpool_proto.PendingReply_Tx, 0, 32)
txSlots := types.TxsRlp{}
if _, err := s.txPool.PeekBest(math.MaxInt16, &txSlots, tx, 0 /* onTopOf */, math.MaxUint64 /* available gas */); err != nil {
if _, err := s.txPool.PeekBest(math.MaxInt16, &txSlots, tx, 0 /* onTopOf */, math.MaxUint64 /* availableGas */, math.MaxUint64 /* availableDataGas */); err != nil {
return nil, err
}
var senderArr [20]byte
Expand Down Expand Up @@ -188,11 +188,11 @@ func (s *GrpcServer) Add(ctx context.Context, in *txpool_proto.AddRequest) (*txp
reply := &txpool_proto.AddReply{Imported: make([]txpool_proto.ImportResult, len(in.RlpTxs)), Errors: make([]string, len(in.RlpTxs))}

j := 0
for i := 0; i < len(in.RlpTxs); i++ { // some incoming txs may be rejected, so - need secnod index
for i := 0; i < len(in.RlpTxs); i++ { // some incoming txs may be rejected, so - need second index
slots.Resize(uint(j + 1))
slots.Txs[j] = &types.TxSlot{}
slots.IsLocal[j] = true
if _, err := parseCtx.ParseTransaction(in.RlpTxs[i], 0, slots.Txs[j], slots.Senders.At(j), false /* hasEnvelope */, func(hash []byte) error {
if _, err := parseCtx.ParseTransaction(in.RlpTxs[i], 0, slots.Txs[j], slots.Senders.At(j), false /* hasEnvelope */, true /* wrappedWithBlobs */, func(hash []byte) error {
if known, _ := s.txPool.IdHashKnown(tx, hash); known {
return types.ErrAlreadyKnown
}
Expand Down
17 changes: 12 additions & 5 deletions types/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type TxSlot struct {
DataNonZeroLen int
AlAddrCount int // Number of addresses in the access list
AlStorCount int // Number of storage keys in the access list
BlobCount uint64 // Number of blobs contained by the transaction
Gas uint64 // Gas limit of the transaction
IDHash [32]byte // Transaction hash for the purposes of using it as a transaction Id
Traced bool // Whether transaction needs to be traced throughout transaction pool code and generate debug printing
Expand All @@ -103,8 +104,9 @@ type TxSlot struct {

const (
LegacyTxType byte = 0
AccessListTxType byte = 1
DynamicFeeTxType byte = 2
AccessListTxType byte = 1 // EIP-2930
DynamicFeeTxType byte = 2 // EIP-1559
BlobTxType byte = 3 // EIP-4844
)

var ErrParseTxn = fmt.Errorf("%w transaction", rlp.ErrParse)
Expand All @@ -121,9 +123,14 @@ func (ctx *TxParseContext) ChainIDRequired() *TxParseContext {
return ctx
}

// ParseTransaction extracts all the information from the transactions's payload (RLP) necessary to build TxSlot
// it also performs syntactic validation of the transactions
func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int, slot *TxSlot, sender []byte, hasEnvelope bool, validateHash func([]byte) error) (p int, err error) {
// ParseTransaction extracts all the information from the transactions's payload (RLP) necessary to build TxSlot.
// It also performs syntactic validation of the transactions.
// wrappedWithBlobs means that for blob (type 3) transactions the full version with blobs/commitments/proofs is expected
// (see https://eips.ethereum.org/EIPS/eip-4844#networking).
func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int, slot *TxSlot, sender []byte, hasEnvelope, wrappedWithBlobs bool, validateHash func([]byte) error) (p int, err error) {
// TODO(eip-4844) implement blob txn parsing with and w/o wrappedWithBlobs
// Ensure that TxSlot.BlobCount is properly populated

if len(payload) == 0 {
return 0, fmt.Errorf("%w: empty rlp", ErrParseTxn)
}
Expand Down
7 changes: 5 additions & 2 deletions types/txn_packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func ParseGetPooledTransactions66(payload []byte, pos int, hashbuf []byte) (requ

// == Pooled transactions ==

// TODO(eip-4844) wrappedWithBlobs = true?
func EncodePooledTransactions66(txsRlp [][]byte, requestID uint64, encodeBuf []byte) []byte {
pos := 0
txsRlpLen := 0
Expand Down Expand Up @@ -139,6 +140,8 @@ func EncodePooledTransactions66(txsRlp [][]byte, requestID uint64, encodeBuf []b
_ = pos
return encodeBuf
}

// TODO(eip-4844) wrappedWithBlobs = false?
func EncodeTransactions(txsRlp [][]byte, encodeBuf []byte) []byte {
pos := 0
dataLen := 0
Expand Down Expand Up @@ -176,7 +179,7 @@ func ParseTransactions(payload []byte, pos int, ctx *TxParseContext, txSlots *Tx
for i := 0; pos < len(payload); i++ {
txSlots.Resize(uint(i + 1))
txSlots.Txs[i] = &TxSlot{}
pos, err = ctx.ParseTransaction(payload, pos, txSlots.Txs[i], txSlots.Senders.At(i), true /* hasEnvelope */, validateHash)
pos, err = ctx.ParseTransaction(payload, pos, txSlots.Txs[i], txSlots.Senders.At(i), true /* hasEnvelope */, true /* wrappedWithBlobs */, validateHash)
if err != nil {
if errors.Is(err, ErrRejected) {
txSlots.Resize(uint(i))
Expand Down Expand Up @@ -206,7 +209,7 @@ func ParsePooledTransactions66(payload []byte, pos int, ctx *TxParseContext, txS
for i := 0; p < len(payload); i++ {
txSlots.Resize(uint(i + 1))
txSlots.Txs[i] = &TxSlot{}
p, err = ctx.ParseTransaction(payload, p, txSlots.Txs[i], txSlots.Senders.At(i), true /* hasEnvelope */, validateHash)
p, err = ctx.ParseTransaction(payload, p, txSlots.Txs[i], txSlots.Senders.At(i), true /* hasEnvelope */, true /* wrappedWithBlobs */, validateHash)
if err != nil {
if errors.Is(err, ErrRejected) {
txSlots.Resize(uint(i))
Expand Down
14 changes: 7 additions & 7 deletions types/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestParseTransactionRLP(t *testing.T) {
tt := tt
t.Run(strconv.Itoa(i), func(t *testing.T) {
payload := hexutility.MustDecodeHex(tt.PayloadStr)
parseEnd, err := ctx.ParseTransaction(payload, 0, tx, txSender[:], false /* hasEnvelope */, nil)
parseEnd, err := ctx.ParseTransaction(payload, 0, tx, txSender[:], false /* hasEnvelope */, true /* wrappedWithBlobs */, nil)
require.NoError(err)
require.Equal(len(payload), parseEnd)
if tt.SignHashStr != "" {
Expand Down Expand Up @@ -74,19 +74,19 @@ func TestTransactionSignatureValidity1(t *testing.T) {

tx, txSender := &TxSlot{}, [20]byte{}
validTxn := hexutility.MustDecodeHex("f83f800182520894095e7baea6a6c7c4c2dfeb977efac326af552d870b801ba048b55bfa915ac795c431978d8a6a992b628d557da5ff759b307d495a3664935301")
_, err := ctx.ParseTransaction(validTxn, 0, tx, txSender[:], false /* hasEnvelope */, nil)
_, err := ctx.ParseTransaction(validTxn, 0, tx, txSender[:], false /* hasEnvelope */, true /* wrappedWithBlobs */, nil)
assert.NoError(t, err)

preEip2Txn := hexutility.MustDecodeHex("f85f800182520894095e7baea6a6c7c4c2dfeb977efac326af552d870b801ba048b55bfa915ac795c431978d8a6a992b628d557da5ff759b307d495a36649353a07fffffffffffffffffffffffffffffff5d576e7357a4501ddfe92f46681b20a1")
_, err = ctx.ParseTransaction(preEip2Txn, 0, tx, txSender[:], false /* hasEnvelope */, nil)
_, err = ctx.ParseTransaction(preEip2Txn, 0, tx, txSender[:], false /* hasEnvelope */, true /* wrappedWithBlobs */, nil)
assert.NoError(t, err)

// Now enforce EIP-2
ctx.WithAllowPreEip2s(false)
_, err = ctx.ParseTransaction(validTxn, 0, tx, txSender[:], false /* hasEnvelope */, nil)
_, err = ctx.ParseTransaction(validTxn, 0, tx, txSender[:], false /* hasEnvelope */, true /* wrappedWithBlobs */, nil)
assert.NoError(t, err)

_, err = ctx.ParseTransaction(preEip2Txn, 0, tx, txSender[:], false /* hasEnvelope */, nil)
_, err = ctx.ParseTransaction(preEip2Txn, 0, tx, txSender[:], false /* hasEnvelope */, true /* wrappedWithBlobs */, nil)
assert.Error(t, err)
}

Expand All @@ -96,12 +96,12 @@ func TestTransactionSignatureValidity2(t *testing.T) {
ctx := NewTxParseContext(*chainId)
slot, sender := &TxSlot{}, [20]byte{}
rlp := hexutility.MustDecodeHex("02f8720513844190ab00848321560082520894cab441d2f45a3fee83d15c6b6b6c36a139f55b6288054607fc96a6000080c001a0dffe4cb5651e663d0eac8c4d002de734dd24db0f1109b062d17da290a133cc02a0913fb9f53f7a792bcd9e4d7cced1b8545d1ab82c77432b0bc2e9384ba6c250c5")
_, err := ctx.ParseTransaction(rlp, 0, slot, sender[:], false /* hasEnvelope */, nil)
_, err := ctx.ParseTransaction(rlp, 0, slot, sender[:], false /* hasEnvelope */, true /* wrappedWithBlobs */, nil)
assert.Error(t, err)

// Only legacy transactions can happen before EIP-2
ctx.WithAllowPreEip2s(true)
_, err = ctx.ParseTransaction(rlp, 0, slot, sender[:], false /* hasEnvelope */, nil)
_, err = ctx.ParseTransaction(rlp, 0, slot, sender[:], false /* hasEnvelope */, true /* wrappedWithBlobs */, nil)
assert.Error(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion types/txn_types_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ func FuzzParseTx(f *testing.F) {
ctx := NewTxParseContext(*u256.N1)
txn := &TxSlot{}
sender := make([]byte, 20)
_, _ = ctx.ParseTransaction(in, pos, txn, sender, false, nil)
_, _ = ctx.ParseTransaction(in, pos, txn, sender, false /* hasEnvelope */, true /* wrappedWithBlobs */, nil)
})
}

0 comments on commit 70767a5

Please sign in to comment.