Skip to content

Commit

Permalink
Replaced Configuration constructors with ConfigurationBuilder (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrzenioszeniou committed Nov 5, 2021
1 parent e1794f9 commit fc57b91
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 115 deletions.
183 changes: 109 additions & 74 deletions src/load/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Strategy {
Duration(std::time::Duration),
PerThread(u32),
Overall(u32),
PerThread(u32),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -16,109 +17,143 @@ pub enum Monitor {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Configuration {
thread_no: usize,
strategy: Strategy,
step_delay: u64,
fetch_limit: Option<usize>,
monitor: Monitor,
shutdown_grace_period: u32,
status_pace: u64,
shutdown_grace_period: Duration,
status_pace: Duration,
step_delay: Duration,
strategy: Strategy,
thread_no: usize,
}

impl Configuration {
pub fn duration(
thread_no: usize,
duration: std::time::Duration,
step_delay: u64,
fetch_limit: Option<usize>,
monitor: Monitor,
shutdown_grace_period: u32,
status_pace: u64,
) -> Configuration {
Self {
thread_no,
strategy: Strategy::Duration(duration),
step_delay,
fetch_limit,
monitor,
shutdown_grace_period,
status_pace,
pub fn fetch_limit(&self) -> Option<usize> {
self.fetch_limit
}

pub fn monitor(&self) -> &Monitor {
&self.monitor
}

pub fn shutdown_grace_period(&self) -> Duration {
self.shutdown_grace_period
}
pub fn status_pace(&self) -> Duration {
self.status_pace
}

pub fn step_delay(&self) -> Duration {
self.step_delay
}

pub fn strategy(&self) -> &Strategy {
&self.strategy
}

pub fn thread_no(&self) -> usize {
self.thread_no
}

pub fn total_votes(&self) -> u32 {
match self.strategy() {
Strategy::Duration(duration) => {
(duration.as_millis() / self.step_delay.as_millis()) as u32
}
Strategy::PerThread(per_thread) => self.thread_no() as u32 * per_thread,
Strategy::Overall(overall) => *overall,
}
}
}

pub struct ConfigurationBuilder {
fetch_limit: Option<usize>,
monitor: Monitor,
shutdown_grace_period: Duration,
status_pace: Duration,
step_delay: Duration,
strategy: Strategy,
thread_no: usize,
}

pub fn requests_per_thread(
thread_no: usize,
requests_count: u32,
step_delay: u64,
fetch_limit: Option<usize>,
monitor: Monitor,
shutdown_grace_period: u32,
status_pace: u64,
) -> Configuration {
impl ConfigurationBuilder {
pub fn duration(duration: Duration) -> Self {
Self {
thread_no,
strategy: Strategy::PerThread(requests_count),
step_delay,
fetch_limit,
monitor,
shutdown_grace_period,
status_pace,
fetch_limit: None,
monitor: Monitor::Disabled(100),
shutdown_grace_period: Duration::ZERO,
status_pace: Duration::from_secs(1),
step_delay: Duration::from_millis(100),
strategy: Strategy::Duration(duration),
thread_no: 1,
}
}

pub fn overall_requests(
thread_no: usize,
requests_count: u32,
step_delay: u64,
fetch_limit: Option<usize>,
monitor: Monitor,
shutdown_grace_period: u32,
status_pace: u64,
) -> Configuration {
pub fn overall_requests(n_requests: u32) -> Self {
Self {
thread_no,
strategy: Strategy::Overall(requests_count),
step_delay,
fetch_limit,
monitor,
shutdown_grace_period,
status_pace,
fetch_limit: None,
monitor: Monitor::Disabled(100),
shutdown_grace_period: Duration::ZERO,
status_pace: Duration::from_secs(1),
step_delay: Duration::from_millis(100),
strategy: Strategy::Overall(n_requests),
thread_no: 1,
}
}

pub fn thread_no(&self) -> usize {
self.thread_no
pub fn requests_per_thread(n_requests: u32) -> Self {
Self {
fetch_limit: None,
monitor: Monitor::Disabled(100),
shutdown_grace_period: Duration::ZERO,
status_pace: Duration::from_secs(1),
step_delay: Duration::from_millis(100),
strategy: Strategy::PerThread(n_requests),
thread_no: 1,
}
}

pub fn strategy(&self) -> &Strategy {
&self.strategy
pub fn fetch_limit(self, fetch_limit: usize) -> Self {
Self {
fetch_limit: Some(fetch_limit),
..self
}
}

pub fn step_delay(&self) -> u64 {
self.step_delay
pub fn monitor(self, monitor: Monitor) -> Self {
Self { monitor, ..self }
}

pub fn status_pace(&self) -> u64 {
self.status_pace
pub fn shutdown_grace_period(self, shutdown_grace_period: Duration) -> Self {
Self {
shutdown_grace_period,
..self
}
}

pub fn fetch_limit(&self) -> Option<usize> {
self.fetch_limit
pub fn status_pace(self, status_pace: Duration) -> Self {
Self {
status_pace,
..self
}
}

pub fn monitor(&self) -> &Monitor {
&self.monitor
pub fn step_delay(self, step_delay: Duration) -> Self {
Self { step_delay, ..self }
}

pub fn shutdown_grace_period(&self) -> u32 {
self.shutdown_grace_period
pub fn thread_no(self, thread_no: usize) -> Self {
Self { thread_no, ..self }
}

pub fn total_votes(&self) -> u32 {
match self.strategy() {
Strategy::Duration(duration) => (duration.as_millis() / self.step_delay as u128) as u32,
Strategy::PerThread(per_thread) => self.thread_no() as u32 * per_thread,
Strategy::Overall(overall) => *overall,
pub fn build(self) -> Configuration {
Configuration {
fetch_limit: self.fetch_limit,
monitor: self.monitor,
shutdown_grace_period: self.shutdown_grace_period,
status_pace: self.status_pace,
step_delay: self.step_delay,
strategy: self.strategy,
thread_no: self.thread_no,
}
}
}
9 changes: 3 additions & 6 deletions src/load/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use std::{sync::Arc, time::Instant};

mod config;
mod monitor;
Expand All @@ -14,7 +11,7 @@ mod status;

use crate::load::rayon::{DurationRequestConsumer, Executor, FixedCountRequestConsumer};
use ::rayon::iter::plumbing::bridge_unindexed;
pub use config::{Configuration, Monitor, Strategy};
pub use config::{Configuration, ConfigurationBuilder, Monitor, Strategy};
pub use indicatif::{MultiProgress, ProgressBar};
pub use monitor::MonitorThread;
pub use progress::{use_as_monitor_progress_bar, use_as_status_progress_bar};
Expand Down Expand Up @@ -221,7 +218,7 @@ where
let request_generator = RayonWrapper::from(request_generator);
println!("Running load using {:?}", config.strategy());
let mut executor = Executor::new(config.thread_no());
let delay = Duration::from_millis(config.step_delay());
let delay = config.step_delay();
let strategy = config.strategy().clone();
let thread_no = config.thread_no();
executor.spawn(move || match strategy {
Expand Down
14 changes: 9 additions & 5 deletions src/load/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use std::{
thread::{self, JoinHandle},
};

const SHUTDOWN_PERIOD_INTERVAL: Duration = Duration::from_secs(1);

#[derive(Debug)]
pub struct StatusUpdaterThread {
stop_signal: Sender<()>,
Expand Down Expand Up @@ -138,8 +140,8 @@ impl StatusUpdaterThread {
monitor: Monitor,
fetch_limit: Option<usize>,
title: &str,
shutdown_grace_period: u32,
pace: u64,
mut shutdown_grace_period: Duration,
pace: Duration,
) -> Self
where
S: RequestStatusProvider + Send + 'static,
Expand All @@ -156,7 +158,7 @@ impl StatusUpdaterThread {
Ok(_) | Err(TryRecvError::Disconnected) => {
progress_bar.set_message("Waiting for all messages to be accepted or rejected");

for _ in 0..shutdown_grace_period {
while !shutdown_grace_period.is_zero() {
let statuses = update_statuses(
&responses_clone,
fetch_limit,
Expand All @@ -174,14 +176,16 @@ impl StatusUpdaterThread {
pending_statuses.len()
));
}
std::thread::sleep(std::time::Duration::from_secs(1));
std::thread::sleep(SHUTDOWN_PERIOD_INTERVAL);
shutdown_grace_period =
shutdown_grace_period.saturating_sub(SHUTDOWN_PERIOD_INTERVAL);
}
break;
}
Err(TryRecvError::Empty) => {}
}
update_statuses(&responses_clone, fetch_limit, &request_status_provider);
std::thread::sleep(std::time::Duration::from_secs(pace));
std::thread::sleep(pace);
});
Self {
stop_signal: tx,
Expand Down
48 changes: 18 additions & 30 deletions tests/load.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use jortestkit::load::ConfigurationBuilder;
pub use jortestkit::load::{
self, Configuration, Id, Monitor, Request, RequestFailure, RequestGenerator, RequestStatus,
RequestStatusProvider, Response,
Expand All @@ -21,35 +22,27 @@ impl RequestGenerator for SampleRequestGenerator {
}

fn split(self) -> (Self, Option<Self>) {
(self.clone(), None)
(self, None)
}
}

#[test]
pub fn load_sanity_sync() {
let config = Configuration::duration(
1,
std::time::Duration::from_secs(3),
50,
None,
Monitor::Progress(10),
0,
1,
);
let config = ConfigurationBuilder::duration(Duration::from_secs(3))
.step_delay(Duration::from_millis(50))
.monitor(Monitor::Progress(10))
.build();

load::start_sync(SampleRequestGenerator { counter: 1 }, config, "Mock load");
}

#[test]
pub fn load_sanity_multi_sync() {
let config = Configuration::duration(
5,
std::time::Duration::from_secs(5),
10,
None,
Monitor::Progress(100),
0,
1,
);
let config = ConfigurationBuilder::duration(Duration::from_secs(5))
.thread_no(5)
.step_delay(Duration::from_millis(10))
.monitor(Monitor::Progress(100))
.build();

load::start_multi_sync(vec![
(
Expand All @@ -64,7 +57,7 @@ pub fn load_sanity_multi_sync() {
),
(
SampleRequestGenerator { counter: 1 },
config.clone(),
config,
"Mock multi load #3".to_string(),
),
]);
Expand All @@ -87,7 +80,7 @@ impl RequestGenerator for AsyncSampleRequestGenerator {
}

fn split(self) -> (Self, Option<Self>) {
(self.clone(), None)
(self, None)
}
}

Expand All @@ -104,15 +97,10 @@ impl RequestStatusProvider for SampleRequestStatusProvider {

#[test]
pub fn load_sanity_async() {
let config = Configuration::duration(
1,
std::time::Duration::from_secs(3),
50,
None,
Monitor::Progress(10),
0,
1,
);
let config = ConfigurationBuilder::duration(Duration::from_secs(3))
.step_delay(Duration::from_millis(50))
.monitor(Monitor::Progress(10))
.build();
load::start_async(
AsyncSampleRequestGenerator { counter: 1 },
SampleRequestStatusProvider,
Expand Down

0 comments on commit fc57b91

Please sign in to comment.