Skip to content

Commit

Permalink
Merge 720bce1 into d9ec4b4
Browse files Browse the repository at this point in the history
  • Loading branch information
udoprog authored Jul 10, 2022
2 parents d9ec4b4 + 720bce1 commit 856ddcf
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ serde_yaml = "0.8"
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
histogram = "0.6"
leaky-bucket = "0.10"
leaky-bucket = "0.12.1"
async-trait = "0.1"
bytesize = "1.0"
humantime = "2.0"
Expand Down
5 changes: 1 addition & 4 deletions src/bench_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ impl BenchRun {
let client = bench_protocol_adapter.build_client()?;

while self.has_more_work() {
self.rate_limiter
.acquire_one()
.await
.expect("Unexpected LeakyBucket.acquire error");
self.rate_limiter.acquire_one().await;

if STOP_ON_FATAL.load(Ordering::Relaxed) {
break;
Expand Down
59 changes: 24 additions & 35 deletions src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
/// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
/// option. This file may not be copied, modified, or distributed
/// except according to those terms.
use leaky_bucket::{LeakyBucket, LeakyBuckets};
use log::{debug, error};
use leaky_bucket::RateLimiter as InnerRateLimiter;
use log::debug;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct RateLimiter {
leaky_bucket: Option<LeakyBucket>,
leaky_bucket: Option<Arc<InnerRateLimiter>>,
}

impl RateLimiter {
Expand All @@ -30,40 +32,21 @@ impl RateLimiter {
amount / interval.as_secs_f64()
);

let mut buckets = LeakyBuckets::new();
let coordinator = buckets.coordinate().expect("no other running coordinator");
tokio::spawn(async move {
match coordinator.await {
Ok(_) => {
debug!("Rate limiter is done");
}
Err(e) => {
error!("Rate limiter crashed: {}", e);
}
}
});

RateLimiter {
leaky_bucket: Some(
buckets
.rate_limiter()
leaky_bucket: Some(Arc::new(
InnerRateLimiter::builder()
// to compensate overhead let's add a bit to the rate
.refill_amount((amount * 1.01) as usize)
.refill_interval(interval)
.refill((amount * 1.01) as usize)
.interval(interval)
.max(amount as usize * 100)
.build()
.expect("LeakyBucket builder failed"),
),
.build(),
)),
}
}

pub async fn acquire_one(&self) -> Result<(), String> {
match self.leaky_bucket.as_ref() {
None => Ok(()),
Some(leaky_bucket) => leaky_bucket.acquire_one().await.map_err(|e| {
error!("Error acquiring permit: {}", e);
e.to_string()
}),
pub async fn acquire_one(&self) {
if let Some(leaky_bucket) = self.leaky_bucket.as_ref() {
leaky_bucket.acquire_one().await;
}
}

Expand Down Expand Up @@ -95,6 +78,12 @@ impl RateLimiter {
}
}

impl fmt::Debug for RateLimiter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RateLimiter").finish()
}
}

#[cfg(test)]
mod tests {
use crate::rate_limiter::RateLimiter;
Expand All @@ -105,7 +94,7 @@ mod tests {
let rate_limiter = RateLimiter::build_rate_limiter(100.);
let begin = Instant::now();
for _ in 0..100 {
rate_limiter.acquire_one().await.expect("No reason to fail");
rate_limiter.acquire_one().await;
}
let elapsed = Instant::now().duration_since(begin);
println!("Elapsed: {:?}", elapsed);
Expand All @@ -117,7 +106,7 @@ mod tests {
let rate_limiter = RateLimiter::build_rate_limiter(0.5);
let begin = Instant::now();
for _ in 0..2 {
rate_limiter.acquire_one().await.expect("No reason to fail");
rate_limiter.acquire_one().await;
}
let elapsed = Instant::now().duration_since(begin);
println!("Elapsed: {:?}", elapsed);
Expand All @@ -130,7 +119,7 @@ mod tests {
let rate_limiter = RateLimiter::build_rate_limiter(0.);
let begin = Instant::now();
for _ in 0..1_000_000 {
rate_limiter.acquire_one().await.expect("No reason to fail");
rate_limiter.acquire_one().await;
}
let elapsed = Instant::now().duration_since(begin);
println!("Elapsed: {:?}", elapsed);
Expand Down

0 comments on commit 856ddcf

Please sign in to comment.