Skip to content

Commit

Permalink
refactor: removed polling from external io (#1690)
Browse files Browse the repository at this point in the history
ExternalIO contained unnecessary sleeps, which slowed down block
processing.
  • Loading branch information
pompon0 committed Apr 16, 2024
1 parent ce10c2f commit 02f7cd5
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 77 deletions.
130 changes: 57 additions & 73 deletions core/lib/zksync_core/src/sync_layer/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ use super::{
};
use crate::state_keeper::{
io::{
common::{load_pending_batch, poll_iters, IoCursor},
common::{load_pending_batch, IoCursor},
L1BatchParams, MiniblockParams, PendingBatchData, StateKeeperIO,
},
metrics::KEEPER_METRICS,
seal_criteria::IoSealCriteria,
updates::UpdatesManager,
};

/// The interval between the action queue polling attempts for the new actions.
const POLL_INTERVAL: Duration = Duration::from_millis(100);

/// ExternalIO is the IO abstraction for the state keeper that is used in the external node.
/// It receives a sequence of actions from the fetcher via the action queue and propagates it
/// into the state keeper.
Expand Down Expand Up @@ -220,34 +217,31 @@ impl StateKeeperIO for ExternalIO {
max_wait: Duration,
) -> anyhow::Result<Option<L1BatchParams>> {
tracing::debug!("Waiting for the new batch params");
for _ in 0..poll_iters(POLL_INTERVAL, max_wait) {
match self.actions.pop_action() {
Some(SyncAction::OpenBatch {
params,
number,
first_miniblock_number,
}) => {
anyhow::ensure!(
number == cursor.l1_batch,
"Batch number mismatch: expected {}, got {number}",
cursor.l1_batch
);
anyhow::ensure!(
first_miniblock_number == cursor.next_miniblock,
"Miniblock number mismatch: expected {}, got {first_miniblock_number}",
cursor.next_miniblock
);
return Ok(Some(params));
}
Some(other) => {
anyhow::bail!("unexpected action in the action queue: {other:?}");
}
None => {
tokio::time::sleep(POLL_INTERVAL).await;
}
let Some(action) = self.actions.recv_action(max_wait).await else {
return Ok(None);
};
match action {
SyncAction::OpenBatch {
params,
number,
first_miniblock_number,
} => {
anyhow::ensure!(
number == cursor.l1_batch,
"Batch number mismatch: expected {}, got {number}",
cursor.l1_batch
);
anyhow::ensure!(
first_miniblock_number == cursor.next_miniblock,
"Miniblock number mismatch: expected {}, got {first_miniblock_number}",
cursor.next_miniblock
);
return Ok(Some(params));
}
other => {
anyhow::bail!("unexpected action in the action queue: {other:?}");
}
}
Ok(None)
}

async fn wait_for_new_miniblock_params(
Expand All @@ -256,62 +250,52 @@ impl StateKeeperIO for ExternalIO {
max_wait: Duration,
) -> anyhow::Result<Option<MiniblockParams>> {
// Wait for the next miniblock to appear in the queue.
let actions = &mut self.actions;
for _ in 0..poll_iters(POLL_INTERVAL, max_wait) {
match actions.pop_action() {
Some(SyncAction::Miniblock { params, number }) => {
anyhow::ensure!(
number == cursor.next_miniblock,
"Miniblock number mismatch: expected {}, got {number}",
cursor.next_miniblock
);
return Ok(Some(params));
}
Some(other) => {
anyhow::bail!(
"Unexpected action in the queue while waiting for the next miniblock: {other:?}"
);
}
None => {
tokio::time::sleep(POLL_INTERVAL).await;
}
let Some(action) = self.actions.recv_action(max_wait).await else {
return Ok(None);
};
match action {
SyncAction::Miniblock { params, number } => {
anyhow::ensure!(
number == cursor.next_miniblock,
"Miniblock number mismatch: expected {}, got {number}",
cursor.next_miniblock
);
return Ok(Some(params));
}
other => {
anyhow::bail!(
"Unexpected action in the queue while waiting for the next miniblock: {other:?}"
);
}
}
Ok(None)
}

async fn wait_for_next_tx(
&mut self,
max_wait: Duration,
) -> anyhow::Result<Option<Transaction>> {
let actions = &mut self.actions;
tracing::debug!(
"Waiting for the new tx, next action is {:?}",
actions.peek_action()
self.actions.peek_action()
);
for _ in 0..poll_iters(POLL_INTERVAL, max_wait) {
match actions.peek_action() {
Some(SyncAction::Tx(_)) => {
let SyncAction::Tx(tx) = actions.pop_action().unwrap() else {
unreachable!()
};
return Ok(Some(Transaction::from(*tx)));
}
Some(SyncAction::SealMiniblock | SyncAction::SealBatch) => {
// No more transactions in the current miniblock; the state keeper should seal it.
return Ok(None);
}
Some(other) => {
anyhow::bail!(
"Unexpected action in the queue while waiting for the next transaction: {other:?}"
);
}
_ => {
tokio::time::sleep(POLL_INTERVAL).await;
}
let Some(action) = self.actions.peek_action_async(max_wait).await else {
return Ok(None);
};
match action {
SyncAction::Tx(tx) => {
self.actions.pop_action().unwrap();
return Ok(Some(Transaction::from(*tx)));
}
SyncAction::SealMiniblock | SyncAction::SealBatch => {
// No more transactions in the current miniblock; the state keeper should seal it.
return Ok(None);
}
other => {
anyhow::bail!(
"Unexpected action in the queue while waiting for the next transaction: {other:?}"
);
}
}
Ok(None)
}

async fn rollback(&mut self, tx: Transaction) -> anyhow::Result<()> {
Expand Down
35 changes: 31 additions & 4 deletions core/lib/zksync_core/src/sync_layer/sync_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,24 @@ impl ActionQueue {
QUEUE_METRICS.action_queue_size.dec_by(1);
return Some(peeked);
}
let action = self.receiver.try_recv().ok();
if action.is_some() {
QUEUE_METRICS.action_queue_size.dec_by(1);
let action = self.receiver.try_recv().ok()?;
QUEUE_METRICS.action_queue_size.dec_by(1);
Some(action)
}

/// Removes the first action from the queue.
pub(super) async fn recv_action(
&mut self,
max_wait: tokio::time::Duration,
) -> Option<SyncAction> {
if let Some(action) = self.pop_action() {
return Some(action);
}
action
let action = tokio::time::timeout(max_wait, self.receiver.recv())
.await
.ok()??;
QUEUE_METRICS.action_queue_size.dec_by(1);
Some(action)
}

/// Returns the first action from the queue without removing it.
Expand All @@ -105,6 +118,20 @@ impl ActionQueue {
self.peeked = self.receiver.try_recv().ok();
self.peeked.clone()
}

/// Returns the first action from the queue without removing it.
pub(super) async fn peek_action_async(
&mut self,
max_wait: tokio::time::Duration,
) -> Option<SyncAction> {
if let Some(action) = &self.peeked {
return Some(action.clone());
}
self.peeked = tokio::time::timeout(max_wait, self.receiver.recv())
.await
.ok()?;
self.peeked.clone()
}
}

/// An instruction for the ExternalIO to request a certain action from the state keeper.
Expand Down

0 comments on commit 02f7cd5

Please sign in to comment.