Skip to content

Commit

Permalink
Adds a current_tasks argument for retransmission in get_work
Browse files Browse the repository at this point in the history
Sometimes a worker will not receive the result of a get_work call from the
scheduler. This leaves the scheduler thinking that the worker is running tasks
that the worker is not running. In order to safely clear up the misunderstanding
we let the scheduler know which tasks the worker is running with each get_work
call. If the scheduler finds that the worker isn't aware of a task that it
should be running, the scheduler will send it to the worker.

In order to maintain backward compatibility and not have to rewrite every unit
test, leaving out current_tasks from the get_work call will be treated as
equivalent to sending every currently running task.

Note that this does not check that every task sent in the currently running
list is actually running.
  • Loading branch information
daveFNbuck committed Oct 22, 2015
1 parent 6aae26f commit f2e6fff
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
9 changes: 8 additions & 1 deletion luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def _schedulable(self, task):
def _retry_time(self, task, config):
return time.time() + config.retry_delay

def get_work(self, host=None, assistant=False, **kwargs):
def get_work(self, host=None, assistant=False, current_tasks=None, **kwargs):
# TODO: remove any expired nodes

# Algo: iterate over all nodes, find the highest priority node no dependencies and available
Expand All @@ -729,7 +729,14 @@ def get_work(self, host=None, assistant=False, **kwargs):
self.update(worker_id, {'host': host}, get_work=True)
if assistant:
self.add_worker(worker_id, [('assistant', assistant)])

best_task = None
if current_tasks is not None:
ct_set = set(current_tasks)
for task in sorted(self._state.get_running_tasks(), key=self._rank):
if task.worker_running == worker_id and task.id not in ct_set:
best_task = task

locally_pending_tasks = 0
running_tasks = []
upstream_table = {}
Expand Down
7 changes: 6 additions & 1 deletion luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,12 @@ def _get_work(self):
if self._stop_requesting_work:
return None, 0, 0, 0
logger.debug("Asking scheduler for work...")
r = self._scheduler.get_work(worker=self._id, host=self.host, assistant=self._assistant)
r = self._scheduler.get_work(
worker=self._id,
host=self.host,
assistant=self._assistant,
current_tasks=list(self._running_tasks.keys()),
)
n_pending_tasks = r['n_pending_tasks']
task_id = r['task_id']
running_tasks = r['running_tasks']
Expand Down
22 changes: 22 additions & 0 deletions test/central_planner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,28 @@ def test_retry(self):
self.sch.prune()
self.assertEqual(self.sch.get_work(worker=WORKER)['task_id'], 'A')

def test_resend_task(self):
self.sch.add_task(worker=WORKER, task_id='A')
self.sch.add_task(worker=WORKER, task_id='B')
for _ in range(10):
self.assertEqual('A', self.sch.get_work(worker=WORKER, current_tasks=[])['task_id'])
self.assertEqual('B', self.sch.get_work(worker=WORKER, current_tasks=['A'])['task_id'])

def test_resend_multiple_tasks(self):
self.sch.add_task(worker=WORKER, task_id='A')
self.sch.add_task(worker=WORKER, task_id='B')
self.sch.add_task(worker=WORKER, task_id='C')

# get A and B running
self.assertEqual('A', self.sch.get_work(worker=WORKER)['task_id'])
self.assertEqual('B', self.sch.get_work(worker=WORKER)['task_id'])

for _ in range(10):
self.assertEqual('A', self.sch.get_work(worker=WORKER, current_tasks=[])['task_id'])
self.assertEqual('A', self.sch.get_work(worker=WORKER, current_tasks=['B'])['task_id'])
self.assertEqual('B', self.sch.get_work(worker=WORKER, current_tasks=['A'])['task_id'])
self.assertEqual('C', self.sch.get_work(worker=WORKER, current_tasks=['A', 'B'])['task_id'])

def test_disconnect_running(self):
# X and Y wants to run A.
# X starts but does not report back. Y does.
Expand Down
19 changes: 19 additions & 0 deletions test/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,25 @@ def requires(self):
self.assertTrue(a.complete())
self.assertTrue(b.complete())

def test_gets_missed_work(self):
class A(Task):
done = False

def complete(self):
return self.done

def run(self):
self.done = True

a = A()
self.assertTrue(self.w.add(a))

# simulate a missed get_work response
self.assertEqual('A()', self.sch.get_work(worker='X')['task_id'])

self.assertTrue(self.w.run())
self.assertTrue(a.complete())

def test_avoid_infinite_reschedule(self):
class A(Task):

Expand Down

0 comments on commit f2e6fff

Please sign in to comment.