forked from lemunozm/message-io
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread.rs
113 lines (102 loc) · 3.5 KB
/
thread.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::thread::{self, JoinHandle};
/// A comprensive error message to notify that the error shown is from other thread.
pub const OTHER_THREAD_ERR: &str = "Avoid this 'panicked_at' error. \
This error is shown because other thread has panicked \
You can safety skip this error.";
/// Thread similar to the std, but with a name that can be nested.
pub struct NamespacedThread<T: Send + 'static> {
namespace: String,
join_handle: Option<JoinHandle<T>>,
}
impl<T: Send + 'static> NamespacedThread<T> {
/// Similar to [`thread::spawn()`] but with a name.
pub fn spawn<F>(name: &str, f: F) -> Self
where
F: FnOnce() -> T,
F: Send + 'static,
{
let namespace = format!("{}/{}", thread::current().name().unwrap_or(""), name);
Self {
namespace: namespace.clone(),
join_handle: Some(
thread::Builder::new()
.name(namespace.clone())
.spawn(move || {
log::trace!("Thread [{}] spawned", namespace);
f()
})
.unwrap(),
),
}
}
/// Wait the thread to finish.
pub fn join(&mut self) -> T {
log::trace!("Join thread [{}] ...", self.namespace);
let content = self.join_handle.take().unwrap().join().expect(OTHER_THREAD_ERR);
log::trace!("Joined thread [{}]", self.namespace);
content
}
/// Wait the thread to finish.
/// Returns the inner `T` value if never was joined, `None` otherwise
pub fn try_join(&mut self) -> Option<T> {
if self.join_handle.is_some() {
return Some(self.join())
}
None
}
}
impl<T: Send + 'static> Drop for NamespacedThread<T> {
fn drop(&mut self) {
self.try_join();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc};
#[test]
fn basic_usage() {
let called = Arc::new(AtomicBool::new(false));
let mut thread = {
let called = called.clone();
NamespacedThread::spawn("test", move || {
std::thread::sleep(Duration::from_millis(500));
called.store(true, Ordering::Relaxed);
})
};
std::thread::sleep(Duration::from_millis(250));
assert!(!called.load(Ordering::Relaxed));
std::thread::sleep(Duration::from_millis(500));
assert!(called.load(Ordering::Relaxed));
thread.join();
}
#[test]
fn join_result() {
let called = Arc::new(AtomicBool::new(false));
let mut thread = {
let called = called.clone();
NamespacedThread::spawn("test", move || {
std::thread::sleep(Duration::from_millis(500));
called.store(true, Ordering::Relaxed);
"result"
})
};
assert_eq!("result", thread.join());
assert!(called.load(Ordering::Relaxed));
}
#[test]
fn drop_implies_join() {
let called = Arc::new(AtomicBool::new(false));
let thread = {
let called = called.clone();
NamespacedThread::spawn("test", move || {
std::thread::sleep(Duration::from_millis(500));
called.store(true, Ordering::Relaxed);
})
};
drop(thread);
assert!(called.load(Ordering::Relaxed));
}
}