-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
tasks.rst
378 lines (274 loc) · 12.6 KB
/
tasks.rst
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
Tasks
-----
Tasks are where the execution takes place.
Tasks depend on each other and output targets.
An outline of how a task can look like:
.. figure:: task_breakdown.png
:alt: Task breakdown
.. _Task.requires:
Task.requires
~~~~~~~~~~~~~
The :func:`~luigi.task.Task.requires` method is used to specify dependencies on other Task object,
which might even be of the same class.
For instance, an example implementation could be
.. code:: python
def requires(self):
return OtherTask(self.date), DailyReport(self.date - datetime.timedelta(1))
In this case, the DailyReport task depends on two inputs created earlier,
one of which is the same class.
requires can return other Tasks in any way wrapped up within dicts/lists/tuples/etc.
Requiring another Task
~~~~~~~~~~~~~~~~~~~~~~
Note that :func:`~luigi.task.Task.requires` can *not* return a :class:`~luigi.target.Target` object.
If you have a simple Target object that is created externally
you can wrap it in a Task class like this:
.. code:: python
class LogFiles(luigi.ExternalTask):
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/log')
This also makes it easier to add parameters:
.. code:: python
class LogFiles(luigi.ExternalTask):
date = luigi.DateParameter()
def output(self):
return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/log/%Y-%m-%d'))
.. _Task.output:
Task.output
~~~~~~~~~~~
The :func:`~luigi.task.Task.output` method returns one or more :class:`~luigi.target.Target` objects.
Similarly to requires, you can return them wrapped up in any way that's convenient for you.
However we recommend that any :class:`~luigi.task.Task` only return one single :class:`~luigi.target.Target` in output.
If multiple outputs are returned,
atomicity will be lost unless the :class:`~luigi.task.Task` itself can ensure that each :class:`~luigi.target.Target` is atomically created.
(If atomicity is not of concern, then it is safe to return multiple :class:`~luigi.target.Target` objects.)
.. code:: python
class DailyReport(luigi.Task):
date = luigi.DateParameter()
def output(self):
return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/reports/%Y-%m-%d'))
# ...
.. _Task.run:
Task.run
~~~~~~~~
The :func:`~luigi.task.Task.run` method now contains the actual code that is run.
When you are using Task.requires_ and Task.run_ Luigi breaks down everything into two stages.
First it figures out all dependencies between tasks,
then it runs everything.
The :func:`~luigi.task.Task.input` method is an internal helper method that just replaces all Task objects in requires
with their corresponding output.
An example:
.. code:: python
class GenerateWords(luigi.Task):
def output(self):
return luigi.LocalTarget('words.txt')
def run(self):
# write a dummy list of words to output file
words = [
'apple',
'banana',
'grapefruit'
]
with self.output().open('w') as f:
for word in words:
f.write('{word}\n'.format(word=word))
class CountLetters(luigi.Task):
def requires(self):
return GenerateWords()
def output(self):
return luigi.LocalTarget('letter_counts.txt')
def run(self):
# read in file as list
with self.input().open('r') as infile:
words = infile.read().splitlines()
# write each word to output file with its corresponding letter count
with self.output().open('w') as outfile:
for word in words:
outfile.write(
'{word} | {letter_count}\n'.format(
word=word,
letter_count=len(word)
)
)
It's useful to note that if you're writing to a binary file, Luigi automatically
strips the ``'b'`` flag due to how atomic writes/reads work. In order to write a binary
file, such as a pickle file, you should instead use ``format=Nop`` when calling
LocalTarget. Following the above example:
.. code:: python
from luigi.format import Nop
class GenerateWords(luigi.Task):
def output(self):
return luigi.LocalTarget('words.pckl', format=Nop)
def run(self):
import pickle
# write a dummy list of words to output file
words = [
'apple',
'banana',
'grapefruit'
]
with self.output().open('w') as f:
pickle.dump(words, f)
It is your responsibility to ensure that after running :func:`~luigi.task.Task.run`, the task is
complete, i.e. :func:`~luigi.task.Task.complete` returns ``True``. Unless you have overridden
:func:`~luigi.task.Task.complete`, :func:`~luigi.task.Task.run` should generate all the targets
defined as outputs. Luigi verifies that you adhere to the contract before running downstream
dependencies, and reports ``Unfulfilled dependencies at run time`` if a violation is detected.
.. _Task.input:
Task.input
~~~~~~~~~~
As seen in the example above, :func:`~luigi.task.Task.input` is a wrapper around Task.requires_ that
returns the corresponding Target objects instead of Task objects.
Anything returned by Task.requires_ will be transformed, including lists,
nested dicts, etc.
This can be useful if you have many dependencies:
.. code:: python
class TaskWithManyInputs(luigi.Task):
def requires(self):
return {'a': TaskA(), 'b': [TaskB(i) for i in xrange(100)]}
def run(self):
f = self.input()['a'].open('r')
g = [y.open('r') for y in self.input()['b']]
Dynamic dependencies
~~~~~~~~~~~~~~~~~~~~
Sometimes you might not know exactly what other tasks to depend on until runtime.
In that case, Luigi provides a mechanism to specify dynamic dependencies.
If you yield another :class:`~luigi.task.Task` in the Task.run_ method,
the current task will be suspended and the other task will be run.
You can also yield a list of tasks.
.. code:: python
class MyTask(luigi.Task):
def run(self):
other_target = yield OtherTask()
# dynamic dependencies resolve into targets
f = other_target.open('r')
This mechanism is an alternative to Task.requires_ in case
you are not able to build up the full dependency graph before running the task.
It does come with some constraints:
the Task.run_ method will resume from scratch each time a new task is yielded.
In other words, you should make sure your Task.run_ method is idempotent.
(This is good practice for all Tasks in Luigi, but especially so for tasks with dynamic dependencies).
As this might entail redundant calls to tasks' :func:`~luigi.task.Task.complete` methods,
you should consider setting the "cache_task_completion" option in the :ref:`worker-config`.
To further control how dynamic task requirements are handled internally by worker nodes,
there is also the option to wrap dependent tasks by :class:`~luigi.task.DynamicRequirements`.
For an example of a workflow using dynamic dependencies, see
`examples/dynamic_requirements.py <https://github.com/spotify/luigi/blob/master/examples/dynamic_requirements.py>`_.
Task status tracking
~~~~~~~~~~~~~~~~~~~~
For long-running or remote tasks it is convenient to see extended status information not only on
the command line or in your logs but also in the GUI of the central scheduler. Luigi implements
dynamic status messages, progress bar and tracking urls which may point to an external monitoring system.
You can set this information using callbacks within Task.run_:
.. code:: python
class MyTask(luigi.Task):
def run(self):
# set a tracking url
self.set_tracking_url("http://...")
# set status messages during the workload
for i in range(100):
# do some hard work here
if i % 10 == 0:
self.set_status_message("Progress: %d / 100" % i)
# displays a progress bar in the scheduler UI
self.set_progress_percentage(i)
.. _Events:
Events and callbacks
~~~~~~~~~~~~~~~~~~~~
Luigi has a built-in event system that
allows you to register callbacks to events and trigger them from your own tasks.
You can both hook into some pre-defined events and create your own.
Each event handle is tied to a Task class and
will be triggered only from that class or
a subclass of it.
This allows you to effortlessly subscribe to events only from a specific class (e.g. for hadoop jobs).
.. code:: python
@luigi.Task.event_handler(luigi.Event.SUCCESS)
def celebrate_success(task):
"""Will be called directly after a successful execution
of `run` on any Task subclass (i.e. all luigi Tasks)
"""
...
@luigi.contrib.hadoop.JobTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
"""Will be called directly after a failed execution
of `run` on any JobTask subclass
"""
...
luigi.run()
But I just want to run a Hadoop job?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The Hadoop code is integrated in the rest of the Luigi code because
we really believe almost all Hadoop jobs benefit from being part of some sort of workflow.
However, in theory, nothing stops you from using the :class:`~luigi.contrib.hadoop.JobTask` class (and also :class:`~luigi.contrib.hdfs.target.HdfsTarget`)
without using the rest of Luigi.
You can simply run it manually using
.. code:: python
MyJobTask('abc', 123).run()
You can use the hdfs.target.HdfsTarget class anywhere by just instantiating it:
.. code:: python
t = luigi.contrib.hdfs.target.HdfsTarget('/tmp/test.gz', format=format.Gzip)
f = t.open('w')
# ...
f.close() # needed
.. _Task.priority:
Task priority
~~~~~~~~~~~~~
The scheduler decides which task to run next from
the set of all tasks that have all their dependencies met.
By default, this choice is pretty arbitrary,
which is fine for most workflows and situations.
If you want to have some control on the order of execution of available tasks,
you can set the ``priority`` property of a task,
for example as follows:
.. code:: python
# A static priority value as a class constant:
class MyTask(luigi.Task):
priority = 100
# ...
# A dynamic priority value with a "@property" decorated method:
class OtherTask(luigi.Task):
@property
def priority(self):
if self.date > some_threshold:
return 80
else:
return 40
# ...
Tasks with a higher priority value will be picked before tasks with a lower priority value.
There is no predefined range of priorities,
you can choose whatever (int or float) values you want to use.
The default value is 0.
Warning: task execution order in Luigi is influenced by both dependencies and priorities, but
in Luigi dependencies come first.
For example:
if there is a task A with priority 1000 but still with unmet dependencies and
a task B with priority 1 without any pending dependencies,
task B will be picked first.
.. _Task.namespaces_famlies_and_ids:
Namespaces, families and ids
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In order to avoid name clashes and to be able to have an identifier for tasks,
Luigi introduces the concepts *task_namespace*, *task_family* and
*task_id*. The namespace and family operate on class level meanwhile the task
id only exists on instance level. The concepts are best illustrated using code.
.. code:: python
import luigi
class MyTask(luigi.Task):
my_param = luigi.Parameter()
task_namespace = 'my_namespace'
my_task = MyTask(my_param='hello')
print(my_task) # --> my_namespace.MyTask(my_param=hello)
print(my_task.get_task_namespace()) # --> my_namespace
print(my_task.get_task_family()) # --> my_namespace.MyTask
print(my_task.task_id) # --> my_namespace.MyTask_hello_890907e7ce
print(MyTask.get_task_namespace()) # --> my_namespace
print(MyTask.get_task_family()) # --> my_namespace.MyTask
print(MyTask.task_id) # --> Error!
The full documentation for this machinery exists in the :py:mod:`~luigi.task` module.
Instance caching
~~~~~~~~~~~~~~~~
In addition to the stuff mentioned above,
Luigi also does some metaclass logic so that
if e.g. ``DailyReport(datetime.date(2012, 5, 10))`` is instantiated twice in the code,
it will in fact result in the same object.
See :ref:`Parameter-instance-caching` for more info