Skip to content

Commit

Permalink
feat: Log debug messages when consuming and acknowledging messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmckinney committed Nov 19, 2021
1 parent 1fd06a0 commit 20b33ab
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 30 deletions.
3 changes: 1 addition & 2 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ Changed
~~~~~~~

- Pass a ``state`` object with a ``connection`` attribute to the consumer callback, instead of a ``connection`` object. Mixins can set a ``__safe__`` class attribute to list attributes that can be used safely in the consumer callback. These attributes are added to the ``state`` object.

- Log a debug message on :meth:`pika.clients.Publisher.publish`.
- Log debug messages when publishing, consuming and acknowledging messages.

0.0.1 (2021-11-19)
------------------
Expand Down
34 changes: 29 additions & 5 deletions tests/clients/test_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from yapw.clients import Base, Blocking, Threaded, Transient
from yapw.decorators import requeue
from yapw.methods import ack
from yapw.methods import ack, nack, publish

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,6 +48,12 @@ def raiser(state, channel, method, properties, body):

def warner(state, channel, method, properties, body):
logger.warning("Oh!")
nack(state, channel, method.delivery_tag)


def writer(state, channel, method, properties, body):
publish(state, channel, {"message": "value"}, "r")
ack(state, channel, method.delivery_tag)


def kill(signum):
Expand Down Expand Up @@ -93,7 +99,26 @@ def test_decorator(message, caplog):
]


def test_declare_queue(caplog):
def test_publish(message, caplog):
caplog.set_level(logging.DEBUG)

consumer = get_client()
consumer.connection.call_later(DELAY, functools.partial(kill, signal.SIGTERM))
consumer.consume(writer, "q", decorator=requeue)

assert consumer.channel.is_closed
assert consumer.connection.is_closed

assert len(caplog.records) == 4
assert [(r.levelname, r.message) for r in caplog.records] == [
("DEBUG", "Consuming messages on channel 1 from queue yapw_test_q"),
("DEBUG", "Published message {'message': 'value'} to exchange yapw_test with routing key yapw_test_r"),
("DEBUG", "Ack'd message on channel 1 with delivery tag 1"),
("INFO", "Received SIGTERM, shutting down gracefully"),
]


def test_consume_declares_queue(caplog):
declarer = get_client()
declarer.connection.call_later(DELAY, functools.partial(kill, signal.SIGTERM))
declarer.consume(warner, "q", decorator=requeue)
Expand All @@ -111,6 +136,5 @@ def test_declare_queue(caplog):
assert consumer.channel.is_closed
assert consumer.connection.is_closed

assert len(caplog.records) == 1
assert caplog.records[-1].levelname == "WARNING"
assert caplog.records[-1].message == "Oh!"
assert len(caplog.records) > 1
assert all(r.levelname == "WARNING" and r.message == "Oh!" for r in caplog.records)
10 changes: 5 additions & 5 deletions tests/test_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from yapw.methods import ack, nack, publish

Connection = namedtuple("Connection", "is_open add_callback_threadsafe")
Channel = namedtuple("Channel", "is_open basic_ack basic_nack basic_publish")
Channel = namedtuple("Channel", "channel_number is_open basic_ack basic_nack basic_publish")
State = namedtuple("State", "format_routing_key connection exchange delivery_mode")

ack_nack_parameters = [(ack, "ack", [1]), (nack, "nack", [1])]
Expand All @@ -21,7 +21,7 @@ def format_routing_key(exchange, routing_key):

