# Canvas: Designing Work-flows

## Signatures

You can create a signature for the add task using its name like this

In [3]:
from celery import signature
from tasks import add, hello, multi

In [4]:
signature('tasks.add', args=(2, 2), countdown=10)

tasks.add(2, 2)

In [5]:
add.signature((2, 2), countdown=10)

tasks.add(2, 2)

In [6]:
add.s(2, 2)

tasks.add(2, 2)

In [8]:
add.s(2, 2, debug=True)

tasks.add(2, 2, debug=True)

In [10]:
s = add.signature((2, 2), {'debug': True}, countdown=10)
s.args


(2, 2)

In [11]:
s.kwargs

{'debug': True}

In [12]:
s.options

{'countdown': 10}

### Calling API

In [14]:
add(2, 2)

4

In [16]:
add.s(2, 2)()

4

In [19]:
result = add.delay(2, 2)
result.get()

4

In [21]:
add.s(2, 2).set(countdown=1)

tasks.add(2, 2)

## Partials

In [23]:
add.s(2, 2).delay()
add.s(2, 2).apply_async(countdown=1)

<AsyncResult: 6d56e863-1e38-4c67-b211-99ab276b0dd5>

Signature을 사용하면 작업자에서 작업을 실행할 수 있습니다.

In [24]:
add.s(2, 2)()

4

또는 현재 프로세스에서 직접 호출 할 수 있습니다.

In [27]:
partial = add.s(2)          # incomplete signature
partial.delay(4)            # 4 + 2
partial.apply_async((4,))  # same

<AsyncResult: d4ca7411-69ab-4559-aa4d-8695383ba9f5>

추가 된 옵션은 서명의 옵션과 병합되며 새 옵션이 우선 적용됩니다.

In [29]:
s = add.signature((2, 2), countdown=10)
s.apply_async(countdown=1)  # countdown is now 1

<AsyncResult: 74b5a6db-2b85-463a-966d-b83ab1e3a2f9>

Clone Signature

In [32]:
s = add.s(2)
s.clone(args=(4,), kwargs={'debug': True})

tasks.add(4, 2, debug=True)

## Immutability

Partials 은 콜백, 연결된 모든 작업과 함께 사용하기 위한것이거나 코드 콜백이 부모 작업의 결과와 함께 적용됩니다.
때로는 추가 인수를 취하지 않는 콜백을 지정하기를 원합니다.이 경우 시그니처를 불변으로 설정할 수 있습니다.

In [36]:
add.apply_async((2, 2), link=hello.signature(immutable=True))

<AsyncResult: a9638092-4015-4182-9d0c-9fd584ec91ed>

**.si()**  shortcut 
immutable signatures:

In [38]:
add.apply_async((2, 2), link=hello.si())

<AsyncResult: 4a5c16f4-7997-412c-ac06-4e680e6ed8fb>

### Callbacks
apply_async에 link 인수를 사용하여 모든 작업에 콜백을 추가 할 수 있습니다.

In [39]:
add.apply_async((2, 2), link=hello.s())

<AsyncResult: 8d8772cc-f239-4102-a601-b4906e3f39ce>

콜백은 작업이 성공적으로 종료 된 경우에만 적용되며 부모 작업의 반환 값이 인수로 적용됩니다.
앞서 언급했듯이 Signature에 추가하는 인수는 Signature 자체에 지정된 인수 앞에 추가됩니다!

In [41]:
sig = add.s(10)

In [43]:
result = add.apply_async((2, 2), link=add.s(8))
result.get()

4


 first : 2 + 2

 second : 4 + 8

## The Primitives

* group
그룹 프리미티브는 병렬로 적용해야하는 작업 목록을 취하는 서명입니다.

* chain
체인 프리미티브는 서명을 서로 연결하여 하나가 다른 하나 뒤에 호출되도록하며 본질적으로 콜백 체인을 형성합니다.

* chord
chord는 그룹과 비슷하지만 콜백이 있습니다. chord는 헤더 그룹과 본문으로 구성되며, 본문은 헤더의 모든 작업이 완료된 후에 실행해야하는 작업입니다.

* map
맵 기본값은 내장 맵 함수와 같이 작동하지만 인수 목록이 작업에 적용되는 임시 작업을 만듭니다.
task.map ([1, 2]) - 단일 작업이 호출되어 결과가 다음과 같이 작업 함수에 인수를 적용합니다.
```
res = [task(1), task(2)]
```

* starmap
인수가 args로 적용되는 것을 제외하고는 맵과 정확히 비슷합니다. 예를 들어 add.starmap ([(2, 2)), (4, 4)])는 단일 작업 호출을 발생시킵니다.
```
res = [add(2, 2), add(4, 4)]
```

* chunks
Chunking은 긴 인수 목록을 부분으로 나눕니다. 
```
items = zip(xrange(1000), xrange(1000))  # 1000 items
add.chunks(items, 10)
```
항목 목록을 10 개의 청크로 분할하여 100 개의 작업 (각 항목을 순서대로 처리)을 수행합니다.

### Chain
Simple chain

In [46]:
from tasks import add, multi

res = add.apply_async((2, 2), link=multi.s(16))
res.get()
# result in mul(4, 16)

4

In [47]:
res.children
res.children[0].get()

64

결과는 원래 작업에서 호출된 하위 작업을 추적하며 결과 인스턴스에서 액세스할 수 있습니다.

In [48]:
list(res.collect())

[(<AsyncResult: eedd8a29-a7bd-4050-ac69-f7340f426499>, 4),
 (<AsyncResult: a520cae4-8646-41ca-ad6b-4e5f46953d48>, 64)]

결과 인스턴스에는 결과를 그래프로 처리하는 collect() 방법이 있으므로 결과를 반복할 수 있습니다.

