Skip to content

Commit

Permalink
fix(api): wait for worker to become idle before enqueueing next job (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ssube committed Apr 16, 2023
1 parent 17e7b6a commit cfdd926
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
15 changes: 13 additions & 2 deletions api/onnx_web/worker/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class WorkerContext:
active_pid: "Value[int]"
progress: "Queue[ProgressCommand]"
last_progress: Optional[ProgressCommand]
idle: "Value[bool]"

def __init__(
self,
Expand All @@ -30,6 +31,7 @@ def __init__(
pending: "Queue[JobCommand]",
progress: "Queue[ProgressCommand]",
active_pid: "Value[int]",
idle: "Value[bool]",
):
self.job = job
self.device = device
Expand All @@ -39,16 +41,21 @@ def __init__(
self.pending = pending
self.active_pid = active_pid
self.last_progress = None
self.idle = idle

def start(self, job: str) -> None:
self.job = job
self.set_cancel(cancel=False)
self.set_idle(idle=False)

def is_active(self) -> bool:
return self.get_active() == getpid()

def is_cancelled(self) -> bool:
return self.cancel.value

def is_active(self) -> bool:
return self.get_active() == getpid()
def is_idle(self) -> bool:
return self.idle.value

def get_active(self) -> int:
with self.active_pid.get_lock():
Expand Down Expand Up @@ -77,6 +84,10 @@ def set_cancel(self, cancel: bool = True) -> None:
with self.cancel.get_lock():
self.cancel.value = cancel

def set_idle(self, idle: bool = True) -> None:
with self.idle.get_lock():
self.idle.value = idle

def set_progress(self, progress: int) -> None:
if self.is_cancelled():
raise RuntimeError("job has been cancelled")
Expand Down
5 changes: 3 additions & 2 deletions api/onnx_web/worker/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def create_device_worker(self, device: DeviceParams) -> None:
logs=self.logs,
pending=self.pending[name],
active_pid=current,
idle=Value("B", False),
)
self.context[name] = context

Expand Down Expand Up @@ -567,7 +568,7 @@ def progress_main(pool: DevicePoolExecutor):
except Exception:
logger.exception("error in progress worker for device %s", device)

for device, queue in pool.pending.items():
if queue.empty():
for device, context in pool.context.items():
if context.is_idle():
logger.trace("enqueueing next job for idle worker")
pool.next_job(device)
3 changes: 2 additions & 1 deletion api/onnx_web/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def worker_main(worker: WorkerContext, server: ServerContext):
logger.info("job succeeded: %s", job.name)
worker.finish()
except Empty:
pass
logger.trace("worker reached end of queue, setting idle flag")
worker.set_idle()
except KeyboardInterrupt:
logger.info("worker got keyboard interrupt")
worker.fail()
Expand Down

0 comments on commit cfdd926

Please sign in to comment.