diff --git a/CHANGELOG.md b/CHANGELOG.md index b936015b..366dea90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,18 @@ All changes in this project will be noted in this file. +## Version 0.8.3 + +### Additions + +- Benchmark tool: + - Switch to using `workload` based benchmarks + +### Fixes + +- Benchmark tool: + - Running a SIGINT now gracefully terminates the workload + ## Version 0.8.2 ### Additions diff --git a/sky-bench/src/args.rs b/sky-bench/src/args.rs index 03da99be..da7930a6 100644 --- a/sky-bench/src/args.rs +++ b/sky-bench/src/args.rs @@ -180,7 +180,7 @@ pub fn parse_and_setup() -> BenchResult { }; let workload = match args.remove("--workload") { Some(workload) => match workload.as_str() { - "uniform_v1" => BenchType::Workload(BenchWorkload::UniformV1), + "uniform_v1_std" => BenchType::Workload(BenchWorkload::UniformV1), _ => { return Err(BenchError::ArgsErr(format!( "unknown workload choice {workload}" @@ -188,7 +188,10 @@ pub fn parse_and_setup() -> BenchResult { } }, None => match args.remove("--engine") { - None => BenchType::Workload(BenchWorkload::UniformV1), + None => { + warn!("workload not specified. choosing default workload 'uniform_v1_std'"); + BenchType::Workload(BenchWorkload::UniformV1) + } Some(engine) => BenchType::Legacy(match engine.as_str() { "rookie" => BenchEngine::Rookie, "fury" => BenchEngine::Fury, diff --git a/sky-bench/src/bench.rs b/sky-bench/src/bench.rs index f0eb4aa7..a20369d9 100644 --- a/sky-bench/src/bench.rs +++ b/sky-bench/src/bench.rs @@ -125,17 +125,20 @@ pub fn run(bench: BenchConfig) -> error::BenchResult<()> { config_instance.username(), config_instance.password(), )); - let mut main_thread_db; + let mut main_thread_db = None; let stats = match bench.workload { - BenchType::Workload(BenchWorkload::UniformV1) => return workload::run_bench(), + BenchType::Workload(BenchWorkload::UniformV1) => { + workload::run_bench(workload::UniformV1Std::new()) + } BenchType::Legacy(l) => { warn!("using `--engine` is now deprecated. please consider switching to `--workload`"); info!("running preliminary checks and creating model `bench.bench` with definition: `{{un: binary, pw: uint8}}`"); - main_thread_db = bench_config.config.connect()?; - main_thread_db.query_parse::<()>(&query!("create space bench"))?; - main_thread_db.query_parse::<()>(&query!(format!( + let mut mt_db = bench_config.config.connect()?; + mt_db.query_parse::<()>(&query!("create space bench"))?; + mt_db.query_parse::<()>(&query!(format!( "create model {BENCHMARK_SPACE_ID}.{BENCHMARK_MODEL_ID}(un: binary, pw: uint8)" )))?; + main_thread_db = Some(mt_db); match l { BenchEngine::Rookie => bench_rookie(bench_config), BenchEngine::Fury => bench_fury(), @@ -170,9 +173,11 @@ pub fn run(bench: BenchConfig) -> error::BenchResult<()> { util */ -fn cleanup(mut main_thread_db: Connection) -> Result<(), error::BenchError> { +fn cleanup(main_thread_db: Option) -> Result<(), error::BenchError> { trace!("dropping space and table"); - main_thread_db.query_parse::<()>(&query!("drop space allow not empty bench"))?; + if let Some(mut db) = main_thread_db { + db.query_parse::<()>(&query!("drop space allow not empty bench"))?; + } Ok(()) } diff --git a/sky-bench/src/error.rs b/sky-bench/src/error.rs index c8d820f0..b4ec8fcd 100644 --- a/sky-bench/src/error.rs +++ b/sky-bench/src/error.rs @@ -28,7 +28,7 @@ use { crate::{ bench::BombardTask, legacy::runtime::{fury, rookie::BombardError}, - workload::error::WorkloadDriverError, + workload::error::WorkloadError, }, core::fmt, skytable::error::Error, @@ -40,7 +40,7 @@ pub type BenchResult = Result; pub enum BenchError { ArgsErr(String), DirectDbError(Error), - WorkloadDriverError(WorkloadDriverError), + WorkloadDriverError(WorkloadError), // legacy LegacyRookieEngineError(BombardError), LegacyFuryEngineError(fury::FuryError), @@ -52,8 +52,8 @@ impl From for BenchError { } } -impl From for BenchError { - fn from(e: WorkloadDriverError) -> Self { +impl From for BenchError { + fn from(e: WorkloadError) -> Self { Self::WorkloadDriverError(e) } } diff --git a/sky-bench/src/setup.rs b/sky-bench/src/setup.rs index 926790d0..89c5e383 100644 --- a/sky-bench/src/setup.rs +++ b/sky-bench/src/setup.rs @@ -37,6 +37,7 @@ static mut SETUP: RunnerSetup = RunnerSetup { object_count: 0, }; +#[derive(Debug)] pub struct RunnerSetup { username: String, password: String, diff --git a/sky-bench/src/workload/driver.rs b/sky-bench/src/workload/driver.rs index e9457fc1..bc4d10d1 100644 --- a/sky-bench/src/workload/driver.rs +++ b/sky-bench/src/workload/driver.rs @@ -26,125 +26,59 @@ use { super::{ - error::{WorkloadDriverError, WorkloadResult}, + error::{WorkloadError, WorkloadResult}, Workload, }, crate::{ setup, - stats::{self, WorkerLocalStats}, - workload::GeneratedWorkload, + stats::{self, RuntimeStats, WorkerLocalStats}, }, - skytable::Config, std::{ marker::PhantomData, time::{Duration, Instant}, }, - tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpStream, - sync::{broadcast, mpsc}, - }, + tokio::sync::{broadcast, mpsc}, }; -const TIMEOUT_DURATION: Duration = Duration::from_secs(60); - -pub type EncodedQueryList = Vec>; - mod global { - use { - std::sync::atomic::{AtomicUsize, Ordering}, - tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, - }; - - #[derive(Debug, PartialEq)] - struct Workload { - expected_response_size: usize, - packets: super::EncodedQueryList, - } - pub static POSITION: AtomicUsize = AtomicUsize::new(0); + use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; static WORKLOAD_LOCK: RwLock = RwLock::const_new(true); - static mut WORKLOAD: Workload = Workload { - expected_response_size: 0, - packets: vec![], - }; - pub fn gset_position(position: usize) { - POSITION.store(position, Ordering::Release) - } pub async fn glck_exclusive() -> RwLockWriteGuard<'static, bool> { WORKLOAD_LOCK.write().await } pub async fn glck_shared() -> RwLockReadGuard<'static, bool> { WORKLOAD_LOCK.read().await } - pub unsafe fn gworkload_step() -> Option<&'static [u8]> { - let mut current = POSITION.load(Ordering::Acquire); - loop { - if current == 0 { - return None; - } - match POSITION.compare_exchange( - current, - current - 1, - Ordering::Relaxed, - Ordering::Acquire, - ) { - Ok(new) => return Some(&*WORKLOAD.packets.as_ptr().add(new - 1)), - Err(_current) => current = _current, - } - } - } - pub unsafe fn report_crash() { - POSITION.store(0, Ordering::SeqCst); - } - pub unsafe fn gworkload_resp_size() -> usize { - WORKLOAD.expected_response_size - } - pub unsafe fn deallocate_workload() { - WORKLOAD.packets.clear() - } - pub unsafe fn push_workload(workload: Vec>, expected_resp_size: usize) { - WORKLOAD.packets = workload; - WORKLOAD.expected_response_size = expected_resp_size; - } } #[derive(Debug)] pub struct WorkloadDriver { connection_count: usize, work_result_rx: mpsc::Receiver>, - work_tx: broadcast::Sender, + work_tx: broadcast::Sender>, _wl: PhantomData, } impl WorkloadDriver { - pub async fn initialize(w: &W, config: Config) -> WorkloadResult { + pub async fn initialize() -> WorkloadResult { let connection_count = unsafe { setup::instance() }.connections(); let (online_tx, mut online_rx) = mpsc::channel::>(connection_count); - let (work_tx, _) = broadcast::channel::(connection_count); + let (work_tx, _) = + broadcast::channel::>(connection_count); let (work_result_tx, work_result_rx) = mpsc::channel(connection_count); - for id in 0..connection_count { + for _ in 0..connection_count { let this_online_tx = online_tx.clone(); let this_work_rx = work_tx.subscribe(); let this_work_result_tx = work_result_tx.clone(); - let this_config = config.clone(); - let winit_packets = w.worker_init_packets(); tokio::spawn(async move { - worker_task( - id, - this_config, - this_online_tx, - this_work_rx, - this_work_result_tx, - winit_packets, - ) - .await + worker_task::(this_online_tx, this_work_rx, this_work_result_tx).await }); } let mut initialized = 0; while initialized != connection_count { match online_rx.recv().await { Some(result) => result?, - None => return Err(WorkloadDriverError::Driver("worker task crashed".into())), + None => return Err(WorkloadError::Driver("worker task crashed".into())), } initialized += 1; } @@ -156,129 +90,102 @@ impl WorkloadDriver { _wl: PhantomData, }) } - pub async fn run_workload(mut self, workload: &W) -> WorkloadResult> { + pub async fn run_workload(mut self) -> WorkloadResult> { let mut results = vec![]; - // insert - self.run_workload_task( - "upsert", - workload.generate_upsert(), - &mut results, - workload.get_query_count(), - ) - .await?; - self.run_workload_task( - "insert", - workload.generate_insert(), - &mut results, - workload.get_query_count(), - ) - .await?; - // update - self.run_workload_task( - "update", - workload.generate_update(), - &mut results, - workload.get_query_count(), - ) - .await?; - // select - self.run_workload_task( - "select", - workload.generate_select(), - &mut results, - workload.get_query_count(), - ) - .await?; - // delete - self.run_workload_task( - "delete", - workload.generate_delete(), - &mut results, - workload.get_query_count(), - ) - .await?; - let _ = self.work_tx.send(WorkerTask::Terminate); - Ok(results) - } - async fn run_workload_task( - &mut self, - task_name: &'static str, - task: GeneratedWorkload<(EncodedQueryList, usize)>, - results: &mut Vec<(&'static str, f64)>, - count: usize, - ) -> WorkloadResult<()> { - if let GeneratedWorkload::Workload((encoded_packets, resp_size)) = task { - info!("executing workload task '{task_name}' with {count} queries"); - // lock - let workload_lock = global::glck_exclusive().await; - unsafe { - global::gset_position(count); - global::push_workload(encoded_packets, resp_size); - } - if self.work_tx.send(WorkerTask::GetReady).is_err() { - return Err(WorkloadDriverError::Driver(format!( - "a background worker crashed or exited due to an unknown reason" + for task in W::generate_tasks() { + let permit_exclusive = global::glck_exclusive().await; + info!("running workload task '{}'", W::task_id(&task)); + // setup task + W::task_setup(&task); + // tell workers to get ready + if self + .work_tx + .send(WorkerCommand::GetReady(task.clone())) + .is_err() + { + W::signal_stop(); + return Err(WorkloadError::Driver(format!( + "a background worker crashed" ))); } - // FIXME(@ohsayan): there is a lot of unnecessary time spent in threading etc., so this is very very imprecise - drop(workload_lock); - info!("workload task '{task_name}' execution started"); - let mut i = 0; + // prepare env and start let mut global_start = None; let mut global_stop = None; - while i != self.connection_count { + let mut global_head = u128::MAX; + let mut global_tail = 0; + drop(permit_exclusive); + // wait for all tasks to complete + let mut workers_completed = 0; + while workers_completed != self.connection_count { match self.work_result_rx.recv().await { - Some(Ok(r)) => { + Some(Ok(WorkerLocalStats { + start, + elapsed, + head, + tail, + })) => { + let stop = start + .checked_add(Duration::from_nanos(elapsed.try_into().unwrap())) + .unwrap(); match global_start.as_mut() { - Some(global) => { - if r.start < *global { - *global = r.start; + Some(gs) => { + if start < *gs { + *gs = start; } } - None => global_start = Some(r.start), + None => global_start = Some(start), } - let this_stop = - r.start + Duration::from_nanos(r.elapsed.try_into().unwrap()); match global_stop.as_mut() { - Some(global) => { - if this_stop > *global { - *global = this_stop; + Some(gs) => { + if stop > *gs { + *gs = stop; } } - None => global_stop = Some(this_stop), + None => global_stop = Some(stop), + } + if head < global_head { + global_head = head; + } + if tail > global_tail { + global_tail = tail; } } Some(Err(e)) => { - return Err(WorkloadDriverError::Driver(format!("a worker failed. {e}"))) + W::signal_stop(); + return Err(WorkloadError::Driver(format!("a worker failed. {e}"))); } None => { - return Err(WorkloadDriverError::Driver(format!( - "a background worker crashed or exited due to an unknown reason" - ))) + W::signal_stop(); + return Err(WorkloadError::Driver(format!( + "a background worker failed due to an unknown reason" + ))); } } - i += 1; + workers_completed += 1; } - let qps = stats::qps( - count, - global_stop - .unwrap() - .duration_since(global_start.unwrap()) - .as_nanos(), - ); - results.push((task_name, qps)); - unsafe { global::deallocate_workload() } - info!("workload task '{task_name}' completed"); - } else { - info!("workload task '{task_name}' skipped by workload"); + results.push(( + W::task_id(&task), + RuntimeStats { + qps: stats::qps( + W::task_query_count(&task), + global_stop + .unwrap() + .duration_since(global_start.unwrap()) + .as_nanos(), + ), + head: global_head, + tail: global_tail, + }, + )); + W::task_cleanup(&task); } - Ok(()) + Ok(results) } } impl Drop for WorkloadDriver { fn drop(&mut self) { - let _ = self.work_tx.send(WorkerTask::Terminate); + let _ = self.work_tx.send(WorkerCommand::Terminate); } } @@ -286,179 +193,64 @@ impl Drop for WorkloadDriver { worker */ -macro_rules! timeout { - ($e_tx:expr, $op:expr, $f:expr) => { - match tokio::time::timeout(TIMEOUT_DURATION, $f).await { - Ok(r) => r, - Err(_) => { - let _ = $e_tx - .send(Err(WorkloadDriverError::Driver(format!( - "{} timed out", - $op - )))) - .await; - return; - } - } - }; -} - #[derive(Debug, Clone, Copy)] -enum WorkerTask { - GetReady, +enum WorkerCommand { + GetReady(W), Terminate, } -async fn worker_task( - worker_id: usize, - config: Config, +async fn worker_task( online_tx: mpsc::Sender>, - mut work_rx: broadcast::Receiver, + mut work_rx: broadcast::Receiver>, result_rx: mpsc::Sender>, - (post_init_request, post_init_resp): (Vec, Vec), ) { - let init = async { - /* - initialize the worker connection. - we use a TCP connection instead of the client library's TCP connection since that allows us to precisely time - when the server responds, which is otherwise not possible - */ - // connect - let mut con = TcpStream::connect((config.host(), config.port())).await?; - // prepare handshake - let hs = format!( - "H\0\0\0\0\0{username_length}\n{password_length}\n{username}{password}", - username_length = config.username().len(), - password_length = config.password().len(), - username = config.username(), - password = config.password(), - ) - .into_bytes(); - // send client handshake - con.write_all(&hs).await?; - // read server handshake - let mut hs_resp = [0u8; 4]; - con.read_exact(&mut hs_resp).await?; - Ok((con, hs_resp)) - }; - let (con, hs_resp) = match init.await { - Ok(hs_resp) => hs_resp, + // initialize the worker connection + let mut worker_connection = match W::setup_data_connection().await { + Ok(con) => con, Err(e) => { - let _ = online_tx.send(Err(WorkloadDriverError::Io(e))).await; + let _ = online_tx.send(Err(e)).await; return; } }; - match hs_resp { - [b'H', 0, 0, 0] => {} - [b'H', 0, 1, e] => { - let _ = online_tx - .send(Err(WorkloadDriverError::Db(format!( - "connection rejected by server with hs error code: {}", - e - )))) - .await; - return; - } - _ => { - let _ = online_tx - .send(Err(WorkloadDriverError::Db(format!( - "server responded with unknown handshake {hs_resp:?}" - )))) - .await; - return; - } - } - // now run post init packets - let mut con = con; - if let Err(e) = timeout!( - result_rx, - "sending worker init query", - con.write_all(&post_init_request) - ) { - let _ = online_tx.send(Err(WorkloadDriverError::Io(e))).await; - return; - } - let mut post_init_resp_actual = vec![0; post_init_resp.len()]; - if let Err(e) = timeout!( - result_rx, - "reading worker init response", - con.read_exact(&mut post_init_resp_actual) - ) { - let _ = online_tx.send(Err(WorkloadDriverError::Io(e))).await; - return; - } - if post_init_resp_actual != post_init_resp { - let _ = online_tx.send(Err(WorkloadDriverError::Db(format!("expected post init responses do not match. received {post_init_resp_actual:?} from server")))).await; - return; - } - // we are ready to go let _ = online_tx.send(Ok(())).await; - // wait to act loop { - match work_rx.recv().await.unwrap() { - WorkerTask::GetReady => {} - WorkerTask::Terminate => break, + let workload_ctx = match work_rx.recv().await { + Ok(WorkerCommand::GetReady(wctx)) => wctx, + Ok(WorkerCommand::Terminate) | Err(_) => break, }; + let mut exec_ctx = W::task_exec_context_init(&workload_ctx); + let work_permit = global::glck_shared().await; + let local_start = Instant::now(); let mut head = u128::MAX; - let mut tail = 0u128; - let mut elapsed = 0u128; - let mut read_buffer = vec![ - 0; - unsafe { - // UNSAFE(@ohsayan): getting a ready command indicates that the main task has already set the workload up - global::gworkload_resp_size() - } - ]; - let _work_permit_that_is_hard_to_get = global::glck_shared().await; - let start = Instant::now(); - while let Some(work) = unsafe { - // UNSAFE(@ohsayan): since we received an execution request, this is safe to do - global::gworkload_step() - } { - let start = Instant::now(); - if con.write_all(work).await.is_err() { - unsafe { - // UNSAFE(@ohsayan): we hit an error, no matter how many workers call this in parallel, this is safe - global::report_crash(); - let _ = result_rx - .send(Err(WorkloadDriverError::Db(format!( - "worker-{worker_id} failed to send query from the server" - )))) - .await; - return; - } - } - if con.read_exact(&mut read_buffer).await.is_err() { - unsafe { - // UNSAFE(@ohsayan): see above - global::report_crash(); - let _ = result_rx - .send(Err(WorkloadDriverError::Db(format!( - "worker-{worker_id} failed to read response from the server" - )))) - .await; - return; - } - } - let stop = Instant::now(); - let full_query_execution_time = stop.duration_since(start).as_nanos(); - if full_query_execution_time < head { - head = full_query_execution_time; + let mut tail = 0; + let mut net_elapsed = 0; + while let Some(task) = W::fetch_next_payload() { + let (start, stop) = + match W::execute_payload(&mut exec_ctx, &mut worker_connection, task).await { + Ok(time) => time, + Err(e) => { + W::signal_stop(); + let _ = result_rx.send(Err(e)).await; + return; + } + }; + let this_elapsed = stop.duration_since(start).as_nanos(); + if this_elapsed > tail { + tail = this_elapsed; } - if full_query_execution_time > tail { - tail = full_query_execution_time + if this_elapsed < head { + head = this_elapsed; } - elapsed += full_query_execution_time; - // FIXME(@ohsayan): validate the response here + net_elapsed += this_elapsed; } - // we're done here + drop(work_permit); let _ = result_rx - .send(Ok(WorkerLocalStats { - start, + .send(Ok(WorkerLocalStats::new( + local_start, + net_elapsed, head, tail, - elapsed, - })) + ))) .await; } } diff --git a/sky-bench/src/workload/error.rs b/sky-bench/src/workload/error.rs index 6d05b8ed..4a36fe92 100644 --- a/sky-bench/src/workload/error.rs +++ b/sky-bench/src/workload/error.rs @@ -26,21 +26,37 @@ use std::{fmt, io}; +use skytable::error::Error; + #[derive(Debug)] -pub enum WorkloadDriverError { +pub enum WorkloadError { Io(io::Error), Db(String), Driver(String), } -pub type WorkloadResult = Result; +pub type WorkloadResult = Result; -impl fmt::Display for WorkloadDriverError { +impl fmt::Display for WorkloadError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Io(e) => write!(f, "i/o error: {e}"), Self::Db(e) => write!(f, "db error: {e}"), - Self::Driver(e) => write!(f, "workload driver error: {e}"), + Self::Driver(e) => write!(f, "driver error: {e}"), } } } + +impl From for WorkloadError { + fn from(e: Error) -> Self { + Self::Db(format!( + "direct operation on control connection failed: {e}" + )) + } +} + +impl From for WorkloadError { + fn from(e: io::Error) -> Self { + Self::Io(e) + } +} diff --git a/sky-bench/src/workload/mod.rs b/sky-bench/src/workload/mod.rs index df37fdab..1c27bbb6 100644 --- a/sky-bench/src/workload/mod.rs +++ b/sky-bench/src/workload/mod.rs @@ -26,191 +26,92 @@ pub mod driver; pub mod error; +mod uniform_v1_std; + +pub use uniform_v1_std::UniformV1Std; use { - crate::{ - error::BenchResult, - setup::{self, RunnerSetup}, - workload::driver::WorkloadDriver, - }, - skytable::{query, Config, ConnectionAsync}, + self::error::WorkloadResult, + crate::{error::BenchResult, setup, stats::RuntimeStats, workload::driver::WorkloadDriver}, + std::{future::Future, process, time::Instant}, + tokio::runtime::Builder, }; -#[tokio::main] -pub async fn run_bench() -> BenchResult<()> { - let setup = unsafe { setup::instance() }; - let config = Config::new( - setup.host(), - setup.port(), - setup.username(), - setup.password(), - ); - let mut main_thread_db = config.connect_async().await?; - let workload = UniformV1::new(setup); - workload.initialize(&mut main_thread_db).await?; - let ret = run(&workload, config).await; - if let Err(e) = workload.cleanup(&mut main_thread_db).await { - info!("failed to clean up DB: {e}"); - } - ret -} - -async fn run(workload: &W, config: Config) -> BenchResult<()> { - info!("initializing workload driver"); - let driver = WorkloadDriver::initialize(workload, config).await?; - info!("beginning execution of workload {}", W::NAME); - for (query, qps) in driver.run_workload(workload).await? { - println!("{query}={qps:?}/sec"); - } - Ok(()) -} - -/* - uniform_v1 - ----- - - 1:1:1:1 Insert, select, update, delete - - all unique rows -*/ - -pub struct UniformV1 { - key_size: usize, - query_count: usize, -} - -impl UniformV1 { - pub const DEFAULT_SPACE: &'static str = "db"; - pub const DEFAULT_MODEL: &'static str = "db"; - pub fn new(setup: &RunnerSetup) -> Self { - Self { - key_size: setup.object_size(), - query_count: setup.object_count(), +pub fn run_bench(w: W) -> BenchResult<(u64, Vec<(&'static str, RuntimeStats)>)> { + let runtime = Builder::new_multi_thread() + .worker_threads(unsafe { setup::instance().threads() }) + .enable_all() + .build() + .unwrap(); + runtime.block_on(async move { + let sig = tokio::signal::ctrl_c(); + let mut control_connection = w.setup_control_connection().await?; + info!("initializing workload '{}'", W::ID); + let wl_drv = WorkloadDriver::::initialize().await?; + info!("executing workload '{}'", W::ID); + tokio::select! { + r_ = wl_drv.run_workload() => { + let r = r_?; + if let Err(e) = w.cleanup(&mut control_connection).await { + error!("failed to clean up database. {e}"); + } + Ok((w.total_queries() as u64, r)) + } + _ = sig => { + W::signal_stop(); + info!("received termination signal. cleaning up"); + if let Err(e) = w.cleanup(&mut control_connection).await { + error!("failed to clean up database. {e}"); + } + process::exit(0x00); + } } - } - fn fmt_pk(&self, current: usize) -> Vec { - format!("{current:0>width$}", width = self.key_size).into_bytes() - } -} - -impl Workload for UniformV1 { - const NAME: &'static str = "uniform_v1"; - fn get_query_count(&self) -> usize { - self.query_count - } - fn worker_init_packets(&self) -> (Vec, Vec) { - ( - query!(format!("use {}", Self::DEFAULT_SPACE)).debug_encode_packet(), - [0x12].into(), - ) - } - async fn initialize(&self, db: &mut ConnectionAsync) -> BenchResult<()> { - db.query_parse::<()>(&query!(format!("create space {}", Self::DEFAULT_SPACE))) - .await?; - db.query_parse::<()>(&query!(format!( - "create model {}.{}(k: binary, v: uint8)", - Self::DEFAULT_SPACE, - Self::DEFAULT_MODEL - ))) - .await?; - Ok(()) - } - async fn cleanup(&self, db: &mut ConnectionAsync) -> BenchResult<()> { - db.query_parse::<()>(&query!(format!( - "drop model allow not empty {}.{}", - Self::DEFAULT_SPACE, - Self::DEFAULT_MODEL - ))) - .await?; - db.query_parse::<()>(&query!(format!("drop space {}", Self::DEFAULT_SPACE))) - .await?; - Ok(()) - } - // workload generation - fn generate_upsert(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)> { - GeneratedWorkload::Skipped - } - fn generate_insert(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)> { - let mut queries = vec![]; - for i in 0..self.query_count { - queries.push( - query!( - format!("ins into {}(?,?)", Self::DEFAULT_MODEL), - self.fmt_pk(i), - 0u8 - ) - .debug_encode_packet() - .into_boxed_slice(), - ); - } - // resp is the empty byte - GeneratedWorkload::Workload((queries, 1)) - } - fn generate_select(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)> { - let mut queries = vec![]; - for i in 0..self.query_count { - queries.push( - query!( - format!("sel v from {} where k = ?", Self::DEFAULT_MODEL), - self.fmt_pk(i) - ) - .debug_encode_packet() - .into_boxed_slice(), - ); - } - // resp is {row_code}{row_size}\n{int_code}{int}\n - GeneratedWorkload::Workload((queries, 6)) - } - fn generate_update(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)> { - let mut queries = vec![]; - for i in 0..self.query_count { - queries.push( - query!( - format!("upd {} set v += ? where k = ?", Self::DEFAULT_MODEL), - 1u8, - self.fmt_pk(i) - ) - .debug_encode_packet() - .into_boxed_slice(), - ); - } - // resp is the empty byte - GeneratedWorkload::Workload((queries, 1)) - } - fn generate_delete(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)> { - let mut queries = vec![]; - for i in 0..self.query_count { - queries.push( - query!( - format!("del from {} where k = ?", Self::DEFAULT_MODEL), - self.fmt_pk(i) - ) - .debug_encode_packet() - .into_boxed_slice(), - ); - } - // resp is the empty byte - GeneratedWorkload::Workload((queries, 1)) - } -} - -/* - workload definition -*/ - -#[derive(Debug)] -pub enum GeneratedWorkload { - Workload(T), - Skipped, + }) } -pub trait Workload: Sized { - const NAME: &'static str; - fn get_query_count(&self) -> usize; - fn worker_init_packets(&self) -> (Vec, Vec); - async fn initialize(&self, db: &mut ConnectionAsync) -> BenchResult<()>; - fn generate_upsert(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)>; - fn generate_insert(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)>; - fn generate_select(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)>; - fn generate_update(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)>; - fn generate_delete(&self) -> GeneratedWorkload<(driver::EncodedQueryList, usize)>; - async fn cleanup(&self, db: &mut ConnectionAsync) -> BenchResult<()>; +pub trait Workload { + /// name of the workload + const ID: &'static str; + /// the control connection + type ControlPort; + /// workload context, forming a part of the full workload + type WorkloadContext: Clone + Send + Sync + 'static; + /// a workload task + type WorkloadPayload: Clone + Send + Sync + 'static; + /// the data connection + type DataPort: Send + Sync; + /// task execution context + type TaskExecContext: Send + Sync; + // main thread + async fn setup_control_connection(&self) -> WorkloadResult; + /// clean up + async fn cleanup(&self, control: &mut Self::ControlPort) -> WorkloadResult<()>; + // task + fn total_queries(&self) -> usize; + /// get the tasks for this workload + fn generate_tasks() -> Vec; + /// get the ID of this workload task + fn task_id(t: &Self::WorkloadContext) -> &'static str; + /// get the number of queries run for this task + fn task_query_count(t: &Self::WorkloadContext) -> usize; + /// set up this task + fn task_setup(t: &Self::WorkloadContext); + /// clean up this task's generated data + fn task_cleanup(t: &Self::WorkloadContext); + /// initialize the task execution context + fn task_exec_context_init(t: &Self::WorkloadContext) -> Self::TaskExecContext; + // worker methods + /// setup up the worker connection + fn setup_data_connection( + ) -> impl Future> + Send + 'static; + /// get the next payload + fn fetch_next_payload() -> Option; + /// execute this payload + fn execute_payload( + ctx: &mut Self::TaskExecContext, + data_port: &mut Self::DataPort, + pl: Self::WorkloadPayload, + ) -> impl Future> + Send; + /// signal to terminate all worker threads + fn signal_stop(); } diff --git a/sky-bench/src/workload/uniform_v1_std.rs b/sky-bench/src/workload/uniform_v1_std.rs new file mode 100644 index 00000000..3ed23103 --- /dev/null +++ b/sky-bench/src/workload/uniform_v1_std.rs @@ -0,0 +1,229 @@ +/* + * Created on Sun Apr 28 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +/*! + * # `uniform_v1_std` workload + * + * This workload is a very real-world workload where we first create multiple unique rows using an `INSERT`, then mutate these rows using an `UPDATE`, + * select a column using a `SELECT` and finally remove the row using `DELETE`. + * + * **We time a full execution**: Including encoding the entire query at execution time, sending the query, waiting to receive a response, validating and + * parsing a response and allocating structures to store this response, like you would do in the real-world when using Skytable. + * + * This is very different from tools like `redis-benchmark` which simply send the same encoded query multiple times over. We instead aim to do a + * real-world benchmark. +*/ + +use { + super::{error::WorkloadResult, Workload}, + crate::setup, + skytable::{query, Config, ConnectionAsync, Query}, + std::{ + future::Future, + sync::atomic::{AtomicUsize, Ordering}, + time::Instant, + }, +}; + +static TARGET: AtomicUsize = AtomicUsize::new(0); +static mut WL: WorkloadData = WorkloadData { + queries: Vec::new(), +}; + +const DEFAULT_SPACE: &'static str = "db"; +const DEFAULT_MODEL: &'static str = "db"; + +struct WorkloadData { + queries: Vec, +} + +#[derive(Debug)] +/// A real-worl, uniform distribution (INSERT,UPDATE,SELECT,DELETE in a 1:1:1:1 distribution) workload that inserts multiple unique rows, manipulates them, +/// selects them and finally removes them +pub struct UniformV1Std(()); + +impl UniformV1Std { + pub fn new() -> Self { + Self(()) + } +} + +#[derive(Clone, Copy)] +pub struct UniformV1Task { + id: &'static str, + f: fn(usize) -> Query, +} + +impl UniformV1Task { + fn new(id: &'static str, f: fn(usize) -> Query) -> Self { + Self { id, f } + } + fn fmt_pk(current: usize) -> Vec { + Self::_fmt_pk(current, unsafe { setup::instance() }.object_size()) + } + fn _fmt_pk(current: usize, width: usize) -> Vec { + format!("{current:0>width$}",).into_bytes() + } +} + +impl Workload for UniformV1Std { + const ID: &'static str = "uniform_v1_std"; + type ControlPort = ConnectionAsync; + type WorkloadContext = UniformV1Task; + type WorkloadPayload = &'static Query; + type DataPort = ConnectionAsync; + type TaskExecContext = (); + async fn setup_control_connection(&self) -> WorkloadResult { + let setup = unsafe { setup::instance() }; + let mut con = Config::new( + setup.host(), + setup.port(), + setup.username(), + setup.password(), + ) + .connect_async() + .await?; + con.query_parse::<()>(&query!(format!("create space {DEFAULT_SPACE}"))) + .await?; + con.query_parse::<()>(&query!(format!( + "create model {DEFAULT_SPACE}.{DEFAULT_MODEL}(k: binary, v: uint8)" + ))) + .await?; + Ok(con) + } + async fn cleanup(&self, c: &mut Self::ControlPort) -> WorkloadResult<()> { + c.query_parse::<()>(&query!(format!( + "DROP SPACE ALLOW NOT EMPTY {DEFAULT_SPACE}" + ))) + .await?; + Ok(()) + } + fn total_queries(&self) -> usize { + unsafe { setup::instance() }.object_count() * 4 + } + fn generate_tasks() -> Vec { + vec![ + UniformV1Task::new("INSERT", |unique_id| { + query!( + format!("INS into {DEFAULT_MODEL}(?, ?)"), + UniformV1Task::fmt_pk(unique_id), + 0u8 + ) + }), + UniformV1Task::new("UPDATE", |unique_id| { + query!( + format!("UPD {DEFAULT_MODEL} SET v += ? WHERE k = ?"), + 1u8, + UniformV1Task::fmt_pk(unique_id), + ) + }), + UniformV1Task::new("SELECT", |unique_id| { + query!( + format!("SEL v FROM {DEFAULT_MODEL} WHERE k = ?"), + UniformV1Task::fmt_pk(unique_id), + ) + }), + UniformV1Task::new("DELETE", |unique_id| { + query!( + format!("DEL FROM {DEFAULT_MODEL} WHERE k = ?"), + UniformV1Task::fmt_pk(unique_id), + ) + }), + ] + } + fn task_id(t: &Self::WorkloadContext) -> &'static str { + t.id + } + fn task_query_count(_: &Self::WorkloadContext) -> usize { + unsafe { setup::instance() }.object_count() + } + fn task_setup(t: &Self::WorkloadContext) { + let setup = &unsafe { setup::instance() }; + for i in 0..setup.object_count() { + unsafe { + WL.queries.push((t.f)(i)); + } + } + TARGET.store(setup.object_count(), Ordering::Release); + } + fn task_cleanup(_: &Self::WorkloadContext) { + unsafe { + WL.queries.clear(); + } + } + fn task_exec_context_init(_: &Self::WorkloadContext) -> Self::TaskExecContext {} + async fn setup_data_connection() -> WorkloadResult { + let setup = unsafe { setup::instance() }; + let mut con = Config::new( + setup.host(), + setup.port(), + setup.username(), + setup.password(), + ) + .connect_async() + .await?; + con.query_parse::<()>(&query!(format!("use {DEFAULT_SPACE}"))) + .await?; + Ok(con) + } + #[inline(always)] + fn fetch_next_payload() -> Option { + let mut current = TARGET.load(Ordering::Acquire); + loop { + if current == 0 { + return None; + } + match TARGET.compare_exchange( + current, + current - 1, + Ordering::Relaxed, + Ordering::Acquire, + ) { + Ok(new) => return unsafe { Some(&*WL.queries.as_ptr().add(new - 1)) }, + Err(_current) => current = _current, + } + } + } + #[inline(always)] + fn execute_payload( + _: &mut Self::TaskExecContext, + data_port: &mut Self::DataPort, + pl: Self::WorkloadPayload, + ) -> impl Future> + Send { + async move { + let start = Instant::now(); + // fully run a query by validating it, allocating any lists or maps or blobs + // like you would in a real world application + let _ = data_port.query(&pl).await?; + let stop = Instant::now(); + Ok((start, stop)) + } + } + #[inline(always)] + fn signal_stop() { + TARGET.store(0, Ordering::SeqCst); + } +}