Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic requirements #424

Merged
merged 10 commits into from Sep 24, 2014
Merged

Dynamic requirements #424

merged 10 commits into from Sep 24, 2014

Conversation

tommyengstrom
Copy link
Contributor

This is a prototype for dynamic requirement based of erikbern's dynamic-requires branch

It differs in that it multiple requirements can be returned at once and that it supports multiple threads. There is no concept of suspending tasks. If the requirements are not all available when yielded the task will be put back as pending.

I struggled with getting access to the dynamic requirements from other processes. How it works now is that you have to add a dynamic_requires function that return the Tasks that may be yielded later on. I'm very open to suggestions on how to improve this.

Again, just a prototype... Comments please!

@tommyengstrom
Copy link
Contributor Author

At first I used a "tunnel" function in the task in order get the tasks, ie:

def _tunnel(self, expr):
    return eval(expr)

I had hoped to figure out a way to do this in the background but so far I have not.

@erikbern
Copy link
Contributor

erikbern commented Sep 4, 2014

I'm not sure I understand what's the point of the dynamic_requires function? Is it because you need to pass references to classes across processes? If that's the case could you pass the task name and parameters in a serialized form instead? I've been working a bit on an interface that lets you instantiate any Task given its module and arguments, see #378 (if you make this work then as a side effect you pretty much also created a Luigi "slave" mode, which is another feature we want)

@tommyengstrom
Copy link
Contributor Author

Yes. And as I said, not happy about it... I'll look at your branch tomorrow, it would be awesome if it's possible to use it.

@tommyengstrom
Copy link
Contributor Author

Okay, so there are still not tests for it but the code is significantly improved.

I took your advice but ended up using the normal ArgParseInterface and importing the module myself since I didn't manage to befriend DynamicArgParseInterface.

Also changed the example to my typical use case.

@tommyengstrom
Copy link
Contributor Author

Need pyparsing to work...

@tommyengstrom
Copy link
Contributor Author

@erikbern Could you add pyparsing to Travic CI?

Also, should we not specify the requirements?

@erikbern
Copy link
Contributor

erikbern commented Sep 8, 2014

I'm strongly against string-to-task parsing in Luigi – use a dict of parameters (see how add_task works)

@erikbern
Copy link
Contributor

erikbern commented Sep 8, 2014

You can add any package dependencies to .travis.yml.

This looks interesting but is a lot to digest – I will get back later this week. I think my main issues is casting between strings and tasks is bad. The way we do it should be to instantiate tasks from a (module, task, params) tuple and store those underlying values instead

@erikbern
Copy link
Contributor

erikbern commented Sep 8, 2014

Would it be helpful if I added some code in base Luigi to instantiate a class dynamically based on the module and a dict of string parameters?

@tommyengstrom
Copy link
Contributor Author

Regarding string-to-task parsing: I see where you're coming from but that would mean that so support slave mode we have to add it in rpc and scheduler which would mean we use both the str representation and a name, dict-of-str representation thus duplicating information?

Btw, while trying the to_str_params method I noticed that it currently doesn't support lists:

import luigi
from datetime import date
class T(luigi.Task):
    date=luigi.DateParameter()
    dates=luigi.DateParameter(is_list=True)
t=T(date=date(2014,2,6), dates=[date(2014,3,2), date(2014,7,2)])
print t.to_str_params()

will print:

{'date': '2014-02-06', 'dates': '(datetime.date(2014, 3, 2), datetime.date(2014, 7, 2))'}

@erikbern
Copy link
Contributor

erikbern commented Sep 8, 2014

I don't think data redundancy is a big deal. The task names are meant to be a human-readable version of the task and a one-way hash of the task's representation. I don't think we should require it to be invertible.

Now that I think about it, for complex data structures, it probably makes more sense to store a str -> pickle dict and pass it around. I was originally thinking that the type information is already determined by the parameter types, but in many cases that's not true. I see two options:

  1. Enforce that all parameters have to be statically typed
  2. Accept the reality, make it backwards compatible, and support arbitrary types

@erikbern
Copy link
Contributor

erikbern commented Sep 8, 2014

Btw – sorry for being picky about everything. I really think you're onto something interesting (both dynamic requirements, and slave mode for workers) and I've been thinking a lot about how to build it. I'm happy to spend some time refactoring this, and/or provide interfaces for whatever stuff you need.

@tommyengstrom
Copy link
Contributor Author

I'm not offended by comments, it's kind of a big change so I was expecting them.

I was leaning more towards the static typing option since it is much simpler. Personally I would also lean towards calling it bad practice to make a task that accepts an image as argument. I guess there are more legitimate use cases though.

Question: If we want to support any data would we not have to unpickle it in the task or import everything from the task somewhere? Or maybe you have a solution for this?

