Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to get a callgraph including asyncio.gather? #54

Closed
asodeur opened this issue Apr 16, 2020 · 13 comments
Closed

How to get a callgraph including asyncio.gather? #54

asodeur opened this issue Apr 16, 2020 · 13 comments

Comments

@asodeur
Copy link

asodeur commented Apr 16, 2020

Coroutines being run via asyncio.gather do not show-up in the callgraph for the calling function.

asyncio.gather returns a Future gathering the results from the provided coroutines. Timings are correct (up to the caveats in #21) but the callgraph only shows the creation of the gathering future. The caller for the coroutines run via gather is the event loop.

Is there any way to provide hints to yappi to change the caller for the coroutines?

Example:

from asyncio import gather,  run, sleep
import yappi

async def aio_worker(id):
    await sleep(1.)
    return id

async def doit():
    task1 = aio_worker(1)
    task2 = aio_worker(2)

    return await gather(task1, task2)

if __name__ == '__main__':
    yappi.set_clock_type('wall')
    with yappi.run(builtins=True):
        run(doit())

    stats = yappi.get_func_stats()

    print("\n\nCallers of 'aio_worker'")
    ps = yappi.convert2pstats(stats)
    ps.print_callees('doit')
    ps.print_callees('gather')  # <- this schedules a future collecting results, only see its __init__ in callees

    ps.print_callers('aio_worker')  # <- only caller is the event loop: "__builtin__:0(<method 'run' of 'Context' objects>)"

For me it would be ok if gather would not show-up in the callgraph at all and aio_worker looked like a direct callee of doit.

@sumerc
Copy link
Owner

sumerc commented Apr 16, 2020

Hey Andreas,

I understand it is a reasonable/practical request to identify caller/callee relationships between coroutines. However, the issue is asyncio.gather adds a done_callback to the loop and returns back a Future. From the profilers perspective there is nothing that links doit() and aio_workers as they are not seen in the callstack as a parent/child relation. Unfortunately, there are more functions like this in asyncio that returns a Future and currently we have no way of knowing who is the caller of that Future (e.x: loop.call_soon or loop.call_later).

To make this work, one needs to hook into library functions to make sense of what is happening. Currently, I cannot see an easy way through to work this out without not going too deep into the library internals.

Needless to say: I am open to suggestions if you have any?

@asodeur
Copy link
Author

asodeur commented Apr 17, 2020

I was hoping there might be some easy trick using set_*_callback.

Adding a callback to change the recorded callstack is probably a no-go (not obvious how this could work and performance reasons). Do you think there is a chance a monkey-patched gather putting hints in a contextvar in combination with set_tag_callback might leave enough information to clean the callgraph in a post-processing step? If there are no obvious reasons this won't work I could give it a try.

@sumerc
Copy link
Owner

sumerc commented Apr 17, 2020

Do you think there is a chance a monkey-patched gather putting hints in a contextvar in combination with set_tag_callback might leave enough information to clean the callgraph in a post-processing step?

I am not sure if I understand your suggested solution. You can use set_tag_callback to tag any profiled function but still not sure how you will link futures with the gather. If you can demonstrate a simple PoC, I would be more than happy to help.

@asodeur
Copy link
Author

asodeur commented Apr 20, 2020

I think I got a prototype doing the tagging but I am struggleling with rewriting the statistics. Already failing to filter the stats for tag == 1. YFuncStat.tag always seems to return 0 and YFuncStats.get seems to mutate the statistics in place.

stats = yappi.get_func_stats()
orig_len = len(stats) 
assert [fs for fs in stats if fs.tag == 1]  == [] # gives [], fs.tag always seems to be zero (but they are not)
assert 0 < len(stats.get({'tag': 1})) < orig_len # there seem to be functions with tag == 1 as expected
len(stats) == len(stats.get({'tag': 1}))  # .get seems to have mutated stats

How do I filter for functions by tag?

@sumerc
Copy link
Owner

sumerc commented Apr 20, 2020

If you are using the latest master branch, please get it like: yappi.get_func_stats(tag=xxxx)

But the problem seems to be different in your case, are you sure you set correct tag callback? This should work in any case: [fs for fs in stats if fs.tag == 1]. Maybe you could post the example code?

@asodeur
Copy link
Author

asodeur commented Apr 20, 2020

Attach my code below. The code accessing the tags is inside the __main__ guard at the bottom. Looks to me as if there is an issue with the Python wrapper. YFuncStat.tag is zero until it is beeing used by YFuncStats.get, YFuncStats.get mutates the object in-place, yappi.get_func_stats gives a new, clean YFuncStats.
For now a feasible work-around should be to chain subsets for different tags (which works in my case b/c the set of all tags is known upfront)

from asyncio import create_task, run, sleep
from contextvars import ContextVar
import inspect
import yappi


_marker = ContextVar('yappi_task_marker')
_task_counter = 0
_task_map = {}


CREATE_TASK_ID = (
    create_task.__code__.co_filename, create_task.__code__.co_firstlineno, create_task.__code__.co_name
)


def _task_tag_cbk():
    return _marker.get(0)


async def aio_worker():
    await sleep(1.)
    return _marker.get(0)


async def doit():
    # except for the two lines marked with '# <- keep this' everything should go into the task factory (or monkey-patch)
    global _task_counter

    coro = aio_worker()  # <- keep

    f = inspect.currentframe()
    caller_id = (f.f_code.co_filename, f.f_code.co_firstlineno, f.f_code.co_name)
    callee_id = (coro.cr_code.co_filename, coro.cr_code.co_firstlineno, coro.cr_code.co_name)

    fid = (*caller_id, *callee_id)
    tag = _task_map.get(fid, 0)
    if not tag:
        _task_counter += 1
        _task_map[fid] = tag = _task_counter
    token = _marker.set(tag)

    task = create_task(coro)  # <- keep this

    _marker.reset(token)

    return await task  # <- keep this

if __name__ == '__main__':
    yappi.set_tag_callback(_task_tag_cbk)
    yappi.set_clock_type('wall')
    with yappi.run(builtins=True):
        print('Task tag: ', run(doit(), debug=False))

    stats = yappi.get_func_stats()

    ## various attempts a retrieving functions with tag == 1
    assert len(stats) == 274
    assert [fs for fs in stats if fs.tag == 1] == []  # gives [], fs.tag always seems to be zero (but they are not)
    assert len(stats.get({'tag': 1})) == 61  # using .get and filter finds 61 functions which looks reasonable
    assert len([fs for fs in stats if fs.tag == 1]) == 61  # .get with filter seems to have populated the tags
    len(stats) == 61  # but also mutated stats in place
    ##

    stats = yappi.get_func_stats()  # this seems to get the original stats back
    assert len(stats) == 274
```

@sumerc
Copy link
Owner

sumerc commented Apr 20, 2020

The correct/fastest way to retrieve per-tag func stats is like: yappi.get_func_stats({tag:xxx}) as it happens entirely on C. The other version [fs for fs in stats if fs.tag == 1] will not perform well on big datasets.

yappi.get_func_stats() will traverse the snapshot that is hold in memory on the Yappi's C extension. That is why, when you call it multiple times, it will traverse that memory again and again and return the same object. Any kind of filtering you do on the stats object will be done entirely on your snapshot. And the reason behind this: profiling might continue between different calls to get_func_stats() which happens in C side. All other API happening on YFuncStats are actually a frontend for this Snapshot.

Final: the reason [fs for fs in stats if fs.tag == 1] is not same with yappi.get_func_stats() is actually a bug. The tag always returns zero when get_func_stats() does not specify a tag value. I will hopefully be fixing this ASAP. But fortunately, this should not block you in any way.

Please use:

stats = yappi.get_func_stats({'tag':1})

@asodeur
Copy link
Author

asodeur commented Apr 21, 2020

Thx, this was helpful. Below is a rough sketch of the idea. The resulting callgraph for doit looks ok.

Still some issue to work-out

  1. doit's ttot needs to be propagated-up the call tree (and maybe should be removed from the
    loop's .select).
  2. The code setting-up the context var needs to move from doit into a task factory
  3. Need to aggregate all create_task per function as the tag is lost during conversion to
    YChildFuncStat.
  4. Current code likely to be pretty slow, should be able to avoid most of the loops

but none of this looks impossible.

from asyncio import create_task, run, sleep
from contextvars import ContextVar
import inspect
import yappi


_marker = ContextVar('yappi_task_marker')
_task_counter = 0
_task_map = {}


CREATE_TASK_ID = (
    create_task.__code__.co_filename, create_task.__code__.co_firstlineno, create_task.__code__.co_name
)


def _task_tag_cbk():
    return _marker.get(0)


async def aio_worker():
    await sleep(1.)
    return _marker.get(0)


async def doit():
    # except for the two lines marked with '# <- keep this' everything should go into a task factory (or monkey-patch)
    global _task_counter

    coro = aio_worker()  # <- keep

    f = inspect.currentframe()
    caller_id = (f.f_code.co_filename, f.f_code.co_firstlineno, f.f_code.co_name)
    callee_id = (coro.cr_code.co_filename, coro.cr_code.co_firstlineno, coro.cr_code.co_name)

    fid = (*caller_id, *callee_id)
    tag = _task_map.get(fid, 0)
    if not tag:
        _task_counter += 1
        _task_map[fid] = tag = _task_counter
    token = _marker.set(tag)

    task = create_task(coro)  # <- keep this

    _marker.reset(token)

    return await task  # <- keep this


def get_func_stats_with_tags(tags):
    """yappi 1.2.4 does not populate YFuncStat.tag unless queried"""
    result = yappi.YFuncStats()

    for tag in tags:
        stats = yappi.get_func_stats()
        for fs in stats.get({'tag': tag}):
            result.append(fs)

    if 0 not in tags:
        stats = yappi.get_func_stats()
        for fs in stats.get({'tag': 0}):
            result.append(fs)

    return result


def to_child_func_stat(y_func_stat):
    return yappi.YChildFuncStat([
        y_func_stat.index,
        y_func_stat.ncall,
        y_func_stat.nactualcall,
        y_func_stat.ttot,
        y_func_stat.tsub,
        y_func_stat.tavg,
        y_func_stat.builtin,
        y_func_stat.full_name,
        y_func_stat.module,
        y_func_stat.lineno,
        y_func_stat.name
    ])


def fix_calltree(stats, task_map):
    # TODO: not working yet
    fixed_stats = yappi.YFuncStats()
    callee_map = {}
    caller_map = {}
    for (rmodule, rline, rname, emodule, eline, ename), tag in task_map.items():
        callee_map[emodule, eline, ename] = tag
        caller_map[rmodule, rline, rname] = tag

    callees = {}
    for fs in stats:
        tag = callee_map.get((fs.module, fs.lineno, fs.name))
        if tag and fs.tag == tag:
            callees[tag] = to_child_func_stat(fs)

    create_tasks = {}
    for fs in stats:
        if (fs.module, fs.lineno, fs.name) == CREATE_TASK_ID:
            callee = callees.get(fs.tag)
            if callee:
                new_fs = yappi.YFuncStat(fs)
                new_fs.ttot += callee.ttot
                new_fs.tavg += callee.tavg
                new_fs.children.append(callee)

                fixed_stats.append(new_fs)
                create_tasks[fs.tag] = to_child_func_stat(new_fs)

    for fs in stats:
        new_fs = yappi.YFuncStat(fs)
        tag = caller_map.get((fs.module, fs.lineno, fs.name))
        if tag:
            ct = create_tasks.get(tag)
            if ct:
                new_fs.tsub -= ct.ttot
                new_fs.children.append(ct)

        fixed_stats.append(new_fs)

    return fixed_stats


if __name__ == '__main__':
    yappi.set_tag_callback(_task_tag_cbk)
    yappi.set_clock_type('wall')
    with yappi.run(builtins=True):
        print('Task tag: ', run(doit(), debug=False))

    stats = get_func_stats_with_tags(set(_task_map.values()))

    fixed_stats = fix_calltree(stats, _task_map)

    # save to view with snakerun or similar
    fixed_stats.save('the_profile.pstat', type='pstat')

@asodeur
Copy link
Author

asodeur commented Apr 23, 2020

Sorry to bother you again, but bumped into this:

class YFuncStat(YStat):
    ...
    def __eq__(self, other):
        if other is None:
            return False
        return self.full_name == other.full_name

Shouldn't comparison take tag into account as well. As a knock-on effect you cannot YFuncStats.append the same function with different tags.

@sumerc
Copy link
Owner

sumerc commented Apr 23, 2020

Sorry to bother you again,

Not at all :)

Shouldn't comparison take tag into account as well

I forgot to mention that it is not possible to traverse on [fs for fs in stats if fs.tag == 1] like this. I completely forgot the implementation details for the get_func_stats, so my previous comment on this being an issue was wrong. You see the point. The snapshot generated by the get_func_stats() is aggregated over func_name or index fields of the underlying data. So, if you have multiple tags for same function, they will be aggregated into a single one and you cannot traverse it that way. It has historic reasons which I don't remember and it is very hard to do other way without breaking backward compat.

Anyway, the only correct way to traverse per-tag or context id stats is: get_func_stats(tag=, ctx_id). I will be updating docs for this. (if there are any mentions of this)

If you really, really want to enumerate the stats yourself, you can always call _yappi.enum_func_stats() which provides all the raw data but I feel get_func_stats() should be enough?

@asodeur
Copy link
Author

asodeur commented May 7, 2020

Got a first prototype by now.

Turns-out you have to set a context var on the caller side of loop.create_task to correlate caller and callee (like in doit above). This requires to (i) change the profiled code, (ii) monkey patch asyncio, (iii) use a custom event loop implementation overriding create_task, or (iv) customize yappi's profile function.

The current prototype is trying to customize the yappi profile function but that does not seem to be supported currently (does nothing with profile_threads==True, crashes for multi-threaded programs). Are there plans to support customizing the profile function?

(Without support for setting the profile function I'd likely go with (iii), the only downside of that being that you need to run the whole program on a custom event loop even if you are just profiling little pieces)

@sumerc
Copy link
Owner

sumerc commented May 7, 2020

First: Wow!

Congrats on this!

Now I would like to help you on this but I think this is not something I can include in Yappi for the time being. Let me share my reasoning before moving further:

  • First: the code might work but extremely fragile on the internals of asyncioas it requires monkey patching. I remember I mentioned it here: How to get a callgraph including asyncio.gather? #54 (comment) . As a maintainer, I could hardly find time to work on the project and I really would not want to depend on internals of other libraries.
  • I am planning to integrate gevent which means I think we should not be too specific on some library(even if it is stdlib). I am hoping Yappi can profile greenlets as well, without too much effort.
  • I understand this adds value: to see the initial callers of coroutines but, given the complexity of this implementation, I am not sure if this is worth the effort.
  • And the code also only covers cases like gather, create_task. but what about call_soon, call_soon_threadsafe, call_later...etc. And all the other functions that can schedule a coroutine?

However: If you really would like to go over this: what I would suggest is to implement another library using Yappi or simply fork it. I would try my best to help on your issues.

Let's clarify this first and then we can talk about potential hooks that you request.

@sumerc
Copy link
Owner

sumerc commented Sep 8, 2022

Closing this issue as there is no progress.

@sumerc sumerc closed this as completed Sep 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants