Celery  
- https://tests4geeks.com/blog/python-celery-rabbitmq-tutorial/#:~:text=Python%20Celery%20%26%20RabbitMQ%20Tutorial%201%20Basic%20Concepts,7%20Conclusion%20...%208%20Coding%20skills%20assessment%20
- https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html

# Introduction

Dans ce chapitre nous allons explorer un modèle d'implementation du modèle producteur - consommateur avec utilisation d'une file d'attentes (queues) pour l'échange de messages.

Pour ce faire nous utiliserons les librairie pika et celery sur python.


Celery est une librairie de gestion des files d'attente de tâches asynchrone. Celery peut être utilisé pour tout ce qui doit être exécuté de manière asynchrone. Par exemple, le calcul en arrière-plan de requêtes coûteuses. 

Mais Celery ne construit pas elle-même les file d'attente de messages, pour ce elle utilise une application tierce appelé 'courtier' ou 'broker' en anglais.  
Le role du courtier est de gérer les fils d'attente et de s'assurer que toutes les informations placées dans la file d'attente par les applications 'producteurs' sont bien enregistrées et ne sont pas perdues en cas de perte de courant, et que toutes ces informations sont bien traitées par les applications 'consommatrices'.

Il existe plusieurs courtiers différents,comme RabbitMQ, Redis ou une base de données. Dans notre cas nous utiliserons RabbitMQ qui est un courtier de messages largement utilisé avec Celery car il est complet, stable et recommandé par Celery.

Pika est une librairie de gestion des files d'attentes. Elle est spécialement conçue pour fonctionner avec RabbitMq.




# Éléments d'une application utilisant Celery  

![image.png](attachment:image.png)

## Courtier
Le courtier (RabbitMQ) est responsable de la création des files d'attente de tâches, de la réception des messages et des taches des processus producteurs, de la répartition des tâches dans les files d'attente de tâches selon certaines règles de routage, puis de la livraison des tâches des files d'attente de tâches aux processus consommateur (travailleurs).

## Consommateur (travailleurs du céleri)
Le consommateur est le ou les processus Celery qui exécutent les tâches et consomment les informations provenant du courtier. L'on peut démarrer de nombreux nœuds de calcul consommateur en fonction des cas d'utilisation.

## Backend (base de données de résultats)
Le Backend est utilisé pour stocker les résultats des tâches. Cependant, ce n'est pas un élément obligatoire. Lorsqu'il est omis, Il devient impossible d'accéder aux résultats de vos tâches.

# Installation de l'environement de travail

### Installer rabitmq
1. installer erlang le langage de programation de rabit mq
https://github.com/erlang/otp/releases/download/OTP-25.3/otp_win64_25.3.exe

2. installer rabitmq
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.13/rabbitmq-server-3.11.13.exe

Start rabitmq service

Les fichiers de rabimq sont instalés dans :
- C:\Users\tm\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\RabbitMQ Server
lancer le fichier
- RabbitMQ Command Prompt (sbin dir)
Ca ouvre une console 
- rabbitmqctl.bat status


### DIAGNOSTICS

attempted to contact: [rabbit@Throne]

rabbit@Throne:
  * connected to epmd (port 4369) on Throne
  * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic
  * TCP connection succeeded but Erlang distribution failed
  * suggestion: check if the Erlang cookie is identical for all server nodes and CLI tools
  * suggestion: check if all server nodes and CLI tools use consistent hostnames when addressing each other
  * suggestion: check if inter-node connections may be configured to use TLS. If so, all nodes and CLI tools must do that
   * suggestion: see the CLI, clustering and networking guides on https://rabbitmq.com/documentation.html to learn more


Current node details:
 * node name: 'rabbitmqcli-419-rabbit@Throne'
 * effective user's home directory: c:/Users/tm
 * Erlang cookie hash: XVdr9niUdtzAPiZh4r7mwQ==


Si le service n'est pas lancé. Executer la commande 
qui est dans le fichier windows RabbitMQ Service - start 

## Configurer RabbitMQ pour le céleri
Avant de pouvoir utiliser RabbitMQ pour Celery, nous devons effectuer certaines configurations pour RabbitMQ. En bref, nous devons créer un hôte virtuel et un utilisateur, puis définir les autorisations de l'utilisateur afin qu'il puisse accéder à l'hôte virtuel.

### add user 'tm' with password 'tm123'
$ rabbitmqctl add_user tm tm123
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.13\sbin>rabbitmqctl add_user tm tm123
Adding user "tm" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.

