Skip to content

Commit

Permalink
Bug 1639018 - Change TaskRunnable::dispatch to take owned runnables…
Browse files Browse the repository at this point in the history
…. r=froydnj

This matches how the `Dispatch(already_AddRefed<nsIRunnable>)`
overloads work in C++: `Dispatch` takes ownership of the runnable, and
leaks it if dispatch fails—because the thread manager is shutting down,
for instance. This avoids a race where a runnable can be released on
either the owning or target thread.

Rust doesn't allow arbitrary `Self` types yet (see
rust-lang/rust#44874), so we need to change `dispatch` and
`dispatch_with_options` to be associated methods.

Differential Revision: https://phabricator.services.mozilla.com/D75858
  • Loading branch information
linabutler committed May 20, 2020
1 parent 28cc647 commit faf2fd1
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 41 deletions.
12 changes: 6 additions & 6 deletions security/manager/ssl/cert_storage/src/lib.rs
Expand Up @@ -1164,7 +1164,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("HasPriorData", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}

Expand Down Expand Up @@ -1226,7 +1226,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("SetRevocations", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}

Expand Down Expand Up @@ -1305,7 +1305,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("SetCRLiteState", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}

Expand Down Expand Up @@ -1351,7 +1351,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("SetFullCRLiteFilter", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}

Expand Down Expand Up @@ -1417,7 +1417,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("AddCerts", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}

Expand Down Expand Up @@ -1445,7 +1445,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("RemoveCertsByHashes", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}

