diff --git a/sky-bench/src/args.rs b/sky-bench/src/args.rs index 6ae1d81b..237b1004 100644 --- a/sky-bench/src/args.rs +++ b/sky-bench/src/args.rs @@ -49,7 +49,7 @@ pub enum Task { } #[derive(Debug, PartialEq, Clone, Copy)] -pub enum BenchEngine { +pub enum LegacyBenchEngine { Rookie, Fury, } @@ -57,7 +57,7 @@ pub enum BenchEngine { #[derive(Debug, PartialEq, Clone, Copy)] pub enum BenchType { Workload(BenchWorkload), - Legacy(BenchEngine), + Legacy(LegacyBenchEngine), } #[derive(Debug, PartialEq, Clone, Copy)] @@ -197,8 +197,8 @@ pub fn parse_and_setup() -> BenchResult { BenchType::Workload(BenchWorkload::UniformV1) } Some(engine) => BenchType::Legacy(match engine.as_str() { - "rookie" => BenchEngine::Rookie, - "fury" => BenchEngine::Fury, + "rookie" => LegacyBenchEngine::Rookie, + "fury" => LegacyBenchEngine::Fury, _ => { return Err(BenchError::ArgsErr(format!( "bad value for `--engine`. got `{engine}` but expected warp or rookie" @@ -211,7 +211,7 @@ pub fn parse_and_setup() -> BenchResult { None => num_cpus::get() * 8, Some(c) => match c.parse::() { Ok(s) if s != 0 => { - if workload == BenchType::Legacy(BenchEngine::Rookie) { + if workload == BenchType::Legacy(LegacyBenchEngine::Rookie) { return Err(BenchError::ArgsErr(format!( "the 'rookie' engine does not support explicit connection count. the number of threads is the connection count" ))); diff --git a/sky-bench/src/bench.rs b/sky-bench/src/bench.rs index ddb2d6f3..9c4fbe07 100644 --- a/sky-bench/src/bench.rs +++ b/sky-bench/src/bench.rs @@ -24,160 +24,34 @@ * */ -use { - crate::{ - args::{BenchConfig, BenchEngine, BenchType, BenchWorkload}, - error::{self, BenchResult}, - legacy::runtime::{fury, rookie}, - setup, - stats::RuntimeStats, - workload::{self, workloads}, - }, - skytable::{ - error::Error, - query, - response::{Response, Value}, - Config, Connection, Query, - }, - std::{fmt, time::Instant}, +use crate::{ + args::{BenchConfig, BenchType, BenchWorkload}, + error, legacy, stats, + stats::RuntimeStats, + workload::{self, workloads}, }; -pub const BENCHMARK_SPACE_ID: &'static str = "bench"; -pub const BENCHMARK_MODEL_ID: &'static str = "bench"; - -/* - task impl -*/ - -/// A bombard task used for benchmarking - -#[derive(Debug)] -pub struct BombardTask { - config: Config, -} - -impl BombardTask { - pub fn new(config: Config) -> Self { - Self { config } - } -} - -/// Errors while running a bombard -#[derive(Debug)] -pub enum BombardTaskError { - DbError(Error), - Mismatch, -} - -impl fmt::Display for BombardTaskError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::DbError(e) => write!(f, "a bombard subtask failed with {e}"), - Self::Mismatch => write!(f, "got unexpected response for bombard subtask"), - } - } -} - -impl From for BombardTaskError { - fn from(dbe: Error) -> Self { - Self::DbError(dbe) - } -} - -impl rookie::ThreadedBombardTask for BombardTask { - type Worker = Connection; - type WorkerTask = (Query, (BenchmarkTask, u64)); - type WorkerTaskSpec = BenchmarkTask; - type WorkerInitError = Error; - type WorkerTaskError = BombardTaskError; - fn worker_init(&self) -> Result { - let mut db = self.config.connect()?; - db.query_parse::<()>(&skytable::query!(format!("use {BENCHMARK_SPACE_ID}"))) - .map(|_| db) - } - fn generate_task(spec: &Self::WorkerTaskSpec, current: u64) -> Self::WorkerTask { - (spec.generate_query(current), (*spec, current)) - } - fn worker_drive_timed( - worker: &mut Self::Worker, - (query, (spec, current)): Self::WorkerTask, - ) -> Result { - let start = Instant::now(); - let ret = worker.query(&query)?; - let stop = Instant::now(); - if spec.verify_response(current, ret) { - Ok(stop.duration_since(start).as_nanos()) - } else { - Err(BombardTaskError::Mismatch) - } - } -} - /* runner */ pub fn run(bench: BenchConfig) -> error::BenchResult<()> { - let config_instance = unsafe { setup::instance() }; - let bench_config = BombardTask::new(Config::new( - config_instance.host(), - config_instance.port(), - config_instance.username(), - config_instance.password(), - )); - let mut main_thread_db = None; - let stats = match bench.workload { + let (total_queries, stats) = match bench.workload { BenchType::Workload(workload) => match workload { BenchWorkload::UniformV1 => workload::run_bench(workloads::UniformV1Std::new()), }, BenchType::Legacy(l) => { warn!("using `--engine` is now deprecated. please consider switching to `--workload`"); - info!("running preliminary checks and creating model `bench.bench` with definition: `{{un: binary, pw: uint8}}`"); - let mut mt_db = bench_config.config.connect()?; - mt_db.query_parse::<()>(&query!("create space bench"))?; - mt_db.query_parse::<()>(&query!(format!( - "create model {BENCHMARK_SPACE_ID}.{BENCHMARK_MODEL_ID}(un: binary, pw: uint8)" - )))?; - main_thread_db = Some(mt_db); - match l { - BenchEngine::Rookie => bench_rookie(bench_config), - BenchEngine::Fury => bench_fury(), - } - } - }; - let (total_queries, stats) = match stats { - Ok(ret) => ret, - Err(e) => { - error!("benchmarking failed. attempting to clean up"); - match cleanup(main_thread_db) { - Ok(()) => return Err(e), - Err(e_cleanup) => { - error!("failed to clean up db: {e_cleanup}. please remove model `bench.bench` manually"); - return Err(e); - } - } + legacy::run_bench(l) } - }; + }?; info!( "{} queries executed. benchmark complete.", - fmt_u64(total_queries) + stats::fmt_u64(total_queries) ); warn!("benchmarks might appear to be slower. this tool is currently experimental"); // print results - print_table(stats); - cleanup(main_thread_db)?; - Ok(()) -} - -/* - util -*/ - -fn cleanup(main_thread_db: Option) -> Result<(), error::BenchError> { - trace!("dropping space and table"); - if let Some(mut db) = main_thread_db { - db.query_parse::<()>(&query!("drop space allow not empty bench"))?; - } + self::print_table(stats); Ok(()) } @@ -201,197 +75,3 @@ fn print_table(data: Vec<(&'static str, RuntimeStats)>) { "+---------+--------------------------+-----------------------+------------------------+" ); } - -/* - bench runner -*/ - -#[derive(Clone, Copy, Debug)] -pub struct BenchmarkTask { - gen_query: fn(&Self, u64) -> Query, - check_resp: fn(&Self, u64, Response) -> bool, -} - -impl BenchmarkTask { - fn new( - gen_query: fn(&Self, u64) -> Query, - check_resp: fn(&Self, u64, Response) -> bool, - ) -> Self { - Self { - gen_query, - check_resp, - } - } - pub fn generate_query(&self, current: u64) -> Query { - (self.gen_query)(self, current) - } - pub fn verify_response(&self, current: u64, resp: Response) -> bool { - (self.check_resp)(self, current, resp) - } -} - -struct BenchItem { - name: &'static str, - spec: BenchmarkTask, - count: usize, -} - -impl BenchItem { - fn new(name: &'static str, spec: BenchmarkTask, count: usize) -> Self { - Self { name, spec, count } - } - fn print_log_start(&self) { - info!( - "benchmarking `{}`. average payload size = {} bytes. queries = {}", - self.name, - self.spec.generate_query(0).debug_encode_packet().len(), - self.count - ) - } - fn run(self, pool: &mut rookie::BombardPool) -> BenchResult { - pool.blocking_bombard(self.spec, self.count) - .map_err(From::from) - } - async fn run_async(self, pool: &mut fury::Fury) -> BenchResult { - pool.bombard(self.count, self.spec) - .await - .map_err(From::from) - } -} - -fn prepare_bench_spec() -> Vec { - let config_instance = unsafe { setup::instance() }; - vec![ - BenchItem::new( - "INSERT", - BenchmarkTask::new( - |_, current| { - query!( - "insert into bench(?, ?)", - unsafe { setup::instance() }.fmt_pk(current), - 0u64 - ) - }, - |_, _, actual_resp| actual_resp == Response::Empty, - ), - config_instance.object_count(), - ), - BenchItem::new( - "SELECT", - BenchmarkTask::new( - |_, current| { - query!( - "select * from bench where un = ?", - unsafe { setup::instance() }.fmt_pk(current) - ) - }, - |_, current, resp| match resp { - Response::Row(r) => { - r.into_values() - == vec![ - Value::Binary(unsafe { setup::instance() }.fmt_pk(current)), - Value::UInt8(0), - ] - } - _ => false, - }, - ), - config_instance.object_count(), - ), - BenchItem::new( - "UPDATE", - BenchmarkTask::new( - |_, current| { - query!( - "update bench set pw += ? where un = ?", - 1u64, - unsafe { setup::instance() }.fmt_pk(current) - ) - }, - |_, _, resp| resp == Response::Empty, - ), - config_instance.object_count(), - ), - BenchItem::new( - "DELETE", - BenchmarkTask::new( - |_, current| { - query!( - "delete from bench where un = ?", - unsafe { setup::instance() }.fmt_pk(current) - ) - }, - |_, _, resp| resp == Response::Empty, - ), - config_instance.object_count(), - ), - ] -} - -fn fmt_u64(n: u64) -> String { - let num_str = n.to_string(); - let mut result = String::new(); - let chars_rev: Vec<_> = num_str.chars().rev().collect(); - for (i, ch) in chars_rev.iter().enumerate() { - if i % 3 == 0 && i != 0 { - result.push(','); - } - result.push(*ch); - } - result.chars().rev().collect() -} - -fn bench_rookie(task: BombardTask) -> BenchResult<(u64, Vec<(&'static str, RuntimeStats)>)> { - // initialize pool - let config_instance = unsafe { setup::instance() }; - info!( - "initializing connections. engine=rookie, threads={}, primary key size ={} bytes", - config_instance.threads(), - config_instance.object_size() - ); - let mut pool = rookie::BombardPool::new(config_instance.threads(), task)?; - // prepare benches - let benches = prepare_bench_spec(); - // bench - let total_queries = config_instance.object_count() as u64 * benches.len() as u64; - let mut results = vec![]; - for task in benches { - let name = task.name; - task.print_log_start(); - let this_result = task.run(&mut pool)?; - results.push((name, this_result)); - } - Ok((total_queries, results)) -} - -fn bench_fury() -> BenchResult<(u64, Vec<(&'static str, RuntimeStats)>)> { - let config_instance = unsafe { setup::instance() }; - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(config_instance.threads()) - .enable_all() - .build() - .unwrap(); - rt.block_on(async move { - info!( - "initializing connections. engine=fury, threads={}, connections={}, primary key size ={} bytes", - config_instance.threads(), config_instance.connections(), config_instance.object_size() - ); - let mut pool = fury::Fury::new( - config_instance.connections(), - Config::new(config_instance.host(), config_instance.port(), config_instance.username(), config_instance.password()), - ) - .await?; - // prepare benches - let benches = prepare_bench_spec(); - // bench - let total_queries = config_instance.object_count() as u64 * benches.len() as u64; - let mut results = vec![]; - for task in benches { - let name = task.name; - task.print_log_start(); - let this_result = task.run_async(&mut pool).await?; - results.push((name, this_result)); - } - Ok((total_queries,results)) - }) -} diff --git a/sky-bench/src/error.rs b/sky-bench/src/error.rs index b4ec8fcd..827de7be 100644 --- a/sky-bench/src/error.rs +++ b/sky-bench/src/error.rs @@ -26,8 +26,10 @@ use { crate::{ - bench::BombardTask, - legacy::runtime::{fury, rookie::BombardError}, + legacy::{ + runtime::{fury, rookie::BombardError}, + BombardTask, + }, workload::error::WorkloadError, }, core::fmt, diff --git a/sky-bench/src/legacy/mod.rs b/sky-bench/src/legacy/mod.rs index 7877f4d4..aad39b90 100644 --- a/sky-bench/src/legacy/mod.rs +++ b/sky-bench/src/legacy/mod.rs @@ -25,3 +25,316 @@ */ pub mod runtime; + +use { + self::runtime::{fury, rookie}, + crate::{args::LegacyBenchEngine, error::BenchResult, setup, stats::RuntimeStats}, + skytable::{ + error::Error, + query, + response::{Response, Value}, + Config, Connection, Query, + }, + std::{fmt, time::Instant}, +}; + +const BENCHMARK_SPACE_ID: &'static str = "bench"; +const BENCHMARK_MODEL_ID: &'static str = "bench"; + +/* + runner +*/ + +pub fn run_bench( + legacy_workload: LegacyBenchEngine, +) -> BenchResult<(u64, Vec<(&'static str, RuntimeStats)>)> { + let config_instance = unsafe { setup::instance() }; + let config = Config::new( + config_instance.host(), + config_instance.port(), + config_instance.username(), + config_instance.password(), + ); + let db = setup_db(&config)?; + let ret = match legacy_workload { + LegacyBenchEngine::Fury => bench_fury(), + LegacyBenchEngine::Rookie => bench_rookie(config), + }; + if let Err(e) = cleanup_db(db) { + error!("failed to clean up DB: {e}"); + } + ret +} + +/* + task +*/ + +#[derive(Clone, Copy, Debug)] +pub struct BenchmarkTask { + gen_query: fn(&Self, u64) -> Query, + check_resp: fn(&Self, u64, Response) -> bool, +} + +impl BenchmarkTask { + fn new( + gen_query: fn(&Self, u64) -> Query, + check_resp: fn(&Self, u64, Response) -> bool, + ) -> Self { + Self { + gen_query, + check_resp, + } + } + fn generate_query(&self, current: u64) -> Query { + (self.gen_query)(self, current) + } + fn verify_response(&self, current: u64, resp: Response) -> bool { + (self.check_resp)(self, current, resp) + } +} + +struct BenchItem { + name: &'static str, + spec: BenchmarkTask, + count: usize, +} + +impl BenchItem { + fn new(name: &'static str, spec: BenchmarkTask, count: usize) -> Self { + Self { name, spec, count } + } + fn print_log_start(&self) { + info!( + "benchmarking `{}`. average payload size = {} bytes. queries = {}", + self.name, + self.spec.generate_query(0).debug_encode_packet().len(), + self.count + ) + } + fn run(self, pool: &mut rookie::BombardPool) -> BenchResult { + pool.blocking_bombard(self.spec, self.count) + .map_err(From::from) + } + async fn run_async(self, pool: &mut fury::Fury) -> BenchResult { + pool.bombard(self.count, self.spec) + .await + .map_err(From::from) + } +} + +fn prepare_bench_spec() -> Vec { + let config_instance = unsafe { setup::instance() }; + vec![ + BenchItem::new( + "INSERT", + BenchmarkTask::new( + |_, current| { + query!( + "insert into bench(?, ?)", + unsafe { setup::instance() }.fmt_pk(current), + 0u64 + ) + }, + |_, _, actual_resp| actual_resp == Response::Empty, + ), + config_instance.object_count(), + ), + BenchItem::new( + "SELECT", + BenchmarkTask::new( + |_, current| { + query!( + "select * from bench where un = ?", + unsafe { setup::instance() }.fmt_pk(current) + ) + }, + |_, current, resp| match resp { + Response::Row(r) => { + r.into_values() + == vec![ + Value::Binary(unsafe { setup::instance() }.fmt_pk(current)), + Value::UInt8(0), + ] + } + _ => false, + }, + ), + config_instance.object_count(), + ), + BenchItem::new( + "UPDATE", + BenchmarkTask::new( + |_, current| { + query!( + "update bench set pw += ? where un = ?", + 1u64, + unsafe { setup::instance() }.fmt_pk(current) + ) + }, + |_, _, resp| resp == Response::Empty, + ), + config_instance.object_count(), + ), + BenchItem::new( + "DELETE", + BenchmarkTask::new( + |_, current| { + query!( + "delete from bench where un = ?", + unsafe { setup::instance() }.fmt_pk(current) + ) + }, + |_, _, resp| resp == Response::Empty, + ), + config_instance.object_count(), + ), + ] +} + +/* + util +*/ + +fn setup_db(cfg: &Config) -> BenchResult { + info!("running preliminary checks and creating model `bench.bench` with definition: `{{un: binary, pw: uint8}}`"); + let mut mt_db = cfg.connect()?; + mt_db.query_parse::<()>(&query!("create space bench"))?; + mt_db.query_parse::<()>(&query!(format!( + "create model {BENCHMARK_SPACE_ID}.{BENCHMARK_MODEL_ID}(un: binary, pw: uint8)" + )))?; + Ok(mt_db) +} + +fn cleanup_db(mut db: Connection) -> BenchResult<()> { + trace!("dropping space and table"); + db.query_parse::<()>(&query!("drop space allow not empty bench"))?; + Ok(()) +} + +/* + fury runner +*/ + +fn bench_fury() -> BenchResult<(u64, Vec<(&'static str, RuntimeStats)>)> { + let config_instance = unsafe { setup::instance() }; + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(config_instance.threads()) + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + info!( + "initializing connections. engine=fury, threads={}, connections={}, primary key size ={} bytes", + config_instance.threads(), config_instance.connections(), config_instance.object_size() + ); + let mut pool = fury::Fury::new( + config_instance.connections(), + Config::new(config_instance.host(), config_instance.port(), config_instance.username(), config_instance.password()), + ) + .await?; + // prepare benches + let benches = prepare_bench_spec(); + // bench + let total_queries = config_instance.object_count() as u64 * benches.len() as u64; + let mut results = vec![]; + for task in benches { + let name = task.name; + task.print_log_start(); + let this_result = task.run_async(&mut pool).await?; + results.push((name, this_result)); + } + Ok((total_queries,results)) + }) +} + +/* + rookie runner +*/ + +/// A bombard task used for benchmarking + +#[derive(Debug)] +pub struct BombardTask { + config: Config, +} + +impl BombardTask { + fn new(config: Config) -> Self { + Self { config } + } +} + +/// Errors while running a bombard +#[derive(Debug)] +pub enum BombardTaskError { + DbError(Error), + Mismatch, +} + +impl fmt::Display for BombardTaskError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::DbError(e) => write!(f, "a bombard subtask failed with {e}"), + Self::Mismatch => write!(f, "got unexpected response for bombard subtask"), + } + } +} + +impl From for BombardTaskError { + fn from(dbe: Error) -> Self { + Self::DbError(dbe) + } +} + +impl rookie::ThreadedBombardTask for BombardTask { + type Worker = Connection; + type WorkerTask = (Query, (BenchmarkTask, u64)); + type WorkerTaskSpec = BenchmarkTask; + type WorkerInitError = Error; + type WorkerTaskError = BombardTaskError; + fn worker_init(&self) -> Result { + let mut db = self.config.connect()?; + db.query_parse::<()>(&skytable::query!(format!("use {BENCHMARK_SPACE_ID}"))) + .map(|_| db) + } + fn generate_task(spec: &Self::WorkerTaskSpec, current: u64) -> Self::WorkerTask { + (spec.generate_query(current), (*spec, current)) + } + fn worker_drive_timed( + worker: &mut Self::Worker, + (query, (spec, current)): Self::WorkerTask, + ) -> Result { + let start = Instant::now(); + let ret = worker.query(&query)?; + let stop = Instant::now(); + if spec.verify_response(current, ret) { + Ok(stop.duration_since(start).as_nanos()) + } else { + Err(BombardTaskError::Mismatch) + } + } +} + +fn bench_rookie(cfg: Config) -> BenchResult<(u64, Vec<(&'static str, RuntimeStats)>)> { + // initialize pool + let config_instance = unsafe { setup::instance() }; + info!( + "initializing connections. engine=rookie, threads={}, primary key size ={} bytes", + config_instance.threads(), + config_instance.object_size() + ); + let mut pool = rookie::BombardPool::new(config_instance.threads(), BombardTask::new(cfg))?; + // prepare benches + let benches = prepare_bench_spec(); + // bench + let total_queries = config_instance.object_count() as u64 * benches.len() as u64; + let mut results = vec![]; + for task in benches { + let name = task.name; + task.print_log_start(); + let this_result = task.run(&mut pool)?; + results.push((name, this_result)); + } + Ok((total_queries, results)) +} diff --git a/sky-bench/src/legacy/runtime/fury.rs b/sky-bench/src/legacy/runtime/fury.rs index 7b461eb6..e21904c9 100644 --- a/sky-bench/src/legacy/runtime/fury.rs +++ b/sky-bench/src/legacy/runtime/fury.rs @@ -25,11 +25,11 @@ */ use { - super::WorkerTask, - crate::{ - bench::{BenchmarkTask, BENCHMARK_SPACE_ID}, - stats::{self, RuntimeStats, WorkerLocalStats}, + super::{ + super::{BenchmarkTask, BENCHMARK_SPACE_ID}, + WorkerTask, }, + crate::stats::{self, RuntimeStats, WorkerLocalStats}, skytable::Config, std::{ fmt, diff --git a/sky-bench/src/stats.rs b/sky-bench/src/stats.rs index f3c79b48..d4b2adc1 100644 --- a/sky-bench/src/stats.rs +++ b/sky-bench/src/stats.rs @@ -58,3 +58,16 @@ impl WorkerLocalStats { } } } + +pub fn fmt_u64(n: u64) -> String { + let num_str = n.to_string(); + let mut result = String::new(); + let chars_rev: Vec<_> = num_str.chars().rev().collect(); + for (i, ch) in chars_rev.iter().enumerate() { + if i % 3 == 0 && i != 0 { + result.push(','); + } + result.push(*ch); + } + result.chars().rev().collect() +}