### add virtual host 'tm_vhost'
$ rabbitmqctl add_vhost tm_vhost
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.13\sbin>rabbitmqctl add virtual host tm_vhost
Adding vhost "tm_vhost" ...

### add user tag 'tm_tag' for user 'tm'
$ rabbitmqctl set_user_tags tm tm_tag
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.13\sbin>rabbitmqctl set_user_tags tm tm_tag
Setting tags for user "tm" to [tm_tag] ...

### set permission for user 'tm' on virtual host 'tm_vhost'
$ rabbitmqctl set_permissions -p tm_vhost tm ".*" ".*" ".*"
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.13\sbin>rabbitmqctl set_permissions -p tm_vhost tm ".*" ".*" ".*"
Setting permissions for user "tm" in vhost "tm_vhost" ...

Il existe trois types d'opérations dans RabbitMQ : configure (configurer), write (écrire) et read (lire).

La chaîne de caractères ".*" ".*" ".*" à la fin de la commande ci-dessus signifie que l'utilisateur "tm" aura toutes les autorisations de configuration, d'écriture et de lecture. Pour trouver plus d'informations sur le contrôle des autorisations dans RabbitMQ, vous pouvez vous référer à http://www.rabbitmq.com/access-control.html

Ensuite vous devez copier le erlang.cookie du repertoire windows vers le fichier .erlang.cookie de votre fichier home.

# Installation de Pika

In [None]:
pip install pika

## Modèle producteur - consommateur

Nous allons montrer un exemple en utilisant la librairie pika qui nous permet de nous connecter a un courtier RabittMq et nous allons simuler le modèle suivant:

- 1 producteur produit plusieurs articles sur 3 sujets différents
- chaque article est placé dans une queue
- toutes ces queues sont placés dans un journal (exchange)
- 1 consommateur se connecte au journal, choisit son sujet et peut lire les articles au fur et a mesures qu'ils sont publiés

![image.png](attachment:image.png)

https://blog.eleven-labs.com/fr/rabbitmq-partie-1-les-bases/

![image.png](attachment:image.png)

https://medium.com/analytics-vidhya/how-to-use-rabbitmq-with-python-e0ccfe7fa959

1. Installer newspaper3k
    ```
    pip install newspaper3k
    ```
2. Installer html clean
    ```
    pip install --upgrade lxml_html_clean
    ```

In [None]:
import nltk
nltk.download('punkt')

In [6]:
import json
from newspaper import Article

url = 'https://edition.cnn.com/2023/04/27/asia/us-south-korea-nuclear-armed-submarine-explainer-intl-hnk-ml/index.html'
#url = 'http://rfi.fr'
article = Article(url, language='en')
article.download()
article.parse()
#print(article.title)

#print(article.text)

article.nlp()
article.summary

'Seoul, South Korea CNN —When the presidents of the United States and South Korea this week announced a landmark deal to deter North Korean aggression, one element of the pact stood out.\nHere’s what you need to know about the submarine and why it’s heading to South Korea.\nAn unarmed Trident II D5 missile is test-launched from the Ohio-class ballistic missile submarine USS Nebraska off the coast of California in 2018.\nOne arriving in South Korea on a port visit – which must be arranged 24 to 48 hours in advance – would be far more visible, giving North Korea an advantage, Schuster said.\nKim’s threats have prompted some in South Korea to call for Seoul to become a nuclear-armed power itself.'

In [7]:
#producteur 
import pika
from time import sleep
import newspaper
#Paramètre de connection host, port, username, password, exchange etc
credentials = pika.PlainCredentials('guest','guest')
connection= pika.BlockingConnection(pika.ConnectionParameters(host='localhost',virtual_host='/',credentials= credentials))

# Établissement d'un canal de connection basé sur la connection 
channel= connection.channel()

# Connection a l'exchange appelé test, qui est durable et qui a le type topic
channel.exchange_declare('test', durable=True, exchange_type='topic')

# Création d'une queue appelée A
channel.queue_declare(queue= 'POLITICO')
# Attache de la queue A à l'échange test, avec l'utlisation de la clef de routage A
channel.queue_bind(exchange='test', queue='POLITICO', routing_key='A')

# Création d'une queue appelée B
channel.queue_declare(queue= 'CNN')
# Attache de la queue B à l'échange test, avec l'utlisation de la clef de routage B
channel.queue_bind(exchange='test', queue='CNN', routing_key='B')