def test_publish():
connection = create_autospec(Connection, is_open=True)
channel = create_autospec(Channel, is_open=True)
channel = create_autospec(Channel, channel_number=1, is_open=True)
function = functools.partial(format_routing_key, "exch")
state = create_autospec(
State, connection=connection, format_routing_key=function, exchange="exch", delivery_mode=2
Expand All @@ -44,7 +44,7 @@ def test_publish():
@pytest.mark.parametrize("kwargs", [{}, {"multiple": True}])
def test_ack_nack(function, infix, args, kwargs):
connection = create_autospec(Connection, is_open=True)
channel = create_autospec(Channel, is_open=True)
channel = create_autospec(Channel, channel_number=1, is_open=True)
state = create_autospec(State, connection=connection)

function(state, channel, *args, **kwargs)
Expand All @@ -60,7 +60,7 @@ def test_ack_nack(function, infix, args, kwargs):
@pytest.mark.parametrize("function,infix,args", parameters)
def test_channel_closed(function, infix, args, caplog):
connection = create_autospec(Connection, is_open=True)
channel = create_autospec(Channel, is_open=False)
channel = create_autospec(Channel, channel_number=1, is_open=False)
state = create_autospec(State, connection=connection)

function(state, channel, *args)
Expand All @@ -80,7 +80,7 @@ def test_channel_closed(function, infix, args, caplog):
@pytest.mark.parametrize("function,infix,args", parameters)
def test_connection_closed(function, infix, args, caplog):
connection = create_autospec(Connection, is_open=False)
channel = create_autospec(Channel, is_open=True)
channel = create_autospec(Channel, channel_number=1, is_open=True)
state = create_autospec(State, connection=connection)

function(state, channel, *args)
Expand Down
10 changes: 3 additions & 7 deletions yapw/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Client(clients.Threaded, clients.Durable, clients.Blocking, clients.Base):

from yapw.decorators import rescue
from yapw.ossignal import install_signal_handlers, signal_names
from yapw.util import basic_publish_kwargs
from yapw.util import basic_publish_debug_args, basic_publish_kwargs

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -190,12 +190,7 @@ def publish(self, message, routing_key):
keywords = basic_publish_kwargs(self, message, routing_key)

self.channel.basic_publish(**keywords)
logger.debug(
"Published message %r to exchange %s with routing key %s",
message,
keywords["exchange"],
keywords["routing_key"],
)
logger.debug(*basic_publish_debug_args(message, keywords))


class Transient(Publisher):
Expand Down Expand Up @@ -252,6 +247,7 @@ def consume(self, callback, routing_key, decorator=rescue):
on_message_callback = functools.partial(_on_message, args=(state, threads, callback, decorator))
self.channel.basic_consume(formatted, on_message_callback)

logger.debug("Consuming messages on channel %s from queue %s", self.channel.channel_number, formatted)
try:
self.channel.start_consuming()
finally:
Expand Down
21 changes: 10 additions & 11 deletions yapw/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import functools
import logging

from yapw.util import basic_publish_kwargs
from yapw.util import basic_publish_debug_args, basic_publish_kwargs

logger = logging.getLogger(__name__)

Expand All @@ -23,32 +23,31 @@ def publish(state, channel, message, routing_key, *args, **kwargs):
keywords.update(kwargs)

_channel_method_from_thread(state.connection, channel, "publish", *args, **keywords)
logger.debug(
"Published message %r to exchange %s with routing key %s",
message,
keywords["exchange"],
keywords["routing_key"],
)
logger.debug(*basic_publish_debug_args(message, keywords))


def ack(state, channel, *args, **kwargs):
def ack(state, channel, delivery_tag=0, **kwargs):
"""
Acks a message by its delivery tag.
:param state: an object with a ``connection`` attribute
:param channel: the channel from which to call ``basic_ack``
:param int delivery_tag: the delivery tag
"""
_channel_method_from_thread(state.connection, channel, "ack", *args, **kwargs)
_channel_method_from_thread(state.connection, channel, "ack", delivery_tag, **kwargs)
logger.debug("Ack'd message on channel %s with delivery tag %s", channel.channel_number, delivery_tag)


def nack(state, channel, *args, **kwargs):
def nack(state, channel, delivery_tag=0, **kwargs):
"""
Nacks a message by its delivery tag.
:param state: an object with a ``connection`` attribute
:param channel: the channel from which to call ``basic_nack``
:param int delivery_tag: the delivery tag
"""
_channel_method_from_thread(state.connection, channel, "nack", *args, **kwargs)
_channel_method_from_thread(state.connection, channel, "nack", delivery_tag, **kwargs)
logger.debug("Nack'd message on channel %s with delivery tag %s", channel.channel_number, delivery_tag)


def _channel_method_from_thread(connection, channel, method, *args, **kwargs):
Expand Down
4 changes: 4 additions & 0 deletions yapw/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ def basic_publish_kwargs(state, message, routing_key):
properties = pika.BasicProperties(delivery_mode=state.delivery_mode, content_type="application/json")

return {"exchange": state.exchange, "routing_key": formatted, "body": body, "properties": properties}


def basic_publish_debug_args(msg, kwargs):
return "Published message %r to exchange %s with routing key %s", msg, kwargs["exchange"], kwargs["routing_key"]

0 comments on commit 20b33ab

Please sign in to comment.