Expand Down
Expand Up @@ -372,7 +372,11 @@ impl PuntTask {
let runnable = TaskRunnable::new(self.name, Box::new(self))?;
// `may_block` schedules the task on the I/O thread pool, since we
// expect most operations to wait on I/O.
runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
target,
DispatchOptions::default().may_block(true),
)?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion services/sync/golden_gate/src/log.rs
Expand Up @@ -118,7 +118,7 @@ impl Log for LogSink {
};
let _ =
TaskRunnable::new("extension_storage_sync::Logger::log", Box::new(task))
.and_then(|r| r.dispatch(logger.owning_thread()));
.and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
}
Err(_) => {}
}
Expand Down
12 changes: 10 additions & 2 deletions services/sync/golden_gate/src/task.rs
Expand Up @@ -196,7 +196,11 @@ where
let runnable = TaskRunnable::new(self.ferry.name(), Box::new(self))?;
// `may_block` schedules the task on the I/O thread pool, since we
// expect most operations to wait on I/O.
runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
target,
DispatchOptions::default().may_block(true),
)?;
Ok(())
}
}
Expand Down Expand Up @@ -339,7 +343,11 @@ where
/// Dispatches the task to the given thread `target`.
pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> {
let runnable = TaskRunnable::new(Self::name(), Box::new(self))?;
runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
target,
DispatchOptions::default().may_block(true),
)?;
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion toolkit/components/bitsdownload/src/bits_interface/mod.rs
Expand Up @@ -130,7 +130,7 @@ impl BitsService {
let runnable = TaskRunnable::new(task_runnable_name, task).map_err(|rv| {
BitsTaskError::from_nsresult(FailedToConstructTaskRunnable, action, Pretask, rv)
})?;
runnable.dispatch(&command_thread).map_err(|rv| {
TaskRunnable::dispatch(runnable, &command_thread).map_err(|rv| {
BitsTaskError::from_nsresult(FailedToDispatchRunnable, action, Pretask, rv)
})
}
Expand Down
Expand Up @@ -77,8 +77,11 @@ impl StorageSyncArea {
let task = PuntTask::new(Arc::downgrade(&*self.store()?), punt, callback)?;
let runnable = TaskRunnable::new(name, Box::new(task))?;
// `may_block` schedules the runnable on a dedicated I/O pool.
runnable
.dispatch_with_options(self.queue.coerce(), DispatchOptions::new().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
self.queue.coerce(),
DispatchOptions::new().may_block(true),
)?;
Ok(())
}

Expand Down Expand Up @@ -240,15 +243,8 @@ impl StorageSyncArea {
Some(store) => {
// Interrupt any currently-running statements.
store.interrupt();
// If dispatching the runnable fails, we'll drop the store and
// close its database connection on the main thread. This is a
// last resort, and can also happen if the last `RefPtr` to this
// storage area is released without calling `teardown`. In that
// case, the destructor for `self.store` will run, which
// automatically closes its database connection. mozStorage's
// `Connection::Release` also falls back to closing the
// connection on the main thread if it can't dispatch to the
// background thread.
// If dispatching the runnable fails, we'll leak the store
// without closing its database connection.
teardown(&self.queue, store, callback)?;
}
None => return Err(Error::AlreadyTornDown),
Expand All @@ -264,7 +260,11 @@ fn teardown(
) -> Result<()> {
let task = TeardownTask::new(store, callback)?;
let runnable = TaskRunnable::new(TeardownTask::name(), Box::new(task))?;
runnable.dispatch_with_options(queue.coerce(), DispatchOptions::new().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
queue.coerce(),
DispatchOptions::new().may_block(true),
)?;
Ok(())
}

Expand Down
16 changes: 8 additions & 8 deletions toolkit/components/kvstore/src/lib.rs
Expand Up @@ -134,7 +134,7 @@ impl KeyValueService {
nsCString::from(name),
));

TaskRunnable::new("KVService::GetOrCreate", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVService::GetOrCreate", task)?, thread)
}
}

Expand Down Expand Up @@ -182,7 +182,7 @@ impl KeyValueDatabase {

let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;

TaskRunnable::new("KVDatabase::Put", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Put", task)?, thread)
}

xpcom_method!(
Expand Down Expand Up @@ -220,7 +220,7 @@ impl KeyValueDatabase {

let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;

TaskRunnable::new("KVDatabase::WriteMany", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::WriteMany", task)?, thread)
}

xpcom_method!(
Expand All @@ -247,7 +247,7 @@ impl KeyValueDatabase {

let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;

TaskRunnable::new("KVDatabase::Get", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Get", task)?, thread)
}

xpcom_method!(
Expand All @@ -264,7 +264,7 @@ impl KeyValueDatabase {

let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;

TaskRunnable::new("KVDatabase::Has", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Has", task)?, thread)
}

xpcom_method!(
Expand All @@ -281,7 +281,7 @@ impl KeyValueDatabase {

let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;

TaskRunnable::new("KVDatabase::Delete", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Delete", task)?, thread)
}

xpcom_method!(
Expand All @@ -297,7 +297,7 @@ impl KeyValueDatabase {

let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;

TaskRunnable::new("KVDatabase::Clear", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Clear", task)?, thread)
}

xpcom_method!(
Expand All @@ -324,7 +324,7 @@ impl KeyValueDatabase {

let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;

TaskRunnable::new("KVDatabase::Enumerate", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Enumerate", task)?, thread)
}
}

Expand Down
4 changes: 2 additions & 2 deletions toolkit/components/places/bookmark_sync/src/driver.rs
Expand Up @@ -100,7 +100,7 @@ impl dogear::Driver for Driver {
"bookmark_sync::Driver::record_telemetry_event",
Box::new(task),
)
.and_then(|r| r.dispatch(progress.owning_thread()));
.and_then(|r| TaskRunnable::dispatch(r, progress.owning_thread()));
}
}
}
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Log for Logger {
message,
};
let _ = TaskRunnable::new("bookmark_sync::Logger::log", Box::new(task))
.and_then(|r| r.dispatch(logger.owning_thread()));
.and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
}
Err(_) => {}
}
Expand Down
2 changes: 1 addition & 1 deletion toolkit/components/places/bookmark_sync/src/merger.rs
Expand Up @@ -108,7 +108,7 @@ impl SyncedBookmarksMerger {
"bookmark_sync::SyncedBookmarksMerger::merge",
Box::new(task),
)?;
runnable.dispatch(&async_thread)?;
TaskRunnable::dispatch(runnable, &async_thread)?;
let op = MergeOp::new(controller);
Ok(RefPtr::new(op.coerce()))
}
Expand Down
2 changes: 1 addition & 1 deletion toolkit/components/xulstore/src/persist.rs
Expand Up @@ -176,7 +176,7 @@ pub(crate) fn persist(key: String, value: Option<String>) -> XULStoreResult<()>
.ok_or(XULStoreError::Unavailable)?
.get_ref()
.ok_or(XULStoreError::Unavailable)?;
TaskRunnable::new("XULStore::Persist", task)?.dispatch(thread)?;
TaskRunnable::dispatch(TaskRunnable::new("XULStore::Persist", task)?, thread)?;
}

// Now insert the key/value pair into the map. The unwrap() call here
Expand Down
25 changes: 19 additions & 6 deletions xpcom/rust/moz_task/src/lib.rs
Expand Up @@ -183,17 +183,30 @@ impl TaskRunnable {
}))
}

/// Dispatches this task runnable to an event target with the default
/// options.
#[inline]
pub fn dispatch(&self, target_thread: &nsIEventTarget) -> Result<(), nsresult> {
self.dispatch_with_options(target_thread, DispatchOptions::default())
pub fn dispatch(this: RefPtr<Self>, target: &nsIEventTarget) -> Result<(), nsresult> {
Self::dispatch_with_options(this, target, DispatchOptions::default())
}

/// Dispatches this task runnable to an event target, like a thread or a
/// task queue, with the given options.
///
/// Note that this is an associated function, not a method, because it takes
/// an owned reference to the runnable, and must be called like
/// `TaskRunnable::dispatch_with_options(runnable, options)` and *not*
/// `runnable.dispatch_with_options(options)`.
///
/// ### Safety
///
/// This function leaks the runnable if dispatch fails.
pub fn dispatch_with_options(
&self,
target_thread: &nsIEventTarget,
this: RefPtr<Self>,
target: &nsIEventTarget,
options: DispatchOptions,
) -> Result<(), nsresult> {
unsafe { target_thread.DispatchFromScript(self.coerce(), options.flags()) }.to_result()
unsafe { target.DispatchFromScript(this.coerce(), options.flags()) }.to_result()
}

xpcom_method!(run => Run());
Expand All @@ -205,7 +218,7 @@ impl TaskRunnable {
Ok(_) => {
assert!(!is_current_thread(&self.original_thread));
self.task.run();
self.dispatch(&self.original_thread)
Self::dispatch(RefPtr::new(self), &self.original_thread)
}
Err(_) => {
assert!(is_current_thread(&self.original_thread));
Expand Down

0 comments on commit faf2fd1

Please sign in to comment.