diff --git a/HISTORY.rst b/HISTORY.rst index 21f0b1c..65a154b 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -2,6 +2,15 @@ .. Unreleased Changes +0.5.0 (2018-05-04) +------------------ +* Taskgraph now supports python versions 2 and 3 (tested with python 2.7, 3.6). +* Fixed an issue with ``taskgraph.TaskGraph`` that prevented a multiprocessed + graph from executing on POSIX systems when ``psutil`` was installed. +* Adding matrix-based test automation (python 2.7, python 3.6, with/without + ``psutil``) via ``tox``. +* Updating repository path to ``https://bitbucket.org/natcap/taskgraph``. + 0.4.0 (2018-04-18) ------------------ * Auto-versioning now happens via ``setuptools_scm``, replacing previous calls to ``natcap.versioner``. diff --git a/README.rst b/README.rst index 135d9bf..3cda868 100644 --- a/README.rst +++ b/README.rst @@ -2,22 +2,22 @@ TaskGraph: ================================================= About TaskGraph -=============== +--------------- TaskGraph is great. TaskGraph Dependencies -====================== +---------------------- Task Graph is written in pure Python, but if the ``psutils`` package is installed the distributed multiprocessing processes will be ``nice``\d. Example Use -=========== +----------- Install taskgraph with -`pip install taskgraph` +``pip install taskgraph`` Then @@ -64,3 +64,23 @@ Then # expect that result is a list `list_len` long with `value_a+value_b` in it result = pickle.load(open(result_path, 'rb')) + +Running Tests +------------- + +Taskgraph includes a ``tox`` configuration for automating builds across +multiple python versions and whether ``psutil`` is installed. To +execute all tests, run:: + + $ tox + +Alternatively, if you're only trying to run tests on a single configuration +(say, python 3.5 without ``psutil``), you'd run:: + + $ tox -e py35-base + +Or if you'd like to run the tests for the combination of Python 2.7 with +``psutil``, you'd run:: + + $ tox -e py27-psutil + diff --git a/setup.py b/setup.py index 96184bf..c12d08c 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ long_description=README, maintainer='Rich Sharp', maintainer_email='richpsharp@gmail.com', - url='https://bitbucket.org/richsharp/taskgraph', + url='https://bitbucket.org/natcap/taskgraph', packages=['taskgraph'], license='BSD', keywords='parallel multiprocessing distributed computing', @@ -21,11 +21,13 @@ }, classifiers=[ 'Intended Audience :: Developers', + 'Topic :: System :: Distributed Computing', 'Development Status :: 5 - Production/Stable', 'Natural Language :: English', 'Operating System :: MacOS :: MacOS X', 'Operating System :: Microsoft', 'Operating System :: POSIX', - 'Programming Language :: Python :: 2 :: Only', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.6', 'License :: OSI Approved :: BSD License' ]) diff --git a/taskgraph/Task.py b/taskgraph/Task.py index 26304c9..bb6a4ea 100644 --- a/taskgraph/Task.py +++ b/taskgraph/Task.py @@ -1,7 +1,6 @@ """Task graph framework.""" import heapq import pprint -import types import collections import hashlib import json @@ -11,19 +10,56 @@ import multiprocessing import threading import errno -import Queue +try: + import Queue as queue +except ImportError: + # Python3 renamed queue as queue + import queue import inspect import abc +# Superclass for ABCs, compatible with python 2.7+ that replaces __metaclass__ +# usage that is no longer clearly documented in python 3 (if it's even present +# at all ... __metaclass__ has been removed from the python data model docs) +# Taken from https://stackoverflow.com/a/38668373/299084 +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + try: import psutil HAS_PSUTIL = True + if psutil.WINDOWS: + # Windows' scheduler doesn't use POSIX niceness. + PROCESS_LOW_PRIORITY = psutil.BELOW_NORMAL_PRIORITY_CLASS + else: + # On POSIX, use system niceness. + # -20 is high priority, 0 is normal priority, 19 is low priority. + # 10 here is an abritrary selection that's probably nice enough. + PROCESS_LOW_PRIORITY = 10 except ImportError: HAS_PSUTIL = False LOGGER = logging.getLogger('Task') +try: + dict.itervalues +except AttributeError: + # Python 3 + # range is an iterator in python3. + xrange = range + # In python2, basestring is the common superclass of str and unicode. In + # python3, we'll probably only be dealing with str objects. + basestring = str + def itervalues(d): + """Python 2/3 compatibility iterator over d.values()""" + return iter(d.values()) +else: + # Python 2 + def itervalues(d): + """Python 2/3 compatibility alias for d.itervalues()""" + return d.itervalues() + + class TaskGraph(object): """Encapsulates the worker and tasks states for parallel processing.""" @@ -71,10 +107,10 @@ def __init__(self, taskgraph_cache_dir_path, n_workers): self.worker_pool = multiprocessing.Pool(n_workers) if HAS_PSUTIL: parent = psutil.Process() - parent.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + parent.nice(PROCESS_LOW_PRIORITY) for child in parent.children(): try: - child.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + child.nice(PROCESS_LOW_PRIORITY) except psutil.NoSuchProcess: LOGGER.warn( "NoSuchProcess exception encountered when trying " @@ -83,7 +119,7 @@ def __init__(self, taskgraph_cache_dir_path, n_workers): # used to synchronize a pass through potential tasks to add to the # work queue - self.work_queue = Queue.Queue() + self.work_queue = queue.Queue() self.worker_semaphore = threading.Semaphore(max(1, n_workers)) # launch threads to manage the workers for thread_id in xrange(max(1, n_workers)): @@ -94,7 +130,7 @@ def __init__(self, taskgraph_cache_dir_path, n_workers): worker_thread.start() # tasks that get passed add_task get put in this queue for scheduling - self.waiting_task_queue = Queue.Queue() + self.waiting_task_queue = queue.Queue() waiting_task_scheduler = threading.Thread( target=self._process_waiting_tasks, name='_waiting_task_scheduler') @@ -103,7 +139,7 @@ def __init__(self, taskgraph_cache_dir_path, n_workers): # tasks in the work ready queue have dependencies satisfied but need # priority scheduling - self.work_ready_queue = Queue.Queue() + self.work_ready_queue = queue.Queue() priority_task_scheduler = threading.Thread( target=self._schedule_priority_tasks, name='_priority_task_scheduler') @@ -237,7 +273,7 @@ def _schedule_priority_tasks(self): break # push task to priority queue heapq.heappush(priority_queue, task) - except Queue.Empty: + except queue.Empty: # this triggers when work_ready_queue is empty and # there's something in the work_ready_queue break @@ -259,7 +295,7 @@ def _schedule_priority_tasks(self): def _process_waiting_tasks(self): """Process any tasks that are waiting on dependencies. - This worker monitors the self.waiting_task_queue Queue and looks for + This worker monitors the self.waiting_task_queue queue and looks for (task, 'wait'), or (task, 'done') tuples. If mode is 'wait' the task is indexed locally with reference to @@ -334,7 +370,7 @@ def join(self, timeout=None): return True try: timedout = False - for task in self.task_id_map.itervalues(): + for task in itervalues(self.task_id_map): timedout = not task.join(timeout) # if the last task timed out then we want to timeout for all # of the task graph @@ -369,7 +405,7 @@ def _terminate(self): self.close() if self.n_workers > 0: self.worker_pool.terminate() - for task in self.task_id_map.itervalues(): + for task in itervalues(self.task_id_map): task._terminate() self.terminated = True @@ -463,7 +499,7 @@ def __init__( json.dumps(self.kwargs, sort_keys=True), source_code, self.target_path_list, str(file_stat_list)) - self.task_hash = hashlib.sha1(task_string).hexdigest() + self.task_hash = hashlib.sha1(task_string.encode('utf-8')).hexdigest() # get ready to make a directory and target based on hashname # take the first 3 characters of the hash and make a subdirectory @@ -480,6 +516,10 @@ def __eq__(self, other): return self.task_hash == other.task_hash return False + def __hash__(self): + """Return the base-16 integer hash of this hash string.""" + return int(self.task_hash, 16) + def __ne__(self, other): """Inverse of __eq__.""" return not self.__eq__(other) @@ -619,23 +659,21 @@ def _terminate(self, exception_object=None): self._task_complete_event.set() -class EncapsulatedTaskOp: +class EncapsulatedTaskOp(ABC): """Used as a superclass for Task operations that need closures. This class will automatically hash the subclass's __call__ method source as well as the arguments to its __init__ function to calculate the Task's unique hash. """ - __metaclass__ = abc.ABCMeta - def __init__(self, *args, **kwargs): # try to get the source code of __call__ so task graph will recompute # if the function has changed - args_as_str = str([args, kwargs]) + args_as_str = str([args, kwargs]).encode('utf-8') try: # hash the args plus source code of __call__ id_hash = hashlib.sha1(args_as_str + inspect.getsource( - self.__class__.__call__)).hexdigest() + self.__class__.__call__).encode('utf-8')).hexdigest() except IOError: # this will fail if the code is compiled, that's okay just do # the args @@ -663,7 +701,7 @@ def _get_file_stats(base_value, ignore_list, ignore_directories): base_value or nested in base value that are not otherwise ignored by the input parameters. """ - if isinstance(base_value, types.StringTypes): + if isinstance(base_value, basestring): try: if base_value not in ignore_list and ( not os.path.isdir(base_value) or @@ -673,7 +711,7 @@ def _get_file_stats(base_value, ignore_list, ignore_directories): except OSError: pass elif isinstance(base_value, collections.Mapping): - for key in sorted(base_value.iterkeys()): + for key in sorted(base_value.keys()): value = base_value[key] for stat in _get_file_stats( value, ignore_list, ignore_directories): diff --git a/taskgraph/__init__.py b/taskgraph/__init__.py index c930e78..9e6913d 100644 --- a/taskgraph/__init__.py +++ b/taskgraph/__init__.py @@ -1,6 +1,12 @@ """taskgraph module.""" +from __future__ import unicode_literals +from __future__ import absolute_import import pkg_resources -from taskgraph.Task import * +from .Task import TaskGraph, Task, EncapsulatedTaskOp + + +__all__ = ['TaskGraph', 'Task', 'EncapsulatedTaskOp'] + try: __version__ = pkg_resources.get_distribution(__name__).version diff --git a/tests/test_task.py b/tests/test_task.py index 6540d4d..68e5500 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -8,16 +8,23 @@ import pickle import logging -import taskgraph import mock +import taskgraph + logging.basicConfig(level=logging.DEBUG) +# Python 3 relocated the reload function to imp. +if 'reload' not in __builtins__: + from imp import reload + + def _long_running_function(): """Wait for 5 seconds.""" time.sleep(5) + def _create_list_on_disk(value, length, target_path): """Create a numpy array on disk filled with value of `size`.""" target_list = [value] * length @@ -38,6 +45,7 @@ def _div_by_zero(): """Divide by zero to raise an exception.""" return 1/0 + class TaskGraphTests(unittest.TestCase): """Tests for the taskgraph.""" diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..a5ffe37 --- /dev/null +++ b/tox.ini @@ -0,0 +1,12 @@ +[tox] +envlist = {py27,py36}-{base,psutil} + +[testenv] +commands = pytest --cov-report=term --cov-report=xml --cov --junitxml=testresults.xml + +# only install psutil to the environments where we're testing psutil. +deps = + mock + pytest + pytest-cov + {py27,py36}-psutil: psutil