Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fixed] Task status messages #1625

Merged
merged 17 commits into from Apr 7, 2016
Merged

[fixed] Task status messages #1625

merged 17 commits into from Apr 7, 2016

Conversation

riga
Copy link
Contributor

@riga riga commented Apr 1, 2016

This PR is a fixed version of PR #1621 which has a faulty commit history.

This PR adds status messages to tasks which are also visible on the scheduler GUI.

Examples:

task list

worker list

Status messages are meant to change during the run method in an opportunistic way. Especially for long-running non-Hadoop tasks, the ability to read those messages directly in the scheduler GUI is quite helpful (at least for us). Internally, message changes are propagated to the scheduler via a _status_message_callback which is - in contrast to tracking_url_callback - not passed to the run method, but set by the TaskProcess.

Usage example:

class MyTask(luigi.Task):
    ...
    def run(self):
        for i in range(100):
            # do some hard work here
            if i % 10 == 0:
                self.set_status_message("Progress: %s / 100" % i)

I know that you don't like PR's that affect core code (which is reasonable =) ), but imho this feature is both lightweight and really helpful.

@riga riga mentioned this pull request Apr 1, 2016
@riga
Copy link
Contributor Author

riga commented Apr 1, 2016

Todo's from discussion in #1621:

  • Remove status_message cache variable done
  • Add pydocs done