# Création d'une queue appelée RFI
channel.queue_declare(queue= 'RFI')
# Attache de la queue C à l'échange test, avec l'utlisation de la clef de routage C
channel.queue_bind(exchange='test', queue='RFI', routing_key='C')

#-------------------
while True:
    try:
        cnn_paper = newspaper.build('http://cnn.com')
    except Exception:
                print(Exception)
    else:  #s'il n y a pas d'erreur
        i= 0
        for article in cnn_paper.articles:
            if i < 20 : # les 20 premiers articles
                print(article.url)
                channel.basic_publish(exchange='test', routing_key='B', body= article.url)
                #sleep(4)
                i = i+1
    finally:
        pass
    #--------------------
    try:
        rfi_paper = newspaper.build('http://rfi.fr')
    except Exception:
                print(Exception)
    else:  #s'il n y a pas d'erreur
        i= 0
        for article in rfi_paper.articles:
            if i < 20 :
                print(article.url)
                # Envoyer un message a l'échange test, le router sur la clef C (queue rfi)
                channel.basic_publish(exchange='test', routing_key='C', body= article.url)
                #sleep(4)
                i = i+1
    finally:
        pass
    #-----------
    try:        
        politico_paper = newspaper.build('http://politico.cd')
    except Exception:
                print(Exception)
    else:  #s'il n y a pas d'erreur
        i= 0
        for article in politico_paper.articles:
            if i < 20 :
                print(article.url)
                # Envoyer un message a l'échange test, le router sur la clef A (queue politico)
                channel.basic_publish(exchange='test', routing_key='A', body= article.url)
                #sleep(2)
                i = i+1
    finally:
        pass

# Fermer le canal
channel.close()


https://edition.cnn.com/business/media
https://edition.cnn.com/travel/news
http://cnn.com/2024/04/16/economy/india-election-modi-economic-performance-intl-hnk-dg/index.html
http://cnn.com/2024/04/16/politics/takeaways-trump-hush-money-trial-day-2/index.html
http://cnn.com/2024/04/16/weather/dubai-rain-flooding-climate/index.html
http://cnn.com/2024/04/16/travel/halo-space-near-space-balloon-trip/index.html
http://cnn.com/2024/04/17/sport/paris-olympics-virtual-medal-table-spt-intl/index.html
http://cnn.com/2024/04/17/entertainment/mccartney-lennon-sons-single-gbr-scli-intl/index.html
http://cnn.com/2024/04/16/sport/surfing-kelly-slater-hints-retirement-intl-hnk/index.html
http://cnn.com/2024/04/16/politics/donald-trump-trial-narrative-analysis/index.html
http://cnn.com/2024/04/17/opinions/trump-trial-problem-brown/index.html
http://cnn.com/2024/04/16/politics/trump-bodega-alvin-bragg/index.html
http://cnn.com/middleeast/live-news/israel-hamas-war-gaza-news-04-17-24/index.html
http://cn

KeyboardInterrupt: 

In [10]:
#consommateur
import pika
#Paramètre de connection host, port, username, password, exchange etc
credentials = pika.PlainCredentials('guest','guest')
connection= pika.BlockingConnection(pika.ConnectionParameters(host='localhost',virtual_host='/',credentials= credentials))
channel = connection.channel()
# Connection a l'exchange appelé test, qui est durable et qui a le type topic
channel.exchange_declare('test', durable=True, exchange_type='topic')

#fonction de callback qui vas répondre chaque fois qu'il ya un message provenant de la queur
def callbackFunctionForQueueA(ch,method,properties,body):
 print('Message reçu de la Queue RFI: ', body)

def callbackFunctionForQueueB(ch,method,properties,body):
 print('Message reçu de la Queue CNN: ', body)
def callbackFunctionForQueueC(ch,method,properties,body):
 print('Message reçu de la Queue Queue POLITICO: ', body)

#Attacher la fonction de callback consomateur a sa queue respective 
channel.basic_consume(queue='POLITICO', on_message_callback=callbackFunctionForQueueA, auto_ack=True)
channel.basic_consume(queue='CNN', on_message_callback=callbackFunctionForQueueB, auto_ack=True)
channel.basic_consume(queue='RFI', on_message_callback=callbackFunctionForQueueC, auto_ack=True)

#Lancer le processus de consomation
channel.start_consuming()