@erikbern
Copy link
Contributor

erikbern commented Sep 8, 2014

A bunch of people have tasks as parameters to other tasks and it seems to be a common pattern.

What do you mean by supporting data? I could put together a function that takes a module + task + params and returns a task object

@tommyengstrom
Copy link
Contributor Author

To support Tasks we could add a TaskParameter and use the current string parsing that is now available :P

I meant that if you want to allow any data type in parameters and pickle them you'll have to have access to that object when unpickeling. E.g.

file1.py

import luigi
class Task1(luigi.Task):
    p = luigi.Parameter()    
    ....

file2.py

import luigi
from file1 import Task1

class MyClass(object):
    ...

class Task2(luigi.Task):
    def requires(self):
        return Task1(p=MyClass())
    ....

@tommyengstrom
Copy link
Contributor Author

We can of course change the identification of tasks to be (task, params) instead of task_id and the import step could be moved from worker.py to some other place, and I have no problem fixing it.

I do however still think that allowing for arbitrary arguments will lead to a bunch of gotchas, but maybe that is because I really like that all tasks are callable from command line.

@erikbern
Copy link
Contributor

erikbern commented Sep 8, 2014

Yeah, the fact that tasks are possible to instantiate from the command line means we already have a mechanism to instantiate a task given a dict of str->str parameters. We should use the same mechanism for serializing tasks over the wire.

@tommyengstrom
Copy link
Contributor Author

Hmm, I think I may have misunderstood you.

You just want me to change it so that the TaskProcess puts (task_name, dict_of_str) on the queue instead of task_id and then use this to initiate the task? Still using ArgParseInterface as is done now, but with that functionality broken out to function that accepts (module, task_name, dict_of_str)?

No changes in rpc or scheduler would be required in this branch (but later in order to implement slave mode without using the task_id parsing)?

@erikbern
Copy link
Contributor

erikbern commented Sep 8, 2014

Yeah, I think that would be ideal, except don't use ArgParseInterface, let's just add a new function to interface.py that takes a (module, task_name, dict_of_str). I can add that function if you want

@tommyengstrom
Copy link
Contributor Author

Something like this?

@erikbern
Copy link
Contributor

erikbern commented Sep 8, 2014

Yeah, this looks great. I would move the module magic stuff into interface.py but it's starting to look great!

@tommyengstrom
Copy link
Contributor Author

I couldn't come up with a use case other than from within the worker so I left it there. I can fix it tomorrow.

Should I leave the id_to_parsable function in task.py? Could be useful sometime maybe. I'll at least leave the id_to_name_and_params function since it use to be there without supporting lists.

So how about tests?

@tommyengstrom
Copy link
Contributor Author

Rebased on master and added a load_task function to interface.

@erikbern
Copy link
Contributor

Just glanced through it – this is starting to look great! My only complaint is id_to_name_and_params and id_to_parsable, but it seems like those aren't even used?

@tommyengstrom
Copy link
Contributor Author

id_to_name_and_params was there from before, I just fixed it so that it supports lists (it used to just do string split). I feel like it could be useful at some point and someone could be using it currently, but I can remove it if you're fundamentally against initializing task from task_id.

Should I remove only id_to_parsable or both?

@erikbern
Copy link
Contributor

I'd prefer if you remove it. I think it opens up a can of worms of really weird bugs that I don't want to deal with. Also fundamentally I don't think you should rely on the task id being meaningful – it should just be an id

@tommyengstrom
Copy link
Contributor Author

I left id_to_name_and_params in there since it it used by task_history.

I also added some tests.

@tommyengstrom
Copy link
Contributor Author

@erikbern Could you have another look at this?

@erikbern
Copy link
Contributor

I'll take a look at it. Sorry that it's taking a lot of time – this PR is pretty overwhelming. We also have another PR that conflicts with this one so we need to figure out what to do. I think we might merge #445 first and then if you don't mind I might just clone your branch and rebase + play around with it for a bit.

@tommyengstrom
Copy link
Contributor Author

Sure, I have no problem with that. I just want the functionality to get into master.

@erikbern
Copy link
Contributor

I'm planning to merge #445 and then release to PyPI. After that, I want to tackle this one. I think it's big and fundamental enough to warrant its own release.

@tommyengstrom
Copy link
Contributor Author

Makes sense to me.

@erikbern
Copy link
Contributor

So if I understand your implementation – if a task goes into SUSPENDED mode, it is re-run from scratch later. In my approach, the task was resumed at the point where it was suspended. Is it possible to make it do that in your branch? The main complication is that you need to keep the task on the same worker because you can't transfer execution state over the wire.

This would be a nice optimization although it's not needed since tasks should be idempotent by construction.

@tommyengstrom
Copy link
Contributor Author

