Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enables running of multiple tasks in batches #1538

Closed
wants to merge 11 commits into from
25 changes: 24 additions & 1 deletion luigi/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import abc
import datetime
import enum
import warnings
import json
from json import JSONEncoder
Expand All @@ -44,6 +45,25 @@
_no_value = object()


def join_with_commas(values):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this isn't public API prepend an _ underscore. _join_with_commas

""" Join all values with a comma.

This is necessary because ','.join is not picklable but top-level functions
are.

"""
return ','.join(values)


class BatchAggregation(enum.Enum):
COMMA_LIST = join_with_commas # join all values with commas
MIN_VALUE = min # choose just the minimum value by string representation
MAX_VALUE = max # choose just the maximum value by string representation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the # choose just the maximum value by string representation visible on readthedocs page?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to make these visible on readthedocs. Maybe I should move this documentation to the batch_method parameter description? It seems a little unfortunate to have it there :(


def __call__(self, values):
return self.value(values)


class ParameterException(Exception):
"""
Base exception.
Expand Down Expand Up @@ -139,7 +159,10 @@ def __init__(self, default=_no_value, is_global=False, significant=True, descrip
``positional=False`` for abstract base classes and similar cases.
:param bool always_in_help: For the --help option in the command line
parsing. Set true to always show in --help.
:param str batch_method: How multiple
:param BatchAggregation batch_method: How multiple values of this argument are combined when
batching in the scheduler. See BatchAggregation for
details of what each one means. Default is None,
meaning that this parameter cannot be aggregated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good docs, but for this big new feature I expect there to be something similar to what we have for docs for Range*. Why not add it in the same file luigi_patterns.rst? In particular make sure to explain these cases:

  • It should be clear to the user what happens when you have some batched and some non-batched params.
  • It should be clear what happens when you have an IntParameter but batch as COMMA_LIST. (that's the reason I don't like COMMA_LIST, can't we have a python list instead?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The COMMA_LIST refers to how we combine serializations. If we combined them as an actual list, it would break deserialization. It's up to the user to define the parameters to deserialize into a list if that's what they want. If we build in the list serialization, it would be very similar to re-implementing list parameters, which you were big on removing.

What we could do instead is provide a wrapper to translate between lists and commas. So instead of IntParameter(batch_method=COMMA_LIST) you would do batched_list(IntParameter()). That way we don't need to build any special-case code in the Parameter class just for COMMA_LIST.

"""
self._default = default
if is_global:
Expand Down
25 changes: 12 additions & 13 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,6 @@ class Scheduler(object):
BATCH_TASKS = 'batch_tasks'
RUNNING_BATCHES = 'running_batches'

AGGREGATE_FUNCTIONS = {
'csv': ','.join,
'max': max,
'min': min,
'range': lambda args: '%s-%s' % (min(args), max(args)),
}


class scheduler(Config):
# TODO(erikbern): the config_path is needed for backwards compatilibity. We
Expand Down Expand Up @@ -242,7 +235,7 @@ def task_id(self, tasks):
raw_vals = [task.params[arg] for task in tasks]
agg_function = self.aggregates.get(arg)
if agg_function is not None:
arg_val = AGGREGATE_FUNCTIONS[agg_function](raw_vals)
arg_val = agg_function(raw_vals)
elif any(v != raw_vals[0] for v in raw_vals):
return None, None
else:
Expand Down Expand Up @@ -341,14 +334,19 @@ def get_state(self):
}

def set_state(self, state):
# This check is for backward compatability as of 2016-06-02. In the future we should be able
# to assume a dictionary.
if isinstance(state, dict):
self._tasks = state.get(TASKS, {})
self._active_workers = state.get(ACTIVE_WORKERS, {})
self._batch_tasks = state.get(BATCH_TASKS, {})
self._running_batches = state.get(RUNNING_BATCHES, {})

# TODO: remove this code path
else:
self._tasks, self._active_workers = state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a note that this is only for backward compatibility and this codepath will be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self._batch_tasks = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add self._running_batches = {}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self._running_batches = {}

def dump(self):
try:
Expand Down Expand Up @@ -392,7 +390,7 @@ def get_pending_tasks(self):
return itertools.chain.from_iterable(six.itervalues(self._status_tasks[status])
for status in [PENDING, RUNNING])

def get_batch(self, worker_id, tasks):
def create_batch_task(self, worker_id, tasks):
if len(tasks) == 1:
return tasks[0]
families = set(task.family for task in tasks)
Expand Down Expand Up @@ -437,8 +435,8 @@ def get_task(self, task_id, default=None, setdefault=None):
else:
return self._tasks.get(task_id, default)

def get_batcher(self, worker, family):
return self._batch_tasks.get((worker, family))
def get_batcher(self, worker, task_family):
return self._batch_tasks.get((worker, task_family))

def set_batcher(self, worker, family, batcher_aggregate_args, max_batch_size):
batcher = TaskBatcher(batcher_aggregate_args, max_batch_size)
Expand Down Expand Up @@ -898,8 +896,9 @@ def get_work(self, host=None, assistant=False, current_tasks=None, **kwargs):
if len(task.workers) == 1 and not assistant:
n_unique_pending += 1

# batch as many tasks as possible together if multiple batchable tasks are available
if (in_workers and best_tasks and task.batchable and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment above this line # Batch as many tasks as possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self._state.get_batch(worker_id, best_tasks + [task]) is not None and
self._state.create_batch_task(worker_id, best_tasks + [task]) is not None and
self._schedulable(task) and
self._has_resources(task.resources, greedy_resources)):
best_tasks.append(task)
Expand Down Expand Up @@ -934,7 +933,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, **kwargs):
'n_unique_pending': n_unique_pending}

if best_tasks:
best_batch = self._state.get_batch(worker_id, best_tasks)
best_batch = self._state.create_batch_task(worker_id, best_tasks)
best_task = self._state.get_task(best_batch.id, setdefault=best_batch)
for task in best_tasks:
if task == best_task:
Expand Down
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# the License.

import os
import sys

from setuptools import setup

Expand Down Expand Up @@ -41,6 +42,9 @@ def get_static_files(path):
'python-daemon<3.0',
]

if sys.version_info < (3, 4):
install_requires.append('enum34==1.1.6')

if os.environ.get('READTHEDOCS', None) == 'True':
# So that we can build documentation for luigi.db_task_history and luigi.contrib.sqla
install_requires.append('sqlalchemy')
Expand Down