Skip to content

Commit

Permalink
fix: made consensus store certificates asynchronously from statekeeper (
Browse files Browse the repository at this point in the history
#1711)

So far we used a simpler interface where statekeeper was provided the
next miniblock only after it persisted the previous one. With the
current statekeeper implementation it severely limited the throughput.
I've updated consensus API to allow for storing certificates
asynchronously to improve the throughput.
  • Loading branch information
pompon0 committed Apr 17, 2024
1 parent aed23e1 commit d1032ab
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 188 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,16 @@ zk_evm_1_3_3 = { package = "zk_evm", git = "https://github.com/matter-labs/era-z
zk_evm_1_4_0 = { package = "zk_evm", git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.0" }
zk_evm_1_4_1 = { package = "zk_evm", git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.1" }
zk_evm_1_5_0 = { package = "zk_evm", git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.5.0" }
zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_consensus_bft = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_consensus_crypto = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_consensus_network = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_consensus_utils = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "c9935c0fa69cde357a3d6f5eca148962dd3313e1" }
zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }
zksync_consensus_bft = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }
zksync_consensus_crypto = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }
zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }
zksync_consensus_network = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }
zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }
zksync_consensus_utils = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "1dcda70c1c25d0e4db6781ba8d2645d7e8966d49" }

# "Local" dependencies
multivm = { path = "core/lib/multivm" }
Expand Down
40 changes: 19 additions & 21 deletions core/lib/zksync_core/src/consensus/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_consensus_executor as executor;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::BlockStore;
use zksync_types::MiniblockNumber;
use zksync_web3_decl::client::BoxedL2Client;

Expand Down Expand Up @@ -41,14 +40,14 @@ impl Fetcher {
conn.try_update_genesis(ctx, &genesis)
.await
.wrap("set_genesis()")?;
let mut cursor = conn
.new_fetcher_cursor(ctx, actions)
let mut payload_queue = conn
.new_payload_queue(ctx, actions)
.await
.wrap("new_fetcher_cursor()")?;
.wrap("new_payload_queue()")?;
drop(conn);

// Fetch blocks before the genesis.
self.fetch_blocks(ctx, &mut cursor, Some(genesis.fork.first_block))
self.fetch_blocks(ctx, &mut payload_queue, Some(genesis.fork.first_block))
.await?;
// Monitor the genesis of the main node.
// If it changes, it means that a hard fork occurred and we need to reset the consensus state.
Expand All @@ -68,13 +67,12 @@ impl Fetcher {
});

// Run consensus component.
let mut block_store = self.store.clone().into_block_store();
block_store
.set_cursor(cursor)
.context("block_store.set_cursor()")?;
let (block_store, runner) = BlockStore::new(ctx, Box::new(block_store))
let (block_store, runner) = self
.store
.clone()
.into_block_store(ctx, Some(payload_queue))
.await
.wrap("BlockStore::new()")?;
.wrap("into_block_store()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
let executor = executor::Executor {
config: p2p.clone(),
Expand All @@ -100,15 +98,15 @@ impl Fetcher {
let res: ctx::Result<()> = scope::run!(ctx, |ctx, s| async {
// Update sync state in the background.
s.spawn_bg(self.fetch_state_loop(ctx));
let mut cursor = self
let mut payload_queue = self
.store
.access(ctx)
.await
.wrap("access()")?
.new_fetcher_cursor(ctx, actions)
.new_payload_queue(ctx, actions)
.await
.wrap("new_fetcher_cursor()")?;
self.fetch_blocks(ctx, &mut cursor, None).await
self.fetch_blocks(ctx, &mut payload_queue, None).await
})
.await;
match res {
Expand Down Expand Up @@ -168,12 +166,12 @@ impl Fetcher {
pub(super) async fn fetch_blocks(
&self,
ctx: &ctx::Ctx,
cursor: &mut storage::Cursor,
queue: &mut storage::PayloadQueue,
end: Option<validator::BlockNumber>,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
let first = cursor.next();
let mut next = cursor.next();
let first = queue.next();
let mut next = first;
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
Expand All @@ -186,17 +184,17 @@ impl Fetcher {
}
Ok(())
});
while end.map_or(true, |end| cursor.next() < end) {
while end.map_or(true, |end| queue.next() < end) {
let block = recv.recv(ctx).await?.join(ctx).await?;
cursor.advance(block).await?;
queue.send(block).await?;
}
Ok(())
})
.await?;
// If fetched anything, wait for the last block to be stored persistently.
if first < cursor.next() {
if first < queue.next() {
self.store
.wait_for_payload(ctx, cursor.next().prev().unwrap())
.wait_for_payload(ctx, queue.next().prev().unwrap())
.await?;
}
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions core/lib/zksync_core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_consensus_executor as executor;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::BlockStore;

pub use self::{fetcher::*, storage::Store};

Expand All @@ -31,14 +30,15 @@ impl MainNodeConfig {
/// Broadcasts the blocks with certificates to gossip network peers.
pub async fn run(self, ctx: &ctx::Ctx, store: Store) -> anyhow::Result<()> {
scope::run!(&ctx, |ctx, s| async {
let mut block_store = store.clone().into_block_store();
block_store
store
.try_init_genesis(ctx, &self.validator_key.public())
.await
.wrap("block_store.try_init_genesis()")?;
let (block_store, runner) = BlockStore::new(ctx, Box::new(block_store))
let (block_store, runner) = store
.clone()
.into_block_store(ctx, None)
.await
.wrap("BlockStore::new()")?;
.wrap("into_block_store()")?;
s.spawn_bg(runner.run(ctx));
let executor = executor::Executor {
config: self.executor,
Expand Down
Loading

0 comments on commit d1032ab

Please sign in to comment.