Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

implement extensions for eip-4844 #850

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
810b325
protoc generated updates for eip-4844
roberto-bayardo Dec 20, 2022
c2acde3
add updated ethbackend protos
roberto-bayardo Dec 20, 2022
d33b39c
start adding blob transaction parsing to txpool (#2)
roberto-bayardo Jan 10, 2023
7eb2b9b
BlobTxNetworkWrapper parsing for txpool (#3)
roberto-bayardo Jan 11, 2023
b94c831
start integrating blob tx parser into txpool (#4)
roberto-bayardo Jan 13, 2023
354e2b8
add blob verification function, hash validation for blob tx parsing, …
roberto-bayardo Jan 15, 2023
810c29d
add Sharding fork time for eip-4844
roberto-bayardo Jan 17, 2023
2546af6
Re-implement the blob tx parser in a lower-level style that avoids un…
roberto-bayardo Jan 18, 2023
c648a3a
Track & enforce data gas usage in transaction pool YieldBest (#8)
roberto-bayardo Jan 21, 2023
d30adc0
don't broadcast blob txs
roberto-bayardo Jan 23, 2023
4b10b4a
latest interfaces from upstream
roberto-bayardo Jan 25, 2023
894d708
update with makefile
roberto-bayardo Jan 25, 2023
754e468
revert update to kv.pb.go
roberto-bayardo Jan 25, 2023
28cc202
fix parsing txs with empty access list
roberto-bayardo Jan 26, 2023
afdb004
fix logging related panic when a blob tx isn't broadcast
roberto-bayardo Jan 26, 2023
02918a3
add todo for replaying blob txs
roberto-bayardo Jan 27, 2023
680b056
make ParseTransaction capable of handling wrapped & unwrapped blob txs
roberto-bayardo Jan 31, 2023
2b1ed1f
rebase fixes
roberto-bayardo Feb 10, 2023
2b48fa6
merge with upstream
roberto-bayardo Apr 15, 2023
2f886fc
replace EngineGetBlobsBundleV1 with EngineGetPayloadWithBlobs
roberto-bayardo Apr 21, 2023
0aa5ed7
txpool parser updates for 4844 spec changes
roberto-bayardo Apr 24, 2023
dba69a0
fix blob tx parsing tests from spec updates & remove print statement
roberto-bayardo Apr 24, 2023
09f3c57
Merge remote-tracking branch 'upstream/main' into eip-4844
roberto-bayardo May 3, 2023
834cfd5
merge with upstream, update interfaces dep
roberto-bayardo May 3, 2023
003d7da
verify signature is of proper length in blob tx parser
roberto-bayardo May 4, 2023
39f3956
more ssz format checking
roberto-bayardo May 4, 2023
28c5ed7
Merge remote-tracking branch 'upstream/main' into eip-4844
roberto-bayardo May 10, 2023
4c0eab0
update interfaces
roberto-bayardo May 26, 2023
2c58b0a
update ledgerwatch/interfaces
roberto-bayardo May 26, 2023
b0e3786
Merge branch 'main' into eip-4844
roberto-bayardo Jun 12, 2023
1340b60
Merge branch 'main' into eip-4844
roberto-bayardo Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
for i := range change.Txs {
minedTxs.Txs[i] = &types2.TxSlot{}
if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error {
_, err := parseContext.ParseTransaction(change.Txs[i], 0, minedTxs.Txs[i], minedTxs.Senders.At(i), false /* hasEnvelope */, nil)
_, err := parseContext.ParseTransaction(change.Txs[i], 0, minedTxs.Txs[i], minedTxs.Senders.At(i), false /* hasEnvelope */, false /* networkVersion */, nil)
return err
}); err != nil {
f.logger.Warn("stream.Recv", "err", err)
Expand All @@ -484,7 +484,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
for i := range change.Txs {
unwindTxs.Txs[i] = &types2.TxSlot{}
if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error {
_, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, nil)
_, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, false /* networkVersion */, nil)
return err
}); err != nil {
f.logger.Warn("stream.Recv", "err", err)
Expand All @@ -493,6 +493,8 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
}
// TODO(eip-4844): If there are blob txs that need to be unwound, these will not replay properly since we only have the
// unwrapped version here (we would need to re-wrap the tx with its blobs & kzg commitments).
if err := f.db.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs, tx)
}); err != nil {
Expand Down
39 changes: 28 additions & 11 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ const (

BaseFeePoolBits = EnoughFeeCapProtocol + NoNonceGaps + EnoughBalance + NotTooMuchGas
QueuedPoolBits = EnoughFeeCapProtocol

DataGasPerBlob = 1 << 17
)

type DiscardReason uint8
Expand Down Expand Up @@ -587,10 +589,9 @@ func (p *TxPool) IsLocal(idHash []byte) bool {
func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
func (p *TxPool) Started() bool { return p.started.Load() }

func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64, availableDataGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
p.lock.Lock()
defer p.lock.Unlock()

// First wait for the corresponding block to arrive
if p.lastSeenBlock.Load() < onTopOf {
return false, 0, nil // Too early
Expand All @@ -615,6 +616,13 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG
continue
}

// Skip transactions that require more datagas than is available
if uint64(mt.Tx.Blobs*DataGasPerBlob) > availableDataGas {
continue
} else {
availableDataGas -= uint64(mt.Tx.Blobs) * DataGasPerBlob
}

if mt.Tx.Gas >= p.blockGasLimit.Load() {
// Skip transactions with very large gas limit
continue
Expand Down Expand Up @@ -666,13 +674,13 @@ func (p *TxPool) ResetYieldedStatus() {
}
}

func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
return p.best(n, txs, tx, onTopOf, availableGas, toSkip)
func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableDataGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
return p.best(n, txs, tx, onTopOf, availableGas, availableDataGas, toSkip)
}

func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64) (bool, error) {
func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableDataGas uint64) (bool, error) {
set := mapset.NewThreadUnsafeSet[[32]byte]()
onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, set)
onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableDataGas, set)
return onTime, err
}

Expand Down Expand Up @@ -1382,6 +1390,11 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
}
}

// Returns whether the given binary encoded transaction is a blob tx
func isBlobTx(tx []byte) bool {
return len(tx) > 0 && tx[0] == byte(types.BlobTxType)
}

// MainLoop - does:
// send pending byHash to p2p:
// - new byHash
Expand Down Expand Up @@ -1478,12 +1491,16 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
localTxTypes = append(localTxTypes, t)
localTxSizes = append(localTxSizes, size)
localTxHashes = append(localTxHashes, hash...)
localTxRlps = append(localTxRlps, slotRlp)
if !isBlobTx(slotRlp) { // don't broadcast blob txs
localTxRlps = append(localTxRlps, slotRlp)
}
} else {
remoteTxTypes = append(remoteTxTypes, t)
remoteTxSizes = append(remoteTxSizes, size)
remoteTxHashes = append(remoteTxHashes, hash...)
remoteTxRlps = append(remoteTxRlps, slotRlp)
if !isBlobTx(slotRlp) {
remoteTxRlps = append(remoteTxRlps, slotRlp)
}
}
}
return nil
Expand All @@ -1496,11 +1513,11 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
}

// first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers
txSentTo := send.BroadcastPooledTxs(localTxRlps)
_ = send.BroadcastPooledTxs(localTxRlps)
hashSentTo := send.AnnouncePooledTxs(localTxTypes, localTxSizes, localTxHashes)
for i := 0; i < localTxHashes.Len(); i++ {
hash := localTxHashes.At(i)
p.logger.Info("local tx propagated", "tx_hash", hex.EncodeToString(hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load())
p.logger.Info("local tx propagated", "tx_hash", hex.EncodeToString(hash), "announced to peers", hashSentTo[i], "baseFee", p.pendingBaseFee.Load())
}
send.BroadcastPooledTxs(remoteTxRlps)
send.AnnouncePooledTxs(remoteTxTypes, remoteTxSizes, remoteTxHashes)
Expand Down Expand Up @@ -1662,7 +1679,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
addr, txRlp := *(*[20]byte)(v[:20]), v[20:]
txn := &types.TxSlot{}

_, err = parseCtx.ParseTransaction(txRlp, 0, txn, nil, false /* hasEnvelope */, nil)
_, err = parseCtx.ParseTransaction(txRlp, 0, txn, nil, false /* hasEnvelope */, true /* networkVersion */, nil)
if err != nil {
err = fmt.Errorf("err: %w, rlp: %x", err, txRlp)
p.logger.Warn("[txpool] fromDB: parseTransaction", "err", err)
Expand Down
2 changes: 1 addition & 1 deletion txpool/pool_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []b
FeeCap: *uint256.NewInt(feeCap[i%len(feeCap)]),
}
txRlp := fakeRlpTx(txs.Txs[i], senders.At(i%senders.Len()))
_, err := parseCtx.ParseTransaction(txRlp, 0, txs.Txs[i], nil, false, nil)
_, err := parseCtx.ParseTransaction(txRlp, 0, txs.Txs[i], nil, false, true, nil)
if err != nil {
panic(err)
}
Expand Down
6 changes: 3 additions & 3 deletions txpool/txpool_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var TxPoolAPIVersion = &types2.VersionReply{Major: 1, Minor: 0, Patch: 0}
type txPool interface {
ValidateSerializedTxn(serializedTxn []byte) error

PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64) (bool, error)
PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableDataGas uint64) (bool, error)
GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
AddLocalTxs(ctx context.Context, newTxs types.TxSlots, tx kv.Tx) ([]DiscardReason, error)
deprecatedForEach(_ context.Context, f func(rlp []byte, sender common.Address, t SubPoolType), tx kv.Tx)
Expand Down Expand Up @@ -154,7 +154,7 @@ func (s *GrpcServer) Pending(ctx context.Context, _ *emptypb.Empty) (*txpool_pro
reply := &txpool_proto.PendingReply{}
reply.Txs = make([]*txpool_proto.PendingReply_Tx, 0, 32)
txSlots := types.TxsRlp{}
if _, err := s.txPool.PeekBest(math.MaxInt16, &txSlots, tx, 0 /* onTopOf */, math.MaxUint64 /* available gas */); err != nil {
if _, err := s.txPool.PeekBest(math.MaxInt16, &txSlots, tx, 0 /* onTopOf */, math.MaxUint64 /* available gas */, math.MaxUint64 /* available data gas*/); err != nil {
return nil, err
}
var senderArr [20]byte
Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *GrpcServer) Add(ctx context.Context, in *txpool_proto.AddRequest) (*txp
slots.Resize(uint(j + 1))
slots.Txs[j] = &types.TxSlot{}
slots.IsLocal[j] = true
if _, err := parseCtx.ParseTransaction(in.RlpTxs[i], 0, slots.Txs[j], slots.Senders.At(j), false /* hasEnvelope */, func(hash []byte) error {
if _, err := parseCtx.ParseTransaction(in.RlpTxs[i], 0, slots.Txs[j], slots.Senders.At(j), false /* hasEnvelope */, true /* networkVersion */, func(hash []byte) error {
if known, _ := s.txPool.IdHashKnown(tx, hash); known {
return types.ErrAlreadyKnown
}
Expand Down
Loading
Loading