Skip to content

Commit

Permalink
stop Worker when runnable make a exception
Browse files Browse the repository at this point in the history
  • Loading branch information
johnverkim committed May 25, 2022
1 parent 692cc73 commit ab54bfa
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
1 change: 1 addition & 0 deletions integration_tests/operator_ITG_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ def check_interval():
dummy_callback.assert_called_with("dummy_report")

op.stop()
time.sleep(0.5)
self.assertEqual(op.state, "ready")
11 changes: 8 additions & 3 deletions smtm/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""입력받은 task를 별도의 thread에서 차례대로 수행하는 일꾼 역할의 Worker 클래스"""
import queue
import threading
import traceback
from .log_manager import LogManager


Expand Down Expand Up @@ -43,18 +44,22 @@ def looper():
while True:
self.logger.debug(f"Worker[{self.name}:{threading.get_ident()}] WAIT ==========")
task = self.task_queue.get()
self.task_queue.task_done()
if task is None:
self.logger.debug(
f"Worker[{self.name}:{threading.get_ident()}] Termanited .........."
)
if self.on_terminated is not None:
self.on_terminated()
self.task_queue.task_done()
break
self.logger.debug(f"Worker[{self.name}:{threading.get_ident()}] GO ----------")
runnable = task["runnable"]
runnable(task)
self.task_queue.task_done()
try:
runnable(task)
except Exception:
self.logger.error(traceback.format_exc())
self.thread = None
raise UserWarning("Worker catched exception. force stop!")

self.thread = threading.Thread(target=looper, name=self.name, daemon=True)
self.thread.start()
Expand Down

0 comments on commit ab54bfa

Please sign in to comment.