Skip to content

Commit

Permalink
feat(storage): Make the storage caches task cancellation aware (#1430)
Browse files Browse the repository at this point in the history
## What ❔

Makes the support task for postgres storage caches aware of cancellation
token.
Additionally, replaces the closure with a named type.

## Why ❔

Currently, it's being canceled implicitly (by dropping the command
sender), which is not idiomatic in terms of the new framework.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `zk spellcheck`.
- [ ] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
popzxc committed Mar 19, 2024
1 parent a3b5a84 commit ab532bb
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 66 deletions.
13 changes: 8 additions & 5 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,14 @@ async fn init_tasks(
);
let latest_values_cache_size = config.optional.latest_values_cache_size() as u64;
let cache_update_handle = (latest_values_cache_size > 0).then(|| {
task::spawn_blocking(storage_caches.configure_storage_values_cache(
latest_values_cache_size,
connection_pool.clone(),
tokio::runtime::Handle::current(),
))
task::spawn(
storage_caches
.configure_storage_values_cache(
latest_values_cache_size,
connection_pool.clone(),
)
.run(stop_receiver.clone()),
)
});

let tx_sender = tx_sender_builder
Expand Down
2 changes: 1 addition & 1 deletion core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod witness;

pub use self::{
in_memory::{InMemoryStorage, IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID},
postgres::{PostgresStorage, PostgresStorageCaches},
postgres::{PostgresStorage, PostgresStorageCaches, PostgresStorageCachesTask},
rocksdb::{RocksbStorageBuilder, RocksdbStorage},
shadow_storage::ShadowStorage,
storage_view::{StorageView, StorageViewMetrics},
Expand Down
95 changes: 65 additions & 30 deletions core/lib/state/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ use std::{
};

use anyhow::Context as _;
use tokio::{runtime::Handle, sync::mpsc};
use tokio::{
runtime::Handle,
sync::{
mpsc::{self, UnboundedReceiver},
watch,
},
};
use zksync_dal::{ConnectionPool, StorageProcessor};
use zksync_types::{L1BatchNumber, MiniblockNumber, StorageKey, StorageValue, H256};

Expand Down Expand Up @@ -140,11 +146,10 @@ impl ValuesCache {
}
}

fn update(
async fn update(
&self,
from_miniblock: MiniblockNumber,
to_miniblock: MiniblockNumber,
rt_handle: &Handle,
connection: &mut StorageProcessor<'_>,
) -> anyhow::Result<()> {
const MAX_MINIBLOCKS_LAG: u32 = 5;
Expand Down Expand Up @@ -177,12 +182,10 @@ impl ValuesCache {
} else {
let update_latency = CACHE_METRICS.values_update[&ValuesUpdateStage::LoadKeys].start();
let miniblocks = (from_miniblock + 1)..=to_miniblock;
let modified_keys = rt_handle
.block_on(
connection
.storage_logs_dal()
.modified_keys_in_miniblocks(miniblocks.clone()),
)
let modified_keys = connection
.storage_logs_dal()
.modified_keys_in_miniblocks(miniblocks.clone())
.await
.with_context(|| {
format!("failed loading modified keys for miniblocks {miniblocks:?}")
})?;
Expand Down Expand Up @@ -293,42 +296,28 @@ impl PostgresStorageCaches {
&mut self,
capacity: u64,
connection_pool: ConnectionPool,
rt_handle: Handle,
) -> impl FnOnce() -> anyhow::Result<()> + Send {
) -> PostgresStorageCachesTask {
assert!(
capacity > 0,
"Storage values cache capacity must be positive"
);
tracing::debug!("Initializing VM storage values cache with {capacity}B capacity");

let (command_sender, mut command_receiver) = mpsc::unbounded_channel();
let (command_sender, command_receiver) = mpsc::unbounded_channel();
let values_cache = ValuesCache::new(capacity);
self.values = Some(ValuesCacheAndUpdater {
cache: values_cache.clone(),
command_sender,
});

// We want to run updates on a single thread in order to not block VM execution on update
// We want to run updates in a separate task in order to not block VM execution on update
// and keep contention over the `ValuesCache` lock as low as possible. As a downside,
// `Self::schedule_values_update()` will produce some no-op update commands from concurrently
// executing VM instances. Due to built-in filtering, this seems manageable.
move || {
let mut current_miniblock = values_cache.valid_for();
while let Some(to_miniblock) = command_receiver.blocking_recv() {
if to_miniblock <= current_miniblock {
continue;
}
let mut connection = rt_handle
.block_on(connection_pool.access_storage_tagged("values_cache_updater"))?;
values_cache.update(
current_miniblock,
to_miniblock,
&rt_handle,
&mut connection,
)?;
current_miniblock = to_miniblock;
}
Ok(())
PostgresStorageCachesTask {
connection_pool,
values_cache,
command_receiver,
}
}

Expand All @@ -352,6 +341,52 @@ impl PostgresStorageCaches {
}
}

/// An asynchronous task that updates the VM storage values cache.
#[derive(Debug)]
pub struct PostgresStorageCachesTask {
connection_pool: ConnectionPool,
values_cache: ValuesCache,
command_receiver: UnboundedReceiver<MiniblockNumber>,
}

impl PostgresStorageCachesTask {
/// Runs the task.
///
/// ## Errors
///
/// - Propagates Postgres errors.
/// - Propagates errors from the cache update task.
pub async fn run(mut self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let mut current_miniblock = self.values_cache.valid_for();
loop {
tokio::select! {
_ = stop_receiver.changed() => {
break;
}
Some(to_miniblock) = self.command_receiver.recv() => {
if to_miniblock <= current_miniblock {
continue;
}
let mut connection = self
.connection_pool
.access_storage_tagged("values_cache_updater")
.await?;
self.values_cache
.update(current_miniblock, to_miniblock, &mut connection)
.await?;
current_miniblock = to_miniblock;
}
else => {
// The command sender has been dropped, which means that we must receive the stop signal soon.
stop_receiver.changed().await?;
break;
}
}
}
Ok(())
}
}

/// [`ReadStorage`] implementation backed by the Postgres database.
#[derive(Debug)]
pub struct PostgresStorage<'a> {
Expand Down
19 changes: 9 additions & 10 deletions core/lib/state/src/postgres/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl ValuesCache {

fn test_values_cache(pool: &ConnectionPool, rt_handle: Handle) {
let mut caches = PostgresStorageCaches::new(1_024, 1_024);
let _ = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone(), rt_handle.clone());
let _ = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone());
// We cannot use an update task since it requires having concurrent DB connections
// that don't work in tests. We'll update values cache manually instead.
let values_cache = caches.values.as_ref().unwrap().cache.clone();
Expand Down Expand Up @@ -451,13 +451,13 @@ fn test_values_cache(pool: &ConnectionPool, rt_handle: Handle) {
(non_existing_key, Some(H256::zero())),
]);

values_cache
.update(
storage
.rt_handle
.block_on(values_cache.update(
MiniblockNumber(0),
MiniblockNumber(1),
&storage.rt_handle,
&mut storage.connection,
)
))
.unwrap();
assert_eq!(values_cache.0.read().unwrap().valid_for, MiniblockNumber(1));

Expand Down Expand Up @@ -510,7 +510,7 @@ async fn using_values_cache() {
/// on randomly generated `read_value()` queries.
fn mini_fuzz_values_cache_inner(rng: &mut impl Rng, pool: &ConnectionPool, mut rt_handle: Handle) {
let mut caches = PostgresStorageCaches::new(1_024, 1_024);
let _ = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone(), rt_handle.clone());
let _ = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone());
let values_cache = caches.values.as_ref().unwrap().cache.clone();

let mut connection = rt_handle.block_on(pool.access_storage()).unwrap();
Expand All @@ -536,13 +536,12 @@ fn mini_fuzz_values_cache_inner(rng: &mut impl Rng, pool: &ConnectionPool, mut r
let cache_valid_for = values_cache.valid_for();
assert!(cache_valid_for < MiniblockNumber(latest_block_number));

values_cache
.update(
rt_handle
.block_on(values_cache.update(
cache_valid_for,
MiniblockNumber(latest_block_number),
&rt_handle,
&mut connection,
)
))
.unwrap();
cache_updated = true;
}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl ApiBuilder {
ApiTransport::Http(_) => "http_api",
ApiTransport::WebSocket(_) => "ws_api",
};
let (_health_check, health_updater) = ReactiveHealthCheck::new(health_check_name);
let (_, health_updater) = ReactiveHealthCheck::new(health_check_name);

Ok(ApiServer {
pool: self.pool,
Expand Down
28 changes: 18 additions & 10 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,13 @@ pub async fn initialize_components(

if components.contains(&Component::HttpApi) {
storage_caches = Some(
build_storage_caches(configs, &replica_connection_pool, &mut task_futures)
.context("build_storage_caches()")?,
build_storage_caches(
configs,
&replica_connection_pool,
&mut task_futures,
stop_receiver.clone(),
)
.context("build_storage_caches()")?,
);

let started_at = Instant::now();
Expand Down Expand Up @@ -478,8 +483,13 @@ pub async fn initialize_components(
if components.contains(&Component::WsApi) {
let storage_caches = match storage_caches {
Some(storage_caches) => storage_caches,
None => build_storage_caches(configs, &replica_connection_pool, &mut task_futures)
.context("build_storage_caches()")?,
None => build_storage_caches(
configs,
&replica_connection_pool,
&mut task_futures,
stop_receiver.clone(),
)
.context("build_storage_caches()")?,
};

let started_at = Instant::now();
Expand Down Expand Up @@ -1115,6 +1125,7 @@ fn build_storage_caches(
configs: &TempConfigStore,
replica_connection_pool: &ConnectionPool,
task_futures: &mut Vec<JoinHandle<anyhow::Result<()>>>,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<PostgresStorageCaches> {
let rpc_config = configs
.web3_json_rpc_config
Expand All @@ -1127,12 +1138,9 @@ fn build_storage_caches(
PostgresStorageCaches::new(factory_deps_capacity, initial_writes_capacity);

if values_capacity > 0 {
let values_cache_task = storage_caches.configure_storage_values_cache(
values_capacity,
replica_connection_pool.clone(),
tokio::runtime::Handle::current(),
);
task_futures.push(tokio::task::spawn_blocking(values_cache_task));
let values_cache_task = storage_caches
.configure_storage_values_cache(values_capacity, replica_connection_pool.clone());
task_futures.push(tokio::task::spawn(values_cache_task.run(stop_receiver)));
}
Ok(storage_caches)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,10 @@ impl WiringLayer for TxSenderLayer {
PostgresStorageCaches::new(factory_deps_capacity, initial_writes_capacity);

if values_capacity > 0 {
let values_cache_task = storage_caches.configure_storage_values_cache(
values_capacity,
replica_pool.clone(),
context.runtime_handle().clone(),
);
let values_cache_task = storage_caches
.configure_storage_values_cache(values_capacity, replica_pool.clone());
context.add_task(Box::new(PostgresStorageCachesTask {
task: Box::new(values_cache_task),
task: values_cache_task,
}));
}

Expand Down Expand Up @@ -114,7 +111,7 @@ impl WiringLayer for TxSenderLayer {
}

struct PostgresStorageCachesTask {
task: Box<dyn FnOnce() -> anyhow::Result<()> + Send>,
task: zksync_state::PostgresStorageCachesTask,
}

impl fmt::Debug for PostgresStorageCachesTask {
Expand All @@ -130,8 +127,8 @@ impl Task for PostgresStorageCachesTask {
"postgres_storage_caches"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
tokio::task::spawn_blocking(self.task).await?
async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.task.run(stop_receiver.0).await
}
}

Expand Down

0 comments on commit ab532bb

Please sign in to comment.