-
Notifications
You must be signed in to change notification settings - Fork 839
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a320aa8
commit 8fc2007
Showing
1 changed file
with
63 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import functools | ||
import logging | ||
import pika | ||
import threading | ||
import time | ||
|
||
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' | ||
'-35s %(lineno) -5d: %(message)s') | ||
LOGGER = logging.getLogger(__name__) | ||
|
||
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) | ||
|
||
def do_work(connection, channel, delivery_tag, body): | ||
thread_id = threading.get_ident() | ||
fmt1 = 'Thread id: {} Delivery tag: {} Message body: {}' | ||
LOGGER.info(fmt1.format(thread_id, delivery_tag, body)) | ||
# Sleeping to simulate 10 seconds of work | ||
time.sleep(10) | ||
if channel.is_open: | ||
fmt2 = 'Thread id: {} Delivery tag: {} sending ack' | ||
LOGGER.info(fmt2.format(thread_id, delivery_tag)) | ||
cb = functools.partial(channel.basic_ack, delivery_tag) | ||
connection.add_callback_threadsafe(cb) | ||
else: | ||
fmt2 = 'Thread id: {} Delivery tag: {} not sending ack, channel closed' | ||
LOGGER.info(fmt2.format(thread_id, delivery_tag)) | ||
|
||
def on_message(channel, method_frame, header_frame, body, args): | ||
(connection, threads) = args | ||
delivery_tag = method_frame.delivery_tag | ||
t = threading.Thread(target=do_work, args=(connection, channel, delivery_tag, body)) | ||
t.start() | ||
threads.append(t) | ||
|
||
credentials = pika.PlainCredentials('guest', 'guest') | ||
# Note: sending a short heartbeat to prove that heartbeats are still | ||
# sent even though the worker simulates long-running work | ||
parameters = pika.ConnectionParameters('localhost', credentials=credentials, heartbeat=5) | ||
connection = pika.BlockingConnection(parameters) | ||
|
||
channel = connection.channel() | ||
channel.exchange_declare(exchange="test_exchange", exchange_type="direct", passive=False, durable=True, auto_delete=False) | ||
channel.queue_declare(queue="standard", auto_delete=True) | ||
channel.queue_bind(queue="standard", exchange="test_exchange", routing_key="standard_key") | ||
# Note: prefetch is set to 1 here as an example only and to keep the number of threads created | ||
# to a reasonable amount. In production you will want to test with different prefetch values | ||
# to find which one provides the best performance and usability for your solution | ||
channel.basic_qos(prefetch_count=1) | ||
|
||
threads = [] | ||
on_message_callback = functools.partial(on_message, args=(connection, threads)) | ||
channel.basic_consume('standard', on_message_callback) | ||
|
||
try: | ||
channel.start_consuming() | ||
except KeyboardInterrupt: | ||
channel.stop_consuming() | ||
|
||
# Wait for all to complete | ||
for thread in threads: | ||
thread.join() | ||
|
||
connection.close() |