def set_status_message(self, message):
"""
Sets the status message of the task to message, i.e., invokes _status_message_callback if it
is a callable. This propagates the message down to the scheduler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the docs here, can you add a reference the _Task.set_status_message: section? I think there are a few example in this file you can copy paste.

@Tarrasch
Copy link
Contributor

Tarrasch commented Apr 1, 2016

Cool. This looks totally merge-worthy. Feel free to fix the minor doc-comment.

@riga
Copy link
Contributor Author

riga commented Apr 1, 2016

Done ;)

@Tarrasch
Copy link
Contributor

Tarrasch commented Apr 1, 2016

Nice!

@erikbern
Copy link
Contributor

erikbern commented Apr 1, 2016

can you resolve the failing tests? https://travis-ci.org/spotify/luigi/jobs/120027335

@riga
Copy link
Contributor Author

riga commented Apr 1, 2016

Ok, this is a tough one.

update_status_message is created at worker-level because it needs information on the scheduler. It is then passed to the task process where it is stored as _status_message_callback in the task itself. This is the reason why it can't be pickled.

This is not the case with tracking_url_callback since it's not saved in the task, but rather passed to its run method in TaskProcess._run_get_new_deps. I afraid that the status message callback should use the same mechanism here.

Here is a suggestion for changes in worker.py that would cleanup this mechanism in order to cope for arbitrary callbacks (like update_status_message) as well (and would solve the "problem" that @Tarrasch mentioned here):

from inspect import getargspec

class TaskProcess(multiprocessing.Process):

    def __init__(self, <current kw/args except tracking_url_callback>, run_callbacks=None):
        ...
        if run_callbacks is None:
            run_callbacks = {}
        self.run_callbacks = run_callbacks

def _run_get_new_deps(self):
    run_spec = getargspec(self.task.run)
    if run_spec.keywords:
        run_kwargs = self.run_callbacks.copy()
    else:
        run_kwargs = {key: cb for key, cb in self.run_callbacks.items() if key in run_sepc.args}

    task_gen = self.task.run(**run_kwargs)

    if not isinstance(task_gen, types.GeneratorType):
        return None
    ...

This way, the code in Worker._create_task_process becomes:

def _create_task_process(self, task):
    ...
    run_callbacks = {
        "tracking_url_callback": update_tracking_url,
        "status_message_callback": update_status_message
    }

    return TaskProcess(
        task, self._id, self._task_result_queue,
        random_seed=bool(self.worker_processes > 1),
        worker_timeout=self._config.timeout,
        run_callbacks=run_callbacks
    )

I'm not a fan of using inspect too much myself, but in this case it's quite robust. I can add this to the PR if you want.

@erikbern
Copy link
Contributor

erikbern commented Apr 1, 2016

I remember running into similar issues before

What's the reason we pickle the Tasks in the first place?

@riga
Copy link
Contributor Author

riga commented Apr 1, 2016

I think parallel scheduling requires that tasks can be pickled. At least it's mentioned in the docs at "parallel-scheduling".

Many contrib tasks also make using of it, e.g. in contrib/spark.py.

@erikbern
Copy link
Contributor

erikbern commented Apr 1, 2016

Hm right makes sense. Tasks are in theory interchangeable given the same name and parameters so instead of pickling you could just de/serialize it using that mechanism (that's how the assistant works). It would be quite easy to fix in worker.py

But you are right that some tasks use pickling (eg Hadoop mapreduce) so it would be mess to avoid pickling everywhere.

Wouldn't your suggestion with inspect run into the same issue when contrib tasks use pickle?

@riga
Copy link
Contributor Author

riga commented Apr 1, 2016

Wouldn't your suggestion with inspect run into the same issue when contrib tasks use pickle?

The run_callbacks would be owned by the TaskProcess instance, the task itself still doesn't know anything about the callbacks. I just ran the tests, looks good.

@riga
Copy link
Contributor Author

riga commented Apr 1, 2016

Actually the use of inspect has another advantage. Users might use decorators on Task.run, which are called twice in the current implementation if tracking_url_callback is missing in the signature of run.

@dlstadther
Copy link
Collaborator

Due to the changes in worker.py, do any of its tests need to be added/updated?

@riga
Copy link
Contributor Author

riga commented Apr 4, 2016

LGTM. test_tracking_url and test_type_error_in_tracking_run seem to be the only tests that make use of tracking_url_callback. The changes to TaskProcess and Worker should be pretty much covered by many other test cases.

luigi.notifications.DEBUG = True


class TaskStatusMessageTest(LuigiTestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we do have a test suite which is run in two modes automatically (in-memory scheduler and rpc to external process scheduler). Can you make sure that the rpc code path is also tested?

@Tarrasch
Copy link
Contributor

Tarrasch commented Apr 5, 2016

We do also have a set of scheduler tests (they are disabled in Travis for flakiness reasons), but can you add a that case there to and make sure it passes locally?

for i in range(100):
# do some hard work here
if i % 10 == 0:
status_message_callback("Progress: %d / 100" % i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still correct?! It thought you pass a dict now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keyword argument unpacking also works for arguments, so

def func(a, b):
    print(a, b)
func(**{"b": "bVal", "a": "aVal"})
# -> ("aVal", "bVal")

@Tarrasch
Copy link
Contributor

Tarrasch commented Apr 5, 2016

I appreciate that you introduce inspect to make @decorators not be called twice. But may feel free to skip that change for a different PR if that simplifies things. Such a change for example requires tests that indeed show that decorators are only called once.

what about making the dict callable as I suggested in an inline comment? We would keep it to only one positional parameter for run then. Less magic, no inspection of argument-name.

@riga
Copy link
Contributor Author

riga commented Apr 6, 2016

Ok. How about this solution:

class TaskProcess(multiprocessing.Process):

    def __init__(self, ..., tracking_url_callback=None, status_message_callback=None):
        ...
        self.tracking_url_callback = tracking_url_callback
        self.status_message_callback = status_message_callback
        ...

    def _run_get_new_deps(self):
        self.task.set_tracking_url = self.tracking_url_callback
        self.task.set_status_message = self.status_message_callback

        def deprecated_tracking_url_callback(*args, **kwargs):
            warnings.warn("tracking_url_callback in run() args is deprecated, use "
                          "set_tracking_url instead.", DeprecationWarning)
            self.tracking_url_callback(*args, **kwargs)

        run_again = False
        try:
            task_gen = self.task.run(tracking_url_callback=deprecated_tracking_url_callback)
        except TypeError as ex:
            if 'unexpected keyword argument' not in str(ex):
                raise
            run_again = True
        if run_again:
            task_gen = self.task.run()

        self.task.set_tracking_url = None
        self.task.set_status_message = None
        ...

The callbacks are set right before run() is called, and then reset. This way pickling is not affected, it's backward compatible, and at some point the try-except block can be refactored.

I just ran the tests and everything looks good.

@Tarrasch
Copy link
Contributor

Tarrasch commented Apr 6, 2016

Sure. But maybe lets hear from @daveFNbuck first, perhaps there were other reasons he opted for the extra-arg in run().

Though I'm very positive to this. It feels a bit hasted now to have introduced that extra arg in run().

Also, of course you'll have to change the places in the code-base that uses run(tracking_url_callback) already, but it seems like your new suggested interface is pretty easy to use. :)

@riga
Copy link
Contributor Author

riga commented Apr 6, 2016

Also, of course you'll have to change the places in the code-base that uses run(tracking_url_callback) already, but it seems like your new suggested interface is pretty easy to use. :)

Yep, already done that, mainly in hadoop, hadoop_jar, hive and scalding) =) I'm preparing the commits right now.

Update: working on the failed tests ...

@riga
Copy link
Contributor Author

riga commented Apr 6, 2016

I changed hadoop.JobTask.dump to disregard callbacks, as they're not need in the deserialized job anyway. E.g. the tracking url is not set actively but parsed from the hadoop job stderr.

All tests pass now.

@daveFNbuck
Copy link
Contributor

This looks like a good alternative to my solution, and more scalable. I see that the example attached here is of a progress percentage. If this is the main use case, I've been thinking it would be nice to add an optional progress bar that can be automatically displayed in the visualizer. That would be a bit better than having a popup to show a single number.

@riga
Copy link
Contributor Author

riga commented Apr 6, 2016

it would be nice to add an optional progress bar that can be automatically displayed in the visualizer

Good idea. Maybe I can add an additional PR that implements a progress bar. However, I think there are more use cases than just sending the progress. We use it for important intermediate output and to some extent for a summary of final results. Maybe one can parse the status message with a regexp (e.g. new RegExp('^progress\:\s(\d+)\%.*$', 'i')) to decide whether to show a progress bar or a pop up.

return self.has_run

def run(self):
if self.set_tracking_url is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait. Do the user really have to do this check before using the tracking_url? Can't we somehow guarantee that it's always present?

@Tarrasch
Copy link
Contributor

Tarrasch commented Apr 7, 2016

This looks very good. Thank you so much for doing this! Just see my inline comment/worry.

Other than that. This is good to merge right?

@riga
Copy link
Contributor Author

riga commented Apr 7, 2016

Can't we somehow guarantee that it's always present?

Yep, Worker._create_task_process is the only place where TaskProcesses are created, and the callbacks are always present/created here. I made some changes to reflect that. Of course, outside the run() scope, users will get an exception when a callback is used, but I think that's fine/wanted.

@Tarrasch
Copy link
Contributor

Tarrasch commented Apr 7, 2016

@riga, ok, is this "fine to merge" now you think?

@riga
Copy link
Contributor Author

riga commented Apr 7, 2016

Yes =)

@Tarrasch
Copy link
Contributor

Tarrasch commented Apr 7, 2016

Ok. Let's wait for test results and then I'll merge. :)

@riga
Copy link
Contributor Author

riga commented Apr 7, 2016

@Tarrasch Looks good, glad I could help.

@Tarrasch Tarrasch merged commit f46efc1 into spotify:master Apr 7, 2016
Tarrasch pushed a commit that referenced this pull request Apr 7, 2016
This fixes a bug resulting from the interference of PR #1625 (Task status messages) and PR #1631 (Add explicit whitelist of RPC commands for luigid) task status messages.

To fix this, I simply added ``set_task_status_message`` and ``get_task_status_message`` to the white-listing.
@@ -870,6 +871,7 @@ def _serialize_task(self, task_id, include_deps=True, deps=None):
'priority': task.priority,
'resources': task.resources,
'tracking_url': getattr(task, "tracking_url", None),
'status_message': task.status_message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is causing a stack-trace like this:

Traceback (most recent call last):
  File "/zserver/Python-3.4.3/lib/python3.4/site-packages/tornado/web.py", line 1443, in _execute
    result = method(*self.path_args, **self.path_kwargs)
  File "/zserver/apps/luigi/code-for-luigid/luigi/server.py", line 101, in get
    result = getattr(self._scheduler, method)(**arguments)
  File "/zserver/apps/luigi/code-for-luigid/luigi/scheduler.py", line 986, in task_list
    serialized = self._serialize_task(task.id, False)
  File "/zserver/apps/luigi/code-for-luigid/luigi/scheduler.py", line 868, in _serialize_task
    'status_message': task.status_message

For anyone using an old pickled state-file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do anything to fix this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can copy the line above. Make it like

getattr(task, "status_message", None) instead of task.status_message. Then a couple of months later we're changing it back to task.status_message. Do you see how it'll work? Do you mind submitting a PR? (you can use the online editor)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created PR #1645 for this issue.

@michcio1234
Copy link
Contributor

@riga @Tarrasch
Hey, sorry for notifying you in something that old. I don't want to open a new issue since I'm not sure if anything is wrong but...
where can I see a status message that I set using self.set_status_message('msg')? Can't find it in the scheduler GUI...

@riga
Copy link
Contributor Author

riga commented Dec 19, 2017

Hi @michcio1234 ,
the status messages are visible in the scheduler GUI. You just have to click on the chat/message icon in the "Actions" column in the task table. If a task has no status message, that icon won't be visible.

@michcio1234
Copy link
Contributor

michcio1234 commented Dec 19, 2017

Then I'm probably doing something wrong since I can't see this icon. So it seems that setting a message is not as simple as stated in the documentation.
Okay, thank you anyway for your response. If I can't make it work, I'll open a new issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants