Skip to content

Commit

Permalink
fix(api): remove finished jobs from worker pool (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssube committed Feb 14, 2023
1 parent 38f8aa3 commit feb4603
Showing 1 changed file with 41 additions and 2 deletions.
43 changes: 41 additions & 2 deletions api/onnx_web/server/device_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,19 @@ class DevicePoolExecutor:
jobs: List[Job] = None
next_device: int = 0
pool: Union[ProcessPoolExecutor, ThreadPoolExecutor] = None
recent: List[Tuple[str, int]] = None

def __init__(
self,
devices: List[DeviceParams],
pool: Optional[Union[ProcessPoolExecutor, ThreadPoolExecutor]] = None,
recent_limit: int = 10,
):
self.devices = devices
self.jobs = []
self.next_device = 0
self.recent = []
self.recent_limit = recent_limit

device_count = len(devices)
if pool is None:
Expand Down Expand Up @@ -150,10 +154,18 @@ def cancel(self, key: str) -> bool:
return False

def done(self, key: str) -> Tuple[Optional[bool], int]:
for k, progress in self.recent:
if key == k:
return (True, progress)

for job in self.jobs:
if job.key == key:
done = job.future.done()
progress = job.get_progress()

if done:
self.prune()

return (done, progress)

logger.warn("checking status for unknown key: %s", key)
Expand Down Expand Up @@ -186,7 +198,21 @@ def get_next_device(self, needs_device: Optional[DeviceParams] = None) -> int:
return lowest_devices[0]

def prune(self):
self.jobs[:] = [job for job in self.jobs if job.future.done()]
pending_jobs = [job for job in self.jobs if job.future.done()]
logger.debug("pruning %s of %s pending jobs", len(pending_jobs), len(self.jobs))

for job in pending_jobs:
self.recent.append((job.key, job.get_progress()))
try:
self.jobs.remove(job)
except ValueError as e:
logger.warning("error removing pruned job from pending: %s", e)

# self.jobs[:] = [job for job in self.jobs if not job.future.done()]
recent_count = len(self.recent)
if recent_count > self.recent_limit:
logger.debug("pruning %s of %s recent jobs", recent_count - self.recent_limit, recent_count)
self.recent[:] = self.recent[-self.recent_limit :]

def submit(
self,
Expand All @@ -197,6 +223,7 @@ def submit(
needs_device: Optional[DeviceParams] = None,
**kwargs,
) -> None:
self.prune()
device = self.get_next_device(needs_device=needs_device)
logger.info(
"assigning job %s to device %s: %s", key, device, self.devices[device]
Expand All @@ -222,7 +249,7 @@ def job_done(f: Future):
future.add_done_callback(job_done)

def status(self) -> List[Tuple[str, int, bool, int]]:
return [
pending = [
(
job.key,
job.context.device_index.value,
Expand All @@ -231,3 +258,15 @@ def status(self) -> List[Tuple[str, int, bool, int]]:
)
for job in self.jobs
]
recent = [
(
key,
None,
True,
progress,
)
for key, progress in self.recent
]

pending.extend(recent)
return pending

0 comments on commit feb4603

Please sign in to comment.