Skip to content

Commit

Permalink
A new List parameter *resources* has been added to the worker. This p…
Browse files Browse the repository at this point in the history
…arameter

lists resources the worker has. These will be checked by the scheduler on work
request and based on that tasks will be held back from execution by workers not
having the required resources.
* Task resource requirements (*worker_resources*) are reported by the worker to
  the scheduler on _add_task()
* The list of available resources (*worker_resources*) is sent by the worker on
  requesting new work on _get_work()

The scheduler stores worker resource requirements *worker_resources* with each
task. On work request *get_work()*, the scheduler checks if the decision is
trivial (this was changed to consider the existence of worker resource
requirements as well):
* only check tasks reported by the worker requesting
* check all tasks reported by all workers

If the scheduler finds a suitable task (even if it is not scheduled by the
worker requesting), it will send it back, as the worker can load the task
dynamically. This implies that the in_workers (a bool value showing thether the
worker has the found task scheduled or not) check was removed from the
scheduler code.
  • Loading branch information
kz0ltan committed Feb 7, 2021
1 parent f0a6328 commit 2bab868
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 9 deletions.
37 changes: 30 additions & 7 deletions luigi/scheduler.py
Expand Up @@ -234,7 +234,7 @@ def __eq__(self, other):
class Task:
def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None,
params=None, param_visibilities=None, accepts_messages=False, tracking_url=None, status_message=None,
progress_percentage=None, retry_policy='notoptional'):
progress_percentage=None, retry_policy='notoptional', worker_resources=None):
self.id = task_id
self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active)
self.workers = OrderedSet() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active
Expand Down Expand Up @@ -271,6 +271,7 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='',
self.runnable = False
self.batchable = False
self.batch_id = None
self.worker_resources = worker_resources

def __repr__(self):
return "Task(%r)" % vars(self)
Expand Down Expand Up @@ -793,35 +794,42 @@ def mark_as_done(self, task_id=None):
@rpc_method()
def add_task(self, task_id=None, status=PENDING, runnable=True,
deps=None, new_deps=None, expl=None, resources=None,
priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False,
assistant=False, tracking_url=None, worker=None, batchable=None,
batch_id=None, retry_policy_dict=None, owners=None, **kwargs):
priority=0, family='', module=None, params=None,
param_visibilities=None, accepts_messages=False,
assistant=False, tracking_url=None, worker=None,
batchable=None, batch_id=None, retry_policy_dict=None,
owners=None, worker_resources=None, **kwargs):
"""
* add task identified by task_id if it doesn't exist
* if deps is not None, update dependency list
* update status of task
* add additional workers/stakeholders
* update priority when needed
"""
# update worker last check-in
assert worker is not None
worker_id = worker
worker = self._update_worker(worker_id)

resources = {} if resources is None else resources.copy()
worker_resources = [] if worker_resources is None else worker_resources.copy()

if retry_policy_dict is None:
retry_policy_dict = {}

retry_policy = self._generate_retry_policy(retry_policy_dict)

# default task
if worker.enabled:
_default_task = self._make_task(
task_id=task_id, status=PENDING, deps=deps, resources=resources,
priority=priority, family=family, module=module, params=params, param_visibilities=param_visibilities,
worker_resources=worker_resources,
)
else:
_default_task = None

# get existing / create default task
task = self._state.get_task(task_id, setdefault=_default_task)

if task is None or (task.status != RUNNING and not worker.enabled):
Expand Down Expand Up @@ -914,6 +922,9 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
if resources is not None:
task.resources = resources

if worker_resources is not None:
task.worker_resources = worker_resources

if worker.enabled and not assistant:
task.stakeholders.add(worker_id)

Expand Down Expand Up @@ -1113,7 +1124,8 @@ def count_pending(self, worker):
}

@rpc_method(allow_null=False)
def get_work(self, host=None, assistant=False, current_tasks=None, worker=None, **kwargs):
def get_work(self, host=None, assistant=False, current_tasks=None,
worker=None, worker_resources=None, **kwargs):
# TODO: remove any expired nodes

# Algo: iterate over all nodes, find the highest priority node no dependencies and available
Expand Down Expand Up @@ -1165,7 +1177,8 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
worker = self._state.get_worker(worker_id)
if self._paused:
relevant_tasks = []
elif worker.is_trivial_worker(self._state):
elif worker.is_trivial_worker(self._state) and not worker_resources:
# If it's not an assistant having only tasks that are without requirements.
relevant_tasks = worker.get_tasks(self._state, PENDING, RUNNING)
used_resources = collections.defaultdict(int)
greedy_workers = dict() # If there's no resources, then they can grab any task
Expand All @@ -1180,6 +1193,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
tasks.sort(key=self._rank, reverse=True)

