puka
The puka
module implements a client for AMQP 0-9-1 protocol. It's tuned to work with RabbitMQ broker, but should work fine with other message brokers that support this protocol.
AMQP protocol defines a model which consists of Exchanges, Bindings and Queues:
Exchange ---binding---> Queue
For more information see:
- RabbitMQ tutorials.
- AMQP 0-9-1 Specification and Definitions
The puka
module defines a single class:
Consturctor for Client
class. amqp_url is an url-like address of the RabbitMQ server. Default points to amqp://guest:guest@localhost:5672/. pubacks tells if the client should take advantage of the publiser-acks feature on the server -autodetect by default. client_properties is a dict of properties to add to the connection properties sent to the server.
The puka
module exposes the following exceptions, as defined by AMQP protocol.
NoRoute
NoConsumers
AccessRefused
NotFound
ResourceLocked
PreconditionFailed
Additionally puka
defines a connection error:
ConnectionBroken
Client object (Client
) provides the public methods described below. They can be grouped in three distinct subcategories.
The following methods are responsible for networking and socket handling.
Client.fileno()
Return a socket's file descriptor (a small integer). This is useful with select.select
.
Client.socket()
Return a socket.socket
object.
Client.on_read()
Inform the Client
object that the socket is now in readable state.
Client.on_write()
Inform the Client
object that the socket is now in writable state.
Client.needs_write()
Return true if the send buffer is full, and needs to be written to the socket.
Client.run_any_callbacks()
Run any outstanding user callbacks for any of the promises.
Client.wait(promise, timeout=None, raise_errors=True)
Wait up to timeout seconds for an event on given promise. If the event was received before timeout, run the callback for the promise and return AMQP response.
Client.loop(timeout=None)
Enter an event loop, keep on handling network and executing user callbacks for up to timeout seconds.
Client.loop_break()
Cause the event loop to break on next iteration.
Functions below return a promise. It's a small number, that identifies an asynchronous request. You can wait for a promise to be done and receive a response for it.
Client.connect()
Establishes an asynchronous connection with the server. You're forbidden to do any other action before this step is finished.
Client.close()
Immediately closes the connection. All buffered data will be lost. All outstanding promises will be closed with an error.
Client.exchange_declare(exchange, type='direct', durable=False, auto_delete=False, arguments={})
Client.exchange_delete(exchange, if_unused=False)
Client.exchange_bind(destination, source, routing_key="", arguments={})
For details see the documentation of exchange bindings RabbitMQ feature.
Client.exchange_unbind(destination, source, routing_key="", arguments={})
Client.queue_declare(queue="", durable=False, exclusive=False, auto_delete=False, arguments={})
Client.queue_delete(queue, if_unused=False, if_empty=False)
Client.queue_purge(queue)
Client.queue_bind(queue, exchange, routing_key="", arguments={})
Client.queue_unbind(queue, exchange, routing_key="", arguments={})
Client.basic_publish(exchange, routing_key, mandatory=False, immediate=False, headers={}, body="")
Client.basic_get(queue, no_ack=False)
Client.basic_consume(queue, prefetch_count=0, no_local=False, no_ack=False, exclusive=False, arguments={})
Return a consume_promise.
Client.basic_consume_multi(queues, prefetch_count=0, no_ack=False)
Return a consume_promise.
Client.basic_qos(consume_promise, prefetch_count=0)
Given a consume_promise returned by basic_consume
or basic_consume_multi
changes the prefetch_count for that consumes.
Client.basic_cancel(consume_promise)
Given a consume_promise returned by basic_consume
or basic_consume_multi
cancels the consume. You can wait on the returned promise.
Client.basic_ack(msg_result)
Given a result of basic_consume or basic_consume_multi promise (ie: a message) acknowledges it. It's an asynchronous method.
Client.basic_reject(msg_result)
Given a result of basic_consume or basic_consume_multi promise (ie: a message) rejects it. It's an asynchronous method.
Synchronously send a message:
import puka
client = puka.Client("amqp://localhost/")
promise = client.connect()
client.wait(promise)
promise = client.queue_declare(queue='test')
client.wait(promise)
promise = client.basic_publish(exchange='', routing_key='test',
body="Hello world!")
client.wait(promise)
promise = client.close()
client.wait(promise)
Synchronously receive three messages:
import puka
client = puka.Client("amqp://localhost/")
promise = client.connect()
client.wait(promise)
promise = client.queue_declare(queue='test')
client.wait(promise)
consume_promise = client.basic_consume(queue='test')
for i in range(3):
result = client.wait(consume_promise)
print " [x] Received message %r" % (result,)
client.basic_ack(result)
promise = client.basic_cancel(consume_promise)
client.wait(promise)
promise = client.close()
client.wait(promise)
Asynchronously send a message:
import puka
def on_connection(promise, result):
client.queue_declare(queue='test', callback=on_queue_declare)
def on_queue_declare(promise, result):
client.basic_publish(exchange='', routing_key='test',
body="Hello world!",
callback=on_basic_publish)
def on_basic_publish(promise, result):
print " [*] Message sent"
client.loop_break()
client = puka.Client("amqp://localhost/")
client.connect(callback=on_connection)
client.loop()
promise = client.close()
client.wait(promise)