Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiprocess if available #368

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Changes
- Fix #373: read DOIT_CONFIG from TOML.
- Fix #405: Add Task attribute `meta`.
- Fix #349: Handle passing task args in "single" task execution.
- Fix #369: Support `multiprocess` if manually installed.

0.33.1 (*2020-09-04*)
=====================
Expand Down
2 changes: 2 additions & 0 deletions doc/cmd_run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ parallel execution
This allows different tasks to be run in parallel, as long any dependencies are met.
By default the `multiprocessing <http://docs.python.org/library/multiprocessing.html>`_
module is used.
If the `multiprocess <https://pypi.org/project/multiprocess/>`_ module is installed,
it will be used instead.
So the same restrictions also apply to the use of multiprocessing in `doit`.

.. code-block:: console
Expand Down
7 changes: 6 additions & 1 deletion doc/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ If you are using python 2::
$ pip install "doit<0.30"


If you want to use non-local plugins you need to install `setuptools` as well.
If you want to use non-local plugins you need to install `setuptools` as well::

$ pip install doit[plugins]

If you are running with multiple processes and get a ``PicklingError``,
installing `multiprocess` may resolve the issue::

$ pip install doit[multiprocess]


Source
^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion doit/cmd_auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import os
import time
import sys
from multiprocessing import Process
from subprocess import call

from .compat import Process
from .exceptions import InvalidCommand
from .cmdparse import CmdParse
from .filewatch import FileModifyWatcher
Expand Down
25 changes: 25 additions & 0 deletions doit/compat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
"""stuff dealing with incompatibilities between python versions"""

try:
from multiprocess import Process, Queue as MQueue
HAS_MULTIPROCESS = True
except ImportError:
from multiprocessing import Process, Queue as MQueue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this will break for BSD users without any of them installed.

HAS_MULTIPROCESS = False
Process # pyflakes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this should be top level.

MQueue # pyflakes


def is_multiprocessing_available():
# see: http://bugs.python.org/issue3770
# not available on BSD systens
try:
if HAS_MULTIPROCESS:
import multiprocess.synchronize
multiprocess
else:
import multiprocessing.synchronize
multiprocessing
except ImportError: # pragma: no cover
return False
else:
return True


def get_platform_system():
"""return platform.system
Expand Down
13 changes: 2 additions & 11 deletions doit/runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Task runner"""

from multiprocessing import Process, Queue as MQueue
from threading import Thread
import pickle
import queue

import cloudpickle

from .compat import is_multiprocessing_available, MQueue, Process
from .exceptions import InvalidTask, CatchedException
from .exceptions import TaskFailed, SetupError, DependencyError, UnmetDependency
from .task import Stream, DelayedLoaded
Expand Down Expand Up @@ -327,16 +327,7 @@ class MRunner(Runner):
@staticmethod
def available():
"""check if multiprocessing module is available"""
# see: https://bitbucket.org/schettino72/doit/issue/17
# http://bugs.python.org/issue3770
# not available on BSD systens
try:
import multiprocessing.synchronize
multiprocessing # pyflakes
except ImportError: # pragma: no cover
return False
else:
return True
return is_multiprocessing_available()

def __init__(self, dep_manager, reporter,
continue_=False, always_execute=False,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
extras_require={
':sys.platform == "darwin"': ['macfsevents'],
':sys.platform == "linux"': ['pyinotify'],
'multiprocess': ['multiprocess'],
'plugins': ['setuptools'],
'toml': ['toml >=0.10.1']
},
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cmd_auto.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import time
from multiprocessing import Process

import pytest

from doit.compat import Process
from doit.cmdparse import DefaultUpdate
from doit.task import Task
from doit.cmd_base import TaskLoader
Expand Down
28 changes: 14 additions & 14 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import os
import sys
import pickle
from multiprocessing import Queue
import platform
from unittest.mock import Mock

import pytest

from doit.compat import MQueue
from doit.exceptions import CatchedException, InvalidTask
from doit.dependency import DbmDB, Dependency
from doit.reporter import ConsoleReporter
Expand Down Expand Up @@ -557,7 +557,7 @@ def testSystemExitRaises(self, reporter, RunnerClass, dep_manager):
class TestMReporter(object):
class MyRunner(object):
def __init__(self):
self.result_q = Queue()
self.result_q = MQueue()

def testReporterMethod(self, reporter):
fake_runner = self.MyRunner()
Expand Down Expand Up @@ -696,8 +696,8 @@ def test_all_processes(self, reporter, monkeypatch, dep_manager):
td = TaskDispatcher({'t1':t1, 't2':t2}, [], ['t1', 't2'])
run = runner.MRunner(dep_manager, reporter, num_process=2)
run._run_tasks_init(td)
result_q = Queue()
task_q = Queue()
result_q = MQueue()
task_q = MQueue()

proc_list = run._run_start_processes(task_q, result_q)
run.finish()
Expand All @@ -714,8 +714,8 @@ def test_less_processes(self, reporter, monkeypatch, dep_manager):
td = TaskDispatcher({'t1':t1}, [], ['t1'])
run = runner.MRunner(dep_manager, reporter, num_process=2)
run._run_tasks_init(td)
result_q = Queue()
task_q = Queue()
result_q = MQueue()
task_q = MQueue()

proc_list = run._run_start_processes(task_q, result_q)
run.finish()
Expand All @@ -732,8 +732,8 @@ def test_waiting_process(self, reporter, monkeypatch, dep_manager):
td = TaskDispatcher({'t1':t1, 't2':t2}, [], ['t1', 't2'])
run = runner.MRunner(dep_manager, reporter, num_process=2)
run._run_tasks_init(td)
result_q = Queue()
task_q = Queue()
result_q = MQueue()
task_q = MQueue()

proc_list = run._run_start_processes(task_q, result_q)
run.finish()
Expand Down Expand Up @@ -779,10 +779,10 @@ def test_task_not_picklabe_thread(self, reporter, dep_manager):
class TestMRunner_execute_task(object):
def test_hold(self, reporter, dep_manager):
run = runner.MRunner(dep_manager, reporter)
task_q = Queue()
task_q = MQueue()
task_q.put(runner.JobHold()) # to test
task_q.put(None) # to terminate function
result_q = Queue()
result_q = MQueue()
run.execute_task_subprocess(task_q, result_q, reporter.__class__)
run.finish()
# nothing was done
Expand All @@ -792,10 +792,10 @@ def test_full_task(self, reporter, dep_manager):
# test execute_task_subprocess can receive a full Task object
run = runner.MRunner(dep_manager, reporter)
t1 = Task('t1', [simple_result])
task_q = Queue()
task_q = MQueue()
task_q.put(runner.JobTask(t1)) # to test
task_q.put(None) # to terminate function
result_q = Queue()
result_q = MQueue()
run.execute_task_subprocess(task_q, result_q, reporter.__class__)
run.finish()
# check result
Expand All @@ -812,10 +812,10 @@ def test_full_task_fail(self, reporter, dep_manager):
# test execute_task_subprocess can receive a full Task object
run = runner.MRunner(dep_manager, reporter)
t1 = Task('t1', [simple_fail])
task_q = Queue()
task_q = MQueue()
task_q.put(runner.JobTask(t1)) # to test
task_q.put(None) # to terminate function
result_q = Queue()
result_q = MQueue()
run.execute_task_subprocess(task_q, result_q, reporter.__class__)
run.finish()
# check result
Expand Down