Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved performance and benchmarks #1

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 22 additions & 4 deletions Cargo.toml
Expand Up @@ -7,9 +7,27 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"] }
thread-id = { version = "4" }
bytes = "1.0.1"
futures-util = { version = "0.3.8", default-features = false, features = ["std"] }
tokio = { version = "1.5.0", features = ["macros", "net", "rt-multi-thread"] }
tokio-util = { version = "0.6.5", features = ["codec", "net"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
log = { version = "0.4" }
env_logger = "0.8.3"
log = "0.4"
env_logger = "0.8.3"

[dev-dependencies]
criterion = "0.3.4"
tokio = { version = "1.5.0", features = ["full"] }

[[bench]]
name = "filter_bench"
harness = false

[[bench]]
name = "codec_bench"
harness = false

[[bench]]
name = "integration_bench"
harness = false
52 changes: 52 additions & 0 deletions benches/codec_bench.rs
@@ -0,0 +1,52 @@
use bytes::{Bytes, BytesMut};
use criterion::{black_box, criterion_main, Criterion};
use statsd_filter_proxy_rs::filtered_codec::FilteredCodec;
use tokio_util::codec::Decoder;

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("Codec benchmark", |b| {
b.iter_custom(|iters| {
let block_list = black_box(vec![
String::from("foo"),
String::from("otherfoo"),
String::from("otherfoo2"),
String::from("otherfoo3"),
String::from("otherfoo4"),
String::from("otherfoo5"),
String::from("otherfoo6"),
String::from("otherfoo7"),
String::from("otherfoo8"),
String::from("otherfoo9"),
]);

let mut filter = FilteredCodec {
block_list: block_list.into_iter().map(Bytes::from).collect(),
};

let data = black_box(b"notfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\n");

let mut duration = std::time::Duration::from_secs(0);

for _ in 0..iters {
let mut src = BytesMut::from(&data[..]);
let now = std::time::Instant::now();

let _ = filter.decode(&mut src);

duration += now.elapsed();
}

duration
});
});
}

fn filter_bench() {
let mut c = Criterion::default()
.measurement_time(std::time::Duration::from_secs(10))
.sample_size(1000);

criterion_benchmark(&mut c);
}

criterion_main!(filter_bench);
71 changes: 71 additions & 0 deletions benches/filter_bench.rs
@@ -0,0 +1,71 @@
use bytes::Bytes;
use criterion::{black_box, criterion_main, Criterion};
use statsd_filter_proxy_rs::filter::{filter, filter_2};

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("Filter benchmark", |b| {
b.iter_custom(|iters| {
let block_list = black_box(vec![
String::from("otherfoo"),
String::from("otherfoo2"),
String::from("otherfoo3"),
String::from("otherfoo4"),
String::from("otherfoo5"),
String::from("otherfoo6"),
String::from("otherfoo7"),
String::from("otherfoo8"),
String::from("otherfoo9"),
String::from("foo"),
]);

let data = black_box(b"notfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\n");

let now = std::time::Instant::now();

for _ in 0..iters {
filter(&block_list, data);
}

now.elapsed()
});
});

c.bench_function("Filter benchmark 2", |b| {
b.iter_custom(|iters| {
let block_list = black_box(vec![
String::from("otherfoo"),
String::from("otherfoo2"),
String::from("otherfoo3"),
String::from("otherfoo4"),
String::from("otherfoo5"),
String::from("otherfoo6"),
String::from("otherfoo7"),
String::from("otherfoo8"),
String::from("otherfoo9"),
String::from("foo"),
]);

let block_list = block_list.into_iter().map(Bytes::from).collect::<Vec<_>>();

let data = black_box(b"notfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\nnotfoo:1|c\nfoo:2|c\nnotfoo:3|c\n");

let now = std::time::Instant::now();

for _ in 0..iters {
filter_2(&block_list, data);
}

now.elapsed()
});
});
}

fn filter_bench() {
let mut c = Criterion::default()
.measurement_time(std::time::Duration::from_secs(10))
.sample_size(1000);

criterion_benchmark(&mut c);
}

criterion_main!(filter_bench);
169 changes: 169 additions & 0 deletions benches/integration_bench.rs
@@ -0,0 +1,169 @@
use bytes::BytesMut;
use statsd_filter_proxy_rs::{old_run_server, run_server, Config};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::sync::Notify;
use tokio::time::{self, error::Elapsed};

#[tokio::main]
async fn main() {
let msg_count = 1_000;
let threads = 4;

let config = Config {
listen_host: String::from("127.0.0.1"),
listen_port: 8125,
target_host: String::from("127.0.0.1"),
target_port: 8126,
metric_blocklist: vec![
String::from("foo"),
String::from("otherfoo"),
String::from("otherfoo2"),
String::from("otherfoo3"),
String::from("otherfoo4"),
String::from("otherfoo5"),
String::from("otherfoo6"),
String::from("otherfoo7"),
String::from("otherfoo8"),
String::from("otherfoo9"),
],
multi_thread: None,
};

let stop = Arc::new(Notify::new());

tokio::spawn(spawn_old(config.clone(), false, stop.clone()));

let mut received = 0;
match run(msg_count, threads, &mut received).await {
Ok(duration) => {
println!(
"[Filter classic] Processed {} messages in {:?} | {:?}/msg",
received,
duration,
duration / received as u32
);
}
Err(_) => {
println!(
"[Filter classic] Test timed out after 60s and {} messages",
received
);
}
}

stop.notify_waiters();

// Cooldown for cleanup
time::sleep(Duration::from_secs(10)).await;

let stop = Arc::new(Notify::new());

tokio::spawn(spawn_old(config.clone(), true, stop.clone()));

let mut received = 0;
match run(msg_count, threads, &mut received).await {
Ok(duration) => {
println!(
"[Filter 2] Processed {} messages in {:?} | {:?}/msg",
received,
duration,
duration / received as u32
);
}
Err(_) => {
println!(
"[Filter 2] Test timed out after 60s and {} messages",
received
);
}
}

stop.notify_waiters();

// Cooldown for cleanup
time::sleep(Duration::from_secs(10)).await;

let stop = Arc::new(Notify::new());

tokio::spawn(spawn_new(config, stop.clone()));

let mut received = 0;
match run(msg_count, threads, &mut received).await {
Ok(duration) => {
println!(
"[Codec] Processed {} messages in {:?} | {:?}/msg",
received,
duration,
duration / received as u32
);
}
Err(_) => {
println!("[Codec] Test timed out after 60s and {} messages", received);
}
}

stop.notify_waiters();

// Cooldown for cleanup
time::sleep(Duration::from_secs(10)).await;
}

async fn spawn_new(config: Config, stop: Arc<Notify>) {
tokio::select! {
_ = run_server(config) => {}
_ = stop.notified() => {}
}
}

async fn spawn_old(config: Config, use_fn_2: bool, stop: Arc<Notify>) {
tokio::select! {
_ = old_run_server(config, use_fn_2) => {}
_ = stop.notified() => {}
}
}

async fn run(msg_count: usize, threads: usize, received: &mut usize) -> Result<Duration, Elapsed> {
let notify = Arc::new(Notify::new());

for i in 0..threads {
let notify = notify.clone();
let addr = format!("127.0.0.1:1000{}", i);
let sock = UdpSocket::bind(addr).await.unwrap();
sock.connect("127.0.0.1:8125").await.unwrap();

tokio::spawn(async move {
notify.notified().await;

for i in 0..msg_count {
let _ = sock.send(format!("metric:{}|c\n", i).as_bytes()).await;
}
});
}

let expected = msg_count * threads;

time::timeout(Duration::from_secs(10), receive(notify, expected, received)).await
}

async fn receive(notify: Arc<Notify>, expected: usize, received: &mut usize) -> Duration {
let sock = UdpSocket::bind("127.0.0.1:8126").await.unwrap();
let mut buffer = BytesMut::with_capacity(10_000_000);

let start = Instant::now();

notify.notify_waiters();

while let Ok((_, _)) = sock.recv_from(&mut buffer).await {
*received += buffer[..].split(|x| x == &b'\n').count();

buffer.clear();

if *received >= expected {
break;
}
}

start.elapsed()
}
2 changes: 1 addition & 1 deletion src/config.rs
Expand Up @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use std::fs;
use std::path::Path;

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct Config {
pub listen_host: String,
pub listen_port: u16,
Expand Down