Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel execution #63

Open
bitprophet opened this issue Apr 29, 2013 · 26 comments
Open

Parallel execution #63

bitprophet opened this issue Apr 29, 2013 · 26 comments

Comments

@bitprophet
Copy link
Member

Clearinghouse for "how to execute N instances of a given task, parameterized in some fashion, in parallel?"

Fabric 1.x has a naive parallelization which assumes a single parameterization vector (host lists); we need to support this more generally while still honoring that sort of use case.

Fab 1.x's implementation has to leverage multiprocessing due to not being threadsafe; Invoke should be threadsafe, so threads are an option, as is multiprocessing or (if we decide it's worthwhile to add a dependency on them) coroutines/greenlets. EDIT: see also Tulip/asyncio in Python 3.x and its port in 2.x, Trollius.

In either case we have to solve the following problems:

  • Display of live output - how to tell different "channels" apart?
    • Fabric handles this by forcing 'linewise' output in parallel mode, and using a line prefix string. This is functional but not very readable (though troubleshooting-related "reading" can be fixed by using logging to per-channel files or data structures).
  • Communicating data between channels
    • Generally, not doable or desirable since you have no idea which permutations of the task will run before/after others.
    • Though if threading is used, users can simply opt to use locks/sempahores/queues/etc
  • Communication between channels and the "master" process, e.g. return values
  • Error handling - should errors partway through the run cause a halt, or just get tallied and displayed at the end?
  • Probably more.
@riltsken
Copy link

Some thoughts:

Error handling - I like options. I feel as if the default should be for errors to not cause a halt, but adding onto the 'Communication' bit where you can specify when a task has an invalid return value, and that could halt the other tasks.

Display of live output - Not sure if you have experienced 'live' output as a big benefit for parallelized tasks. It tends to clutter things up for me when you have more than 3-4 processes spitting out information. I didn't realize there was a 'prefix' option in Fabric. I'll take a look at that for my own use actually.

@bitprophet
Copy link
Member Author

@riltsken The prefix in Fabric is the default, so a simple setup like this will show you how it behaves there:

from fabric.api import task, parallel, run

@task
@parallel
def mytask():
    run("ls /") # or some other multiline-printing, ideally not-super-fast command

# Execute as: fab -H host1,host2,host3 mytask

Options, yea. The setup I have right now is to have a user-selectable/subclassable "Executor" class and ideally any sort of "how to go from parsed args + a task namespace, to actual execution" pattern can be implemented in one.

The default and only instance right now is of course naively serial, but the idea is to use it to support parallelism, different error handling behaviors, etc. Having Fabric users locked into the core implementation has always been crummy.

@alendit
Copy link

alendit commented Nov 21, 2013

Would a concurent execution (i.e. start a watcher for coffeescript and an another for SCSS at the same time) be the same feature or should I open an additional issue?

@bitprophet
Copy link
Member Author

I'd argue that's strongly enough related to live here, @alendit, it's really just making the concept that much more general.

That use case also lends weight to the "just make sure we are 100% threadsafe and ensure using threads is as easy as possible" option. Which is both less work for the core codebase, and maximum flexibility for users.

If I remember my threading correctly (dubious...) it means your use case would look something like this - one task using threads to do >1 thing "concurrently":

@task
def coffee():
    run("coffeescript --watch")

@task
def sass():
    run("sass --scss --watch")

@task(default=True
def main():
    threads = map(lambda x: Thread(target=x), (coffee, sass))
    # Kick off
    [x.start() for x in threads]
    # Wait for completion - maybe pending KeyboardInterrupt or similar
    [x.join() for x in threads]

Then maybe store that in watch.py, add it as watch to your tasks.py's namespace, and call e.g. invoke watch (to watch both, via threads) or invoke sass (to just watch one) etc.

Note that I haven't tried this myself yet - it might work fine now, but it's possible something in how run behaves would goof things up.

As in comments, this is also fudging over control stuff. That + the thread busywork are places where Invoke could grow an API so users can save some boilerplate - but it would also be optional and you could just do the above on your own.

@alendit
Copy link

alendit commented Nov 29, 2013

Thanks, works perfectly :)

