# Celery
#### _Un sistema distribuito semplice, flessibile e affidabile per elaborare grandi quantità di operazioni_

## Introduzione
Le code di attività ([**task**](https://docs.celeryproject.org/en/stable/userguide/tasks.html) queue) vengono utilizzate come meccanismo per distribuire il lavoro tra [(**workers**)](https://docs.celeryproject.org/en/stable/userguide/workers.html#guide-workers) thread o macchine.

[Project Link](https://docs.celeryproject.org/en/stable/getting-started/index.html) - [Celery GitHub Link](https://github.com/celery/celery)

Celery comunica tramite messaggi, di solito utilizzando un broker per mediare tra client e workers. Per avviare un'attività, il client aggiunge un messaggio alla coda, il broker consegna quindi quel messaggio a un worker.

Un sistema Celery può essere costituito da più worker (CPU) e broker, lasciando il posto a un'elevata disponibilità e al ridimensionamento orizzontale.

Supporta:
* Brokers: RabbitMQ, Redis, Amazon SQS, ...
* Concurrency: prefork, thread, …
* Result Stores: Redis, AMQP, Memcached, SQLAlchemy, MongoDB, …
* Serialization: pickle, json, …

Ulteriori caratteristiche:
* Monitoring
* [Work-flows](https://docs.celeryproject.org/en/stable/userguide/canvas.html#guide-canvas): grouping, chaining, chunking 
* [Time & Rate Limits](https://docs.celeryproject.org/en/stable/userguide/workers.html#worker-time-limits): È possibile controllare quante attività possono essere eseguite al secondo / minuto / ora o per quanto tempo può essere consentita l'esecuzione di un'attività e questo può essere impostato come predefinito, per un lavoratore specifico o individualmente per ogni tipo di attività.
* [Scheduling](https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#guide-beat): È possibile specificare il tempo per eseguire un'attività in secondi o un datetime oppure è possibile utilizzare attività periodiche per eventi ricorrenti basati su un intervallo semplice o espressioni Crontab che supportano minuto, ora, giorno della settimana, giorno del mese e mese dell'anno.
* Resource Leak Protection (Protezione dalla perdita di risorse)
* User Components: Ogni componente worker può essere personalizzato

## Installazione di Celery e avvio del Broker

```pip install -U Celery```

oppure

```pip install flower``` (flower è uno strumento di monitoring)

```pip install "celery[redis]"```

In questo caso lo installiamo unitamente all'estensione con Redis.

Redis è il nostro broker di messaggi, per completezza di esposizione si prefersce usare la sua versione in Docker.

```sudo docker pull redis```

```docker run -d -p 6379:6379 redis```

```sudo ip addr show docker0```

*Quest'ultima istruzione serve per conoscere l'indirizzo verso cui puntare da usare nello script.*

In [1]:
from celery import Celery

##### Configurazione iniziale

In [2]:
app = Celery(main='tasks', backend='redis://172.17.0.1/', broker='redis://172.17.0.1/')

In [10]:
app.AsyncResult('b2caf158-1f6f-4be2-bf44-5787ae12e37e').status

'SUCCESS'

##### Il nostro task (o attività)

In [None]:
@app.task
def add(x, y):
    return x + y

# oppure

@app.task(bind=True)
def add(self, x, y):
    logger.info(self.request.id)
    return x + y

##### La semplice chiamata asincrona

In [None]:
r = add.apply_async([1,1], countdown=10)
r.id

Il parametro "countdown" è opzionale. Indica quanti secondi devono trascorrere prima che il task venga eseguito.

##### Alcuni parametri del task

* **name**

In [None]:
@app.task(name='task.add')
def add(x, y):
    return x + y

* **time_limit** (in mancanza usa il valore definito del worker)

In [None]:
@app.task(time_limit=3600)
def add(x, y):
    return x + y

* **max_retries** è il numero di volte per le quali sarà ripetuto il task in caso di errore (default = 3). Con "None" ripeterà all'infinito

* Mentre **default_retry_delay** è il tempo (in secondi) tra ogni tentativo (default 3 minuti).

In [None]:
@app.task(max_retries=3)
def add(x, y):
    return x + y

### [Work-flows](https://docs.celeryproject.org/en/stable/userguide/canvas.html)

##### Task raggruppati

In [None]:
from celery import group

res = group(add.s(i, i) for i in range(10))()
res.get()

##### Task in sequenza (stile Map and Reduce)

In [None]:
from celery import chord

@app.task
def xsum(numbers):
    return sum(numbers)

res = chord((add.s(i, i) for i in range(10)), xsum.s()).apply_async()
res.get()

### Completamente compatibile con Flask

In [None]:
"""
nome file: myapp.py
"""

appFlask = Flask(__name__)

client = Celery(appFlask.name, ............)



@client.task(time_limit=3600)
def add(x, y):
    return x + y

@app.route('/somma', methods=['GET','POST'])
def somma():
    
    """
    codice
    """
    
    add.apply_async([args])
    
    return "Ok"
    



if __name__ == '__main__':
    app.run(debug=True)

### Lanciare lo script e il sistema di monitoring (flower)

```celery -A myapp worker -E --loglevel=INFO```

```celery flower --broker=redis://172.17.0.1:6379/0```