Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio.ProcessPoolExecutor tracing not working correctly #481

Open
nedbat opened this issue Mar 13, 2016 · 5 comments
Open

asyncio.ProcessPoolExecutor tracing not working correctly #481

nedbat opened this issue Mar 13, 2016 · 5 comments
Labels
exotic Different execution semantics to be measured

Comments

@nedbat
Copy link
Owner

nedbat commented Mar 13, 2016

Originally reported by Alexander Mohr (Bitbucket: thehesiod, GitHub: thehesiod)


first I had to monkey patch concurrent.futures.process._process_worker from concurrent.futures.ProcessPoolExecutor because it doesn't call the atexit handlers:

#!python

def _concurrent_futures_process_worker(*args, **kwargs):
    orig_call = kwargs.pop('_orig_call')
    result = orig_call(*args, **kwargs)

    for aeh in _concurrent_futures_process_worker._atexit_handlers:
        aeh()

    return result

_concurrent_futures_process_worker._atexit_handlers = []

def _init_coverage_monkey_patch():
    try:
        import coverage
        cps = os.environ.get("COVERAGE_PROCESS_START")
        if not cps:
            return

        # process pool executors don't call atexit handlers :(
        concurrent.futures.process._process_worker = functools.partial(_concurrent_futures_process_worker, _orig_call=concurrent.futures.process._process_worker)
    except:
        pass

def start_coverage():
    try:
       import coverage

        cps = os.environ.get("COVERAGE_PROCESS_START")
        if not cps:
            # No request for coverage, nothing to do.
            return

        cov = coverage.Coverage(config_file=cps, auto_data=True)
        cov.start()
        cov._warn_no_data = False
        cov._warn_unimported_source = False

        _concurrent_futures_process_worker._atexit_handlers.append(cov._atexit)
    except:
        pass

my first note is that this monkey patch would be a lot simpler if coverage.process_startup() returned the Coverage instance.

With the above patching, and running with "-p" for parallel, I actually get coverage data, however it's all bizarre. For example: in a function it will say the first line executed, but the next line did not, and the loop.run_until_complete line did not execute, but most of the lines in the coroutine did.


@nedbat
Copy link
Owner Author

nedbat commented Mar 13, 2016

Can you provide a reproducible description of the problem before we jump into the solution?

@nedbat
Copy link
Owner Author

nedbat commented Mar 13, 2016

Original comment by Alexander Mohr (Bitbucket: thehesiod, GitHub: thehesiod)


I will try. Its now intertwined in a fairly large project.

@nedbat
Copy link
Owner Author

nedbat commented Mar 14, 2016

Original comment by Alexander Mohr (Bitbucket: thehesiod, GitHub: thehesiod)


ok I have a reproducible testcase!

cov_tester.py:

#!python

import asyncio
import unittest
import concurrent.futures
import cov_helper


def async_test(f):
    def wrapper(*args, **kwargs):
        coro = asyncio.coroutine(f)
        future = coro(*args, **kwargs)
        loop = asyncio.get_event_loop()
        loop.run_until_complete(future)
    return wrapper


def get_executor_props(method, **kwargs):
    if getattr(method, '_logger', None) is None:
        cov_helper.init_coverage()

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        method._loop = loop

        return loop
    else:
        return method._loop


def executor_task():
    loop = get_executor_props(executor_task)

    async def doit():
        try:
            print("hello")
        finally:
            print("Done")

    loop.run_until_complete(doit())
    return None


class TransformerUnitTest(unittest.TestCase):
    def __init__(self, *args, **kwargs):
        unittest.TestCase.__init__(self, *args, **kwargs)

    @async_test
    def test_cov(self):
        loop = asyncio.get_event_loop()

        cov_helper.init_coverage_monkey_patch()

        executor = concurrent.futures.ProcessPoolExecutor(1)

        yield from loop.run_in_executor(executor, executor_task)
        executor.shutdown()


if __name__ == '__main__':
    unittest.main()

cov_helper.py

#!python

import os
import concurrent.futures
import functools


def _concurrent_futures_process_worker(*args, **kwargs):
    try:
        orig_call = kwargs.pop('_orig_call')
        result = orig_call(*args, **kwargs)
    finally:
        for aeh in _concurrent_futures_process_worker._atexit_handlers:
            aeh()

    return result

_concurrent_futures_process_worker._atexit_handlers = []


def init_coverage_monkey_patch():
    try:
        import coverage
        cps = os.environ.get("COVERAGE_PROCESS_START")
        if not cps:
            return

        # process pool executors don't call atexit handlers :(
        concurrent.futures.process._process_worker = functools.partial(_concurrent_futures_process_worker, _orig_call=concurrent.futures.process._process_worker)
    except:
        pass


def init_coverage():
    if getattr(init_coverage, '_cov', None) is not None:
        return

    try:
        # unfortunately the ProcessPoolExecutor uses os._exit so it won't call the atexit handlers and thus use the
        # coverage.process_startup trick
        import coverage

        cps = os.environ.get("COVERAGE_PROCESS_START")
        if not cps:
            # No request for coverage, nothing to do.
            return

        cov = coverage.Coverage(config_file=cps, auto_data=True)
        cov.start()
        cov._warn_no_data = False
        cov._warn_unimported_source = False

        _concurrent_futures_process_worker._atexit_handlers.append(cov._atexit)

        init_coverage._cov = cov
    except:
        # not available
        init_coverage._cov = True

.coveragerc

[run]
parallel = True

test run:

#!bash

export COVERAGE_PROCESS_START=.coveragerc
python3.5 /usr/local/bin/coverage run -p cov_tester.py
coverage combine
coverage html
open htmlcov/index.html

now look at the results, note how nothing in get_executor_props is marked as run, further note that in executor_task, loop.run_until_complete is not marked as run, and yet the function that it runs is marked as run :) also "async def doit()" is not marked as run (would expect everything after init_coverage would be).

it seems like its only marking lines as hit in the async task, but not outside it. Perhaps there's some other initialization thats missing?

@nedbat
Copy link
Owner Author

nedbat commented May 8, 2016

Thanks, I'm looking into this. BTW, I took your suggestion of returning the Coverage instance from process_startup. That's in f3b4faef8280 (bb).

@nedbat
Copy link
Owner Author

nedbat commented Dec 18, 2016

Original comment by Loic Dachary (Bitbucket: dachary, GitHub: dachary)


@nedbat before starting with this issue I'd like to make sure I'm not missing a draft or notes you made when looking into this a few months ago. Just to avoid duplicating your effort ;-)

@nedbat nedbat added major bug Something isn't working labels Jun 23, 2018
@nedbat nedbat removed the 4.1 label Aug 17, 2018
@nedbat nedbat added exotic Different execution semantics to be measured and removed bug Something isn't working major labels Jan 15, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
exotic Different execution semantics to be measured
Projects
None yet
Development

No branches or pull requests

1 participant