Skip to content

Commit 01fa768

Browse files
committed
add more examples
1 parent aba30a0 commit 01fa768

File tree

6 files changed

+174
-23
lines changed

6 files changed

+174
-23
lines changed

pool/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,16 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9+
executor-service = "0.2.2"
10+
executors = "0.9.0"
911
fast-threadpool = "0.3.0"
1012
futures-lite = "1.12.0"
13+
poolite = "0.7.1"
1114
rayon = "1.5.3"
1215
rusty_pool = "0.7.0"
1316
scheduled-thread-pool = "0.2.6"
1417
scoped_threadpool = "0.1.9"
1518
smol = "1.2.5"
1619
threadpool = "1.8.1"
20+
threadpool-executor = "0.3.2"
21+
workerpool-rs = "0.2.1"

pool/src/lib.rs

Lines changed: 139 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
use std::sync::atomic::{AtomicUsize, Ordering};
12
use std::sync::mpsc::channel;
3+
use std::sync::{Arc, Mutex};
4+
use std::thread;
5+
use std::thread::sleep;
26
use std::time::Duration;
37

8+
use fast_threadpool::ThreadPoolConfig;
49
use futures_lite::*;
510
use rayon;
6-
use threadpool::ThreadPool;
7-
use fast_threadpool::ThreadPoolConfig;
11+
use rusty_pool;
812

