Skip to content

Thread pool barrier executor#9

Merged
huyng merged 9 commits intomasterfrom
thread_pool_barrier_executor
Dec 16, 2017
Merged

Thread pool barrier executor#9
huyng merged 9 commits intomasterfrom
thread_pool_barrier_executor

Conversation

@huyng
Copy link
Copy Markdown
Contributor

@huyng huyng commented Dec 14, 2017

@tobibaum

usage:

pipeline.execmethod = "parallel"
results = pipeline({"x": 10}, ["co", "go", "do"])

@codecov-io
Copy link
Copy Markdown

codecov-io commented Dec 14, 2017

Codecov Report

Merging #9 into master will increase coverage by 3.26%.
The diff coverage is 94.33%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master       #9      +/-   ##
==========================================
+ Coverage   73.68%   76.94%   +3.26%     
==========================================
  Files           5        5              
  Lines         285      334      +49     
==========================================
+ Hits          210      257      +47     
- Misses         75       77       +2
Impacted Files Coverage Δ
graphkit/base.py 77.96% <100%> (+1.18%) ⬆️
graphkit/functional.py 93.75% <50%> (+0.16%) ⬆️
graphkit/network.py 68.78% <95.74%> (+8.09%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d7039f8...1c71ed4. Read the comment docs.

@codecov-io
Copy link
Copy Markdown

codecov-io commented Dec 14, 2017

Codecov Report

Merging #9 into master will increase coverage by 3.53%.
The diff coverage is 94.73%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master       #9      +/-   ##
==========================================
+ Coverage   73.68%   77.21%   +3.53%     
==========================================
  Files           5        5              
  Lines         285      338      +53     
==========================================
+ Hits          210      261      +51     
- Misses         75       77       +2
Impacted Files Coverage Δ
graphkit/base.py 79.36% <100%> (+2.57%) ⬆️
graphkit/functional.py 93.75% <50%> (+0.16%) ⬆️
graphkit/network.py 68.78% <95.74%> (+8.09%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d7039f8...7d63219. Read the comment docs.

graphkit/base.py Outdated
class NetworkOperation(Operation):
def __init__(self, **kwargs):
self.net = kwargs.pop('net')
self.execmethod = None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe something like:

self._execmethod = kwargs.get('execmethod', None)

making execmethod a "private" attribuet

self.net = kwargs.pop('net')
self.execmethod = None
Operation.__init__(self, **kwargs)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and then also add a function to set execmethod explicitely?

def set_execmethod(method):
    options = ['parallel', 'sequential']
    assert(method in options)
    self._execmethod = method

from multiprocessing.dummy import Pool

# if we have not already created a thread_pool, create one
if not hasattr(self, "_pool"):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where could the _pool attribute have been initialized? will this always be initialized on the first run?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I was thinking of mainly doing this on first run because I wasn't able to think of a common use case where we'd want to have a different behavior.

A boolean indicating whether the operation may be scheduled for
execution based on what has already been executed.
"""
dependencies = set(v for v in nx.ancestors(graph, op) if isinstance(v, Operation))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing, but more explicit, that it's only filtering.

dependencies = set(filter(lambda v: isinstance(v, Operation), nx.ancestors(graph, op))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay i'll update.

if len(upnext) == 0:
break

done_iterator = pool.imap_unordered(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly, that this is layerwise parallelism? as in, if at a certain depth in the graph, we have a pool of 5 workers and 1 worker takes a long time, then the others have to wait?

Copy link
Copy Markdown
Contributor Author

@huyng huyng Dec 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, your description is correct. The current scheduling method for the parallel thread pool is to "schedule everything that is able to be scheduled within this current iteration" . In the same iteration, wait for all answers to come back, and then repeat the process. This may some times be inefficient when we have for example 1 task that takes very long and many tasks that take a short amount of time.

There is another way to do the scheduling which could potentially be more efficient, but I have not figured out how to implement it yet. The idea would be to schedule operations that are "schedulable" as soon as there is a free worker slot.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. thanks for clarifying :) you're right, no idea how that would work using threads from within python. the pattern i'd use other places would be to have the outermost loop iterate over threads. whenever there is a job available from the job queue, run it. idk whether you can address threads like that with multiprocessing.

@huyng
Copy link
Copy Markdown
Contributor Author

huyng commented Dec 15, 2017

Here's the updated usage string:

pipeline.set_execution_method("parallel")
results = pipeline({"x": 10}, ["co", "go", "do"])

@tobibaum
Copy link
Copy Markdown

awesome :) looks good! i think the point further up about switching the parallelism mechanism can be pushed to the backlog for now?
👍

@huyng
Copy link
Copy Markdown
Contributor Author

huyng commented Dec 16, 2017

cool yea, the approach you mention is what we want to get to. It was a little tricky figuring out how to write it in python, but we can implement it as a more efficient parallelism method down the line.

@huyng huyng changed the title [wip] thread pool barrier executor Thread pool barrier executor Dec 16, 2017
@huyng huyng merged commit d3f8a8a into master Dec 16, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants