Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a current_tasks argument for retransmission in get_work #1341

Merged
merged 1 commit into from
Nov 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions luigi/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,15 @@ def add_task(self, worker, task_id, status=PENDING, runnable=True,
'assistant': assistant,
})

def get_work(self, worker, host=None, assistant=False):
def get_work(self, worker, host=None, assistant=False, current_tasks=None):
return self._request(
'/api/get_work',
{'worker': worker, 'host': host, 'assistant': assistant})
{
'worker': worker,
'host': host,
'assistant': assistant,
'current_tasks': current_tasks,
})

def graph(self):
return self._request('/api/graph', {})
Expand Down
9 changes: 8 additions & 1 deletion luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def _schedulable(self, task):
def _retry_time(self, task, config):
return time.time() + config.retry_delay

def get_work(self, host=None, assistant=False, **kwargs):
def get_work(self, host=None, assistant=False, current_tasks=None, **kwargs):
# TODO: remove any expired nodes

# Algo: iterate over all nodes, find the highest priority node no dependencies and available
Expand All @@ -729,7 +729,14 @@ def get_work(self, host=None, assistant=False, **kwargs):
self.update(worker_id, {'host': host}, get_work=True)
if assistant:
self.add_worker(worker_id, [('assistant', assistant)])

best_task = None
if current_tasks is not None:
ct_set = set(current_tasks)
for task in sorted(self._state.get_running_tasks(), key=self._rank):
if task.worker_running == worker_id and task.id not in ct_set:
best_task = task

locally_pending_tasks = 0
running_tasks = []
upstream_table = {}
Expand Down
7 changes: 6 additions & 1 deletion luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,12 @@ def _get_work(self):
if self._stop_requesting_work:
return None, 0, 0, 0
logger.debug("Asking scheduler for work...")
r = self._scheduler.get_work(worker=self._id, host=self.host, assistant=self._assistant)
r = self._scheduler.get_work(
worker=self._id,
host=self.host,
assistant=self._assistant,
current_tasks=list(self._running_tasks.keys()),
)
n_pending_tasks = r['n_pending_tasks']
task_id = r['task_id']
running_tasks = r['running_tasks']
Expand Down
22 changes: 22 additions & 0 deletions test/central_planner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,28 @@ def test_retry(self):
self.sch.prune()
self.assertEqual(self.sch.get_work(worker=WORKER)['task_id'], 'A')

def test_resend_task(self):
self.sch.add_task(worker=WORKER, task_id='A')
self.sch.add_task(worker=WORKER, task_id='B')
for _ in range(10):
self.assertEqual('A', self.sch.get_work(worker=WORKER, current_tasks=[])['task_id'])
self.assertEqual('B', self.sch.get_work(worker=WORKER, current_tasks=['A'])['task_id'])

def test_resend_multiple_tasks(self):
self.sch.add_task(worker=WORKER, task_id='A')
self.sch.add_task(worker=WORKER, task_id='B')
self.sch.add_task(worker=WORKER, task_id='C')

# get A and B running
self.assertEqual('A', self.sch.get_work(worker=WORKER)['task_id'])
self.assertEqual('B', self.sch.get_work(worker=WORKER)['task_id'])

for _ in range(10):
self.assertEqual('A', self.sch.get_work(worker=WORKER, current_tasks=[])['task_id'])
self.assertEqual('A', self.sch.get_work(worker=WORKER, current_tasks=['B'])['task_id'])
self.assertEqual('B', self.sch.get_work(worker=WORKER, current_tasks=['A'])['task_id'])
self.assertEqual('C', self.sch.get_work(worker=WORKER, current_tasks=['A', 'B'])['task_id'])

def test_disconnect_running(self):
# X and Y wants to run A.
# X starts but does not report back. Y does.
Expand Down
1 change: 1 addition & 0 deletions test/execution_summary_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def run(self):
old_func = luigi.scheduler.CentralPlannerScheduler.get_work

def new_func(*args, **kwargs):
kwargs['current_tasks'] = None
old_func(*args, **kwargs)
return old_func(*args, **kwargs)

Expand Down
1 change: 1 addition & 0 deletions test/retcodes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def run(self):
old_func = luigi.scheduler.CentralPlannerScheduler.get_work

def new_func(*args, **kwargs):
kwargs['current_tasks'] = None
old_func(*args, **kwargs)
res = old_func(*args, **kwargs)
res['running_tasks'][0]['worker'] = "not me :)" # Otherwise it will be filtered
Expand Down
8 changes: 4 additions & 4 deletions test/scheduler_visualisation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,15 +412,15 @@ def test_worker_list_running(self):
class X(luigi.Task):
n = luigi.IntParameter()

w = luigi.worker.Worker(scheduler=self.scheduler, worker_processes=3)
w = luigi.worker.Worker(worker_id='w', scheduler=self.scheduler, worker_processes=3)
w.add(X(0))
w.add(X(1))
w.add(X(2))
w.add(X(3))

w._get_work()
w._get_work()
w._get_work()
self.scheduler.get_work(worker='w')
self.scheduler.get_work(worker='w')
self.scheduler.get_work(worker='w')

workers = self._remote().worker_list()
self.assertEqual(1, len(workers))
Expand Down
19 changes: 19 additions & 0 deletions test/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,25 @@ def requires(self):
self.assertTrue(a.complete())
self.assertTrue(b.complete())

def test_gets_missed_work(self):
class A(Task):
done = False

def complete(self):
return self.done

def run(self):
self.done = True

a = A()
self.assertTrue(self.w.add(a))

# simulate a missed get_work response
self.assertEqual('A()', self.sch.get_work(worker='X')['task_id'])

self.assertTrue(self.w.run())
self.assertTrue(a.complete())

def test_avoid_infinite_reschedule(self):
class A(Task):

Expand Down