celery-rmq is a python library to implement Celery and RabbitMQ message broker. it's a standalone library required celery and kombu (python library for rabbitmq). Celery and RabbitMQ producer and consumer made easy with celery-rmq
First of all, install rabbitmq and start.
Create a python app and create an virtual environment too. Activate the virtual environment.
1:: Install it:
pip install celery-rmq
2:: Basic Usage:
** app.py **
from celery_rmq.app import CeleryAppProvider
from celery_rmq.exchange import register_exchange
from celery_rmq.queue import register_queue
from .consumers.testconsumer import BasicTestConsumer
apps = ['tests.testapp']
app_provider = CeleryAppProvider(app_name='test_celery_broker', installed_apps=apps)
app = app_provider.get_app()
# adding exchanges
def register_exchanges():
register_exchange(app_provider, "test_exchange")
def register_queues():
register_queue(app_provider, "test_queue", "test_routing", "test_exchange")
def add_consumers():
app_provider.add_consumer(BasicTestConsumer)
# execution of following two functions
# "register_exchanges()" and "register_queues()"
# needs to be synced
# i.e one after another.
# because queues are depended on exchanges
register_exchanges()
register_queues()
add_consumers()
app.start()
register as many as exchanges, queues and consumers
3:: consumer.py
import kombu
from celery import bootsteps
from celery_rmq.registry import get_queue
class BasicTestConsumer(bootsteps.ConsumerStep):
def handle_message(self, body, message):
print(body)
message.ack()
def get_consumers(self, channel):
queue = get_queue("test_queue", "test_routing")
return [kombu.Consumer(
channel,
queues=[queue],
callbacks=[self.handle_message],
accept=['json']
)]
4:: If you want to add this with any framework, create an app (example: Django apps) and create a tasks.py file under the app:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from tests.app import app_provider
@shared_task
def simple_json_message(message, exchange_name, route_key):
producer = app_provider.get_producer()
producer.publish(message, content_type='application/json', exchange=exchange_name, routing_key=route_key)
and call the simple_json_message
function from the view as per your need.
5:: Run the celery worker:
celery worker -l info -A app
here app is app.py file
for testing purpose, we would like to send messages directly from RabbitMQ web panel. Go the the queue section and publish message.