Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): Fix "values cache update task failed" panics #1561

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/lib/state/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,10 @@ impl PostgresStorageCaches {
};
if values.cache.valid_for() < to_miniblock {
// Filter out no-op updates right away in order to not store lots of them in RAM.
values
.command_sender
.send(to_miniblock)
.expect("values cache update task failed");
// Since the task updating the values cache (`PostgresStorageCachesTask`) is cancel-aware,
// it can stop before some of `schedule_values_update()` calls; in this case, it's OK
// to ignore the updates.
values.command_sender.send(to_miniblock).ok();
}
}
}
Expand Down
44 changes: 32 additions & 12 deletions core/lib/state/src/postgres/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Tests for `PostgresStorage`.

use std::{collections::HashMap, mem};
use std::{collections::HashMap, mem, time::Duration};

use rand::{
rngs::StdRng,
Expand Down Expand Up @@ -384,11 +384,27 @@ impl ValuesCache {
}
}

async fn wait_for_cache_update(values_cache: &ValuesCache, target_miniblock: MiniblockNumber) {
tokio::time::timeout(Duration::from_secs(5), async {
loop {
let valid_for = values_cache.0.read().unwrap().valid_for;
assert!(valid_for <= target_miniblock, "{valid_for:?}");
if valid_for == target_miniblock {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("timed out waiting for cache update");
}

fn test_values_cache(pool: &ConnectionPool<Core>, rt_handle: Handle) {
let mut caches = PostgresStorageCaches::new(1_024, 1_024);
let _ = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone());
// We cannot use an update task since it requires having concurrent DB connections
// that don't work in tests. We'll update values cache manually instead.
let task = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone());
let (stop_sender, stop_receiver) = watch::channel(false);
let update_task_handle = tokio::task::spawn(task.run(stop_receiver));

let values_cache = caches.values.as_ref().unwrap().cache.clone();
let old_miniblock_assertions = values_cache.assertions(MiniblockNumber(0));
let new_miniblock_assertions = values_cache.assertions(MiniblockNumber(1));
Expand Down Expand Up @@ -451,15 +467,10 @@ fn test_values_cache(pool: &ConnectionPool<Core>, rt_handle: Handle) {
(non_existing_key, Some(H256::zero())),
]);

caches.schedule_values_update(MiniblockNumber(1));
storage
.rt_handle
.block_on(values_cache.update(
MiniblockNumber(0),
MiniblockNumber(1),
&mut storage.connection,
))
.unwrap();
assert_eq!(values_cache.0.read().unwrap().valid_for, MiniblockNumber(1));
.block_on(wait_for_cache_update(&values_cache, MiniblockNumber(1)));

assert_eq!(storage.read_value(&existing_key), H256::repeat_byte(1));
assert_eq!(storage.read_value(&non_existing_key), H256::repeat_byte(2));
Expand Down Expand Up @@ -487,14 +498,23 @@ fn test_values_cache(pool: &ConnectionPool<Core>, rt_handle: Handle) {
MiniblockNumber(0),
true,
)
.with_caches(caches);
.with_caches(caches.clone());

assert_eq!(storage.read_value(&existing_key), initial_value);
assert_eq!(storage.read_value(&non_existing_key), StorageValue::zero());
assert_eq!(storage.read_value(&unmodified_key), unmodified_value);

// None of the cache entries should be modified.
assert_final_cache();

stop_sender.send_replace(true);
storage
.rt_handle
.block_on(update_task_handle)
.expect("update task panicked")
.unwrap();
// Check that `schedule_values_update()` doesn't panic after the update task is finished.
caches.schedule_values_update(MiniblockNumber(2));
}

#[tokio::test]
Expand Down
Loading