Celery
==

Celery es un sistema para procesar grandes cantidades de mensajes, de una manera distribuida y confiable

se centra en el procesamiento en tiempo real y al mismo tiempo que admite la programación de tareas.

```python
from celery import Celery

app = Celery(
    'tasks', 
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

@app.task
def add(x, y):
    return x + y

```

In [2]:
from tasks import add, mul_list
from datetime import datetime, timedelta
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [2]:
result = add.delay(1, 2)

```
[2021-09-16 17:36:11,621: INFO/MainProcess] Task tasks.add[45e9947e-d907-473e-a8f9-aaa2dc03b5b6] received
```

In [3]:
result.get()

3

```
[2021-09-16 17:36:11,625: INFO/ForkPoolWorker-7] Task tasks.add[45e9947e-d907-473e-a8f9-aaa2dc03b5b6] succeeded in 0.0019244389841333032s: 3
```

## Llamar a una task

**apply_async**
```python
add.apply_async((1, 2))
```
**delay**
```python
add.delay(1, 2)
```

**Call**
```python
add(1, 2)
```

### Call

Es el llamado normal del codigo, no invocara a  ningun worker

In [4]:
add(1, 2)

3

### delay

es un shortcut para enviar tareas a los brokers, pero no admite parametros mas alla de los establecidos la tarea

In [5]:
result = add.delay(2, 2)

In [6]:
result.get()

4

### apply_async

metodo para enviar tareas al broker

In [7]:
result = add.apply_async((2,3))
result.get()

5

apply_async nos ofrece un mayor control sobre la ejecuccion de las tareas

In [8]:
datetime.now()
result = add.apply_async((2,3), countdown=10)
result.get()
datetime.now()

datetime.datetime(2021, 9, 16, 18, 42, 46, 848799)

5

datetime.datetime(2021, 9, 16, 18, 42, 56, 912566)

In [9]:
datetime.now()
eta = datetime.utcnow() + timedelta(seconds=5)
print(eta)
result = add.apply_async((2,6), eta=eta)
result.get()
datetime.now()

datetime.datetime(2021, 9, 16, 18, 42, 56, 922407)

2021-09-16 23:43:01.925259


8

datetime.datetime(2021, 9, 16, 18, 43, 2, 48115)

### Signature

es un un clase que envuelve los parametros y opciones de ejecucion de una tarea 

In [10]:
signature = add.signature((2,3), countdown=4)
result = signature.delay()
result.get()

5

In [11]:
signature = add.s(2,3)
result = signature.delay()
result.get()

5

In [12]:
signature = add.s(2,3)
result = signature.apply_async(countdown=5)
result.get()

5

In [8]:
signature = add.s(2,3)
signature.link(add.s(10))
signature.link(add.s(20))
result = signature.delay()
result.get()
result.children[0].get()

tasks.add(10)

tasks.add(20)

5

[15, 25]

In [14]:
signature = add.signature((2, ))
result = signature.delay(4)
result.get()

6

### Chain

In [15]:
from celery import chain

In [16]:
res = chain(add.s(2, 2), add.s(4), add.signature((9, ), countdown=8))()
res.get()

17

### Group

In [17]:
from celery import group


In [18]:
res = group(add.s(i, i) for i in range(10))()
res.get()

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

### Chords

In [19]:
from celery import chord

In [26]:
chord(group([add.s(0, i) for i in range(1, 10)]))(mul_list.s()).get()

362880

### Map

In [40]:
result = mul_list.map([list(range(1, 10)), list(range(1, 20))]).delay()
result.get()

[362880, 121645100408832000]

In [16]:
from tasks import create_task, get_task_info

In [15]:
result = create_task.delay()
result.get()

'http://localhost:5000/status/9857b0ea-7256-4c49-890c-4f7e635315d0/'

In [20]:
func = create_task.s()
func.link(get_task_info.s())
result = func.delay()
result.get()
result.children[0].get()

tasks.get_task_info()

'http://localhost:5000/status/2f103101-59f7-4545-bd96-68a30236e6f6/'

{'current': 100,
 'result': 42,
 'state': 'SUCCESS',
 'status': 'Task completed!',
 'total': 100}