913
fn fib(n: usize) -> usize {
1014
if n == 0 || n == 1 {
@@ -26,7 +30,7 @@ pub fn rayon_threadpool() {
2630
pub fn threadpool_example() {
2731
let n_workers = 4;
2832
let n_jobs = 8;
29-
let pool = ThreadPool::new(n_workers);
33+
let pool = threadpool::ThreadPool::new(n_workers);
3034

3135
let (tx, rx) = channel();
3236
for _ in 0..n_jobs {
@@ -41,19 +45,20 @@ pub fn threadpool_example() {
4145
}
4246

4347
pub fn rusty_pool_example() {
44-
let pool = ThreadPool::new(4);
48+
let pool = rusty_pool::ThreadPool::default();
4549

4650
for _ in 1..10 {
4751
pool.execute(|| {
4852
println!("Hello from a rusty_pool!");
4953
});
5054
}
5155

52-
pool.join();
56+
pool.join();
5357
}
5458

55-
pub fn fast_threadpool_example() -> Result<(), fast_threadpool::ThreadPoolDisconnected>{
56-
let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), ()).into_sync_handler();
59+
pub fn fast_threadpool_example() -> Result<(), fast_threadpool::ThreadPoolDisconnected> {
60+
let threadpool =
61+
fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), ()).into_sync_handler();
5762

5863
assert_eq!(4, threadpool.execute(|_| { 2 + 2 })?);
5964

@@ -84,31 +89,145 @@ pub fn scheduled_thread_pool() {
8489
let (sender, receiver) = channel();
8590

8691
let pool = scheduled_thread_pool::ScheduledThreadPool::new(4);
87-
let handle = pool.execute_after(Duration::from_millis(1000), move ||{
92+
let handle = pool.execute_after(Duration::from_millis(1000), move || {
8893
println!("Hello from a scheduled thread!");
8994
sender.send("done").unwrap();
9095
});
9196

92-
9397
let _ = handle;
9498
receiver.recv().unwrap();
99+
}
100+
101+
// workerpool-rs
102+
pub fn workerpool_rs_example() {
103+
use workerpool_rs::pool::WorkerPool;
104+
105+
let n_workers = 4;
106+
let n_jobs = 8;
107+
let pool = WorkerPool::new(n_workers);
108+
109+
let (tx, rx) = channel();
110+
let atx = Arc::new(Mutex::new(tx));
111+
for _ in 0..n_jobs {
112+
let atx = atx.clone();
113+
pool.execute(move || {
114+
let tx = atx.lock().unwrap();
115+
tx.send(1)
116+
.expect("channel will be there waiting for the pool");
117+
});
118+
}
119+
120+
// assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
121+
println!("{}", rx.iter().take(n_jobs).fold(0, |a, b| a + b))
122+
}
123+
124+
fn test(msg: usize) {
125+
println!("key: {}\tvalue: {}", msg, fib(msg));
126+
}
127+
128+
// poolite
129+
pub fn poolite_example() {
130+
let pool = poolite::Pool::new().unwrap();
131+
for i in 0..10 {
132+
pool.push(move || test(i));
133+
}
134+
135+
pool.join(); //wait for the pool
136+
}
137+
138+
pub fn poolite_example2() {
139+
let pool = poolite::Pool::new().unwrap();
140+
let mut array = (0..10usize).into_iter().map(|i| (i, 0)).collect::<Vec<_>>();
141+
142+
// scoped method will waiting scoped's task running finish.
143+
pool.scoped(|scope| {
144+
for i in array.iter_mut() {
145+
// have to move
146+
scope.push(move || i.1 = i.0 * i.0);
147+
}
148+
});
95149

150+
for (i, j) in array {
151+
println!("key: {}\tvalue: {}", i, j);
152+
}
96153
}
97154

98-
pub fn unblocking_smol() -> io::Result<()> {
99-
smol::block_on(async {
100-
let mut stream = smol::net::TcpStream::connect("example.com:80").await?;
101-
let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
102-
stream.write_all(req).await?;
155+
pub fn executor_service_example() {
156+
use executor_service::Executors;
157+
158+
let mut executor_service =
159+
Executors::new_fixed_thread_pool(10).expect("Failed to create the thread pool");
160+
161+
let counter = Arc::new(AtomicUsize::new(0));
162+
163+
for _ in 0..10 {
164+
let counter = counter.clone();
165+
executor_service.execute(move || {
166+
thread::sleep(Duration::from_millis(100));
167+
counter.fetch_add(1, Ordering::SeqCst);
168+
});
169+
}
170+
171+
thread::sleep(Duration::from_millis(1000));
172+
173+
assert_eq!(counter.load(Ordering::SeqCst), 10);
103174

104-
let mut stdout = smol::Unblock::new(std::io::stdout());
105-
io::copy(stream, &mut stdout).await?;
106-
Ok(())
175+
let mut executor_service =
176+
Executors::new_fixed_thread_pool(2).expect("Failed to create the thread pool");
177+
178+
let some_param = "Mr White";
179+
let res = executor_service
180+
.submit_sync(move || {
181+
sleep(Duration::from_secs(5));
182+
println!("Hello {:}", some_param);
183+
println!("Long computation finished");
184+
2
185+
})
186+
.expect("Failed to submit function");
187+
188+
println!("Result: {:#?}", res);
189+
assert_eq!(res, 2);
190+
}
191+
192+
pub fn threadpool_executor_example() {
193+
let pool = threadpool_executor::ThreadPool::new(1);
194+
let mut expectation = pool.execute(|| "hello, thread pool!").unwrap();
195+
assert_eq!(expectation.get_result().unwrap(), "hello, thread pool!");
196+
197+
let pool = threadpool_executor::threadpool::Builder::new()
198+
.core_pool_size(1)
199+
.maximum_pool_size(3)
200+
.keep_alive_time(std::time::Duration::from_secs(300))
201+
.exeed_limit_policy(threadpool_executor::threadpool::ExceedLimitPolicy::Wait)
202+
.build();
203+
204+
pool.execute(|| {
205+
std::thread::sleep(std::time::Duration::from_secs(3));
107206
})
207+
.unwrap();
208+
let mut exp = pool.execute(|| {}).unwrap();
209+
exp.cancel();
108210
}
109211

110-
// threads_pool
111-
// workerpool
112-
// poolite
212+
pub fn executors_example() {
213+
use executors::crossbeam_workstealing_pool;
214+
use executors::*;
215+
use std::sync::mpsc::channel;
113216

217+
let n_workers = 4;
218+
let n_jobs = 8;
219+
let pool = crossbeam_workstealing_pool::small_pool(n_workers);
220+
221+
let (tx, rx) = channel();
222+
for _ in 0..n_jobs {
223+
let tx = tx.clone();
224+
pool.execute(move || {
225+
tx.send(1)
226+
.expect("channel will be there waiting for the pool");
227+
});
228+
}
229+
230+
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
231+
}
114232

233+
// slave-pool

pool/src/main.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ fn main() {
66
scoped_threadpool();
77
rusty_pool_example();
88
fast_threadpool_example().unwrap();
9-
109
scheduled_thread_pool();
11-
12-
unblocking_smol().unwrap();
10+
workerpool_rs_example();
11+
poolite_example();
12+
poolite_example2();
13+
executor_service_example();
14+
threadpool_executor_example();
15+
executors_example();
1316
}

tokio_examples/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ edition = "2021"
77

88
[dependencies]
99
tokio = { version = "1.21.2", features = ["full"] }
10+
tokio-rayon = "2.1.0"

tokio_examples/src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,26 @@ pub fn watch_example() {
175175
Ok(())
176176
});
177177

178+
}
179+
180+
/// 实现fib
181+
pub fn fib(n: usize) -> usize {
182+
if n == 0 || n == 1 {
183+
return n;
184+
}
185+
186+
return fib(n-1) + fib(n-2);
187+
}
188+
189+
pub fn tokio_rayon_example() {
190+
let rt = tokio::runtime::Runtime::new().unwrap();
191+
192+
rt.block_on(async {
193+
let nft = tokio_rayon::spawn(|| {
194+
fib(20)
195+
}).await;
196+
197+
assert_eq!(nft, 6765);
198+
})
199+
178200
}

tokio_examples/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ fn main() {
1818
notify_example();
1919
notify_example2();
2020

21+
tokio_rayon_example();
2122
}

0 commit comments

Comments
 (0)