@ghost
Copy link

ghost commented Oct 23, 2014

I'm interested in using invoke to write a bioinformatics pipeline that would be executed in parallel on an LSF or Amazon EC2 cluster, but it wasn't clear to me from this thread whether this is currently possible in the current incarnation of invoke. Do you have any suggestions on this by any chance, or would you suggest going with something that explicitly supports this and interfaces with the python cluster-managing library, DRMAA, like ruffus?

@bitprophet
Copy link
Member Author

@erasmusthereformer No parallelization is in yet, though I expect to poke at it soon since it's a key part of the lib I'm writing on top of it (fabric 2).

@bitprophet
Copy link
Member Author

See also paramiko/paramiko#389 which links to some asyncore/asyncio related prior art. Part of this ticket here is to figure out if an interface/implementation can work well for both local+remote execution or if they need to be separate concerns.

@bitprophet
Copy link
Member Author

A great example of this which is possibly distinct from the Fab use case, that I find myself wanting now, is running >1 file-watch task at the same time. Specifically, two concurrent instances of watchmedo running w/ different arguments at the same time.

I can work around this by creating a single hybrid invocation of watchmedo but it feels more "natural" to implement them as distinct invocations so one could e.g. inv -P www.watch docs.watch.

@cdman
Copy link

cdman commented Jun 27, 2016

Hello,

I'm looking into using invoke as a build tool and the "parallel execution" would be very useful to reduce build times / make use of the machines we actually have. For example we have multiple modules in the same repository, and using parallel execution we could run the linting on all of them in parallel. Or an other example would be to run the linting of our JS code in parallel with the linting of our PY code.

A simplistic way to do this (but which nonetheless works perfectly fine up to ~10k tasks as least) would be something like:

import collections

Task = collections.namedtuple('Task', 'dependencies')

tasks = {
  'a': Task(dependencies=[]),
  'b': Task(dependencies=[]),
  'c': Task(dependencies=['a', 'b']),
  'd': Task(dependencies=['a', 'c']),
}

tasks_by_dependencies = {k: set(v.dependencies) for k, v in tasks.iteritems()}
blocked_by_me = {k: set() for k in tasks.keys()}
for k, v in tasks.iteritems():
  for d in v.dependencies:
    blocked_by_me[d].add(k)

while tasks_by_dependencies:
  name = next(
    name for name, unmet_dependencies in tasks_by_dependencies.iteritems()
    if not unmet_dependencies)
  del tasks_by_dependencies[name]

  print "Executed %s" % name

  blocked = blocked_by_me.pop(name)
  for k in blocked:
     tasks_by_dependencies[k].remove(name)

@chaos-ad
Copy link

chaos-ad commented Jun 27, 2016

Just discovered this library and thought at first that it could be used to replace GNU Make.
And it would do that with bliss, at least for my use cases, if only it would have an option which works like a -j

@cdman
Copy link

cdman commented Jun 28, 2016

@chaos-ad - that was my thinking exactly!

What I liked about make: well supported and integrated (ie. you get by default tab completion on many systems) and of course the -j option.

Why I would like to use invoke or the equivalent: because I know more Python than makefile and frankly the Makefile syntax seems like Bash - a lot of data structures shoved into strings and a lot of things which "seems to work" but can actually break and there is little way to discover it in advance (see bash handling filenames with spaces in them).

@bitprophet
Copy link
Member Author

bitprophet commented Jul 3, 2016

This should probs be linked to #45 since enhanced dependency resolution is pretty important re: the kinds of use cases @cdman describes.

If it wasn't clear from the original description, @chaos-ad / @cdman - yes, you've hit on one major reason why this feature's needed! I totally want Invoke to be a good make replacement and that includes -j type stuff. (The other reason, and one I'll personally be banging on soon, is for parallel remote execution, aka the Fabric use case).

I'll update this once I start poking that functionality, with notes / requests for API feedback / etc :)

@bitprophet
Copy link
Member Author

This should also link to #15 since both are concerned with how to handle differentiating output from concurrently executing contexts (different hosts in Fabric, different tasks or subprocesses in Invoke, etc).

@perzizzle
Copy link

