Skip to content

Commit

Permalink
Merging release/0.5.0 into master.
Browse files Browse the repository at this point in the history
RE:#BITBUCKET-13
Branch: master
  • Loading branch information
phargogh committed May 4, 2018
2 parents 8d22923 + c6234e8 commit a633d85
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 27 deletions.
9 changes: 9 additions & 0 deletions HISTORY.rst
Expand Up @@ -2,6 +2,15 @@
.. Unreleased Changes
0.5.0 (2018-05-04)
------------------
* Taskgraph now supports python versions 2 and 3 (tested with python 2.7, 3.6).
* Fixed an issue with ``taskgraph.TaskGraph`` that prevented a multiprocessed
graph from executing on POSIX systems when ``psutil`` was installed.
* Adding matrix-based test automation (python 2.7, python 3.6, with/without
``psutil``) via ``tox``.
* Updating repository path to ``https://bitbucket.org/natcap/taskgraph``.

0.4.0 (2018-04-18)
------------------
* Auto-versioning now happens via ``setuptools_scm``, replacing previous calls to ``natcap.versioner``.
Expand Down
28 changes: 24 additions & 4 deletions README.rst
Expand Up @@ -2,22 +2,22 @@ TaskGraph:
=================================================

About TaskGraph
===============
---------------

TaskGraph is great.

TaskGraph Dependencies
======================
----------------------

Task Graph is written in pure Python, but if the ``psutils`` package is
installed the distributed multiprocessing processes will be ``nice``\d.

Example Use
===========
-----------

Install taskgraph with

`pip install taskgraph`
``pip install taskgraph``

Then

Expand Down Expand Up @@ -64,3 +64,23 @@ Then
# expect that result is a list `list_len` long with `value_a+value_b` in it
result = pickle.load(open(result_path, 'rb'))
Running Tests
-------------

Taskgraph includes a ``tox`` configuration for automating builds across
multiple python versions and whether ``psutil`` is installed. To
execute all tests, run::

$ tox

Alternatively, if you're only trying to run tests on a single configuration
(say, python 3.5 without ``psutil``), you'd run::

$ tox -e py35-base

Or if you'd like to run the tests for the combination of Python 2.7 with
``psutil``, you'd run::

$ tox -e py27-psutil

6 changes: 4 additions & 2 deletions setup.py
Expand Up @@ -12,7 +12,7 @@
long_description=README,
maintainer='Rich Sharp',
maintainer_email='richpsharp@gmail.com',
url='https://bitbucket.org/richsharp/taskgraph',
url='https://bitbucket.org/natcap/taskgraph',
packages=['taskgraph'],
license='BSD',
keywords='parallel multiprocessing distributed computing',
Expand All @@ -21,11 +21,13 @@
},
classifiers=[
'Intended Audience :: Developers',
'Topic :: System :: Distributed Computing',
'Development Status :: 5 - Production/Stable',
'Natural Language :: English',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft',
'Operating System :: POSIX',
'Programming Language :: Python :: 2 :: Only',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.6',
'License :: OSI Approved :: BSD License'
])
76 changes: 57 additions & 19 deletions taskgraph/Task.py
@@ -1,7 +1,6 @@
"""Task graph framework."""
import heapq
import pprint
import types
import collections
import hashlib
import json
Expand All @@ -11,19 +10,56 @@
import multiprocessing
import threading
import errno
import Queue
try:
import Queue as queue
except ImportError:
# Python3 renamed queue as queue
import queue
import inspect
import abc

# Superclass for ABCs, compatible with python 2.7+ that replaces __metaclass__
# usage that is no longer clearly documented in python 3 (if it's even present
# at all ... __metaclass__ has been removed from the python data model docs)
# Taken from https://stackoverflow.com/a/38668373/299084
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})

try:
import psutil
HAS_PSUTIL = True
if psutil.WINDOWS:
# Windows' scheduler doesn't use POSIX niceness.
PROCESS_LOW_PRIORITY = psutil.BELOW_NORMAL_PRIORITY_CLASS
else:
# On POSIX, use system niceness.
# -20 is high priority, 0 is normal priority, 19 is low priority.
# 10 here is an abritrary selection that's probably nice enough.
PROCESS_LOW_PRIORITY = 10
except ImportError:
HAS_PSUTIL = False

