Skip to content

Commit dc861f6

Browse files
committed
add chapter 08
1 parent 53231e6 commit dc861f6

File tree

5 files changed

+71
-3
lines changed

5 files changed

+71
-3
lines changed

.DS_Store

0 Bytes
Binary file not shown.

book_cn/rust_concurrency_cookbook.pdf

131 KB
Binary file not shown.

channel/src/lib.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub use others::*;
44
use std::sync::mpsc;
55
use std::sync::mpsc::sync_channel;
66
use std::thread;
7+
use std::time::Duration;
78

89
pub fn mpsc_example1() {
910
let (tx, rx) = mpsc::channel();
@@ -60,3 +61,27 @@ pub fn mpsc_example4() {
6061

6162
println!("mpsc_example4 completed");
6263
}
64+
65+
pub fn mpsc_drop_example() {
66+
// 创建一个有边界的多生产者、单消费者的通道
67+
let (sender, receiver) = mpsc::channel::<i32>(); // 指定通道中传递的数据类型为 i32
68+
69+
// 启动三个生产者线程
70+
for i in 0..3 {
71+
let tx = sender.clone(); // 克隆发送端,每个线程都拥有独立的发送端
72+
thread::spawn(move || {
73+
thread::sleep(Duration::from_secs(1)); // 等待所有线程启动完毕
74+
tx.send(i).expect("Failed to send message");
75+
});
76+
}
77+
78+
79+
// 丢弃发送端,不影响clone
80+
drop(sender);
81+
82+
83+
// 主线程作为消费者,接收来自生产者线程的消息
84+
for received_message in receiver {
85+
println!("Received message: {}", received_message);
86+
}
87+
}

channel/src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,20 @@ fn main() {
55
mpsc_example2();
66
mpsc_example3();
77
mpsc_example4();
8+
mpsc_drop_example();
89

910
crossfire_mpsc();
1011
crossfire_mpmc();
12+
1113
flume_example();
14+
flume_select();
15+
flume_async();
16+
1217
async_channel_example();
1318
async_priority_channel_example();
1419
futures_channel_mpsc_example();
1520
futures_channel_oneshot_example();
1621
kanal_example();
1722
kanal_async_example();
23+
kanal_oneshot_example();
1824
}

channel/src/others.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ pub fn crossfire_mpmc() {
7474
});
7575
}
7676

77-
78-
7977
// has issues.
8078
pub fn broadcaster() {
8179
// let rt = tokio::runtime::Runtime::new().unwrap();
@@ -107,6 +105,35 @@ pub fn flume_example() {
107105
assert_eq!((0..10).sum::<u32>(), received);
108106
}
109107

108+
pub fn flume_select() {
109+
let (tx0, rx0) = flume::unbounded();
110+
let (tx1, rx1) = flume::unbounded();
111+
112+
std::thread::spawn(move || {
113+
tx0.send(true).unwrap();
114+
tx1.send(42).unwrap();
115+
});
116+
117+
flume::Selector::new()
118+
.recv(&rx0, |b| println!("Received {:?}", b))
119+
.recv(&rx1, |n| println!("Received {:?}", n))
120+
.wait();
121+
}
122+
123+
pub fn flume_async() {
124+
let rt = tokio::runtime::Runtime::new().unwrap();
125+
126+
let (tx, rx) = flume::unbounded();
127+
128+
rt.block_on(async move {
129+
tokio::spawn(async move {
130+
tx.send_async(5).await.unwrap();
131+
});
132+
133+
println!("flume async rx: {}", rx.recv_async().await.unwrap());
134+
});
135+
}
136+
110137
pub fn async_channel_example() {
111138
let rt = tokio::runtime::Runtime::new().unwrap();
112139

@@ -194,7 +221,7 @@ pub fn kanal_example() {
194221
});
195222

196223
let received: u32 = rx.sum();
197-
224+
198225
println!("received sum: {}", received);
199226
}
200227

@@ -210,4 +237,14 @@ pub fn kanal_async_example() {
210237

211238
println!("rx: {}", rx.recv().await.unwrap());
212239
});
240+
}
241+
242+
pub fn kanal_oneshot_example() {
243+
let (tx, rx) = kanal::oneshot();
244+
245+
thread::spawn(move || {
246+
tx.send(5).unwrap();
247+
});
248+
249+
println!("kanal oneshot rx: {}", rx.recv().unwrap());
213250
}

0 commit comments

Comments
 (0)