Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Update apply_async and apply_async_with_reservations signatures #3002

Merged
merged 1 commit into from Jun 2, 2017

Conversation

fdobrovolny
Copy link
Contributor

Now one should be able to tell what is passing to where.

closes #2658
https://pulp.plan.io/issues/2658

@mention-bot
Copy link

@BrnoPCmaniak, thanks for your PR! By analyzing the history of the files in this pull request, we identified @bmbouter, @seandst and @werwty to be potential reviewers.

@bmbouter
Copy link
Member

bmbouter commented May 6, 2017

This needs a new push to resolve conflicts.

@@ -149,7 +149,7 @@ class UserFacingTask(PulpTask):
# this tells celery to not automatically log tracebacks for these exceptions
throws = (PulpException,)

def apply_async_with_reservation(self, resource_type, resource_id, *args, **kwargs):
def apply_async_with_reservation(self, resource_type, resource_id, *async_args, **async_kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the docstrings be updated to include async_args and async_kwargs.

return AsyncResult(inner_task_id)

def apply_async(self, *args, **kwargs):
def apply_async(self, *celery_args, tags=[], group_id=None, **celery_kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the docstring be updated to include celery_args and celery_kwargs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also needs to be celery_args=None instead of *celery_args and celery_kwargs=None instead of **celery_kwargs.

@@ -149,7 +149,7 @@ class UserFacingTask(PulpTask):
# this tells celery to not automatically log tracebacks for these exceptions
throws = (PulpException,)

def apply_async_with_reservation(self, resource_type, resource_id, *args, **kwargs):
def apply_async_with_reservation(self, resource_type, resource_id, *async_args, **async_kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be args=None, kwargs=None instead of *async_args, **async_kwargs. It needs to match the function signature of celery's apply_async. http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async

@@ -190,8 +191,8 @@ def apply_async_with_reservation(self, resource_type, resource_id, *args, **kwar
resource_id = ":".join((resource_type, resource_id))
inner_task_id = str(uuid.uuid4())
task_name = self.name
tag_list = kwargs.get('tags', [])
group_id = kwargs.get('group_id', None)
tag_list = async_kwargs.get('tags', [])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tags and group_id should become explicit arguments and not expected to be pulled out of async_kwargs. This way async_kwargs will only store kwargs that will be sent to celery.

Copy link
Member

@bmbouter bmbouter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments. Maybe I'm not thinking about it right so if my comments don't make sense just let me know. Thanks for working on this.

@fdobrovolny
Copy link
Contributor Author

@bmbouter I'm not sure if I get the task how it was ment. My understanding of the task was that all parameters of apply_async are in one *args and **kwargs and then in the function we popped some things from it. Which was what I think is the confusing part.

So I made changes to separate these things. Also I renamed the parameters in way that I found more understandable that they are not the args and kwargs parameters on the celery function but rather the whole *args and **kwargs of the function. And therefore our function would become a wrapper which just takes some more arguments and then everything else is passed to the celery apply_async.

I get this understatement from the fact that parameters on http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async
apply_async use **options so I thought that it's better to group all celery params into one **kwargs and pass them uniformly so it would be more clear, but probably it's not.

Also I don't understand what you mean with updating the docstring. Both of the function have a paragraph containing the parameters e.g. https://github.com/pulp/pulp/pull/3002/files#diff-6233f95b689d7b89c6debe53c073b2d7R166

But I can change it to correspond exactly to the way how celery signature looks like, if you think it's the way how it was meant.

@werwty
Copy link
Contributor

werwty commented May 8, 2017

@BrnoPCmaniak I think we do want to duplicate celery's function signature as-is.
The 'args' and 'kwargs' that celery takes are arguments passed onto the task, whereas the **options is used in the celery task. It would be good to make this distinction explicit in our apply_async function signature.

I envisioned our apply_async function signature to be:
apply_async(self, tags=[], group_id=None, task_args=None, task_kwargs=None, **celery_options):
and we would call the super with:
async_result = super(UserFacingTask, self).apply_async(args=task_args, kwargs=task_kwargs, **celery_options)

@bmbouter
Copy link
Member

bmbouter commented May 8, 2017

@BrnoPCmaniak I also imagined it as @werwty wrote in her comment. We want to mimic Celery's apply_async() signature but with our own keyword arguments tags and group_id to be accepted directly in the signature.

apply_async_with_reservation(self, resource_type, resource_id, tags=[], group_id=None, args=None, kwargs=None, **options):

Similarly for apply_async() it would be:

apply_async(self, tags=[], group_id=None, args=None, kwargs=None, **options):

and we would call the super with:

async_result = super(UserFacingTask, self).apply_async(args=args, kwargs=kwargs, **options)

@bmbouter
Copy link
Member

bmbouter commented May 8, 2017

@BrnoPCmaniak Ignore my comment about the docstrings, I didn't realize we documented already like parameters even though your diff is adding them as parameters. Thanks!

@fdobrovolny
Copy link
Contributor Author

@bmbouter @werwty Fixed.

Copy link
Contributor

@werwty werwty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BrnoPCmaniak This looks good!

There's just one other refactoring change needed,
https://github.com/BrnoPCmaniak/pulp/blob/441d23b53544c6f627f432c9bb419e42f507e77a/platform/pulp/tasking/tasks.py#L99 should be refactored to pass in args=(inner_task_id, )

async_result = super(UserFacingTask, self).apply_async(*args, **kwargs)
async_result.tags = tag_list
async_kwargs = {
"tags": tags,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not be passing tags and group_id to celery

# Call the outer task which is a promise to call the real task when it can.
_queue_reserved_task.apply_async(args=[task_name, inner_task_id, resource_id, args, kwargs],
_queue_reserved_task.apply_async(args=[task_name, inner_task_id, resource_id, async_args,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like _queue_reserved_task wants inner_args, inner_kwargs not async_args and async_kwargs, so lines 208-215 can be removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nevermind, I see _queue_reserved_task passes this back to apply_async.

async_kwargs = {
"args": args,
"kwargs": kwargs,
"options": options
Copy link
Contributor

@werwty werwty May 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options isn't an keyword argument, it's a dict of keywork arguments that should be unpacked before being passed in to apply_async

async_result = super(UserFacingTask, self).apply_async(args=args, kwargs=kwargs, **options)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fdobrovolny fdobrovolny force-pushed the 2658-apply_async branch 2 times, most recently from 13b69c5 to ae0fabc Compare May 12, 2017 19:57
Copy link
Contributor

@werwty werwty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BrnoPCmaniak this looks good! I tested it in pulp3 with repository.delete.apply_async and repository.delete.apply_async_with_reservation

Thanks!

# Wrap celery signature into kwargs for _queue_reserved_task
async_args = tuple()
async_kwargs = {
"tags": tags,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't exactly part of the story, but I don't think that tags and group_id should be kept in the async_kwargs. They are only needed in the few statements above and do not need to be passed along to the task.

@fdobrovolny
Copy link
Contributor Author

@bmbouter Fixed.

@@ -96,7 +96,7 @@ def _queue_reserved_task(name, inner_task_id, resource_id, inner_args, inner_kwa
try:
celery.tasks[name].apply_async(*inner_args, **inner_kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be changed to be: celery.tasks[name].apply_async(args=inner_args, kwargs=inner_kwargs) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is correct as is. inner_kwargs in this case needs to be unpacked, since it's:

async_kwargs = {
            "args": args,
            "kwargs": kwargs,
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid calling apply_async() with *something, or **something_else because our args and kwargs to the task are being mixed with the options we are passing to apply_sync itself. Say one of our tasks wants to use a keyword that is also used by apply_async like max_retries. Couldn't this be an issue?

It would be avoided (I think) by always having apply_async have parameters (positional and keyword) that are meant for the task itself being delivered via a keyword named args and kwargs to the apply_async().

Does that make sense? I'm still maybe a little confused. (It's complicated). :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the end task's parameters always live inside their own dict and tuple and never get mixed with others. For example I call this https://github.com/BrnoPCmaniak/pulp/blob/b6e579ca3a8ec7d50bd0127738d2403b5620f7ee/platform/pulp/tasking/tasks.py#L152 this:

some_task.apply_async_with_reservation("RPM", "dnf-2.5.0", tags=[], group_id=None, args=(1, 2), kwargs={"max_retries": 5}, max_retries=10)

this leads to https://github.com/BrnoPCmaniak/pulp/blob/b6e579ca3a8ec7d50bd0127738d2403b5620f7ee/platform/pulp/tasking/tasks.py#L45

_queue_reserved_task.apply_async(args=["some_task", "123e4567-e89b-12d3-a456-426655440000", "dnf-2.5.0", tuple(), {"args": (1, 2),"kwargs": {"max_retries": 5}, "max_retries": 10}], queue=RESOURCE_MANAGER_QUEUE)

Thanks to this method on line https://github.com/BrnoPCmaniak/pulp/blob/b6e579ca3a8ec7d50bd0127738d2403b5620f7ee/platform/pulp/tasking/tasks.py#L45 will be called taht method with parameters:

_queue_reserved_task("some_task", "123e4567-e89b-12d3-a456-426655440000", "dnf-2.5.0", tuple(), {"args": (1, 2),"kwargs": {"max_retries": 5}, "max_retries": 10})

And then the apply async is finally called.
https://github.com/BrnoPCmaniak/pulp/blob/b6e579ca3a8ec7d50bd0127738d2403b5620f7ee/platform/pulp/tasking/tasks.py#L220

some_task.apply_async(args=(1, 2), kwargs={"max_retries": 5}, max_retries=10)

Which then calls the task itself:

some_task(1,2 max_retries=5)

Copy link
Contributor

@werwty werwty May 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you update async_args here to be args

 async_args = args
 async_kwargs = {
            "kwargs": kwargs,
 }

Then at https://github.com/BrnoPCmaniak/pulp/blob/b6e579ca3a8ec7d50bd0127738d2403b5620f7ee/platform/pulp/tasking/tasks.py#L97
you don't have to unpack and can call apply_async with:
celery.tasks[name].apply_async(args=inner_args, kwargs=inner_kwargs)

Like this we can also stop passing the empty tuple around. The more I think about this, the more I think this is the correct way to go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@werwty I can't agree with you on that. What you propose is half solution because then on the end task you would end up with task(*args, kwargs={kwargs: {...}})"and also you would loose this way ability to pass all additional arguments to apply_async.

Copy link
Contributor

@werwty werwty May 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BrnoPCmaniak I'm thinking of something like: werwty@528e44c

Where additional arguments are being passed through options, and you would end up with task(args=args, kwargs=kwargs, **options)

# Call the outer task which is a promise to call the real task when it can.
_queue_reserved_task.apply_async(args=[task_name, inner_task_id, resource_id, args, kwargs],
_queue_reserved_task.apply_async(args=[task_name, inner_task_id, resource_id, async_args,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should become:

_queue_reserved_task.apply_async(args=(task_name, inner_task_id, resource_id, args, kwargs), queue=RESOURCE_MANAGER_QUEUE)

The change includes a switch from a list to a tuple. Also it ensures args and kwargs are passed as positional arguments to _queue_reserve_task which expects them as positional arguments.

Copy link
Member

@bmbouter bmbouter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments. This stuff is really complicated so maybe some of my comments are wrong. Think about if there are right or not and let me know with questions and ideas.

@fdobrovolny
Copy link
Contributor Author

@bmbouter I'm definitely in for the change into tuple. But how @werwty pointed out I really think it should be this way.

@fdobrovolny fdobrovolny force-pushed the 2658-apply_async branch 3 times, most recently from 60b6dcf to b4af676 Compare May 19, 2017 16:48
Copy link
Contributor

@werwty werwty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I tested this PR manually and tasking works as expected.
Thanks @BrnoPCmaniak

:type inner_args: tuple
:param inner_kwargs: The keyword arguments to pass on to the task.
:type inner_kwargs: dict
:param **options: For all options accepted by apply_async please visit: http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async #NOQA
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a positional argument in the function signature this docstring should be s/**//.

@@ -64,6 +64,12 @@ def _queue_reserved_task(name, inner_task_id, resource_id, inner_args, inner_kwa
will ensure that no other tasks that want that same reservation will run
concurrently with yours.
:type resource_id: basestring
:param inner_args: The positional arguments to pass on to the task.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these docstrings be converted to Napoleon per PUP2? This can be done as a separate PR, but I think it would be good. Here is a regex to help with the conversation: https://pulp.plan.io/issues/2347?pn=1#note-14 Thank you for anything you do in this area.

task_status.tags.create(name=tag)

# Call the outer task which is a promise to call the real task when it can.
_queue_reserved_task.apply_async(args=[task_name, inner_task_id, resource_id, args, kwargs],
_queue_reserved_task.apply_async(args=[task_name, inner_task_id, resource_id, args,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this list [task_name, ... ] be converted to a tuple for more memory efficiency?

@@ -225,14 +230,18 @@ def apply_async(self, *args, **kwargs):

:param group_id: The id that identifies which group of tasks a task belongs to
:type group_id: uuid.UUID
:param args: The positional arguments to pass on to the task.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few lines above L#223 - L#225 has queue as a parameter but it's not accepted as a named parameter. Please remove queue from this docstring.

async_result.tags = tag_list

async_result = super(UserFacingTask, self).apply_async(args=args, kwargs=kwargs, **options)
async_result.tags = tags
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should be removed. The async_result returned does not need tags as an attribute. This is a holdover from the Pulp2 code which also did not use it.

Copy link
Member

@bmbouter bmbouter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the revisions @BrnoPCmaniak This PR is very close to what it needs to be. I left some comments requesting changes. They are all easy, straightforward change I think. Please let me know if you have questions or if these changes don't make sense. Thanks again for putting this PR together!

concurrently with yours.
inner_args (tuple): The positional arguments to pass on to the task.
inner_kwargs (dict): The keyword arguments to pass on to the task.
options: For all options accepted by apply_async please visit: http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async #NOQA
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have a type also. I think options (dict).

This is an optional argument which is pulled out of kwargs.
args (tuple): The positional arguments to pass on to the task.
kwargs (dict): The keyword arguments to pass on to the task.
options: For all options accepted by apply_async please visit: http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async #NOQA
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here. options (dict):

:param einfo: celery's ExceptionInfo instance, containing serialized traceback.
:type einfo: ???
Args:
exc: The exception raised by the task.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could maybe be exc (BaseException):

Copy link
Member

@bmbouter bmbouter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much @BrnoPCmaniak . I left just a few very small docstring comments if you feel like fixing those before merging. This has been a complex piece of work, but I think what you've made here is exactly right. Thanks again for the great contribution.

@fdobrovolny fdobrovolny merged commit 85b641c into pulp:3.0-dev Jun 2, 2017
@fdobrovolny fdobrovolny deleted the 2658-apply_async branch June 2, 2017 17:06
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
4 participants