Skip to content

Commit

Permalink
feat(rust): Add CPU parallel runner for the exec manager
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenj committed May 5, 2024
1 parent 80b8173 commit 1c55d53
Showing 1 changed file with 55 additions and 3 deletions.
58 changes: 55 additions & 3 deletions utilities/scripts/python/exec_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from dataclasses import dataclass
import textwrap
import time
import multiprocessing
import concurrent.futures


def status_for_rc(rc: int) -> str:
Expand Down Expand Up @@ -220,17 +222,20 @@ def __init__(self, title: str) -> None:
self.title = title
self.results = []

def add(self, result: Result):
def add(self, result: Result | list[Result]) -> None:
"""
Add a result to the list of results.
Args:
result (Result): The result object to be added.
result (Result): The result object (or list of result objects) to be added.
Returns:
None
"""
self.results.append(result)
if isinstance(result, list):
self.results.extend(result)
else:
self.results.append(result)

def print(self):
"""
Expand Down Expand Up @@ -273,3 +278,50 @@ def ok(self) -> bool:
if not result.ok():
return False
return True


class ParallelRunner:
def __init__(self, name: str, max_workers: int = None) -> None:
self.max_workers = max_workers if max_workers else multiprocessing.cpu_count()
self.results = Results(name)
self.executor = concurrent.futures.ProcessPoolExecutor(
max_workers=self.max_workers
)
self.processes = []
self.start_time = time.perf_counter()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
# Stop the multiprocessing pool when done.
self.executor.shutdown()

def run(self, func, *args, **kwargs) -> None:
self.processes.append(self.executor.submit(func, *args, **kwargs))

def get_results(self) -> Results:
"""
A method that calculates the execution time of each process in self.processes
and adds the results to the Results object.
As execution time overlaps, the recalculated execution time is based on
when tasks complete vs how long they ran internally.
Returns a Results object.
"""
start_time = self.start_time
for complete in concurrent.futures.as_completed(self.processes):
res = complete.result()
end_time = time.perf_counter()
execution_time = end_time - start_time
start_time = end_time
if isinstance(res, list):
for r in res:
r.runtime = execution_time
execution_time = 0.0
else:
res.runtime = execution_time

self.results.add(res)

return self.results

0 comments on commit 1c55d53

Please sign in to comment.