Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-34076: Improve timeout handling by MPGraphExecutor #174

Merged
merged 2 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 33 additions & 25 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,19 @@ def __init__(self, qnode):
self._state = JobState.PENDING
self.started = None
self._rcv_conn = None
self._terminated = False

@property
def state(self):
"""Job processing state (JobState)"""
return self._state

@property
def terminated(self):
"""Return True if job was killed by stop() method and negative exit
code is returned from child process. (`bool`)"""
return self._terminated and self.process.exitcode < 0

def start(self, butler, quantumExecutor, startMethod=None):
"""Start process which runs the task.

Expand Down Expand Up @@ -152,6 +159,7 @@ def stop(self):
else:
_LOG.debug("Killing process %s", self.process.name)
self.process.kill()
self._terminated = True

def cleanup(self):
"""Release processes resources, has to be called for each finished
Expand All @@ -176,6 +184,9 @@ def report(self) -> QuantumReport:
dataId=self.qnode.quantum.dataId,
taskLabel=self.qnode.taskDef.label,
)
if self.terminated:
# Means it was killed, assume it's due to timeout
report.status = ExecutionStatus.TIMEOUT
return report

def failMessage(self):
Expand Down Expand Up @@ -501,51 +512,48 @@ def _executeQuantaMP(self, graph, butler):
# finished
exitcode = job.process.exitcode
quantum_report = job.report()
if quantum_report:
self.report.quantaReports.append(quantum_report)
self.report.quantaReports.append(quantum_report)
if exitcode == 0:
jobs.setJobState(job, JobState.FINISHED)
job.cleanup()
_LOG.debug("success: %s took %.3f seconds", job, time.time() - job.started)
else:
self.report.status = ExecutionStatus.FAILURE
# failMessage() has to be called before cleanup()
message = job.failMessage()
jobs.setJobState(job, JobState.FAILED)
if job.terminated:
# Was killed due to timeout.
if self.report.status == ExecutionStatus.SUCCESS:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is killed due to time out and reports success why can't that be treated as success? How can you be timed out and successful?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is treated as success, terminated is False for that case, and check for exitcode == 0 is in the if branch above. This branch is only for the case when it was actually killed by a signal (had non-zero exitcode)

# Do not override global FAILURE status
self.report.status = ExecutionStatus.TIMEOUT
message = f"Timeout ({self.timeout} sec) for task {job}, task is killed"
jobs.setJobState(job, JobState.TIMED_OUT)
else:
self.report.status = ExecutionStatus.FAILURE
# failMessage() has to be called before cleanup()
message = job.failMessage()
jobs.setJobState(job, JobState.FAILED)

job.cleanup()
_LOG.debug("failed: %s", job)
if self.failFast or exitcode == InvalidQuantumError.EXIT_CODE:
# stop all running jobs
for stopJob in jobs.running:
if stopJob is not job:
stopJob.stop()
raise MPGraphExecutorError(message)
if job.state is JobState.TIMED_OUT:
raise MPTimeoutError(f"Timeout ({self.timeout} sec) for task {job}.")
else:
raise MPGraphExecutorError(message)
else:
_LOG.error("%s; processing will continue for remaining tasks.", message)
else:
# check for timeout
now = time.time()
if now - job.started > self.timeout:
# Do not override FAILURE status
if self.report.status == ExecutionStatus.SUCCESS:
self.report.status = ExecutionStatus.TIMEOUT
jobs.setJobState(job, JobState.TIMED_OUT)
# Try to kill it, and there is a chance that it
# finishes successfully before it gets killed. Exit
# status is handled by the code above on next
# iteration.
_LOG.debug("Terminating job %s due to timeout", job)
job.stop()
quantum_report = job.report()
if quantum_report:
quantum_report.status = ExecutionStatus.TIMEOUT
self.report.quantaReports.append(quantum_report)
job.cleanup()
if self.failFast:
raise MPTimeoutError(f"Timeout ({self.timeout} sec) for task {job}.")
else:
_LOG.error(
"Timeout (%s sec) for task %s; task is killed, processing continues "
"for remaining tasks.",
self.timeout,
job,
)

# Fail jobs whose inputs failed, this may need several iterations
# if the order is not right, will be done in the next loop.
Expand Down
38 changes: 14 additions & 24 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,6 @@ def runQuantum(self):
signal.raise_signal(signal.SIGILL)


class TaskMockSleep:
"""Simple mock class for task which "runs" for some time."""

canMultiprocess = True

def runQuantum(self):
_LOG.debug("TaskMockSleep.runQuantum")
time.sleep(5.0)


class TaskMockLongSleep:
"""Simple mock class for task which "runs" for very long time."""

Expand Down Expand Up @@ -330,7 +320,7 @@ def test_mpexec_timeout(self):
"""Fail due to timeout"""

taskDef = TaskDefMock()
taskDefSleep = TaskDefMock(taskClass=TaskMockSleep)
taskDefSleep = TaskDefMock(taskClass=TaskMockLongSleep)
qgraph = QuantumGraphMock(
[
QuantumIterDataMock(index=0, taskDef=taskDef, detector=0),
Expand All @@ -348,7 +338,7 @@ def test_mpexec_timeout(self):
self.assertEqual(report.status, ExecutionStatus.TIMEOUT)
self.assertEqual(report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPTimeoutError")
self.assertGreater(len(report.quantaReports), 0)
self.assertEquals(_count_status(report, ExecutionStatus.TIMEOUT), 1)
self.assertEqual(_count_status(report, ExecutionStatus.TIMEOUT), 1)
self.assertTrue(any(qrep.exitCode < 0 for qrep in report.quantaReports))
self.assertTrue(all(qrep.exceptionInfo is None for qrep in report.quantaReports))

Expand Down Expand Up @@ -396,8 +386,8 @@ def test_mpexec_failure(self):
report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPGraphExecutorError"
)
self.assertGreater(len(report.quantaReports), 0)
self.assertEquals(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEquals(_count_status(report, ExecutionStatus.SUCCESS), 2)
self.assertEqual(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEqual(_count_status(report, ExecutionStatus.SUCCESS), 2)
self.assertTrue(any(qrep.exitCode > 0 for qrep in report.quantaReports))
self.assertTrue(any(qrep.exceptionInfo is not None for qrep in report.quantaReports))

Expand Down Expand Up @@ -431,9 +421,9 @@ def test_mpexec_failure_dep(self):
)
# Dependencies of failed tasks do not appear in quantaReports
self.assertGreater(len(report.quantaReports), 0)
self.assertEquals(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEquals(_count_status(report, ExecutionStatus.SUCCESS), 2)
self.assertEquals(_count_status(report, ExecutionStatus.SKIPPED), 2)
self.assertEqual(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEqual(_count_status(report, ExecutionStatus.SUCCESS), 2)
self.assertEqual(_count_status(report, ExecutionStatus.SKIPPED), 2)
self.assertTrue(any(qrep.exitCode > 0 for qrep in report.quantaReports))
self.assertTrue(any(qrep.exceptionInfo is not None for qrep in report.quantaReports))

Expand Down Expand Up @@ -467,9 +457,9 @@ def test_mpexec_failure_dep_nomp(self):
)
# Dependencies of failed tasks do not appear in quantaReports
self.assertGreater(len(report.quantaReports), 0)
self.assertEquals(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEquals(_count_status(report, ExecutionStatus.SUCCESS), 2)
self.assertEquals(_count_status(report, ExecutionStatus.SKIPPED), 2)
self.assertEqual(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEqual(_count_status(report, ExecutionStatus.SUCCESS), 2)
self.assertEqual(_count_status(report, ExecutionStatus.SKIPPED), 2)
self.assertTrue(all(qrep.exitCode is None for qrep in report.quantaReports))
self.assertTrue(any(qrep.exceptionInfo is not None for qrep in report.quantaReports))

Expand Down Expand Up @@ -509,7 +499,7 @@ def test_mpexec_failure_failfast(self):
)
# Dependencies of failed tasks do not appear in quantaReports
self.assertGreater(len(report.quantaReports), 0)
self.assertEquals(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEqual(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertTrue(any(qrep.exitCode > 0 for qrep in report.quantaReports))
self.assertTrue(any(qrep.exceptionInfo is not None for qrep in report.quantaReports))

Expand Down Expand Up @@ -537,8 +527,8 @@ def test_mpexec_crash(self):
)
# Dependencies of failed tasks do not appear in quantaReports
self.assertGreater(len(report.quantaReports), 0)
self.assertEquals(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEquals(_count_status(report, ExecutionStatus.SUCCESS), 2)
self.assertEqual(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEqual(_count_status(report, ExecutionStatus.SUCCESS), 2)
self.assertTrue(any(qrep.exitCode == -signal.SIGILL for qrep in report.quantaReports))
self.assertTrue(all(qrep.exceptionInfo is None for qrep in report.quantaReports))

Expand All @@ -564,7 +554,7 @@ def test_mpexec_crash_failfast(self):
self.assertEqual(
report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPGraphExecutorError"
)
self.assertEquals(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertEqual(_count_status(report, ExecutionStatus.FAILURE), 1)
self.assertTrue(any(qrep.exitCode == -signal.SIGILL for qrep in report.quantaReports))
self.assertTrue(all(qrep.exceptionInfo is None for qrep in report.quantaReports))

Expand Down