I have a current makefile that has grown overly complex and was thinking invoke looked interesting. I use make -j to execute lots of concurrent tasks.

Following the issue links I am struggling to determine if this parallel feature is still under consideration. Is this something that is still actively being considered?

@haydenflinner
Copy link
Contributor

^ Agree, I currently use sh in the one spot in my project that needs parallelism and the output and usability is far worse

@anarcat
Copy link

anarcat commented Feb 23, 2020

this would be a great improvement to invoke, and one i would dearly love to see in one tool i'm about to rewrite with an invoke backend (stressant).

i'll also note the main website promises support for parallel execution, yet this seems to have been in the planning stages for over 6 years now... maybe it would be best to remove the sentence from the website until this is completed? or at least call it "background execution" or something... :)

i'm curious to hear whether the "background execution" (async and disown) stuff that landed in 1.4 will help in pushing this issue forward, or if this is unrelated?

to be honest, i myself am not clear on what exactly "parallel execution" means in this context... in my use case, i am launching a bunch of "burn in" jobs (write a lot to the disk, run the CPU, test the network and so on). right now, it's done serially, and the output is sent to the console, a logfile and email (through the logging module). but i would hope to do some of those jobs in parallel, while retaining the logging behavior (ideally having jobs grouped in the email) and console output (which means logging would be sequential then!).

finally, i'll mention that the git-annex project has wrangled a lot with the "parallel execution console output" problem, ie. how to multiplex multiple commands with output on the console (rsync, in their case, is one such commands). you might want to look at how he did things!

anyways, long story short: thanks for this great project. I can't believe i worked so long as a sysadmin without using fabric and invoke more extensively, this is going to replace so many horrible shell scripts. thanks!

@haydenflinner
Copy link
Contributor

I tried reading the git-annex source to see how they handled parallel execution output as I am also very interested in adding this feature. Unfortunately I didn't derive a clear approach from a 5-minute skim of the source. Can you describe how its behavior works or am I just supposed to use the tool and see its output to see the intended outcome?

Btw, I'm not aware of a reason why one couldn't just fire up multiprocessing / threads and run the ctx.run function within, returning an AsyncResult with a .join member rather than a normal Result. I probably won't be taking a stab at this any time soon but it might be doable to get it working in a fork of your own pretty readily.

@anarcat
Copy link

anarcat commented Feb 24, 2020

@haydenflinner take a look at joey's dev blog. start when he worked on parallel get and work backwards (well, forward in time). look for concurrentprogress, which seems to be based on ascii-progress which in turn seems to be based on node.js progress.

that's just progress bars, of course... but he also worked on a concurrent output library that might specifically be interesting for our use case here.

@DylanYoung
Copy link

Have you thought about just using celery with an in-memory broker? It has task chaining tools that will probably take years to implement independently and robustly. Possibly more work than it's worth, but a thought.

@castlez
Copy link

castlez commented Mar 4, 2022

Are there plans to revisit this? Parallel tasks was kind of the main reason i liked the old fabric (and why i was using fabric3 by Mathiasertl) and not having it anymore is a bummer. I want to do parallel tasks using python 3.9.

@neozenith
Copy link

Hi, thanks for the interest to bump this issue up. The current roadmap is here:
https://bitprophet.org/projects/#roadmap

The key blocks of work that are getting cleared out first are:

  • Migrate CI to CircleCI
  • Drop Python2

As bitprophet has noted the next focus after that for invoke is:

Task-related features like sharing flags and data between tasks & calling tasks programmatically from other tasks, will probably be the next major focus.

This will enable building out a DAG of tasks that can draw from pool of executors.

So the answer is yes, but no promise on timelines. Especially since invoke fits into the fabric and paramiko tool chains and they also need equal amounts of maintenance love.

@neozenith
Copy link

neozenith commented Mar 30, 2022

Not as a representative of invoke but just a dump of my own thoughts on the topic.

Escalation of needs / motivations

People tend to go on a journey with automation I have seen.

  • Automate individual tasks
  • Chain well defined tasks into a longer sequence
  • Chain gets too long and they want parallelism
  • Start reaching for DAGs to maximise parallelism but respect sequential dependencies where needed.

