Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement "opinionated cancellation" #265

Merged
merged 11 commits into from
May 27, 2021
18 changes: 11 additions & 7 deletions components/salsa-macros/src/query_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,14 +447,18 @@ pub(crate) fn query_group(args: TokenStream, input: TokenStream) -> TokenStream
/// deadlock.
///
/// Before blocking, the thread that is attempting to `set` will
/// also set a cancellation flag. In the threads operating on
/// snapshots, you can use the [`is_current_revision_canceled`]
/// method to check for this flag and bring those operations to a
/// close, thus allowing the `set` to succeed. Ignoring this flag
/// may lead to "starvation", meaning that the thread attempting
/// to `set` has to wait a long, long time. =)
/// also set a cancellation flag. This will cause any query
/// invocations in other threads to unwind with a `Canceled`
/// sentinel value and eventually let the `set` succeed once all
/// threads have unwound past the salsa invocation.
///
/// [`is_current_revision_canceled`]: struct.Runtime.html#method.is_current_revision_canceled
/// If your query implementations are performing expensive
/// operations without invoking another query, you can also use
/// the `Runtime::unwind_if_cancelled` method to check for an
/// ongoing cancelation and bring those operations to a close,
/// thus allowing the `set` to succeed. Otherwise, long-running
/// computations may lead to "starvation", meaning that the
/// thread attempting to `set` has to wait a long, long time. =)
#trait_vis fn in_db_mut(self, db: &mut #dyn_db) -> salsa::QueryTableMut<'_, Self>
{
salsa::plumbing::get_query_table_mut::<#qt>(db)
Expand Down
2 changes: 2 additions & 0 deletions src/derived.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ where
db: &<Q as QueryDb<'_>>::DynDb,
key: &Q::Key,
) -> Result<Q::Value, CycleError<DatabaseKeyIndex>> {
db.salsa_runtime().unwind_if_canceled();

let slot = self.slot(key);
let StampedValue {
value,
Expand Down
2 changes: 2 additions & 0 deletions src/derived/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ where
let runtime = db.salsa_runtime();
let revision_now = runtime.current_revision();

runtime.unwind_if_canceled();

debug!(
"maybe_changed_since({:?}) called with revision={:?}, revision_now={:?}",
self, revision, revision_now,
Expand Down
2 changes: 2 additions & 0 deletions src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ where
db: &<Q as QueryDb<'_>>::DynDb,
key: &Q::Key,
) -> Result<Q::Value, CycleError<DatabaseKeyIndex>> {
db.salsa_runtime().unwind_if_canceled();

let slot = self
.slot(key)
.unwrap_or_else(|| panic!("no value set for {:?}({:?})", Q::default(), key));
Expand Down
2 changes: 2 additions & 0 deletions src/interned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ where
db: &<Q as QueryDb<'_>>::DynDb,
key: &Q::Key,
) -> Result<Q::Value, CycleError<DatabaseKeyIndex>> {
db.salsa_runtime().unwind_if_canceled();

let slot = self.intern_index(db, key);
let changed_at = slot.interned_at;
let index = slot.index;
Expand Down
45 changes: 40 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ use crate::plumbing::LruQueryStorageOps;
use crate::plumbing::QueryStorageMassOps;
use crate::plumbing::QueryStorageOps;
pub use crate::revision::Revision;
use std::fmt::{self, Debug};
use std::hash::Hash;
use std::sync::Arc;
use std::{
fmt::{self, Debug},
panic::{self, UnwindSafe},
};

pub use crate::durability::Durability;
pub use crate::intern_id::InternId;
Expand Down Expand Up @@ -74,7 +77,7 @@ pub trait Database: plumbing::DatabaseOps {
/// This function is invoked when a dependent query is being computed by the
/// other thread, and that thread panics.
fn on_propagated_panic(&self) -> ! {
panic!("concurrent salsa query panicked")
Canceled::throw()
}
jonas-schievink marked this conversation as resolved.
Show resolved Hide resolved

/// Gives access to the underlying salsa runtime.
Expand Down Expand Up @@ -277,11 +280,9 @@ pub trait ParallelDatabase: Database + Send {
/// series of queries in parallel and arranging the results. Using
/// this method for that purpose ensures that those queries will
/// see a consistent view of the database (it is also advisable
/// for those queries to use the [`is_current_revision_canceled`]
/// for those queries to use the [`Runtime::unwind_if_cancelled`]
/// method to check for cancellation).
///
/// [`is_current_revision_canceled`]: struct.Runtime.html#method.is_current_revision_canceled
///
/// # Panics
///
/// It is not permitted to create a snapshot from inside of a
Expand Down Expand Up @@ -644,6 +645,40 @@ where
}
}

/// A panic payload indicating that a salsa revision was canceled.
#[derive(Debug)]
pub struct Canceled {
jonas-schievink marked this conversation as resolved.
Show resolved Hide resolved
_private: (),
}
jonas-schievink marked this conversation as resolved.
Show resolved Hide resolved

impl Canceled {
fn throw() -> ! {
jonas-schievink marked this conversation as resolved.
Show resolved Hide resolved
std::panic::resume_unwind(Box::new(Self { _private: () }));
}
}

impl std::fmt::Display for Canceled {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("canceled")
}
}

impl std::error::Error for Canceled {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have this, but this was not obvious to me, so let me write down justification.

I would expect Cancellation to not be treated as an error. Cancellation is a serendipitous success. It would be wrong to propagate Cancelled as Error. However, in salsa itself Cancelled is not used throughout the API, as we use unwinding, its used only at the boundady. For boundaries, I think treating it as an Error is probably fine


/// Runs `f`, and catches any salsa cancelation.
pub fn catch_cancellation<F, T>(f: F) -> Result<T, Canceled>
jonas-schievink marked this conversation as resolved.
Show resolved Hide resolved
where
F: FnOnce() -> T + UnwindSafe,
{
match panic::catch_unwind(f) {
Ok(t) => Ok(t),
Err(payload) => match payload.downcast() {
Ok(canceled) => Err(*canceled),
Err(payload) => panic::resume_unwind(payload),
},
}
}

// Re-export the procedural macros.
#[allow(unused_imports)]
#[macro_use]
Expand Down
106 changes: 16 additions & 90 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::durability::Durability;
use crate::plumbing::CycleDetected;
use crate::revision::{AtomicRevision, Revision};
use crate::{durability::Durability, Canceled};
use crate::{CycleError, Database, DatabaseKeyIndex, Event, EventKind};
use log::debug;
use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive};
Expand Down Expand Up @@ -156,91 +156,33 @@ impl Runtime {
self.shared_state.pending_revision.load()
}

/// Check if the current revision is canceled. If this method ever
/// returns true, the currently executing query is also marked as
/// having an *untracked read* -- this means that, in the next
/// revision, we will always recompute its value "as if" some
/// input had changed. This means that, if your revision is
/// canceled (which indicates that current query results will be
/// ignored) your query is free to shortcircuit and return
/// whatever it likes.
/// Starts unwinding the stack if the current revision is canceled.
///
/// This method is useful for implementing cancellation of queries.
/// You can do it in one of two ways, via `Result`s or via unwinding.
/// This method can be called by query implementations that perform
/// potentially expensive computations, in order to speed up propagation of
/// cancelation.
///
/// The `Result` approach looks like this:
///
/// * Some queries invoke `is_current_revision_canceled` and
/// return a special value, like `Err(Canceled)`, if it returns
/// `true`.
/// * Other queries propagate the special value using `?` operator.
/// * API around top-level queries checks if the result is `Ok` or
/// `Err(Canceled)`.
///
/// The `panic` approach works in a similar way:
///
/// * Some queries invoke `is_current_revision_canceled` and
/// panic with a special value, like `Canceled`, if it returns
/// true.
/// * The implementation of `Database` trait overrides
/// `on_propagated_panic` to throw this special value as well.
/// This way, panic gets propagated naturally through dependant
/// queries, even across the threads.
/// * API around top-level queries converts a `panic` into `Result` by
/// catching the panic (using either `std::panic::catch_unwind` or
/// threads) and downcasting the payload to `Canceled` (re-raising
/// panic if downcast fails).
///
/// Note that salsa is explicitly designed to be panic-safe, so cancellation
/// via unwinding is 100% valid approach to cancellation.
/// Cancelation will automatically be triggered by salsa on any query
/// invocation.
#[inline]
pub fn is_current_revision_canceled(&self) -> bool {
pub fn unwind_if_canceled(&self) {
jonas-schievink marked this conversation as resolved.
Show resolved Hide resolved
let current_revision = self.current_revision();
let pending_revision = self.pending_revision();
debug!(
"is_current_revision_canceled: current_revision={:?}, pending_revision={:?}",
"unwind_if_cancelled: current_revision={:?}, pending_revision={:?}",
current_revision, pending_revision
);
if pending_revision > current_revision {
self.report_untracked_read();
true
} else {
// Subtle: If the current revision is not canceled, we
// still report an **anonymous** read, which will bump up
// the revision number to be at least the last
// non-canceled revision. This is needed to ensure
// deterministic reads and avoid salsa-rs/salsa#66. The
// specific scenario we are trying to avoid is tested by
// `no_back_dating_in_cancellation`; it works like
// this. Imagine we have 3 queries, where Query3 invokes
// Query2 which invokes Query1. Then:
//
// - In Revision R1:
// - Query1: Observes cancelation and returns sentinel S.
// - Recorded inputs: Untracked, because we observed cancelation.
// - Query2: Reads Query1 and propagates sentinel S.
// - Recorded inputs: Query1, changed-at=R1
// - Query3: Reads Query2 and propagates sentinel S. (Inputs = Query2, ChangedAt R1)
// - Recorded inputs: Query2, changed-at=R1
// - In Revision R2:
// - Query1: Observes no cancelation. All of its inputs last changed in R0,
// so it returns a valid value with "changed at" of R0.
// - Recorded inputs: ..., changed-at=R0
// - Query2: Recomputes its value and returns correct result.
// - Recorded inputs: Query1, changed-at=R0 <-- key problem!
// - Query3: sees that Query2's result last changed in R0, so it thinks it
// can re-use its value from R1 (which is the sentinel value).
//
// The anonymous read here prevents that scenario: Query1
// winds up with a changed-at setting of R2, which is the
// "pending revision", and hence Query2 and Query3
// are recomputed.
assert_eq!(pending_revision, current_revision);
self.report_anon_read(pending_revision);
false
self.unwind_canceled();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This simplification is quite nice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried that there could be a subtle bug if we allowed users to override cancelation here.

}
}

#[cold]
fn unwind_canceled(&self) {
self.report_untracked_read();
Canceled::throw();
}

/// Acquires the **global query write lock** (ensuring that no queries are
/// executing) and then increments the current revision counter; invokes
/// `op` with the global query write lock still held.
Expand Down Expand Up @@ -381,18 +323,6 @@ impl Runtime {
self.local_state.report_synthetic_read(durability);
}

/// An "anonymous" read is a read that doesn't come from executing
/// a query, but from some other internal operation. It just
/// modifies the "changed at" to be at least the given revision.
/// (It also does not disqualify a query from being considered
/// constant, since it is used for queries that don't give back
/// actual *data*.)
///
/// This is used when queries check if they have been canceled.
fn report_anon_read(&self, revision: Revision) {
self.local_state.report_anon_read(revision)
}

/// Obviously, this should be user configurable at some point.
pub(crate) fn report_unexpected_cycle(
&self,
Expand Down Expand Up @@ -639,10 +569,6 @@ impl ActiveQuery {
fn add_synthetic_read(&mut self, durability: Durability) {
self.durability = self.durability.min(durability);
}

fn add_anon_read(&mut self, changed_at: Revision) {
self.changed_at = self.changed_at.max(changed_at);
}
}

/// A unique identifier for a particular runtime. Each time you create
Expand Down
6 changes: 0 additions & 6 deletions src/runtime/local_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ impl LocalState {
top_query.add_synthetic_read(durability);
}
}

pub(super) fn report_anon_read(&self, revision: Revision) {
if let Some(top_query) = self.query_stack.borrow_mut().last_mut() {
top_query.add_anon_read(revision);
}
}
}

impl std::panic::RefUnwindSafe for LocalState {}
Expand Down
Loading