Message reçu de la Queue RFI:  b'https://www.politico.cd/grand-angle/2017/03/21/manifeste-de-politico-cd.html/12895/'
Message reçu de la Queue RFI:  b'https://www.politico.cd/actualite/encontinu/2024/04/17/beni-le-parlement-denfants-deplore-lutilisation-des-ecoliers-dans-des-manifestations.html/161316/'
Message reçu de la Queue RFI:  b'https://www.politico.cd/actualite/2024/04/17/rdc-remise-des-vehicules-refrigeres-pour-faciliter-le-transport-des-vaccins-dans-8-provinces.html/161312/'
Message reçu de la Queue RFI:  b'https://www.politico.cd/actualite/2024/04/17/rdc-remise-des-vehicules-refrigeres-pour-faciliter-le-transport-des-vaccins-dans-8-provinces.html/161312/#respond'
Message reçu de la Queue RFI:  b'https://www.politico.cd/actualite/2024/04/17/le-slovene-mitja-leskovar-nomme-nonce-apostolique-en-rdc.html/161283/'
Message reçu de la Queue RFI:  b'https://www.politico.cd/actualite/2024/04/17/le-slovene-mitja-leskovar-nomme-nonce-apostolique-en-rdc.html/161283/#respond'
Message reç

KeyboardInterrupt: 

# Installer celery

- Ouvrir un terminal
- activer l'environement
- conda install celery

In [None]:
!pip install celery

# File d'attente de tâches 
## Utilisation de Celery
https://docs.celeryq.dev/en/stable/getting-started/introduction.html#id2


Les files d'attente de tâches sont utilisées comme mécanisme pour répartir le travail entre les threads ou les machines.

L'entrée d'une file d'attente de tâches est une unité de travail appelée tâche. Des processus de travail dédiés surveillent en permanence les files d'attente de tâches à la recherche de nouveaux travaux à effectuer.

Le céleri communique via des messages, utilisant généralement un courtier pour assurer la médiation entre les clients et les travailleurs. Pour lancer une tâche, le client ajoute un message à la file d'attente, le courtier transmet ensuite ce message à un agent.

Un système Celery peut être composé de plusieurs travailleurs et courtiers (broker), laissant place à une haute disponibilité et à une mise à l'échelle horizontale.

Celery est écrit en Python, mais le protocole peut être implémenté dans n'importe quel langage. En plus de Python, il y a node-celery et node-celery-ts pour Node.js, et un client PHP .

La première chose dont vous avez besoin est une instance Celery. Nous appelons cela l' application Celery ou simplement l'application en abrégé. Comme cette instance est utilisée comme point d'entrée pour tout ce que vous voulez faire dans Celery, comme la création de tâches et la gestion des travailleurs, il doit être possible pour d'autres modules de l'importer.



Project structure

![image.png](attachment:image.png)

### celery.py

In [None]:
from __future__ import absolute_import
from celery import Celery

app = Celery('test_celery',
             broker='amqp://tm:tm123@192.168.241.74/tm_vhost',
             backend='rpc://',
             include=['test_celery.tasks'])


Ici, nous initialisons une instance de Celery appelée app, qui est utilisée plus tard pour créer une tâche.

Le premier argument de Celery est simplement le nom du package de projet, qui est "test_celery".

L'argument broker spécifie l'URL du courtier, qui devrait être le RabbitMQ que nous avons démarré plus tôt. Bien verifier l'adresse IP de rabiitMQ.
Notez que le format de l'URL du courtier doit être : transport://userid:password@hostname:port/virtual_host
Pour RabbitMQ, le transportestamqp.

L'argument backend spécifie une URL principale. Un backend dans Celery est utilisé pour stocker les résultats de la tâche. Donc, si vous avez besoin d'accéder aux résultats de votre tâche lorsqu'elle est terminée, vous devez définir un backend pour Celery.
rpc signifie renvoyer les résultats sous forme de messages AMQP, ce qui est un format acceptable pour notre démo. Plus de choix pour les formats de message peuvent être trouvés dans la documentation de RabbitMQ.

L'argument include spécifie une liste de modules que vous souhaitez importer lorsque le travailleur Celery démarre. Nous ajoutons ici le module tâches afin que le travailleur puisse trouver notre tâche.

### tasks.py


In [None]:
from __future__ import absolute_import
from test_celery.celery import app
import time


@app.task
def longtime_add(x, y):
    print('long time task begins')
    # sleep 5 seconds
    time.sleep(5)
    print('long time task finished')
    return x + y

