Skip to content

Commit

Permalink
benchmark: split launch into submit and start commands
Browse files Browse the repository at this point in the history
- adds two new commands: submit and start
- launch works as before, performing both submit and start

closes #572
  • Loading branch information
Vladyslav Moisieienkov committed Nov 2, 2021
1 parent 04e252a commit cee243c
Showing 1 changed file with 102 additions and 40 deletions.
142 changes: 102 additions & 40 deletions scripts/reana_bench.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
Expand Down Expand Up @@ -42,16 +43,28 @@ def cli():
Prerequisites:
- install reana-client 0.8.x, pandas and matplotlib Python packages
- set REANA_ACCESS_TOKEN and REANA_SERVER_URL
How to launch 50 concurrent workflows and collect results:
How to launch 50 concurrent workflows and collect results (option 1):
.. code-block:: console
\b
$ cd reana-demo-root6-roofit # find an example of REANA workflow
$ reana_bench.py launch -w roofit50yadage -n 50 -f reana-yadage.yaml # starts 50 workflows
$ reana_bench.py collect -w roofit50yadage # collect results
$ reana_bench.py launch -w roofit50yadage -n 50 -f reana-yadage.yaml # submit and start
$ reana_bench.py collect -w roofit50yadage # collect results and save them locally
$ reana_bench.py analyze -w roofit50yadage # analyzes results that were saved locally
How to launch 50 concurrent workflows and collect results (option 2):
.. code-block:: console
\b
$ cd reana-demo-root6-roofit # find an example of REANA workflow
$ reana_bench.py submit -w roofit50yadage -n 50 -f reana-yadage.yaml # submit, do not start
$ reana_bench.py start -w roofit50yadage # start workflows
$ reana_bench.py collect -w roofit50yadage # collect results and save them locally
$ reana_bench.py analyze -w roofit50yadage # analyzes results that were saved locally
"""
pass
Expand Down Expand Up @@ -144,8 +157,10 @@ def _start_workflows_and_record_submit_dates(
return df


def _get_workflows(workflow: str) -> pd.DataFrame:
cmd = _build_reana_client_list_command(workflow)
def _get_workflows(workflow_prefix: str) -> pd.DataFrame:
# TODO: in case of big number of workflows, this function can take a long time
# maybe, consider pagination and page size
cmd = _build_reana_client_list_command(workflow_prefix)
return pd.DataFrame(json.loads(subprocess.check_output(cmd).decode("ascii")))


Expand Down Expand Up @@ -340,20 +355,6 @@ def _create_plots(prefix: str, title: str, df: pd.DataFrame) -> None:
_pending_time_histogram(pending_time_histogram_path, df, title)


def _start_benchmark(
workflow_name: str, number_of_submissions: int, file: str, workers: int
) -> pd.DataFrame:
if _workflow_already_exists(workflow_name):
raise Exception("Found duplicated workflow name. Please use unique name.")

_create_and_upload_workflows(workflow_name, number_of_submissions, file, workers)

submitted_results = _start_workflows_and_record_submit_dates(
workflow_name, number_of_submissions, workers
)
return submitted_results


def _build_original_results_path(workflow: str) -> Path:
return Path(f"{workflow}_original_results.csv")

Expand All @@ -379,42 +380,103 @@ def _save_original_results(workflow: str, df: pd.DataFrame):
df.to_csv(original_results_path, index=False)


@cli.command()
@click.option("--workflow", "-w", help="Name of the workflow", required=True, type=str)
@click.option(
"--number", "-n", help="Number of workflows to start", required=True, type=int
def submit(
workflow_prefix: str, number_of_workflows: int, file: str, workers: int
) -> None:
"""Submit multiple workflows, do not start them."""
if _workflow_already_exists(workflow_prefix):
raise Exception("Found duplicated workflow name(s). Please use unique name.")

_create_and_upload_workflows(workflow_prefix, number_of_workflows, file, workers)
logging.info("Finished creating and uploading workflows.")


def start(workflow_name: str, workers: int) -> None:
"""Start already submitted workflows."""

number_of_workflows = len(_get_workflows(workflow_name))

if number_of_workflows == 0:
raise Exception("Cannot start. Workflow(s) do not exist.")

submitted_results = _start_workflows_and_record_submit_dates(
workflow_name, number_of_workflows, workers
)

logging.info("Saving intermediate submit results...")
submitted_results_path = _build_submitted_results_path(workflow_name)
submitted_results.to_csv(submitted_results_path, index=False)
logging.info("Finished. Don't forget to collect the results.")


workflow_option = click.option(
"--workflow", "-w", help="Name of the workflow", required=True, type=str
)
@click.option(
"--file",
"-f",
help="REANA YAML specification file",
default="reana.yaml",
type=click.Path(exists=True),
number_of_workflows_option = click.option(
"--number", "-n", help="Number of workflows to start", required=True, type=int
)
@click.option(
concurrency_option = click.option(
"--concurrency",
"-c",
help=f"Number of workers to submit workflows, default {WORKERS_DEFAULT_COUNT}",
type=int,
default=WORKERS_DEFAULT_COUNT,
)
def launch(workflow: str, number: int, file: str, concurrency: int) -> NoReturn:
"""Launch multiple workflows."""

reana_file_option = click.option(
"--file",
"-f",
help="REANA YAML specification file",
default="reana.yaml",
type=click.Path(exists=True),
)


@cli.command(name="submit")
@workflow_option
@number_of_workflows_option
@reana_file_option
@concurrency_option
def submit_command(workflow: str, number: int, file: str, concurrency: int) -> NoReturn:
"""Submit workflows, do not start them."""
try:
submit(workflow, number, file, concurrency)
except Exception as e:
logging.error(f"Something went wrong during workflow submission: {e}")


@cli.command(name="start")
@workflow_option
@concurrency_option
def start_command(workflow: str, concurrency: int) -> NoReturn:
"""Start submitted workflows and record intermediate results."""
try:
submitted_results = _start_benchmark(workflow, number, file, concurrency)
start(workflow, concurrency)
except Exception as e:
logging.error(f"Something went wrong during benchmark launch: {e}")
return

logging.info("Saving intermediate submit results...")
submitted_results_path = _build_submitted_results_path(workflow)
submitted_results.to_csv(submitted_results_path, index=False)

logging.info("Finished. Don't forget to collect the results.")
@cli.command()
@workflow_option
@number_of_workflows_option
@reana_file_option
@concurrency_option
def launch(workflow: str, number: int, file: str, concurrency: int) -> NoReturn:
"""Launch benchmark for multiple workflows. This command equals to running submit and start commands."""
try:
submit(workflow, number, file, concurrency)
except Exception as e:
logging.error(f"Something went wrong during workflow submission: {e}")
return

try:
start(workflow, concurrency)
except Exception as e:
logging.error(f"Something went wrong during benchmark launch: {e}")


@cli.command()
@click.option("--workflow", "-w", help="Name of the workflow", required=True, type=str)
@workflow_option
@click.option(
"--title",
"-t",
Expand All @@ -438,7 +500,7 @@ def analyze(workflow: str, title: str) -> NoReturn:


@cli.command()
@click.option("--workflow", "-w", help="Name of the workflow", required=True, type=str)
@workflow_option
@click.option(
"--force",
"-f",
Expand Down

0 comments on commit cee243c

Please sign in to comment.