Skip to content

Commit e306d92

Browse files
Update graph test
1 parent 5607784 commit e306d92

File tree

1 file changed

+61
-34
lines changed

1 file changed

+61
-34
lines changed

csplib/tests/computational_graph.rs

Lines changed: 61 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,67 @@
11
use csplib::*;
22

3+
#[csplib::process]
4+
struct P1 {
5+
#[input]
6+
a: i32,
7+
#[output]
8+
b: i32,
9+
}
10+
// λx. x+2
11+
async fn run_p1(inner: P1Inner) -> Result<()> {
12+
let x = inner.a_r.reader().get().await?;
13+
inner.b_w.put(x + 2)?;
14+
Ok(())
15+
}
16+
#[csplib::process]
17+
struct P2 {
18+
#[input]
19+
a: i32,
20+
#[output]
21+
b: i32,
22+
}
23+
// λx. x*2
24+
async fn run_p2(inner: P2Inner) -> Result<()> {
25+
let x = inner.a_r.reader().get().await?;
26+
// Emulating expensive I/O
27+
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
28+
inner.b_w.put(x * 2)?;
29+
Ok(())
30+
}
31+
#[csplib::process]
32+
struct P3 {
33+
#[input]
34+
a: i32,
35+
#[input]
36+
b: i32,
37+
#[output]
38+
c: i32,
39+
}
40+
// λxy. x*y
41+
async fn run_p3(inner: P3Inner) -> Result<()> {
42+
let x = inner.a_r.reader().get().await?;
43+
let y = inner.b_r.reader().get().await?;
44+
inner.c_w.put(x * y)?;
45+
Ok(())
46+
}
47+
348
#[tokio::test]
449
async fn computational_graph() {
5-
let (w1, ch1) = channel();
6-
let (w2, ch2) = channel();
7-
let (w3, ch3) = channel();
8-
let (w4, ch4) = channel();
9-
// λx. x+2
10-
tokio::spawn({
11-
let r1 = ch1.reader();
12-
async move {
13-
let x = r1.get().await.unwrap();
14-
w2.put(x + 2).unwrap();
15-
}
16-
});
17-
// λx. x*2
18-
tokio::spawn({
19-
let r1 = ch1.reader();
20-
async move {
21-
// Emulating expensive I/O
22-
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
23-
let x = r1.get().await.unwrap();
24-
w3.put(x * 2).unwrap();
25-
}
26-
});
27-
// λxy. x*y
28-
tokio::spawn({
29-
let r2 = ch2.reader();
30-
let r3 = ch3.reader();
31-
async move {
32-
let (x, y) = tokio::try_join!(r2.get(), r3.get()).unwrap();
33-
w4.put(x * y).unwrap();
34-
}
35-
});
36-
w1.put(1).unwrap();
37-
let r4 = ch4.reader();
38-
let ans = r4.get().await.unwrap();
50+
let (main_w, main_r) = channel();
51+
let (p1, p1_inner) = P1::new();
52+
let (p2, p2_inner) = P2::new();
53+
let (p3, p3_inner) = P3::new();
54+
55+
tokio::spawn(run_p1(p1_inner));
56+
tokio::spawn(run_p2(p2_inner));
57+
tokio::spawn(run_p3(p3_inner));
58+
59+
tokio::spawn(connect(main_r.reader(), p1.a_w));
60+
tokio::spawn(connect(main_r.reader(), p2.a_w));
61+
tokio::spawn(connect(p1.b_r.reader(), p3.a_w));
62+
tokio::spawn(connect(p2.b_r.reader(), p3.b_w));
63+
64+
main_w.put(1).unwrap();
65+
let ans = p3.c_r.reader().get().await.unwrap();
3966
assert_eq!(ans, 6);
4067
}

0 commit comments

Comments
 (0)