for task in tasks:
# batched task
if (best_task and batched_params and task.family == best_task.family and
len(batched_tasks) < max_batch_size and task.is_batchable() and all(
task.params.get(name) == value for name, value in unbatched_params.items()) and
Expand All @@ -1196,8 +1210,17 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
greedy_resources[resource] += amount

if self._schedulable(task) and self._has_resources(task.resources, greedy_resources):

# task.workers: workers ids that can perform task - task is
# 'BROKEN' if none of these workers are active
in_workers = (assistant and task.runnable) or worker_id in task.workers
if in_workers and self._has_resources(task.resources, used_resources):

worker_has_resources = all(r in worker_resources for r in task.worker_resources)

# in_workers check has been removed as the worker can load the
# task dynamically, even if it didn't schedule it before
#if in_workers and self._has_resources(task.resources, used_resources):
if self._has_resources(task.resources, used_resources) and worker_has_resources:
best_task = task
batch_param_names, max_batch_size = self._state.get_batcher(
worker_id, task.family)
Expand Down
21 changes: 19 additions & 2 deletions luigi/worker.py
Expand Up @@ -57,7 +57,7 @@
from luigi.task import Task, flatten, getpaths, Config
from luigi.task_register import TaskClassException
from luigi.task_status import RUNNING
from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter
from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter, ListParameter

import json

Expand Down Expand Up @@ -461,6 +461,9 @@ class worker(Config):
'used for obtaining high level customizable monitoring or logging of '
'each individual Task run.')

resources = ListParameter(default='',
config_path=dict(section='core', name='worker-resrouces'),
description='Resources available for the worker')

class KeepAliveThread(threading.Thread):
"""
Expand Down Expand Up @@ -743,6 +746,7 @@ def add(self, task, multiprocess=False, processes=0):
queue = DequeQueue()
pool = SingleProcessPool()
self._validate_task(task)
# check_complete(task, out_queue) -> puts into queue if task is complete
pool.apply_async(check_complete, [task, queue])

# we track queue size ourselves because len(queue) won't work for multiprocessing
Expand All @@ -753,6 +757,7 @@ def add(self, task, multiprocess=False, processes=0):
current = queue.get()
queue_size -= 1
item, is_complete = current
# add any dependencies if found
for next in self._add(item, is_complete):
if next.task_id not in seen:
self._validate_task(next)
Expand Down Expand Up @@ -857,7 +862,9 @@ def _add(self, task, is_complete):

deps = [d.task_id for d in deps]

# add to local scheduled_tasks
self._scheduled_tasks[task.task_id] = task
# add to scheduler
self._add_task(
worker=self._id,
task_id=task.task_id,
Expand All @@ -872,6 +879,7 @@ def _add(self, task, is_complete):
batchable=task.batchable,
retry_policy_dict=_get_retry_policy_dict(task),
accepts_messages=task.accepts_messages,
worker_resources=task.worker_resources,
)

def _validate_dependency(self, dependency):
Expand Down Expand Up @@ -948,6 +956,7 @@ def _get_work(self):
host=self.host,
assistant=self._assistant,
current_tasks=list(self._running_tasks.keys()),
worker_resources=self._config.resources,
)
else:
logger.debug("Checking if tasks are still pending")
Expand Down Expand Up @@ -1064,6 +1073,7 @@ def _handle_next_task(self):
except Queue.Empty:
return


task = self._scheduled_tasks[task_id]
if not task or task_id not in self._running_tasks:
continue
Expand All @@ -1075,6 +1085,7 @@ def _handle_next_task(self):
if status == FAILED and not external_task_retryable:
self._email_task_failure(task, expl)

# add new requirements
new_deps = []
if new_requirements:
new_req = [load_task(module, name, params)
Expand All @@ -1083,6 +1094,7 @@ def _handle_next_task(self):
self.add(t)
new_deps = [t.task_id for t in new_req]

# add task
self._add_task(worker=self._id,
task_id=task_id,
status=status,
Expand All @@ -1094,7 +1106,8 @@ def _handle_next_task(self):
module=task.task_module,
new_deps=new_deps,
assistant=self._assistant,
retry_policy_dict=_get_retry_policy_dict(task))
retry_policy_dict=_get_retry_policy_dict(task),
worker_resources=task.worker_resources)

self._running_tasks.pop(task_id)

Expand Down Expand Up @@ -1180,18 +1193,22 @@ def run(self):
self._add_worker()

while True:
# while something is running
while len(self._running_tasks) >= self.worker_processes > 0:
logger.debug('%d running tasks, waiting for next task to finish', len(self._running_tasks))
self._handle_next_task()

get_work_response = self._get_work()

# if scheduler shut down this worker
if get_work_response.worker_state == WORKER_STATE_DISABLED:
self._start_phasing_out()

# if scheduler didn't send a task to run
if get_work_response.task_id is None:
if not self._stop_requesting_work:
self._log_remote_tasks(get_work_response)
# if no tasks are running currently
if len(self._running_tasks) == 0:
self._idle_since = self._idle_since or datetime.datetime.now()
if self._keep_alive(get_work_response):
Expand Down

0 comments on commit 2bab868

Please sign in to comment.