1
- use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
1
+ use std:: sync:: atomic:: { AtomicUsize , AtomicI32 , Ordering } ;
2
2
use std:: sync:: mpsc:: channel;
3
- use std:: sync:: { Arc , Mutex } ;
3
+ use std:: sync:: { Arc , Barrier , Mutex } ;
4
4
use std:: thread;
5
5
use std:: thread:: sleep;
6
6
use std:: time:: Duration ;
7
7
8
8
use fast_threadpool:: ThreadPoolConfig ;
9
- use futures_lite:: * ;
10
9
use rayon;
11
10
use rusty_pool;
11
+ use tokio;
12
12
13
13
fn fib ( n : usize ) -> usize {
14
14
if n == 0 || n == 1 {
@@ -27,6 +27,31 @@ pub fn rayon_threadpool() {
27
27
println ! ( "{}" , n) ;
28
28
}
29
29
30
+ scoped_tls:: scoped_thread_local!( static POOL_DATA : Vec <i32 >) ;
31
+ pub fn rayon_threadpool2 ( ) {
32
+ let pool_data = vec ! [ 1 , 2 , 3 ] ;
33
+
34
+ // We haven't assigned any TLS data yet.
35
+ assert ! ( !POOL_DATA . is_set( ) ) ;
36
+
37
+ rayon:: ThreadPoolBuilder :: new ( )
38
+ . build_scoped (
39
+ // Borrow `pool_data` in TLS for each thread.
40
+ |thread| POOL_DATA . set ( & pool_data, || thread. run ( ) ) ,
41
+ // Do some work that needs the TLS data.
42
+ |pool| {
43
+ pool. install ( || {
44
+ assert ! ( POOL_DATA . is_set( ) ) ;
45
+ assert_eq ! ( POOL_DATA . with( |data| data. len( ) ) , 3 ) ;
46
+ } )
47
+ } ,
48
+ )
49
+ . unwrap ( ) ;
50
+
51
+ // Once we've returned, `pool_data` is no longer borrowed.
52
+ drop ( pool_data) ;
53
+ }
54
+
30
55
pub fn threadpool_example ( ) {
31
56
let n_workers = 4 ;
32
57
let n_jobs = 8 ;
@@ -44,6 +69,35 @@ pub fn threadpool_example() {
44
69
assert_eq ! ( rx. iter( ) . take( n_jobs) . fold( 0 , |a, b| a + b) , 8 ) ;
45
70
}
46
71
72
+ pub fn threadpool_example2 ( ) {
73
+ // create at least as many workers as jobs or you will deadlock yourself
74
+ let n_workers = 42 ;
75
+ let n_jobs = 23 ;
76
+ let pool = threadpool:: ThreadPool :: new ( n_workers) ;
77
+ let an_atomic = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
78
+
79
+ assert ! ( n_jobs <= n_workers, "too many jobs, will deadlock" ) ;
80
+
81
+ // create a barrier that waits for all jobs plus the starter thread
82
+ let barrier = Arc :: new ( Barrier :: new ( n_jobs + 1 ) ) ;
83
+ for _ in 0 ..n_jobs {
84
+ let barrier = barrier. clone ( ) ;
85
+ let an_atomic = an_atomic. clone ( ) ;
86
+
87
+ pool. execute ( move || {
88
+ // do the heavy work
89
+ an_atomic. fetch_add ( 1 , Ordering :: Relaxed ) ;
90
+
91
+ // then wait for the other threads
92
+ barrier. wait ( ) ;
93
+ } ) ;
94
+ }
95
+
96
+ // wait for the threads to finish the work
97
+ barrier. wait ( ) ;
98
+ assert_eq ! ( an_atomic. load( Ordering :: SeqCst ) , /* n_jobs = */ 23 ) ;
99
+ }
100
+
47
101
pub fn rusty_pool_example ( ) {
48
102
let pool = rusty_pool:: ThreadPool :: default ( ) ;
49
103
@@ -54,14 +108,81 @@ pub fn rusty_pool_example() {
54
108
}
55
109
56
110
pool. join ( ) ;
111
+
112
+ let handle = pool. evaluate ( || {
113
+ thread:: sleep ( Duration :: from_secs ( 5 ) ) ;
114
+ return 4 ;
115
+ } ) ;
116
+ let result = handle. await_complete ( ) ;
117
+ assert_eq ! ( result, 4 ) ;
118
+ }
119
+
120
+ async fn some_async_fn ( x : i32 , y : i32 ) -> i32 {
121
+ x + y
122
+ }
123
+
124
+ async fn other_async_fn ( x : i32 , y : i32 ) -> i32 {
125
+ x - y
126
+ }
127
+
128
+ pub fn rusty_pool_example2 ( ) {
129
+ let pool = rusty_pool:: ThreadPool :: default ( ) ;
130
+
131
+ let handle = pool. complete ( async {
132
+ let a = some_async_fn ( 4 , 6 ) . await ; // 10
133
+ let b = some_async_fn ( a, 3 ) . await ; // 13
134
+ let c = other_async_fn ( b, a) . await ; // 3
135
+ some_async_fn ( c, 5 ) . await // 8
136
+ } ) ;
137
+ assert_eq ! ( handle. await_complete( ) , 8 ) ;
138
+
139
+ let count = Arc :: new ( AtomicI32 :: new ( 0 ) ) ;
140
+ let clone = count. clone ( ) ;
141
+ pool. spawn ( async move {
142
+ let a = some_async_fn ( 3 , 6 ) . await ; // 9
143
+ let b = other_async_fn ( a, 4 ) . await ; // 5
144
+ let c = some_async_fn ( b, 7 ) . await ; // 12
145
+ clone. fetch_add ( c, Ordering :: SeqCst ) ;
146
+ } ) ;
147
+ pool. join ( ) ;
148
+ assert_eq ! ( count. load( Ordering :: SeqCst ) , 12 ) ;
57
149
}
58
150
151
+ pub fn rusty_pool_example3 ( ) {
152
+ let pool = rusty_pool:: ThreadPool :: default ( ) ;
153
+ for _ in 0 ..10 {
154
+ pool. execute ( || thread:: sleep ( Duration :: from_secs ( 10 ) ) )
155
+ }
156
+
157
+ // 等待所有线程变得空闲,即所有任务都完成,包括此线程调用join()后由其他线程添加的任务,或者等待超时
158
+ pool. join_timeout ( Duration :: from_secs ( 5 ) ) ;
159
+
160
+ let count = Arc :: new ( AtomicI32 :: new ( 0 ) ) ;
161
+ for _ in 0 ..15 {
162
+ let clone = count. clone ( ) ;
163
+ pool. execute ( move || {
164
+ thread:: sleep ( Duration :: from_secs ( 5 ) ) ;
165
+ clone. fetch_add ( 1 , Ordering :: SeqCst ) ;
166
+ } ) ;
167
+ }
168
+
169
+ // 关闭并删除此“ ThreadPool”的唯一实例(无克隆),导致通道被中断,从而导致所有worker在完成当前工作后退出
170
+ pool. shutdown_join ( ) ;
171
+ assert_eq ! ( count. load( Ordering :: SeqCst ) , 15 ) ;
172
+ }
59
173
pub fn fast_threadpool_example ( ) -> Result < ( ) , fast_threadpool:: ThreadPoolDisconnected > {
60
174
let threadpool =
61
175
fast_threadpool:: ThreadPool :: start ( ThreadPoolConfig :: default ( ) , ( ) ) . into_sync_handler ( ) ;
62
-
63
176
assert_eq ! ( 4 , threadpool. execute( |_| { 2 + 2 } ) ?) ;
64
177
178
+
179
+ let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
180
+ rt. block_on ( async {
181
+ let threadpool = fast_threadpool:: ThreadPool :: start ( ThreadPoolConfig :: default ( ) , ( ) ) . into_async_handler ( ) ;
182
+ assert_eq ! ( 4 , threadpool. execute( |_| { 2 + 2 } ) . await . unwrap( ) ) ;
183
+ } ) ;
184
+
185
+
65
186
Ok ( ( ) )
66
187
}
67
188
@@ -96,6 +217,13 @@ pub fn scheduled_thread_pool() {
96
217
97
218
let _ = handle;
98
219
receiver. recv ( ) . unwrap ( ) ;
220
+
221
+ let handle = pool. execute_at_fixed_rate ( Duration :: from_millis ( 1000 ) , Duration :: from_millis ( 1000 ) , || {
222
+ println ! ( "Hello from a scheduled thread!" ) ;
223
+ } ) ;
224
+
225
+ sleep ( Duration :: from_secs ( 5 ) ) ;
226
+ handle. cancel ( )
99
227
}
100
228
101
229
// workerpool-rs
@@ -165,7 +293,7 @@ pub fn executor_service_example() {
165
293
executor_service. execute ( move || {
166
294
thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
167
295
counter. fetch_add ( 1 , Ordering :: SeqCst ) ;
168
- } ) ;
296
+ } ) . unwrap ( ) ;
169
297
}
170
298
171
299
thread:: sleep ( Duration :: from_millis ( 1000 ) ) ;
@@ -206,13 +334,11 @@ pub fn threadpool_executor_example() {
206
334
} )
207
335
. unwrap ( ) ;
208
336
let mut exp = pool. execute ( || { } ) . unwrap ( ) ;
209
- exp. cancel ( ) ;
337
+ exp. cancel ( ) . unwrap ( ) ;
210
338
}
211
339
212
340
pub fn executors_example ( ) {
213
- use executors:: crossbeam_workstealing_pool;
214
341
use executors:: * ;
215
- use std:: sync:: mpsc:: channel;
216
342
217
343
let n_workers = 4 ;
218
344
let n_jobs = 8 ;
0 commit comments