From f225ac819a66d575d9a2da31c768066ef4aef8ba Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Thu, 18 May 2023 21:52:45 +0800 Subject: [PATCH] feat: refine innerWatcher --- .../java/io/optimism/l1/ChainWatcher.java | 25 +- .../java/io/optimism/l1/InnerWatcher.java | 411 ++++++++---------- .../java/io/optimism/l1/InnerWatcherTest.java | 15 +- 3 files changed, 214 insertions(+), 237 deletions(-) diff --git a/hildr-node/src/main/java/io/optimism/l1/ChainWatcher.java b/hildr-node/src/main/java/io/optimism/l1/ChainWatcher.java index e01dc4e3..f0c347be 100644 --- a/hildr-node/src/main/java/io/optimism/l1/ChainWatcher.java +++ b/hildr-node/src/main/java/io/optimism/l1/ChainWatcher.java @@ -31,8 +31,9 @@ @SuppressWarnings({"UnusedVariable", "preview"}) public class ChainWatcher { - private MessagePassingQueue blockUpdateQueue; - private final InnerWatcher innerWatcher; + private volatile MessagePassingQueue blockUpdateQueue; + private volatile InnerWatcher innerWatcher; + private final Config config; /** * Gets block update queue. @@ -51,10 +52,11 @@ public MessagePassingQueue getBlockUpdateQueue() { * @param config the global config */ public ChainWatcher(BigInteger l1StartBlock, BigInteger l2StartBlock, Config config) { + this.config = config; this.blockUpdateQueue = new MpscGrowableArrayQueue<>(1024 * 4, 1024 * 64); this.innerWatcher = new InnerWatcher( - config, + this.config, this.blockUpdateQueue, l1StartBlock, l2StartBlock, @@ -75,8 +77,19 @@ public void stop() { /** * Restart. * - * @param number the number - * @param number1 the number 1 + * @param l1StartBlock new l1 start block number + * @param l2StartBlock new l2 start block number */ - public void restart(BigInteger number, BigInteger number1) {} + public void restart(BigInteger l1StartBlock, BigInteger l2StartBlock) { + this.stop(); + this.blockUpdateQueue = new MpscGrowableArrayQueue<>(1024 * 4, 1024 * 64); + this.innerWatcher = + new InnerWatcher( + this.config, + this.blockUpdateQueue, + l1StartBlock, + l2StartBlock, + Executors.newVirtualThreadPerTaskExecutor()); + this.start(); + } } diff --git a/hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java b/hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java index 5cdd3a59..b2ba06b0 100644 --- a/hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java +++ b/hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java @@ -29,11 +29,9 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.function.Function; import java.util.stream.Collectors; import okhttp3.OkHttpClient; import org.jctools.queues.MessagePassingQueue; @@ -147,26 +145,7 @@ public InnerWatcher( if (l2StartBlock.equals(config.chainConfig().l2Genesis().number())) { this.systemConfig = config.chainConfig().systemConfig(); } else { - Web3j l2Client = createClient(config.l2RpcUrl()); - CompletableFuture l2Future = - this.getBlock(l2Client, l2StartBlock.subtract(BigInteger.ONE)); - EthBlock block; - try { - block = l2Future.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - EthBlock.TransactionObject tx = - (EthBlock.TransactionObject) block.getBlock().getTransactions().get(0).get(); - final byte[] input = Numeric.hexStringToByteArray(tx.getInput()); - - final String batchSender = Numeric.toHexString(Arrays.copyOfRange(input, 176, 196)); - var l1FeeOverhead = new BigInteger(Arrays.copyOfRange(input, 196, 228)); - var l1FeeScalar = new BigInteger(Arrays.copyOfRange(input, 228, 260)); - var gasLimit = block.getBlock().getGasLimit(); - this.systemConfig = - new Config.SystemConfig(batchSender, gasLimit, l1FeeOverhead, l1FeeScalar); - l2Client.shutdown(); + this.getMetadataFromL2(l2StartBlock); } this.config = config; @@ -179,152 +158,127 @@ public InnerWatcher( this.systemConfigUpdate = new Tuple2<>(l1StartBlock, null); } + private void getMetadataFromL2(BigInteger l2StartBlock) { + Web3j l2Client = createClient(config.l2RpcUrl()); + EthBlock block; + try { + block = this.getBlock(l2Client, l2StartBlock.subtract(BigInteger.ONE)); + } catch (InterruptedException | ExecutionException e) { + l2Client.shutdown(); + throw new RuntimeException(e); + } + EthBlock.TransactionObject tx = + (EthBlock.TransactionObject) block.getBlock().getTransactions().get(0).get(); + final byte[] input = Numeric.hexStringToByteArray(tx.getInput()); + + final String batchSender = Numeric.toHexString(Arrays.copyOfRange(input, 176, 196)); + var l1FeeOverhead = new BigInteger(Arrays.copyOfRange(input, 196, 228)); + var l1FeeScalar = new BigInteger(Arrays.copyOfRange(input, 228, 260)); + var gasLimit = block.getBlock().getGasLimit(); + this.systemConfig = new Config.SystemConfig(batchSender, gasLimit, l1FeeOverhead, l1FeeScalar); + l2Client.shutdown(); + } + /** * try ingest block. * - * @return the completable future + * @throws ExecutionException thrown if failed to get data from web3j client + * @throws InterruptedException thrown if executor has been shutdown */ - public CompletableFuture tryIngestBlock() { - CompletableFuture res = CompletableFuture.completedFuture(null); + public void tryIngestBlock() throws ExecutionException, InterruptedException { if (this.currentBlock.compareTo(this.finalizedBlock) > 0) { - res = - res.thenCompose( - (Function>) - unused -> - InnerWatcher.this - .getFinalized() - .thenAccept( - finalizedBlock -> { - InnerWatcher.this.finalizedBlock = finalizedBlock; - - while (true) { - boolean isOffered = - InnerWatcher.this.blockUpdateQueue.relaxedOffer( - new FinalityUpdate(finalizedBlock)); - if (isOffered) { - break; - } - } - - InnerWatcher.this.unfinalizedBlocks = - InnerWatcher.this.unfinalizedBlocks.stream() - .filter( - blockInfo -> - blockInfo - .number() - .compareTo(InnerWatcher.this.finalizedBlock) - > 0) - .toList(); - })); + this.finalizedBlock = this.getFinalized(); + this.putBlockUpdate(new FinalityUpdate(finalizedBlock)); + this.unfinalizedBlocks = + this.unfinalizedBlocks.stream() + .filter( + blockInfo -> blockInfo.number().compareTo(InnerWatcher.this.finalizedBlock) > 0) + .toList(); } + if (this.currentBlock.compareTo(this.headBlock) > 0) { - res = - res.thenCompose( - (Function>) - unused -> - InnerWatcher.this - .getHead() - .thenAccept(head -> InnerWatcher.this.headBlock = head)); + this.headBlock = this.getHead(); } if (this.currentBlock.compareTo(this.headBlock) <= 0) { - return res.thenCompose( - (Function>) unused -> updateSystemConfigWithNewestLog()); + updateSystemConfigWithNewestLog(); } else { - return res.thenAccept( - unused -> { - try { - Thread.sleep(Duration.ofMillis(250L)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); + try { + Thread.sleep(Duration.ofMillis(250L)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } - private CompletableFuture updateSystemConfigWithNewestLog() { - CompletableFuture updateSysConfFuture = this.updateSystemConfig(); - CompletableFuture blockRespFuture = this.getBlock(this.provider, this.currentBlock); - CompletableFuture> userDepositsFuture = - this.getDeposits(this.currentBlock); - - return CompletableFuture.allOf(updateSysConfFuture, blockRespFuture, userDepositsFuture) - .thenAccept( - unused -> { - updateSysConfFuture.join(); - EthBlock blockResp = blockRespFuture.join(); - List userDeposits = userDepositsFuture.join(); - boolean finalized = this.currentBlock.compareTo(this.finalizedBlock) >= 0; - L1Info l1Info = - L1Info.create( - blockResp.getBlock(), - userDeposits, - config.chainConfig().batchInbox(), - finalized, - this.systemConfig); - - if (l1Info.blockInfo().number().compareTo(this.finalizedBlock) >= 0) { - BlockInfo blockInfo = - new BlockInfo( - l1Info.blockInfo().hash(), - l1Info.blockInfo().number(), - blockResp.getBlock().getParentHash(), - l1Info.blockInfo().timestamp()); - this.unfinalizedBlocks.add(blockInfo); - } + private void updateSystemConfigWithNewestLog() throws ExecutionException, InterruptedException { + this.updateSystemConfig(); + EthBlock blockResp = this.getBlock(this.provider, this.currentBlock); + List userDeposits = this.getDeposits(this.currentBlock); + + boolean finalized = this.currentBlock.compareTo(this.finalizedBlock) >= 0; + L1Info l1Info = + L1Info.create( + blockResp.getBlock(), + userDeposits, + config.chainConfig().batchInbox(), + finalized, + this.systemConfig); + + if (l1Info.blockInfo().number().compareTo(this.finalizedBlock) >= 0) { + BlockInfo blockInfo = + new BlockInfo( + l1Info.blockInfo().hash(), + l1Info.blockInfo().number(), + blockResp.getBlock().getParentHash(), + l1Info.blockInfo().timestamp()); + this.unfinalizedBlocks.add(blockInfo); + } - BlockUpdate update = - this.checkReorg() ? new BlockUpdate.Reorg() : new BlockUpdate.NewBlock(l1Info); - while (true) { - boolean isOffered = this.blockUpdateQueue.relaxedOffer(update); - if (isOffered) { - break; - } - } - this.currentBlock = this.currentBlock.add(BigInteger.ONE); - }); + BlockUpdate update = + this.checkReorg() ? new BlockUpdate.Reorg() : new BlockUpdate.NewBlock(l1Info); + this.putBlockUpdate(update); + this.currentBlock = this.currentBlock.add(BigInteger.ONE); } - private CompletableFuture updateSystemConfig() { - CompletableFuture res = CompletableFuture.completedFuture(null); + private void putBlockUpdate(final BlockUpdate update) { + while (true) { + boolean isOffered = this.blockUpdateQueue.relaxedOffer(update); + if (isOffered) { + break; + } + } + } + + private void updateSystemConfig() throws ExecutionException, InterruptedException { BigInteger preLastUpdateBlock = this.systemConfigUpdate.component1(); if (preLastUpdateBlock.compareTo(this.currentBlock) < 0) { BigInteger toBlock = preLastUpdateBlock.add(BigInteger.valueOf(1000L)); - res = - res.thenCompose( - (Function>) - unused -> - InnerWatcher.this.getLog( - preLastUpdateBlock.add(BigInteger.ONE), - toBlock, - InnerWatcher.this.config.chainConfig().systemConfigContract(), - CONFIG_UPDATE_TOPIC)) - .thenAccept( - updates -> { - LogResult update = updates.getLogs().iterator().next(); - BigInteger updateBlock = ((LogObject) update).getBlockNumber(); - SystemConfigUpdate configUpdate = - SystemConfigUpdate.tryFrom((LogObject) update); - if (updateBlock == null) { - InnerWatcher.this.systemConfigUpdate = new Tuple2<>(toBlock, null); - } else { - SystemConfig updateSystemConfig = parseSystemConfigUpdate(configUpdate); - InnerWatcher.this.systemConfigUpdate = - new Tuple2<>(updateBlock, updateSystemConfig); - } - }); + EthLog updates = + this.getLog( + preLastUpdateBlock.add(BigInteger.ONE), + toBlock, + InnerWatcher.this.config.chainConfig().systemConfigContract(), + CONFIG_UPDATE_TOPIC); + + LogResult update = updates.getLogs().iterator().next(); + BigInteger updateBlock = ((LogObject) update).getBlockNumber(); + SystemConfigUpdate configUpdate = SystemConfigUpdate.tryFrom((LogObject) update); + if (updateBlock == null) { + InnerWatcher.this.systemConfigUpdate = new Tuple2<>(toBlock, null); + } else { + SystemConfig updateSystemConfig = parseSystemConfigUpdate(configUpdate); + this.systemConfigUpdate = new Tuple2<>(updateBlock, updateSystemConfig); + } + } + BigInteger lastUpdateBlock = this.systemConfigUpdate.component1(); + SystemConfig nextConfig = this.systemConfigUpdate.component2(); + if (lastUpdateBlock.compareTo(currentBlock) == 0 && nextConfig != null) { + LOGGER.info("system config updated"); + LOGGER.debug("{}", nextConfig); + this.systemConfig = nextConfig; } - return res.thenAccept( - unused -> { - BigInteger lastUpdateBlock = InnerWatcher.this.systemConfigUpdate.component1(); - SystemConfig nextConfig = InnerWatcher.this.systemConfigUpdate.component2(); - if (lastUpdateBlock.compareTo(currentBlock) == 0 && nextConfig != null) { - LOGGER.info("system config updated"); - LOGGER.debug("{}", nextConfig); - InnerWatcher.this.systemConfig = nextConfig; - } - }); } private Config.SystemConfig parseSystemConfigUpdate(SystemConfigUpdate configUpdate) { @@ -364,50 +318,58 @@ private boolean checkReorg() { return false; } - private CompletableFuture getFinalized() { - return CompletableFuture.supplyAsync( - () -> { - try { - EthBlock block = - this.provider - .ethGetBlockByNumber(DefaultBlockParameterName.FINALIZED, false) - .send(); - return block.getBlock().getNumber(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, - this.executor); + private BigInteger getFinalized() throws ExecutionException, InterruptedException { + return this.executor + .submit( + () -> { + try { + EthBlock block = + this.provider + .ethGetBlockByNumber(DefaultBlockParameterName.FINALIZED, false) + .send(); + return block.getBlock().getNumber(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .get(); } - private CompletableFuture getHead() { - return CompletableFuture.supplyAsync( - () -> { - try { - EthBlock block = - this.provider.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false).send(); - return block.getBlock().getNumber(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, - this.executor); + private BigInteger getHead() throws ExecutionException, InterruptedException { + return this.executor + .submit( + () -> { + try { + EthBlock block = + this.provider + .ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false) + .send(); + return block.getBlock().getNumber(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .get(); } - private CompletableFuture getBlock(final Web3j client, final BigInteger blockNum) { - return CompletableFuture.supplyAsync( - () -> { - try { - return client.ethGetBlockByNumber(DefaultBlockParameter.valueOf(blockNum), true).send(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, - this.executor); + private EthBlock getBlock(final Web3j client, final BigInteger blockNum) + throws ExecutionException, InterruptedException { + return this.executor + .submit( + () -> { + try { + return client + .ethGetBlockByNumber(DefaultBlockParameter.valueOf(blockNum), true) + .send(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .get(); } - private CompletableFuture getLog( - BigInteger fromBlock, BigInteger toBlock, String contract, String topic) { + private EthLog getLog(BigInteger fromBlock, BigInteger toBlock, String contract, String topic) + throws ExecutionException, InterruptedException { final EthFilter ethFilter = new EthFilter( DefaultBlockParameter.valueOf(fromBlock), @@ -415,56 +377,55 @@ private CompletableFuture getLog( contract) .addSingleTopic(topic); - return CompletableFuture.supplyAsync( - () -> { - try { - return this.provider.ethGetLogs(ethFilter).send(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, - this.executor); + return this.executor + .submit( + () -> { + try { + return this.provider.ethGetLogs(ethFilter).send(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .get(); } - private CompletableFuture> getDeposits(BigInteger blockNum) { + private List getDeposits(BigInteger blockNum) + throws ExecutionException, InterruptedException { final List removed = this.deposits.remove(blockNum); if (removed != null) { - return CompletableFuture.completedFuture(removed); + return removed; } final BigInteger endBlock = this.headBlock.min(blockNum.add(BigInteger.valueOf(1000L))); - final CompletableFuture logFuture = + EthLog result = this.getLog( blockNum, endBlock, this.config.chainConfig().depositContract(), TRANSACTION_DEPOSITED_TOPIC); - return logFuture.thenApply( - result -> { - List depositLogs = - result.getLogs().stream() - .map( - log -> { - if (log instanceof LogObject) { - return UserDeposited.fromLog((LogObject) log); - } else { - throw new IllegalStateException( - "Unexpected result type: " + log.get() + " required LogObject"); - } - }) - .toList(); - - for (BigInteger i = blockNum; i.compareTo(endBlock) < 0; i = i.add(BigInteger.ONE)) { - final BigInteger num = i; - final List collect = - depositLogs.stream() - .filter(log -> log.l1BlockNum().compareTo(num) == 0) - .collect(Collectors.toList()); - InnerWatcher.this.deposits.put(num, collect); - } - return InnerWatcher.this.deposits.remove(blockNum); - }); + List depositLogs = + result.getLogs().stream() + .map( + log -> { + if (log instanceof LogObject) { + return UserDeposited.fromLog((LogObject) log); + } else { + throw new IllegalStateException( + "Unexpected result type: " + log.get() + " required LogObject"); + } + }) + .toList(); + + for (BigInteger i = blockNum; i.compareTo(endBlock) < 0; i = i.add(BigInteger.ONE)) { + final BigInteger num = i; + final List collect = + depositLogs.stream() + .filter(log -> log.l1BlockNum().compareTo(num) == 0) + .collect(Collectors.toList()); + InnerWatcher.this.deposits.put(num, collect); + } + return InnerWatcher.this.deposits.remove(blockNum); } private Web3j createClient(String url) { @@ -477,9 +438,8 @@ private Web3j createClient(String url) { protected void run() { while (isRunning()) { LOGGER.debug("fetching L1 data for block {}", currentBlock); - CompletableFuture res = tryIngestBlock(); try { - res.get(); + this.tryIngestBlock(); } catch (ExecutionException e) { LOGGER.error("error while fetching L1 data for block {}", currentBlock); } catch (InterruptedException e) { @@ -493,4 +453,9 @@ protected void run() { protected Executor executor() { return this.executor; } + + @Override + protected void triggerShutdown() { + var unused = this.executor.shutdownNow(); + } } diff --git a/hildr-node/src/test/java/io/optimism/l1/InnerWatcherTest.java b/hildr-node/src/test/java/io/optimism/l1/InnerWatcherTest.java index f94ac6ec..0698f876 100644 --- a/hildr-node/src/test/java/io/optimism/l1/InnerWatcherTest.java +++ b/hildr-node/src/test/java/io/optimism/l1/InnerWatcherTest.java @@ -16,17 +16,16 @@ package io.optimism.l1; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; import io.optimism.config.Config; -import java.io.IOException; import java.math.BigInteger; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jctools.queues.MessagePassingQueue; -import org.jctools.queues.MpscBlockingConsumerArrayQueue; +import org.jctools.queues.MpscGrowableArrayQueue; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -89,7 +88,7 @@ void testCreateInnerWatcher() { if (!isConfiguredApiKeyEnv) { return; } - var queue = new MpscBlockingConsumerArrayQueue(1000); + var queue = new MpscGrowableArrayQueue(1024 * 4, 1024 * 64); var unused = this.createWatcher(null, queue, executor); unused = this.createWatcher( @@ -97,14 +96,14 @@ void testCreateInnerWatcher() { } @Test - void testTryIngestBlock() throws IOException, ExecutionException, InterruptedException { + void testTryIngestBlock() throws ExecutionException, InterruptedException { if (!isConfiguredApiKeyEnv) { return; } ExecutorService executor = Executors.newSingleThreadExecutor(); - var queue = new MpscBlockingConsumerArrayQueue(1000); + var queue = new MpscGrowableArrayQueue(1024 * 4, 1024 * 64); var watcher = this.createWatcher(null, queue, executor); - watcher.tryIngestBlock().get(); - assertTrue(queue.size() != 0); + watcher.tryIngestBlock(); + assertEquals(2, queue.size()); } }