Vous pouvez voir que nous importons le app défini dans le module précédent celery et l'utilisons comme décorateur pour notre méthode de tâche. Notez que ce app.taskn'est qu'un décorateur. De plus, nous dormons 5 secondes dans notre longtime_addtâche pour simuler une tâche chronophage :)

### run_tasks.py



In [None]:
from .tasks import longtime_add
import time

if __name__ == '__main__':
    result = longtime_add.delay(1,2)
    # at this time, our task is not finished, so it will return False
    print ('Task finished? ', result.ready())
    print ('Task result: ', result.result)
    # sleep 10 seconds to ensure the task has been finished
    time.sleep(10)
    # now the task should be finished and ready method will return True
    print ('Task finished? ', result.ready())
    print ('Task result: ', result.result)

Ici, nous appelons la tâche longtime_add à l'aide de la méthode delay , qui est nécessaire si nous voulons traiter la tâche de manière asynchrone. De plus, nous conservons les résultats de la tâche et imprimons certaines informations. La méthode ready renverra True si la tâche est terminée, sinon False. L'attribut result est le résultat de la tâche (« 3 » dans notre cas). Si la tâche n'est pas terminée, elle renvoie None .

### Démarrer le travailleur du céleri

Maintenant, nous pouvons démarrer Celery worker en utilisant la commande ci-dessous (exécutée dans le dossier parent de notre dossier de projet test_celery ):


$ celery -A test_celery worker --loglevel=info

Vous verrez quelque chose comme ceci si Celery se connecte avec succès à RabbitMQ :

(pp) tm@Throne:/mnt/c/upc_pp/Celery$ celery -A test_celery worker --loglevel=info
 
 -------------- celery@Throne v5.2.7 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-5.15.90.1-microsoft-standard-WSL2-x86_64-with-glibc2.35 2023-04-20 07:02:46
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         test_celery:0x7f74bd6c04d0
- ** ---------- .> transport:   amqp://tm:**@192.168.135.74:5672/tm_vhost
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . test_celery.tasks.longtime_add

[2023-04-20 07:02:47,109: INFO/MainProcess] Connected to amqp://tm:**@192.168.135.74:5672/tm_vhost
[2023-04-20 07:02:47,115: INFO/MainProcess] mingle: searching for neighbors
[2023-04-20 07:02:48,149: INFO/MainProcess] mingle: all alone
[2023-04-20 07:02:48,182: INFO/MainProcess] celery@Throne ready.
^X^C
worker: Hitting Ctrl+C again will terminate all running tasks!

### Exécuter des tâches

Dans une autre console, saisissez ce qui suit (exécuté dans le dossier parent de notre dossier de projet test_celery ):


$ python -m test_celery.run_tasks

Maintenant, si vous regardez la console Celery, vous verrez que notre travailleur a reçu la tâche :

(pp) tm@Throne:/mnt/c/upc_pp/Celery$ python -m test_celery.run_tasks
Task finished?  False
Task result:  None
Task finished?  False
Task result:  None

Comme vous pouvez le voir, lorsque notre travailleur Celery a reçu une tâche, il a imprimé le nom de la tâche avec un identifiant de tâche (entre parenthèses) :
Tâche reçue : test_celery.tasks.longtime_add[7d942984-8ea6-4e4d-8097-225616f797d5]

En dessous de cette ligne se trouvent deux lignes qui ont été imprimées par notre tâche longtime_add, avec un délai de 5 secondes :
tâche longue durée commence
tâche longue durée terminée

La dernière ligne montre que notre tâche s'est terminée en 5 secondes environ et le résultat de la tâche est 3 : La
tâche test_celery.tasks.longtime_add[7d942984-8ea6-4e4d-8097-225616f797d5] a réussi en 5.025242167s : 3

Dans la console actuelle, vous verrez le résultat suivant :

$ python -m test_celery.run_tasks
Task finished? False
Task result: None
Task finished? False
Task result: None

C'est le comportement attendu. Au début, notre tâche n'était pas prête et le résultat était Aucun. Après 10 secondes, notre tâche est terminée et le résultat est 3.

### Surveillez le céleri en temps réel
Flower est un moniteur Web en temps réel pour Celery. En utilisant Flower, vous pouvez facilement surveiller la progression et l'historique de vos tâches.

Nous pouvons utiliser pip pour installer Flower :

In [None]:
pip install flower

Pour démarrer la console Web Flower, nous devons exécuter la commande suivante (exécutée dans le dossier parent de notre dossier de projet test_celery ):


$ celery -A test_celery flower