Skip to content

Commit 599da06

Browse files
committed
issue #477 / ansible: avoid a race in async job startup.
Ansible 2.3/Python 2.4 work revealed there is no guarantee a slow target will have written the initial job status file out before a fast controller makes an initial check for it. Therefore, provide AsyncRunner with a sender it should send a message to when the initial job file has been written. As a bonus, also catch and report exceptions happening early in AsyncRunner, rather than leaving them to end up in -vvv output.
1 parent 4095358 commit 599da06

File tree

3 files changed

+52
-20
lines changed

3 files changed

+52
-20
lines changed

ansible_mitogen/planner.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import ansible.errors
4747
import ansible.module_utils
4848
import mitogen.core
49+
import mitogen.select
4950

5051
import ansible_mitogen.loaders
5152
import ansible_mitogen.parsing
@@ -416,22 +417,33 @@ def _invoke_async_task(invocation, planner):
416417
job_id = '%016x' % random.randint(0, 2**64)
417418
context = invocation.connection.spawn_isolated_child()
418419
_propagate_deps(invocation, planner, context)
419-
context.call_no_reply(
420-
ansible_mitogen.target.run_module_async,
421-
job_id=job_id,
422-
timeout_secs=invocation.timeout_secs,
423-
kwargs=planner.get_kwargs(),
424-
)
425420

426-
return {
427-
'stdout': json.dumps({
428-
# modules/utilities/logic/async_wrapper.py::_run_module().
429-
'changed': True,
430-
'started': 1,
431-
'finished': 0,
432-
'ansible_job_id': job_id,
433-
})
434-
}
421+
with mitogen.core.Receiver(context.router) as started_recv:
422+
call_recv = context.call_async(
423+
ansible_mitogen.target.run_module_async,
424+
job_id=job_id,
425+
timeout_secs=invocation.timeout_secs,
426+
started_sender=started_recv.to_sender(),
427+
kwargs=planner.get_kwargs(),
428+
)
429+
430+
# Wait for run_module_async() to crash, or for AsyncRunner to indicate
431+
# the job file has been written.
432+
for msg in mitogen.select.Select([started_recv, call_recv]):
433+
if msg.receiver is call_recv:
434+
# It can only be an exception.
435+
raise msg.unpickle()
436+
break
437+
438+
return {
439+
'stdout': json.dumps({
440+
# modules/utilities/logic/async_wrapper.py::_run_module().
441+
'changed': True,
442+
'started': 1,
443+
'finished': 0,
444+
'ansible_job_id': job_id,
445+
})
446+
}
435447

436448

437449
def _invoke_isolated_task(invocation, planner):

ansible_mitogen/target.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -430,9 +430,10 @@ def _get_async_dir():
430430

431431

432432
class AsyncRunner(object):
433-
def __init__(self, job_id, timeout_secs, econtext, kwargs):
433+
def __init__(self, job_id, timeout_secs, started_sender, econtext, kwargs):
434434
self.job_id = job_id
435435
self.timeout_secs = timeout_secs
436+
self.started_sender = started_sender
436437
self.econtext = econtext
437438
self.kwargs = kwargs
438439
self._timed_out = False
@@ -515,6 +516,7 @@ def _run(self):
515516
'finished': 0,
516517
'pid': os.getpid()
517518
})
519+
self.started_sender.send(True)
518520

519521
if self.timeout_secs > 0:
520522
self._install_alarm()
@@ -550,13 +552,26 @@ def run(self):
550552

551553

552554
@mitogen.core.takes_econtext
553-
def run_module_async(kwargs, job_id, timeout_secs, econtext):
555+
def run_module_async(kwargs, job_id, timeout_secs, started_sender, econtext):
554556
"""
555557
Execute a module with its run status and result written to a file,
556558
terminating on the process on completion. This function must run in a child
557559
forked using :func:`create_fork_child`.
558-
"""
559-
arunner = AsyncRunner(job_id, timeout_secs, econtext, kwargs)
560+
561+
@param mitogen.core.Sender started_sender:
562+
A sender that will receive :data:`True` once the job has reached a
563+
point where its initial job file has been written. This is required to
564+
avoid a race where an overly eager controller can check for a task
565+
before it has reached that point in execution, which is possible at
566+
least on Python 2.4, where forking is not available for async tasks.
567+
"""
568+
arunner = AsyncRunner(
569+
job_id,
570+
timeout_secs,
571+
started_sender,
572+
econtext,
573+
kwargs
574+
)
560575
arunner.run()
561576

562577

tests/ansible/integration/async/runner_one_job.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,16 @@
4040
- result1.cmd == "sleep 1;\n echo alldone"
4141
- result1.delta|length == 14
4242
- result1.start|length == 26
43-
- result1.failed == False
4443
- result1.finished == 1
4544
- result1.rc == 0
4645
- result1.start|length == 26
4746
- result1.stderr == ""
4847
- result1.stderr_lines == []
4948
- result1.stdout == "alldone"
5049
- result1.stdout_lines == ["alldone"]
50+
51+
- assert:
52+
that:
53+
- result1.failed == False
54+
when: ansible_version.full > '2.4'
55+

0 commit comments

Comments
 (0)