LOGGER = logging.getLogger('Task')


try:
dict.itervalues
except AttributeError:
# Python 3
# range is an iterator in python3.
xrange = range
# In python2, basestring is the common superclass of str and unicode. In
# python3, we'll probably only be dealing with str objects.
basestring = str
def itervalues(d):
"""Python 2/3 compatibility iterator over d.values()"""
return iter(d.values())
else:
# Python 2
def itervalues(d):
"""Python 2/3 compatibility alias for d.itervalues()"""
return d.itervalues()


class TaskGraph(object):
"""Encapsulates the worker and tasks states for parallel processing."""

Expand Down Expand Up @@ -71,10 +107,10 @@ def __init__(self, taskgraph_cache_dir_path, n_workers):
self.worker_pool = multiprocessing.Pool(n_workers)
if HAS_PSUTIL:
parent = psutil.Process()
parent.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
parent.nice(PROCESS_LOW_PRIORITY)
for child in parent.children():
try:
child.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
child.nice(PROCESS_LOW_PRIORITY)
except psutil.NoSuchProcess:
LOGGER.warn(
"NoSuchProcess exception encountered when trying "
Expand All @@ -83,7 +119,7 @@ def __init__(self, taskgraph_cache_dir_path, n_workers):

# used to synchronize a pass through potential tasks to add to the
# work queue
self.work_queue = Queue.Queue()
self.work_queue = queue.Queue()
self.worker_semaphore = threading.Semaphore(max(1, n_workers))
# launch threads to manage the workers
for thread_id in xrange(max(1, n_workers)):
Expand All @@ -94,7 +130,7 @@ def __init__(self, taskgraph_cache_dir_path, n_workers):
worker_thread.start()

# tasks that get passed add_task get put in this queue for scheduling
self.waiting_task_queue = Queue.Queue()
self.waiting_task_queue = queue.Queue()
waiting_task_scheduler = threading.Thread(
target=self._process_waiting_tasks,
name='_waiting_task_scheduler')
Expand All @@ -103,7 +139,7 @@ def __init__(self, taskgraph_cache_dir_path, n_workers):

# tasks in the work ready queue have dependencies satisfied but need
# priority scheduling
self.work_ready_queue = Queue.Queue()
self.work_ready_queue = queue.Queue()
priority_task_scheduler = threading.Thread(
target=self._schedule_priority_tasks,
name='_priority_task_scheduler')
Expand Down Expand Up @@ -237,7 +273,7 @@ def _schedule_priority_tasks(self):
break
# push task to priority queue
heapq.heappush(priority_queue, task)
except Queue.Empty:
except queue.Empty:
# this triggers when work_ready_queue is empty and
# there's something in the work_ready_queue
break
Expand All @@ -259,7 +295,7 @@ def _schedule_priority_tasks(self):
def _process_waiting_tasks(self):
"""Process any tasks that are waiting on dependencies.
This worker monitors the self.waiting_task_queue Queue and looks for
This worker monitors the self.waiting_task_queue queue and looks for
(task, 'wait'), or (task, 'done') tuples.
If mode is 'wait' the task is indexed locally with reference to
Expand Down Expand Up @@ -334,7 +370,7 @@ def join(self, timeout=None):
return True
try:
timedout = False
for task in self.task_id_map.itervalues():
for task in itervalues(self.task_id_map):
timedout = not task.join(timeout)
# if the last task timed out then we want to timeout for all
# of the task graph
Expand Down Expand Up @@ -369,7 +405,7 @@ def _terminate(self):
self.close()
if self.n_workers > 0:
self.worker_pool.terminate()
for task in self.task_id_map.itervalues():
for task in itervalues(self.task_id_map):
task._terminate()
self.terminated = True

Expand Down Expand Up @@ -463,7 +499,7 @@ def __init__(
json.dumps(self.kwargs, sort_keys=True), source_code,
self.target_path_list, str(file_stat_list))

self.task_hash = hashlib.sha1(task_string).hexdigest()
self.task_hash = hashlib.sha1(task_string.encode('utf-8')).hexdigest()

# get ready to make a directory and target based on hashname
# take the first 3 characters of the hash and make a subdirectory
Expand All @@ -480,6 +516,10 @@ def __eq__(self, other):
return self.task_hash == other.task_hash
return False

def __hash__(self):
"""Return the base-16 integer hash of this hash string."""
return int(self.task_hash, 16)

def __ne__(self, other):
"""Inverse of __eq__."""
return not self.__eq__(other)
Expand Down Expand Up @@ -619,23 +659,21 @@ def _terminate(self, exception_object=None):
self._task_complete_event.set()


class EncapsulatedTaskOp:
class EncapsulatedTaskOp(ABC):
"""Used as a superclass for Task operations that need closures.
This class will automatically hash the subclass's __call__ method source
as well as the arguments to its __init__ function to calculate the
Task's unique hash.
"""
__metaclass__ = abc.ABCMeta

def __init__(self, *args, **kwargs):
# try to get the source code of __call__ so task graph will recompute
# if the function has changed
args_as_str = str([args, kwargs])
args_as_str = str([args, kwargs]).encode('utf-8')
try:
# hash the args plus source code of __call__
id_hash = hashlib.sha1(args_as_str + inspect.getsource(
self.__class__.__call__)).hexdigest()
self.__class__.__call__).encode('utf-8')).hexdigest()
except IOError:
# this will fail if the code is compiled, that's okay just do
# the args
Expand Down Expand Up @@ -663,7 +701,7 @@ def _get_file_stats(base_value, ignore_list, ignore_directories):
base_value or nested in base value that are not otherwise
ignored by the input parameters.
"""
if isinstance(base_value, types.StringTypes):
if isinstance(base_value, basestring):
try:
if base_value not in ignore_list and (
not os.path.isdir(base_value) or
Expand All @@ -673,7 +711,7 @@ def _get_file_stats(base_value, ignore_list, ignore_directories):
except OSError:
pass
elif isinstance(base_value, collections.Mapping):
for key in sorted(base_value.iterkeys()):
for key in sorted(base_value.keys()):
value = base_value[key]
for stat in _get_file_stats(
value, ignore_list, ignore_directories):
Expand Down
8 changes: 7 additions & 1 deletion taskgraph/__init__.py
@@ -1,6 +1,12 @@
"""taskgraph module."""
from __future__ import unicode_literals
from __future__ import absolute_import
import pkg_resources
from taskgraph.Task import *
from .Task import TaskGraph, Task, EncapsulatedTaskOp


__all__ = ['TaskGraph', 'Task', 'EncapsulatedTaskOp']


try:
__version__ = pkg_resources.get_distribution(__name__).version
Expand Down
10 changes: 9 additions & 1 deletion tests/test_task.py
Expand Up @@ -8,16 +8,23 @@
import pickle
import logging

import taskgraph
import mock

import taskgraph

logging.basicConfig(level=logging.DEBUG)


# Python 3 relocated the reload function to imp.
if 'reload' not in __builtins__:
from imp import reload


def _long_running_function():
"""Wait for 5 seconds."""
time.sleep(5)


def _create_list_on_disk(value, length, target_path):
"""Create a numpy array on disk filled with value of `size`."""
target_list = [value] * length
Expand All @@ -38,6 +45,7 @@ def _div_by_zero():
"""Divide by zero to raise an exception."""
return 1/0


class TaskGraphTests(unittest.TestCase):
"""Tests for the taskgraph."""

Expand Down
12 changes: 12 additions & 0 deletions tox.ini
@@ -0,0 +1,12 @@
[tox]
envlist = {py27,py36}-{base,psutil}

[testenv]
commands = pytest --cov-report=term --cov-report=xml --cov --junitxml=testresults.xml

# only install psutil to the environments where we're testing psutil.
deps =
mock
pytest
pytest-cov
{py27,py36}-psutil: psutil

0 comments on commit a633d85

Please sign in to comment.