/
lib.rs
320 lines (288 loc) · 11.2 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
//! This module wraps XPCOM threading functions with Rust functions
//! to make it safer and more convenient to call the XPCOM functions.
//! It also provides the Task trait and TaskRunnable struct,
//! which make it easier to dispatch tasks to threads.
extern crate libc;
extern crate nserror;
extern crate nsstring;
extern crate xpcom;
use nserror::{nsresult, NS_OK};
use nsstring::{nsACString, nsCString};
use std::{
ffi::CStr,
marker::PhantomData,
mem, ptr,
sync::atomic::{AtomicBool, Ordering},
};
use xpcom::{
getter_addrefs,
interfaces::{nsIEventTarget, nsIRunnable, nsISerialEventTarget, nsISupports, nsIThread},
xpcom, xpcom_method, AtomicRefcnt, RefCounted, RefPtr, XpCom,
};
extern "C" {
fn NS_GetCurrentThreadEventTarget(result: *mut *const nsIThread) -> nsresult;
fn NS_GetMainThreadEventTarget(result: *mut *const nsIThread) -> nsresult;
fn NS_IsMainThread() -> bool;
fn NS_NewNamedThreadWithDefaultStackSize(
name: *const nsACString,
result: *mut *const nsIThread,
event: *const nsIRunnable,
) -> nsresult;
fn NS_IsCurrentThread(thread: *const nsIEventTarget) -> bool;
fn NS_ProxyReleaseISupports(
name: *const libc::c_char,
target: *const nsIEventTarget,
doomed: *const nsISupports,
always_proxy: bool,
);
fn NS_CreateBackgroundTaskQueue(
name: *const libc::c_char,
target: *mut *const nsISerialEventTarget,
) -> nsresult;
fn NS_DispatchBackgroundTask(event: *const nsIRunnable, flags: u32) -> nsresult;
}
pub fn get_current_thread() -> Result<RefPtr<nsIThread>, nsresult> {
getter_addrefs(|p| unsafe { NS_GetCurrentThreadEventTarget(p) })
}
pub fn get_main_thread() -> Result<RefPtr<nsIThread>, nsresult> {
getter_addrefs(|p| unsafe { NS_GetMainThreadEventTarget(p) })
}
pub fn is_main_thread() -> bool {
unsafe { NS_IsMainThread() }
}
pub fn create_thread(name: &str) -> Result<RefPtr<nsIThread>, nsresult> {
getter_addrefs(|p| unsafe {
NS_NewNamedThreadWithDefaultStackSize(&*nsCString::from(name), p, ptr::null())
})
}
pub fn is_current_thread(thread: &nsIThread) -> bool {
unsafe { NS_IsCurrentThread(thread.coerce()) }
}
/// Creates a queue that runs tasks on the background thread pool. The tasks
/// will run in the order they're dispatched, one after the other.
pub fn create_background_task_queue(
name: &'static CStr,
) -> Result<RefPtr<nsISerialEventTarget>, nsresult> {
getter_addrefs(|p| unsafe { NS_CreateBackgroundTaskQueue(name.as_ptr(), p) })
}
/// Dispatches a one-shot task runnable to the background thread pool with the
/// default options.
#[inline]
pub fn dispatch_background_task(runnable: RefPtr<nsIRunnable>) -> Result<(), nsresult> {
dispatch_background_task_with_options(runnable, DispatchOptions::default())
}
/// Dispatches a one-shot task runnable to the background thread pool with the
/// given options. The task may run concurrently with other background tasks.
/// If you need tasks to run in a specific order, please create a background
/// task queue using `create_background_task_queue`, and dispatch tasks to it
/// instead.
///
/// ### Safety
///
/// This function leaks the runnable if dispatch fails. This avoids a race where
/// a runnable can be destroyed on either the original or target thread, which
/// is important if the runnable holds thread-unsafe members.
pub fn dispatch_background_task_with_options(
runnable: RefPtr<nsIRunnable>,
options: DispatchOptions,
) -> Result<(), nsresult> {
// This eventually calls the non-`already_AddRefed<nsIRunnable>` overload of
// `nsIEventTarget::Dispatch` (see xpcom/threads/nsIEventTarget.idl#20-25),
// which adds an owning reference and leaks if dispatch fails.
unsafe { NS_DispatchBackgroundTask(runnable.coerce(), options.flags()) }.to_result()
}
/// Options to control how task runnables are dispatched.
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub struct DispatchOptions(u32);
impl Default for DispatchOptions {
#[inline]
fn default() -> Self {
DispatchOptions(nsIEventTarget::DISPATCH_NORMAL as u32)
}
}
impl DispatchOptions {
/// Creates a blank set of options. The runnable will be dispatched using
/// the default mode.
#[inline]
pub fn new() -> Self {
DispatchOptions::default()
}
/// Indicates whether or not the dispatched runnable may block its target
/// thread by waiting on I/O. If `true`, the runnable may be dispatched to a
/// dedicated thread pool, leaving the main pool free for CPU-bound tasks.
#[inline]
pub fn may_block(self, may_block: bool) -> DispatchOptions {
const FLAG: u32 = nsIEventTarget::DISPATCH_EVENT_MAY_BLOCK as u32;
if may_block {
DispatchOptions(self.flags() | FLAG)
} else {
DispatchOptions(self.flags() & !FLAG)
}
}
/// Returns the set of bitflags to pass to `DispatchFromScript`.
#[inline]
fn flags(self) -> u32 {
self.0
}
}
/// A task represents an operation that asynchronously executes on a target
/// thread, and returns its result to the original thread.
pub trait Task {
fn run(&self);
fn done(&self) -> Result<(), nsresult>;
}
/// The struct responsible for dispatching a Task by calling its run() method
/// on the target thread and returning its result by calling its done() method
/// on the original thread.
///
/// The struct uses its has_run field to determine whether it should call
/// run() or done(). It could instead check if task.result is Some or None,
/// but if run() failed to set task.result, then it would loop infinitely.
#[derive(xpcom)]
#[xpimplements(nsIRunnable, nsINamed)]
#[refcnt = "atomic"]
pub struct InitTaskRunnable {
name: &'static str,
original_thread: RefPtr<nsIThread>,
task: Box<dyn Task + Send + Sync>,
has_run: AtomicBool,
}
impl TaskRunnable {
pub fn new(
name: &'static str,
task: Box<dyn Task + Send + Sync>,
) -> Result<RefPtr<TaskRunnable>, nsresult> {
Ok(TaskRunnable::allocate(InitTaskRunnable {
name,
original_thread: get_current_thread()?,
task,
has_run: AtomicBool::new(false),
}))
}
/// Dispatches this task runnable to an event target with the default
/// options.
#[inline]
pub fn dispatch(this: RefPtr<Self>, target: &nsIEventTarget) -> Result<(), nsresult> {
Self::dispatch_with_options(this, target, DispatchOptions::default())
}
/// Dispatches this task runnable to an event target, like a thread or a
/// task queue, with the given options.
///
/// Note that this is an associated function, not a method, because it takes
/// an owned reference to the runnable, and must be called like
/// `TaskRunnable::dispatch_with_options(runnable, options)` and *not*
/// `runnable.dispatch_with_options(options)`.
///
/// ### Safety
///
/// This function leaks the runnable if dispatch fails.
pub fn dispatch_with_options(
this: RefPtr<Self>,
target: &nsIEventTarget,
options: DispatchOptions,
) -> Result<(), nsresult> {
unsafe { target.DispatchFromScript(this.coerce(), options.flags()) }.to_result()
}
xpcom_method!(run => Run());
fn run(&self) -> Result<(), nsresult> {
match self
.has_run
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => {
assert!(!is_current_thread(&self.original_thread));
self.task.run();
Self::dispatch(RefPtr::new(self), &self.original_thread)
}
Err(_) => {
assert!(is_current_thread(&self.original_thread));
self.task.done()
}
}
}
xpcom_method!(get_name => GetName() -> nsACString);
fn get_name(&self) -> Result<nsCString, nsresult> {
Ok(nsCString::from(self.name))
}
}
pub type ThreadPtrHandle<T> = RefPtr<ThreadPtrHolder<T>>;
/// A Rust analog to `nsMainThreadPtrHolder` that wraps an `nsISupports` object
/// with thread-safe refcounting. The holder keeps one reference to the wrapped
/// object that's released when the holder's refcount reaches zero.
pub struct ThreadPtrHolder<T: XpCom + 'static> {
ptr: *const T,
marker: PhantomData<T>,
name: &'static CStr,
owning_thread: RefPtr<nsIThread>,
refcnt: AtomicRefcnt,
}
unsafe impl<T: XpCom + 'static> Send for ThreadPtrHolder<T> {}
unsafe impl<T: XpCom + 'static> Sync for ThreadPtrHolder<T> {}
unsafe impl<T: XpCom + 'static> RefCounted for ThreadPtrHolder<T> {
unsafe fn addref(&self) {
self.refcnt.inc();
}
unsafe fn release(&self) {
let rc = self.refcnt.dec();
if rc == 0 {
// Once the holder's count reaches zero, release the wrapped
// object...
if !self.ptr.is_null() {
// The holder can be released on any thread. If we're on the
// owning thread, we can release the object directly. Otherwise,
// we need to post a proxy release event to release the object
// on the owning thread.
if is_current_thread(&self.owning_thread) {
(*self.ptr).release()
} else {
NS_ProxyReleaseISupports(
self.name.as_ptr(),
self.owning_thread.coerce(),
self.ptr as *const T as *const nsISupports,
false,
);
}
}
// ...And deallocate the holder.
Box::from_raw(self as *const Self as *mut Self);
}
}
}
impl<T: XpCom + 'static> ThreadPtrHolder<T> {
/// Creates a new owning thread pointer holder. Returns an error if the
/// thread manager has shut down. Panics if `name` isn't a valid C string.
pub fn new(name: &'static CStr, ptr: RefPtr<T>) -> Result<RefPtr<Self>, nsresult> {
let owning_thread = get_current_thread()?;
// Take ownership of the `RefPtr`. This does _not_ decrement its
// refcount, which is what we want. Once we've released all references
// to the holder, we'll release the wrapped `RefPtr`.
let raw: *const T = &*ptr;
mem::forget(ptr);
unsafe {
let boxed = Box::new(ThreadPtrHolder {
name,
ptr: raw,
marker: PhantomData,
owning_thread,
refcnt: AtomicRefcnt::new(),
});
Ok(RefPtr::from_raw(Box::into_raw(boxed)).unwrap())
}
}
/// Returns the wrapped object's owning thread.
pub fn owning_thread(&self) -> &nsIThread {
&self.owning_thread
}
/// Returns the wrapped object if called from the owning thread, or
/// `None` if called from any other thread.
pub fn get(&self) -> Option<&T> {
if is_current_thread(&self.owning_thread) && !self.ptr.is_null() {
unsafe { Some(&*self.ptr) }
} else {
None
}
}
}