/
stream-ext-test.rs
141 lines (118 loc) · 3.83 KB
/
stream-ext-test.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#![allow(unused)]
#![allow(dead_code)]
use std::collections::*;
use std::fmt::Debug;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::{stream, FutureExt, SinkExt, StreamExt};
use futures::{Future, Stream};
use rust_box::queue_ext::{Action, QueueExt, Reply, Waker};
use rust_box::stream_ext::{IntoLimiter, Limiter, LimiterExt};
use tokio::task::spawn_local;
use tokio::time::{Instant, Sleep};
fn main() {
std::env::set_var("RUST_LOG", "stream_ext=info");
env_logger::init();
let runner = async move {
futures::future::join3(
test_with_limiter(),
test_with_leaky_bucket_limiter(),
test_with_governor_limiter(),
)
.await;
// test_with_limiter().await;
// test_with_leaky_bucket_limiter().await;
// test_with_governor_limiter().await;
};
// async_std::task::block_on(runner);
tokio::task::LocalSet::new().block_on(&tokio::runtime::Runtime::new().unwrap(), runner);
// tokio::runtime::Runtime::new().unwrap().block_on(runner);
}
async fn test_with_limiter() {
let s = stream::repeat(1);
const INTERVAL: Duration = Duration::from_millis(500);
#[derive(Debug)]
struct TheLimiter {
sleep: Pin<Box<Sleep>>,
}
impl TheLimiter {
fn new() -> Self {
Self {
sleep: Box::pin(tokio::time::sleep(INTERVAL)),
}
}
}
impl Limiter for TheLimiter {
fn acquire(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
match self.sleep.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => {
self.sleep.as_mut().reset(Instant::now() + INTERVAL);
Poll::Ready(Some(()))
}
}
}
}
let mut s = s.limiter(TheLimiter::new());
log::info!("stream limiter: {:?}", s);
while let Some(item) = s.next().await {
log::info!("recv limiter: {:?}", item);
}
}
async fn test_with_leaky_bucket_limiter() {
// let s = get_queue_stream(20);
let s = stream::repeat(2);
let rate_limiter = leaky_bucket::RateLimiter::builder()
.initial(3)
.refill(1)
.interval(Duration::from_millis(500))
.max(100)
.fair(false)
.build();
let mut s = s.leaky_bucket_limiter(rate_limiter);
while let Some(item) = s.next().await {
log::info!("recv leaky_bucket_limiter: {:?}", item);
}
}
async fn test_with_governor_limiter() {
use futures::Stream;
use governor::state::StreamRateLimitExt;
use governor::{clock, Quota, RateLimiter, RatelimitedStream};
use nonzero_ext::nonzero;
let s = stream::repeat(3);
// let s = get_queue_stream(10);
let lim = RateLimiter::direct(Quota::per_second(nonzero!(2u32)));
// let mut s = s.ratelimit_stream(&lim);
let mut s = s.governor_limiter(&lim);
while let Some(item) = s.next().await {
log::info!("recv governor_limiter: {:?}", item);
}
}
fn get_queue_stream(max: i32) -> impl Stream<Item = i32> + Debug {
use crossbeam_queue::SegQueue;
let s = Rc::new(SegQueue::default()).queue_stream::<i32, _>(|s, _| {
if s.is_empty() {
Poll::Pending
} else {
match s.pop() {
Some(m) => Poll::Ready(Some(m)),
None => Poll::Pending,
}
}
});
let mut tx = s.clone().queue_sender::<i32, _, _>(|s, act| match act {
Action::Send(item) => Reply::Send(s.push(item)),
Action::IsFull => Reply::IsFull(false),
Action::IsEmpty => Reply::IsEmpty(s.is_empty()),
Action::Len => Reply::Len(s.len()),
});
spawn_local(async move {
for i in 0..max {
tx.send(i).await;
}
});
s
}