Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmark: use output files instead of directories #564

Merged
merged 1 commit into from Oct 27, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 40 additions & 46 deletions scripts/reana_bench.py
Expand Up @@ -61,7 +61,7 @@ def _build_command(command_type: str, workflow_name: str) -> List[str]:
return ["reana-client", command_type, "-w", workflow_name]


def _create_workflow(workflow: str, file: str) -> NoReturn:
def _create_workflow(workflow: str, file: str) -> None:
reana_specification = load_reana_spec(
click.format_filename(file),
access_token=REANA_ACCESS_TOKEN,
Expand All @@ -70,12 +70,12 @@ def _create_workflow(workflow: str, file: str) -> NoReturn:
create_workflow(reana_specification, workflow, REANA_ACCESS_TOKEN)


def _upload_workflow(workflow: str) -> NoReturn:
def _upload_workflow(workflow: str) -> None:
upload_cmd = _build_command("upload", workflow)
subprocess.run(upload_cmd, stdout=subprocess.DEVNULL)


def _create_and_upload_single_workflow(workflow_name: str, file: str):
def _create_and_upload_single_workflow(workflow_name: str, file: str) -> None:
_create_workflow(workflow_name, file)
_upload_workflow(workflow_name)

Expand All @@ -89,7 +89,7 @@ def _create_and_upload_workflows(
n: int,
file: Optional[str] = None,
workers: int = WORKERS_DEFAULT_COUNT,
) -> NoReturn:
) -> None:
logging.info(f"Creating and uploading {n} workflows...")
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = [
Expand Down Expand Up @@ -238,7 +238,7 @@ def _max_min_mean_median(series: pd.Series) -> (int, int, int, int):
return int(max_value), int(min_value), int(mean_value), int(median_value)


def _execution_progress_plot(folder_path: Path, df: pd.DataFrame) -> NoReturn:
def _execution_progress_plot(path: Path, df: pd.DataFrame) -> None:
plt.clf()

sorted_df = df.sort_values(["submit_date", "submit_number"])
Expand All @@ -264,10 +264,10 @@ def _execution_progress_plot(folder_path: Path, df: pd.DataFrame) -> NoReturn:
plt.xlabel("workflow run")
plt.ylabel("time [s]")
plt.legend()
plt.savefig(f"{folder_path}/execution_progress.png")
plt.savefig(path)


def _execution_status_plot(folder_path: Path, df: pd.DataFrame) -> NoReturn:
def _execution_status_plot(path: Path, df: pd.DataFrame) -> None:
plt.clf()
total = len(df)
statuses = list(df["status"].unique())
Expand All @@ -279,12 +279,12 @@ def _execution_status_plot(folder_path: Path, df: pd.DataFrame) -> NoReturn:
)
plt.title("Status distribution")
plt.text(-1, 1, f"total: {total}")
plt.savefig(f"{folder_path}/execution_status.png")
plt.savefig(path)


def _create_histogram_plot(
folder_path: Path, series: pd.Series, bin_size: int, label: str,
) -> NoReturn:
path: Path, series: pd.Series, bin_size: int, label: str,
) -> None:
plt.clf()
plt.hist(series, bin_size, color="b", label=label)
plt.xlabel(f"{label} [s] (bin size = {bin_size})")
Expand All @@ -297,30 +297,38 @@ def _create_histogram_plot(
f"fastest: {fastest}, median: {median}," f" mean: {mean}, slowest: {slowest}"
)

plt.savefig(f"{folder_path}/histogram_{label}.png")
plt.savefig(path)


def _total_time_histogram(folder_path: Path, df: pd.DataFrame) -> NoReturn:
_create_histogram_plot(
folder_path, df["runtime"] + df["pending_time"], 10, "total_time"
)
def _total_time_histogram(path: Path, df: pd.DataFrame) -> None:
_create_histogram_plot(path, df["runtime"] + df["pending_time"], 10, "total_time")


def _runtime_histogram(folder_path: Path, df: pd.DataFrame) -> NoReturn:
_create_histogram_plot(folder_path, df["runtime"], 10, "runtime")
def _runtime_histogram(path: Path, df: pd.DataFrame) -> None:
_create_histogram_plot(path, df["runtime"], 10, "runtime")