In [49]:
s = add.s(2, 2)
s.link(multi.s(4))
s.link(multi.s())

tasks.multi()

In [50]:
# error callbacks 
from tasks import add, log_error
add.s(2, 2).on_error(log_error.s()).delay()
add.apply_async((2, 2), link_error=log_error.s())

<AsyncResult: 071ad126-5e7c-4735-9751-971a3c882fe6>

In [51]:
from celery import chain
from tasks import multi
res = chain(add.s(4, 4), multi.s(8), multi.s(10))()
print(res.get())

640


In [52]:
res.parent.get()

64

In [53]:
res.parent.parent.get()

8

In [54]:
# pipe operator
res = (add.s(2, 2) | multi.s(8) | multi.s(10)).apply_async()
print(res.get())

320


## Graphs

In [55]:
with open('graph.dot', 'w') as fh:
    res.parent.parent.graph.to_dot(fh)

```python
with open('graph.dot', 'w') as fh:
	res.parent.parent.graph.to_dot(fh)
```
generated graph.dot 

## Groups

In [56]:
from celery import group
group(add.s(2, 2), add.s(4, 4))

group((tasks.add(2, 2), add(4, 4)))

In [57]:
g = group(add.s(2, 2), add.s(4, 4))
res = g()
res.get()

[4, 8]

In [60]:
# iterators 
group(add.s(i, i) for i in range(100))()

<GroupResult: 692ead43-f148-4a0a-bda5-700e3b7a348c [1030a8d0-f107-4634-919c-5c57c6e7542c, 3ed2d630-234f-498d-98d0-b6381113aaec, 39678c4e-d0cd-4ee9-a5c7-d4d1dc2cbe1d, 6d0c6c12-b780-4ab9-bd80-c798b6e52e1a, 3a2ea61a-c10f-4739-92de-434c321bca97, 125059ff-cb16-410c-9e85-3e46e5dc5492, 876a23c2-c7d6-4fe2-a055-76946313be66, 0b9b4f17-5b59-461f-8e63-61fc5e1253b4, 24e5c58b-cdd1-489a-b904-6a0df640eef8, d3f2bc9e-7922-42d1-906d-e263dd99bc20, 787e625a-ed8d-4de5-ae59-4709464a1ed5, 1c75957a-b60e-4f63-9a16-637a5dbb798b, 7473197a-721e-4fe9-9d1f-7709edfb2504, 7b23dfde-8f49-48f2-af51-0d86a208e889, adeca925-854c-4876-b3d9-ff17ac085d3f, 019db95c-161c-4469-bb38-5cf7ce1e050c, 5f6902db-b126-43cd-af8e-f29f51393dd5, aa019bcc-821b-40c9-bcfc-67b41a7553f6, 16305904-c53f-4ceb-9c45-1d7ca513b8bc, b56fa7a5-29b7-437c-96d8-38acf44586d4, 071b10e3-96e3-4c5d-b38b-195081cf6c79, 276d9f47-bc45-4f7d-a23c-c6b7e0516a9c, 8662bbaf-6400-4a9f-94a8-9bce0c06399a, ad8c67e5-92b9-4059-aa88-84ee8d777e62, 1de36b33-a833-4b91-a543-a69105a82bd6

## Group Results

In [67]:

job = group([add.s(2, 2), add.s(4, 4), add.s(8, 8), add.s(16, 16), add.s(32, 32),]).apply_async()
job.ready()  # have all subtasks completed?

False

In [68]:
job.successful() # were all subtasks successful?

True

In [76]:
job.get()

[4, 8, 16, 32, 64]

successful()
	Return True if all of the subtasks finished successfully (e.g., didn’t raise an exception).

failed()
	Return True if any of the subtasks failed.

waiting()
	Return True if any of the subtasks isn’t ready yet.

ready()
	Return True if all of the subtasks are ready.

completed_count()
	Return the number of completed subtasks.

revoke()
	Revoke all of the subtasks.

join()
	Gather the results of all subtasks and return them in the same order as they were called (as a list).

## Chords

In [2]:
from celery import chord
from tasks import add,  tsum

In [5]:
chord(add.s(i, i) for i in range(100))(tsum.s()).get()

9900

코드 표현식을 분해 해 봅시다.

In [8]:
sum(i + i for i in range(100))
callback = tsum.s()
header = [add.s(i, i) for i in range(100)]
result = chord(header)(callback)
result.get()

9900

## Error handling

chord 콜백 결과는 실패 상태로 전환되고 오류는 ChordError 예외로 설정됩니다.

In [7]:

from celery import chord, group
from tasks import add, throw, log_result, on_chord_error
c = chord([add.s(4, 4), throw.s(), add.s(8, 8)])

In [8]:
c = (group(add.s(i, i) for i in range(10)) | log_result.s().on_error(on_chord_error.s())).delay()

## Important Notes
chord 내에서 사용되는 작업은 결과를 무시해서는 안됩니다. 실제로 이것은 코드를 사용하려면 **result_backend**를 사용하도록 설정해야 함을 의미합니다. 
또한 task_ignore_result가 구성에서 True로 설정되면 코드 내에서 사용할 개별 작업은 ignore_result=False 로 정의되어야 합니다. 이것은 작업 하위 클래스와 장식된 작업에 모두 적용됩니다.

## Map & Starmap

In [19]:
# using Map
# ~add.map([range(10), range(100)])

# using starmap:
~add.starmap(zip(range(10), range(10)))

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

In [22]:
add.starmap(zip(range(10), range(10))).apply_async().get()

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

## Chunks

In [24]:
res = add.chunks(zip(range(100), range(100)), 10)()
res.get()

[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]