Skip to content

Commit

Permalink
Add an event that tells how long run() took.
Browse files Browse the repository at this point in the history
Quintessential example:

@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def be_mindful_of_time(task, x):
	my.metrics.send_value(type(task).__name__, x)
  • Loading branch information
paxan committed Aug 13, 2014
1 parent cf61799 commit c65da20
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
9 changes: 7 additions & 2 deletions luigi/worker.py
Expand Up @@ -49,6 +49,7 @@ class Event:
START = "event.core.start"
FAILURE = "event.core.failure"
SUCCESS = "event.core.success"
PROCESSING_TIME = "event.core.processing_time"


class Worker(object):
Expand Down Expand Up @@ -254,7 +255,7 @@ def _add(self, task):
self._validate_dependency(d)
task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d)
yield d # return additional tasks to add

deps = [d.task_id for d in deps]

self._scheduled_tasks[task.task_id] = task
Expand Down Expand Up @@ -286,7 +287,11 @@ def _run_task(self, task_id):
deps = 'dependency' if len(missing) == 1 else 'dependencies'
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
task.trigger_event(Event.START, task)
task.run()
t0 = time.time()
try:
task.run()
finally:
task.trigger_event(Event.PROCESSING_TIME, task, time.time() - t0)
error_message = json.dumps(task.on_success())
logger.info('[pid %s] Worker %s done %s', os.getpid(), self._id, task_id)
task.trigger_event(Event.SUCCESS, task)
Expand Down
15 changes: 15 additions & 0 deletions test/test_event_callbacks.py
@@ -1,4 +1,6 @@
from unittest import TestCase
from mock import patch
import random
from luigi import Task, build, Event
from luigi.mock import MockFile, MockFileSystem
from luigi.task import flatten
Expand Down Expand Up @@ -72,6 +74,19 @@ def story_dummy():
build([t], local_scheduler=True)
self.assertEquals(dummies[0], "foo")

def test_processing_time_handler(self):
@EmptyTask.event_handler(Event.PROCESSING_TIME)
def save_task(task, processing_time):
self.result = task, processing_time

times = [43.0, 1.0]
t = EmptyTask(random.choice([True, False]))
with patch('luigi.worker.time') as mock:
mock.time = times.pop
build([t], local_scheduler=True)
self.assertIs(self.result[0], t)
self.assertEqual(self.result[1], 42.0)


# A
# / \
Expand Down

0 comments on commit c65da20

Please sign in to comment.