def _pending_time_histogram(folder_path: Path, df: pd.DataFrame) -> NoReturn:
_create_histogram_plot(folder_path, df["pending_time"], 10, "pending_time")
def _pending_time_histogram(path: Path, df: pd.DataFrame) -> None:
_create_histogram_plot(path, df["pending_time"], 10, "pending_time")


def _create_plots(folder_path: Path, df: pd.DataFrame) -> NoReturn:
def _create_plots(prefix: str, df: pd.DataFrame) -> None:
logging.info("Creating plots...")
_execution_progress_plot(folder_path, df)
_execution_status_plot(folder_path, df)
_total_time_histogram(folder_path, df)
_runtime_histogram(folder_path, df)
_pending_time_histogram(folder_path, df)

progress_plot_path = Path(f"{prefix}_execution_progress.png")
_execution_progress_plot(progress_plot_path, df)

status_plot_path = Path(f"{prefix}_execution_status.png")
_execution_status_plot(status_plot_path, df)

total_time_histogram_path = Path(f"{prefix}_histogram_total_time.png")
_total_time_histogram(total_time_histogram_path, df)

runtime_histogram_path = Path(f"{prefix}_histogram_runtime.png")
_runtime_histogram(runtime_histogram_path, df)

pending_time_histogram_path = Path(f"{prefix}_histogram_pending_time.png")
_pending_time_histogram(pending_time_histogram_path, df)


def _start_benchmark(
Expand All @@ -337,20 +345,16 @@ def _start_benchmark(
return submitted_results


def _build_results_folder_path(workflow: str) -> Path:
return Path(f"benchmark_{workflow}")


def _build_original_results_path(workflow: str) -> Path:
return Path(f"{_build_results_folder_path(workflow)}/original_results.csv")
return Path(f"{workflow}_original_results.csv")


def _build_submitted_results_path(workflow: str) -> Path:
return Path(f"{_build_results_folder_path(workflow)}/submitted_results.csv")
return Path(f"{workflow}_submitted_results.csv")


def _build_processed_results_path(workflow: str) -> Path:
return Path(f"{_build_results_folder_path(workflow)}/processed_results.csv")
return Path(f"{workflow}_processed_results.csv")


def _merge_workflows_and_submitted_results(
Expand Down Expand Up @@ -385,17 +389,8 @@ def _save_original_results(workflow: str, df: pd.DataFrame):
default=WORKERS_DEFAULT_COUNT,
type=int,
)
def launch(workflow: str, number: int, file: str, concurrency: int):
def launch(workflow: str, number: int, file: str, concurrency: int) -> NoReturn:
"""Launch multiple workflows."""
results_folder_path = _build_results_folder_path(workflow)

try:
os.mkdir(results_folder_path)
except FileExistsError:
logging.info(
"Benchmark folder already exists. Will overwrite previous results."
)

try:
submitted_results = _start_benchmark(workflow, number, file, concurrency)
except Exception as e:
Expand All @@ -411,7 +406,7 @@ def launch(workflow: str, number: int, file: str, concurrency: int):

@cli.command()
@click.option("--workflow", "-w", help="Name of the workflow", required=True, type=str)
def analyze(workflow: str):
def analyze(workflow: str) -> NoReturn:
"""Produce various plots and derive metrics based on launch results collected before."""
original_results_path = _build_original_results_path(workflow)
original_results = pd.read_csv(original_results_path)
Expand All @@ -422,8 +417,7 @@ def analyze(workflow: str):
processed_results_path = _build_processed_results_path(workflow)
processed_results.to_csv(processed_results_path, index=False)

results_folder_path = _build_results_folder_path(workflow)
_create_plots(results_folder_path, processed_results)
_create_plots(workflow, processed_results)


@cli.command()
Expand All @@ -435,7 +429,7 @@ def analyze(workflow: str):
default=False,
is_flag=True,
)
def collect(workflow: str, force: bool):
def collect(workflow: str, force: bool) -> NoReturn:
"""Collect workflows results, merge them with intermediate results and save."""
submitted_results_path = _build_submitted_results_path(workflow)
submitted_results = pd.read_csv(submitted_results_path)
Expand Down