Skip to content

Commit

Permalink
fix(api): attempt to read progress updates from recycled workers
Browse files Browse the repository at this point in the history
  • Loading branch information
ssube committed Mar 26, 2023
1 parent 27500ec commit 2d2283e
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions api/onnx_web/worker/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DevicePoolExecutor:
progress_interval: float
recycle_interval: float

leaking: List[Tuple[str, Process]]
leaking: List[Tuple[str, Process, WorkerContext]]
context: Dict[str, WorkerContext] # Device -> Context
current: Dict[str, "Value[int]"] # Device -> pid
pending: Dict[str, "Queue[JobCommand]"]
Expand Down Expand Up @@ -256,7 +256,7 @@ def join(self):
worker.pid,
device,
)
self.leaking.append((device, worker))
self.leak_worker(device)
else:
logger.debug("worker for device %s has died", device)

Expand All @@ -273,10 +273,9 @@ def join(self):

def join_leaking(self):
if len(self.leaking) > 0:
logger.warning("cleaning up %s leaking workers", len(self.leaking))
for device, worker in self.leaking:
logger.debug(
"shutting down worker %s for device %s", worker.pid, device
for device, worker, _context in self.leaking:
logger.warning(
"shutting down leaking worker %s for device %s", worker.pid, device
)
worker.join(self.join_timeout)
if worker.is_alive():
Expand Down Expand Up @@ -312,7 +311,7 @@ def recycle(self):
worker.pid,
device,
)
self.leaking.append((device, worker))
self.leak_worker(device)
else:
del worker

Expand Down Expand Up @@ -463,6 +462,11 @@ def update_job(self, progress: ProgressCommand):
)
self.context[progress.device].set_cancel()

def leak_worker(self, device: str):
context = self.context[device]
worker = self.workers[device]
self.leaking.append((device, worker, context))


def health_main(pool: DevicePoolExecutor):
logger.trace("checking in from health worker thread")
Expand Down Expand Up @@ -494,10 +498,25 @@ def logger_main(pool: DevicePoolExecutor, logs: "Queue[str]"):
logger.exception("error in log worker")


def progress_main(
pool: DevicePoolExecutor
):
def progress_main(pool: DevicePoolExecutor):
logger.trace("checking in from progress worker thread")

for device, _worker, context in pool.leaking:
# whether the worker is alive or not, try to clear its queues
try:
progress = context.progress.get_nowait()
while progress is not None:
pool.update_job(progress)
progress = context.progress.get_nowait()
except Empty:
logger.trace("empty queue in leaking worker for device %s", device)
pass
except ValueError as e:
logger.debug("value error in leaking worker for device %s: %s", device, e)
break
except Exception:
logger.exception("error in leaking worker for device %s", device)

for device, queue in pool.progress.items():
try:
progress = queue.get_nowait()
Expand Down

0 comments on commit 2d2283e

Please sign in to comment.