Correct. I tried to figure out a way to get both but I came up short. I personally think don't think it's a very big improvement to have the state resumed by the same worker later on. I also think that there is some potential for problems with it, e.g. if the sub task require some resource that is not currently available it could lead to the worker starting some other long going task and create a bottle neck once the resource is released and the sub task is finished quickly.

A bigger downside with the current implementation is that if you connect another worker, i.e. by starting the same task from two terminals, the second worker will not be a stakeholder of the dynamic requirements spawned by the first. I saw this as something to do in the second iteration since such logic would need to be added for slave mode as well.

@erikbern
Copy link
Contributor

I'll think about the resuming. The options seem to be

  1. A worker running a task in a suspended state sits around doing nothing until its dependencies are filled. Could potentially lead to a deadlock if the chain of dependencies is bigger than the number of workers
  2. The worker running the task that gets suspended is put aside and not counted towards the pool of workers. Could lead to issues since you might end up with a huge amount of child processes

I think there could be some golden average where we opportunistically try to resume tasks, but we don't enforce it. Maybe we allow up to X suspended tasks sitting in worker processes doing nothing, but if we go above that we will kill child processes. Not sure. Anyway sounds like a v2 to implement this

Let me think about your other example, not sure I understand why it wouldn't work

@tommyengstrom
Copy link
Contributor Author

Better safe than sorry imo. The second one could lead to memory issues, say that each tasks include reading in a big file, we may now suddenly have a bunch of them in memory.

Regarding the second example, this is because this task will never mention the dynamic requirements and thus will not be registered as a stakeholder. It shouldn't be big deal to fix. Also not a big deal to wait with imo.

@erikbern
Copy link
Contributor

Ok this is now on top of my list of PR's to review

@tommyengstrom
Copy link
Contributor Author

Awesome!

Is it a problem that it's not based on current master? I don't have time to rebase right now but may be able to do it tonight.

@erikbern
Copy link
Contributor

No stress, I won't have time to review everything today anyway. Hoping to get it in this week at least.

@@ -227,13 +229,16 @@ def add_task(self, worker, task_id, status=PENDING, runnable=True, deps=None, ex
# We also check for status == PENDING b/c that's the default value
# (so checking for status != task.status woule lie)
self._update_task_history(task_id, status)
task.status = status
task.status = PENDING if status == SUSPENDED else status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the line above (if task.status not in (PENDING, SUSPENDED) it doesn't seem like this is needed?

@erikbern
Copy link
Contributor

LGTM, I think this could be merged now. Some minor comments for things I think we should address in an upcoming PR:

  • Documentation (this is a big TODO for a lot of stuff in Luigi atm, we should really spend some time on it)
  • TaskProcess.run needs a refactoring
  • _handle_next_task is also pretty convoluted, we should refactor
  • It seems like the SUSPENDED state isn't used on the scheduler side. Not sure if that's good/bad. In your implementation, SUSPENDED and PENDING are identical but there might be reasons why we want to separate them later.
  • I still don't like encouraging the parsing of task id's back, would prefer to remove

@freider
Copy link
Contributor

freider commented Sep 23, 2014

Just read through the code and I have a couple of concerns:

  1. Do I understand correctly that suspended tasks are picked back up, but not from where the generator left off, but instead are re-run altogether? This has the advantage that any worker can pick up a suspended task, but can cause problems if additional requirements are added after some amount of persistent/heavy work has been done, since any work up until the yield statement will be re-ran in the new attempt after the requirement has been satisfied, not just the remaining work.
  2. As Erik noted, it would be great if suspended was actually set as the status of tasks in a suspended mode, to indicate that they have started running, but are not actively running right now, allowing users of the visualizer to distinguish those from pending or running.

Tasks may now yield new requirements in run just as they are returned
in requires. If requirements are fulfilled when yielded the task will
continue, if not it will be rescheduled to be run when requirements are
finished. Note that this means that the task will be rerun from start.
It used to be initialized using task_id now it's done using
task_family + str_params.
Also fix import problem where it would try to import worker_test
from luigi/test/test.py due to luigi/test having higher priority
in sys.path and worker_test being located in the test folder.
@tommyengstrom
Copy link
Contributor Author

Regarding the task_id parsing, are you saying you want me to revert to the older version that didn't work for lists? Again, task_history is using it so I cannot remove it. I also felt that since I did improved it I might just as well leave it in for now.

@erikbern
Copy link
Contributor

No I think we should keep it for now, but I'll see if I can remove it later

erikbern pushed a commit that referenced this pull request Sep 24, 2014
@erikbern erikbern merged commit d7fc6f4 into spotify:master Sep 24, 2014
erikbern pushed a commit that referenced this pull request Sep 25, 2014
send outputs back into yield statement (for dynamic requirements, #424)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants