Skip to content

Commit

Permalink
feat: allow non-ccc pipelines (#781)
Browse files Browse the repository at this point in the history
  • Loading branch information
omerfirmak committed May 30, 2024
1 parent aae829c commit a43879b
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 32 deletions.
18 changes: 15 additions & 3 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,11 @@ func (w *worker) startNewPipeline(timestamp int64) {
}

w.currentPipelineStart = time.Now()
w.currentPipeline = pipeline.NewPipeline(w.chain, w.chain.GetVMConfig(), parentState, header, nextL1MsgIndex, w.getCCC()).WithBeforeTxHook(w.beforeTxHook)
pipelineCCC := w.getCCC()
if !w.isRunning() {
pipelineCCC = nil
}
w.currentPipeline = pipeline.NewPipeline(w.chain, w.chain.GetVMConfig(), parentState, header, nextL1MsgIndex, pipelineCCC).WithBeforeTxHook(w.beforeTxHook)

deadline := time.Unix(int64(header.Time), 0)
if w.chainConfig.Clique != nil && w.chainConfig.Clique.RelaxedPeriod {
Expand Down Expand Up @@ -579,11 +583,19 @@ func (w *worker) startNewPipeline(timestamp int64) {
return
}
}

// pipelineCCC was nil, so the block was built for RPC purposes only. Stop the pipeline immediately
// and update the pending block.
if pipelineCCC == nil {
w.currentPipeline.Stop()
}
}

func (w *worker) handlePipelineResult(res *pipeline.Result) error {
if !w.isRunning() {
if res != nil && res.FinalBlock != nil {
// Rows being nil without an OverflowingTx means that block didn't go thru CCC,
// which means that we are not the sequencer. Do not attempt to commit.
if res != nil && res.Rows == nil && res.OverflowingTx == nil {
if res.FinalBlock != nil {
w.updateSnapshot(res.FinalBlock)
}
w.currentPipeline.Release()
Expand Down
44 changes: 44 additions & 0 deletions miner/scroll_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,3 +1045,47 @@ func TestSealBlockAfterCliquePeriod(t *testing.T) {
t.Fatalf("timeout")
}
}

func TestPending(t *testing.T) {
var (
engine consensus.Engine
chainConfig *params.ChainConfig
db = rawdb.NewMemoryDatabase()
)
chainConfig = params.AllCliqueProtocolChanges
chainConfig.Clique = &params.CliqueConfig{Period: 1, Epoch: 30000}
chainConfig.Scroll.FeeVaultAddress = &common.Address{}
engine = clique.New(chainConfig.Clique, db)
w, b := newTestWorker(t, chainConfig, engine, db, 0)
defer w.close()

// This test chain imports the mined blocks.
b.genesis.MustCommit(db)
chain, _ := core.NewBlockChain(db, nil, b.chain.Config(), engine, vm.Config{
Debug: true,
Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil)
defer chain.Stop()

// Define 3 transactions:
// A --> B (nonce: 0, gas: 20)
tx0, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(100000000000000000), params.TxGas, big.NewInt(20*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)
// A --> B (nonce: 1, gas: 5)
tx1, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(0), params.TxGas, big.NewInt(5*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)
// B --> A (nonce: 0, gas: 20)
tx2, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testUserAddress), testBankAddress, big.NewInt(0), params.TxGas, big.NewInt(20*params.InitialBaseFee), nil), types.HomesteadSigner{}, testUserKey)
// B --> A (nonce: 1, gas: 20)
tx3, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testUserAddress)+1, testBankAddress, big.NewInt(0), params.TxGas, big.NewInt(20*params.InitialBaseFee), nil), types.HomesteadSigner{}, testUserKey)
// B --> A (nonce: 2, gas: 20)
tx4, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testUserAddress)+2, testBankAddress, big.NewInt(0), params.TxGas, big.NewInt(20*params.InitialBaseFee), nil), types.HomesteadSigner{}, testUserKey)
// A --> B (nonce: 2, gas: 5)
tx5, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress)+2, testUserAddress, big.NewInt(0), params.TxGas, big.NewInt(5*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)

b.txPool.AddRemotesSync([]*types.Transaction{tx0, tx1, tx2, tx3, tx4, tx5})
// start building pending block
w.startCh <- struct{}{}

time.Sleep(time.Second)
pending := w.pendingBlock()
assert.NotNil(t, pending)
assert.NotEmpty(t, pending.Transactions())
}
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 3 // Minor version component of the current release
VersionPatch = 30 // Patch version component of the current release
VersionPatch = 31 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down
66 changes: 38 additions & 28 deletions rollup/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func (p *Pipeline) Start(deadline time.Time) error {
return nil
}

// Stop forces pipeline to stop its operation and return whatever progress it has so far
func (p *Pipeline) Stop() {
if p.txnQueue != nil {
close(p.txnQueue)
p.txnQueue = nil
}
}

func (p *Pipeline) TryPushTxns(txs types.OrderedTransactionSet, onFailingTxn func(txnIndex int, tx *types.Transaction, err error) bool) *Result {
for {
tx := txs.Peek()
Expand All @@ -127,8 +135,7 @@ func (p *Pipeline) TryPushTxns(txs types.OrderedTransactionSet, onFailingTxn fun
txs.Shift()
default:
if errors.Is(err, ErrApplyStageDone) || onFailingTxn(p.txs.Len(), tx, err) {
close(p.txnQueue)
p.txnQueue = nil
p.Stop()
return nil
}

Expand Down Expand Up @@ -167,10 +174,7 @@ func (p *Pipeline) TryPushTxn(tx *types.Transaction) (*Result, error) {

// Release releases all resources related to the pipeline
func (p *Pipeline) Release() {
if p.txnQueue != nil {
close(p.txnQueue)
p.txnQueue = nil
}
p.Stop()

select {
case <-p.applyStageRespCh:
Expand Down Expand Up @@ -303,7 +307,9 @@ type Result struct {
}

func (p *Pipeline) cccStage(candidates <-chan *BlockCandidate, deadline time.Time) <-chan *Result {
p.ccc.Reset()
if p.ccc != nil {
p.ccc.Reset()
}
resultCh := make(chan *Result)
var lastCandidate *BlockCandidate
var lastAccRows *types.RowConsumption
Expand Down Expand Up @@ -335,7 +341,7 @@ func (p *Pipeline) cccStage(candidates <-chan *BlockCandidate, deadline time.Tim
cccStart := time.Now()
var accRows *types.RowConsumption
var err error
if candidate != nil {
if candidate != nil && p.ccc != nil {
accRows, err = p.ccc.ApplyTransaction(candidate.LastTrace)
lastTxn := candidate.Txs[candidate.Txs.Len()-1]
cccTimer.UpdateSince(cccStart)
Expand All @@ -352,6 +358,8 @@ func (p *Pipeline) cccStage(candidates <-chan *BlockCandidate, deadline time.Tim

lastCandidate = candidate
lastAccRows = accRows
} else if candidate != nil && p.ccc == nil {
lastCandidate = candidate
}

// immediately close the block if deadline reached or apply stage is done
Expand Down Expand Up @@ -381,29 +389,31 @@ func (p *Pipeline) traceAndApply(tx *types.Transaction) (*types.Receipt, *types.
return nil, nil, core.ErrGasLimitReached
}

// don't commit the state during tracing for circuit capacity checker, otherwise we cannot revert.
// and even if we don't commit the state, the `refund` value will still be correct, as explained in `CommitTransaction`
commitStateAfterApply := false
snap := p.state.Snapshot()

// 1. we have to check circuit capacity before `core.ApplyTransaction`,
// because if the tx can be successfully executed but circuit capacity overflows, it will be inconvenient to revert.
// 2. even if we don't commit to the state during the tracing (which means `clearJournalAndRefund` is not called during the tracing),
// the `refund` value will still be correct, because:
// 2.1 when starting handling the first tx, `state.refund` is 0 by default,
// 2.2 after tracing, the state is either committed in `core.ApplyTransaction`, or reverted, so the `state.refund` can be cleared,
// 2.3 when starting handling the following txs, `state.refund` comes as 0
trace, err = tracing.NewTracerWrapper().CreateTraceEnvAndGetBlockTrace(p.chain.Config(), p.chain, p.chain.Engine(), p.chain.Database(),
p.state, p.parent, types.NewBlockWithHeader(&p.Header).WithBody([]*types.Transaction{tx}, nil), commitStateAfterApply)
// `w.current.traceEnv.State` & `w.current.state` share a same pointer to the state, so only need to revert `w.current.state`
// revert to snapshot for calling `core.ApplyMessage` again, (both `traceEnv.GetBlockTrace` & `core.ApplyTransaction` will call `core.ApplyMessage`)
p.state.RevertToSnapshot(snap)
if err != nil {
return nil, nil, err
if p.ccc != nil {
// don't commit the state during tracing for circuit capacity checker, otherwise we cannot revert.
// and even if we don't commit the state, the `refund` value will still be correct, as explained in `CommitTransaction`
commitStateAfterApply := false
snap := p.state.Snapshot()

// 1. we have to check circuit capacity before `core.ApplyTransaction`,
// because if the tx can be successfully executed but circuit capacity overflows, it will be inconvenient to revert.
// 2. even if we don't commit to the state during the tracing (which means `clearJournalAndRefund` is not called during the tracing),
// the `refund` value will still be correct, because:
// 2.1 when starting handling the first tx, `state.refund` is 0 by default,
// 2.2 after tracing, the state is either committed in `core.ApplyTransaction`, or reverted, so the `state.refund` can be cleared,
// 2.3 when starting handling the following txs, `state.refund` comes as 0
trace, err = tracing.NewTracerWrapper().CreateTraceEnvAndGetBlockTrace(p.chain.Config(), p.chain, p.chain.Engine(), p.chain.Database(),
p.state, p.parent, types.NewBlockWithHeader(&p.Header).WithBody([]*types.Transaction{tx}, nil), commitStateAfterApply)
// `w.current.traceEnv.State` & `w.current.state` share a same pointer to the state, so only need to revert `w.current.state`
// revert to snapshot for calling `core.ApplyMessage` again, (both `traceEnv.GetBlockTrace` & `core.ApplyTransaction` will call `core.ApplyMessage`)
p.state.RevertToSnapshot(snap)
if err != nil {
return nil, nil, err
}
}

// create new snapshot for `core.ApplyTransaction`
snap = p.state.Snapshot()
snap := p.state.Snapshot()

var receipt *types.Receipt
receipt, err = core.ApplyTransaction(p.chain.Config(), p.chain, nil /* coinbase will default to chainConfig.Scroll.FeeVaultAddress */, p.gasPool,
Expand Down

0 comments on commit a43879b

Please sign in to comment.