Skip to content

Commit

Permalink
Improve stress tests (#1082)
Browse files Browse the repository at this point in the history
* stress

* stress

* stress

* fmt

* nits

* format

* fmt
  • Loading branch information
lzchen committed May 30, 2023
1 parent 2e4d396 commit 8935df0
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 28 deletions.
2 changes: 1 addition & 1 deletion stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ doc = false
ctrlc = "3.2.5"
lazy_static = "1.4.0"
num_cpus = "1.15.0"
rayon = "1.7.0"
opentelemetry_api = { path = "../opentelemetry-api", features = ["metrics"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics"] }
num-format = "0.4.4"
96 changes: 69 additions & 27 deletions stress/src/throughput.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
use rayon::prelude::*;
use num_format::{Locale, ToFormattedString};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use std::thread;
use std::time::{Duration, Instant};

const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds
const BATCH_SIZE: u64 = 1000;

static STOP: AtomicBool = AtomicBool::new(false);

#[repr(C)]
#[derive(Default)]
struct WorkerStats {
count: AtomicU64,
/// We use a padding for the struct to allow each thread to have exclusive access to each WorkerStat
/// Otherwise, there would be some cpu contention with threads needing to take ownership of the cache lines
padding: [u64; 15],
}

pub fn test_throughput<F>(func: F)
where
F: Fn() + Sync + Send + 'static,
Expand All @@ -15,37 +26,68 @@ where
STOP.store(true, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");
let mut start_time = Instant::now();
let mut end_time = Instant::now();
let mut total_count_old: u64 = 0;
let total_count = Arc::new(AtomicU64::new(0));
let total_count_clone = Arc::clone(&total_count);

rayon::spawn(move || {
(0..num_cpus::get()).into_par_iter().for_each(|_| loop {
func();
total_count_clone.fetch_add(1, Ordering::SeqCst);
let num_threads = num_cpus::get_physical();
println!("Number threads: {}", num_threads);
let mut handles = Vec::with_capacity(num_threads);
let func_arc = Arc::new(func);
let mut worker_stats_vec: Vec<WorkerStats> = Vec::new();

for _ in 0..num_threads {
worker_stats_vec.push(WorkerStats::default());
}
let worker_stats_shared = Arc::new(worker_stats_vec);
let worker_stats_shared_monitor = Arc::clone(&worker_stats_shared);

let handle_main_thread = thread::spawn(move || {
let mut start_time = Instant::now();
let mut end_time = start_time;
let mut total_count_old: u64 = 0;
loop {
let elapsed = end_time.duration_since(start_time).as_secs();
if elapsed >= SLIDING_WINDOW_SIZE {
let total_count_u64: u64 = worker_stats_shared_monitor
.iter()
.map(|worker_stat| worker_stat.count.load(Ordering::Relaxed))
.sum();
let current_count = total_count_u64 - total_count_old;
total_count_old = total_count_u64;
let throughput = current_count / elapsed;
println!(
"Throughput: {} iterations/sec",
throughput.to_formatted_string(&Locale::en)
);
start_time = Instant::now();
}

if STOP.load(Ordering::SeqCst) {
break;
}
});
});

loop {
let elapsed = end_time.duration_since(start_time).as_secs();
if elapsed >= SLIDING_WINDOW_SIZE {
let total_count_u64 = total_count.load(Ordering::Relaxed);
let current_count = total_count_u64 - total_count_old;
total_count_old = total_count_u64;
let throughput = current_count as f64 / elapsed as f64;
println!("Throughput: {:.2} requests/sec", throughput);
start_time = Instant::now();
end_time = Instant::now();
thread::sleep(Duration::from_millis(5000));
}
});

if STOP.load(Ordering::SeqCst) {
break;
}
handles.push(handle_main_thread);

for thread_index in 0..num_threads - 1 {
let worker_stats_shared = Arc::clone(&worker_stats_shared);
let func_arc_clone = Arc::clone(&func_arc);
let handle = thread::spawn(move || loop {
for _ in 0..BATCH_SIZE {
func_arc_clone();
}
worker_stats_shared[thread_index]
.count
.fetch_add(BATCH_SIZE, Ordering::Relaxed);
if STOP.load(Ordering::SeqCst) {
break;
}
});
handles.push(handle)
}

end_time = Instant::now();
for handle in handles {
handle.join().unwrap();
}
}

0 comments on commit 8935df0

Please sign in to comment.