Skip to content

Commit

Permalink
enable support to set task execution time limit in WfBench (closes #33)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelfsilva committed Jul 21, 2023
1 parent a52f922 commit 50060a8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 7 deletions.
8 changes: 7 additions & 1 deletion wfcommons/wfbench/bench.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021-2022 The WfCommons Team.
# Copyright (c) 2021-2023 The WfCommons Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -79,6 +79,7 @@ def create_benchmark(self,
percent_cpu: Union[float, Dict[str, float]] = 0.6,
cpu_work: Union[int, Dict[str, int]] = None,
gpu_work: Union[int, Dict[str, int]] = None,
time: Optional[int] = None,
data: Optional[Union[int, Dict[str, str]]] = None,
mem: Optional[float] = None,
lock_files_folder: Optional[pathlib.Path] = None,
Expand All @@ -91,6 +92,8 @@ def create_benchmark(self,
:type percent_cpu: Union[float, Dict[str, float]]
:param cpu_work: CPU work per workflow task.
:type cpu_work: Union[int, Dict[str, int]]
:param time: Time limit for running each task (in seconds).
:type time: Optional[int]
:param data: Dictionary of input size files per workflow task type or total workflow data footprint (in MB).
:type data: Optional[Union[int, Dict[str, str]]]
:param mem: Maximum amount of memory consumption per task (in MB).
Expand Down Expand Up @@ -155,6 +158,9 @@ def create_benchmark(self,

if mem:
params.extend([f"--mem {mem}"])

if time:
params.extend([f"--time {time}"])

task.runtime = 0
task.files = []
Expand Down
23 changes: 17 additions & 6 deletions wfcommons/wfbench/wfbench.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021-2022 The WfCommons Team.
# Copyright (c) 2021-2023 The WfCommons Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -14,9 +14,12 @@
import subprocess
import time
import json
from io import StringIO
import signal
import sys
import pandas as pd

from io import StringIO

from filelock import FileLock
from typing import List, Optional

Expand Down Expand Up @@ -147,6 +150,7 @@ def get_parser() -> argparse.ArgumentParser:
help="Path to cores file.")
parser.add_argument("--cpu-work", default=None, help="Amount of CPU work.")
parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.")
parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete the task (overrides CPU and GPU works)")
parser.add_argument("--mem", default=None, help="Max amount (in MB) of memory consumption.")
parser.add_argument("--out", help="output files name.")
return parser
Expand Down Expand Up @@ -193,7 +197,7 @@ def main():
else:
device = available_gpus[0]
print(f"Running on GPU {device}")
gpu_benchmark(args.gpu_work, device)
gpu_benchmark(args.gpu_work, device, time=args.time)

if args.cpu_work:
print("[WfBench] Starting CPU and Memory Benchmarks...")
Expand All @@ -202,11 +206,18 @@ def main():

cpu_procs = cpu_mem_benchmark(cpu_threads=int(10 * args.percent_cpu),
mem_threads=int(10 - 10 * args.percent_cpu),
cpu_work=int(args.cpu_work),
cpu_work=sys.maxsize if args.time else int(args.cpu_work),
core=core,
total_mem=args.mem)
for proc in cpu_procs:
proc.wait()

if args.time:
time.sleep(int(args.time))
for proc in cpu_procs:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
else:
for proc in cpu_procs:
proc.wait()

mem_kill = subprocess.Popen(["killall", "stress-ng"])
mem_kill.wait()
print("[WfBench] Completed CPU and Memory Benchmarks!\n")
Expand Down

0 comments on commit 50060a8

Please sign in to comment.