I've seen this with data pipelines eventually adding Apache Airflow, AWS Step Functions, Databricks Multitask Jobs and dbt-labs all have some form of DAG because it is a great data structure for mapping parallelism and paths that require some sequential queueing.

The nature of Makefiles

I've been thinking about this for weeks and the Makefile syntax finally makes sense as a DSL for describing DAGS.
I think the clever constraint is that the nodes are always files. The targets are files and the dependencies are files.

For example I want to build a C++ binary, that has

  • 50x .h files and 50x .cpp files.
  • That build 50x .o compiled object files
  • That link together into 5x .a libraries
  • Finally into 1x binary executable.

From scratch it'd run all those steps and create all those intermediate files. If -j 1 it'd do it sequentially. If I had a 5000 core CPU and -j 5000 it'd take as long as the 3 longest steps from each of step in the hierarchy which is the theoretical upper limit. Mere mortals might be lucky to get -j 12, so that means they'll queue up, despite having no dependencies.

So the DAG is a queue of tasks.
We have a pool of executors.
The executor, takes the next tasks with no pending dependencies.

So leaf nodes (.h and .cpp) first and so on.

Q: But if I have completed a full run, and then change only one source file, how does it know what is the minimal DAG?
A: mtime of the dependency file is newer than target.

So the DAG of pending dependencies is either a missing target, or a dependency that is newer than the target. Beautifully simple way to lean on the filesystem as the cache and query if the cache is invalidated.

As a replacement for Makefiles?

This is a pretty audacious task.
But people seem to love the developer experience of invoke as a cleaner DSL.
We create some functions, then decorate it. Simple!

But all the power of Python and it's library ecosystem. The standard library can take you a lot further than bash and unix tools in a much more readable way.

Which ever direction this feature heads, it would be good to do it as a beta pre-release. Undoubtedly people will have opinions and all other contenders have had decades head start.

Nested DAGs?

I am interested in the space of multi-project mono-repos. So each subproject could in theory build it's own DAGs, but I could also have a root level DAG to orchestrate the parts of those subprojects.
We now have nested DAGs.

And trust me I have seen nested DAGs play out many times in data pipelines. For example, dbt has it's own DAG to build tables and views in a datawarehouse, but then we needed Airflow or AWS Step functions to build the DAG of feed-in systems that manage if that dbt project has pending dependencies.

Scope

  • My point here is, do we limit the scope to files like make?
    • dbt limited their scope to what is inside a given datawarehouse.
  • Do we make Target and Dependency arbitrary concepts that can be expanded upon?
  • Are there existing libraries that have implemented this that we could lean on?
  • Do we scrap this idea as being part of invoke because it should be a library that wraps around invoke?
  • Do we integrate a basic version, but leave certain Concepts open for extension? eg Target, Dependency, Executor, ExecutorPool, Scheduler.

Takeaways

  • I think researching the current eco-system and any prior art would be wise
  • Putting as a pre-release to solicit feedback
  • Developer experience ergonomics are likely going to be hard to nail down.
  • Making base concepts extensible allows users to adopt arbitrary levels of complexity instead of this project itself.
    • An Executor could be: a thread, a multiprocess, an async concurrent method, a remote computer, a remote spark cluster, a kubernetes replicaset of pods.

Related Works

Maybe Actor Model implementations are worth reading about:

@haydenflinner
Copy link
Contributor

haydenflinner commented Mar 30, 2022 via email

@neozenith
Copy link

Ah that’s awesome! I’ve been meaning to look at magicinvoke to see what’s been added and how it works.

After bitprophet clears the Py2 dead weight, we should look at back porting the enhancements made in magicinvoke.

Your implementation sounds like it might be good as a reference implementation we can start from in a beta version to solicit further feedback.

Also thank you for carrying magicinvoke forward whilst things got quiet. It’s been interesting catching up on the 8-9 year history of issues and PRs of this project and related projects.

@ahrnbom
Copy link

ahrnbom commented May 30, 2023

This issue is now 10 years old. I think it's probably a good idea to remove the "parallel execution" part of the website until it has actually been implemented and documented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests