Skip to content

Commit

Permalink
AsyncScheduler: use async_start method
Browse files Browse the repository at this point in the history
Convert AsyncScheduler to use the async_start method, since
eventually this method will need to be a coroutine in order to write
messages to the build log as discussed in bug 709746.

Also fix async_iter_completed to be compatible with callback
scheduling differences introduced by migration to the async_start
method.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico@gentoo.org>
  • Loading branch information
zmedico committed Feb 20, 2020
1 parent 036c644 commit 8f47d3f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 15 deletions.
8 changes: 3 additions & 5 deletions lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,16 @@ def testDoebuild(self):
output_fd: pw,
},
"prev_mtimes": {}})
producer.addStartListener(lambda producer: os.close(pw))

# PipeReader closes pr
consumer = PipeReader(
input_files={"producer" : pr})

task_scheduler = TaskScheduler(iter([producer, consumer]),
max_jobs=2)

try:
loop.run_until_complete(task_scheduler.async_start())
finally:
# PipeReader closes pr
os.close(pw)
loop.run_until_complete(task_scheduler.async_start())

task_scheduler.wait()
output = portage._unicode_decode(
Expand Down
2 changes: 2 additions & 0 deletions lib/portage/tests/util/futures/test_iter_completed.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ def future_generator():

for future_done_set in async_iter_completed(future_generator(),
max_jobs=True, max_load=True, loop=loop):
while not input_futures:
loop.run_until_complete(asyncio.sleep(0, loop=loop))
future_done_set.cancel()
break

Expand Down
20 changes: 18 additions & 2 deletions lib/portage/util/_async/AsyncScheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Copyright 2012-2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

import functools

from portage import os
from portage.util.futures import asyncio
from portage.util.futures.compat_coroutine import coroutine
from _emerge.AsynchronousTask import AsynchronousTask
from _emerge.PollScheduler import PollScheduler

Expand Down Expand Up @@ -62,8 +66,8 @@ def _schedule_tasks(self):
else:
self._running_tasks.add(task)
task.scheduler = self._sched_iface
task.addExitListener(self._task_exit)
task.start()
future = asyncio.ensure_future(self._task_coroutine(task), loop=self._sched_iface)
future.add_done_callback(functools.partial(self._task_coroutine_done, task))

if self._loadavg_check_id is not None:
self._loadavg_check_id.cancel()
Expand All @@ -73,6 +77,18 @@ def _schedule_tasks(self):
# Triggers cleanup and exit listeners if there's nothing left to do.
self.poll()

@coroutine
def _task_coroutine(self, task):
yield task.async_start()
yield task.async_wait()

def _task_coroutine_done(self, task, future):
try:
future.result()
except asyncio.CancelledError:
self.cancel()
self._task_exit(task)

def _task_exit(self, task):
self._running_tasks.discard(task)
if task.returncode != os.EX_OK:
Expand Down
38 changes: 30 additions & 8 deletions lib/portage/util/futures/iter_completed.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
from portage.util._async.TaskScheduler import TaskScheduler
from portage.util.futures import asyncio
from portage.util.futures.compat_coroutine import coroutine, coroutine_return
from portage.util.cpuinfo import get_cpu_count


Expand Down Expand Up @@ -90,21 +91,42 @@ def cancel_callback(wait_result, future_done_set):
if future_done_set.cancelled() and not wait_result.done():
wait_result.cancel()

@coroutine
def fetch_wait_result(scheduler, first, loop=None):
if first:
yield scheduler.async_start()

# If the current coroutine awakens just after a call to
# done_callback but before scheduler has been notified of
# corresponding done future(s), then wait here until scheduler
# is notified (which will cause future_map to populate).
while not future_map and scheduler.poll() is None:
yield asyncio.sleep(0, loop=loop)

if not future_map:
if scheduler.poll() is not None:
coroutine_return((set(), set()))
else:
raise AssertionError('expected non-empty future_map')

wait_result = yield asyncio.wait(list(future_map.values()),
return_when=asyncio.FIRST_COMPLETED, loop=loop)

coroutine_return(wait_result)

first = True
try:
scheduler.start()

# scheduler should ensure that future_map is non-empty until
# task_generator is exhausted
while future_map:
wait_result = asyncio.ensure_future(
asyncio.wait(list(future_map.values()),
return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop)
while True:
wait_result = asyncio.ensure_future(fetch_wait_result(scheduler, first, loop=loop), loop=loop)
first = False
future_done_set = loop.create_future()
future_done_set.add_done_callback(
functools.partial(cancel_callback, wait_result))
wait_result.add_done_callback(
functools.partial(done_callback, future_done_set))
yield future_done_set
if not future_map and scheduler.poll() is not None:
break
finally:
# cleanup in case of interruption by SIGINT, etc
scheduler.cancel()
Expand Down

0 comments on commit 8f47d3f

Please sign in to comment.