from aws_sqs_consumer import Consumer, Message
class SimpleConsumer(Consumer):
def handle_message(self, message: Message):
# Write your logic to handle a single `message`.
print("Received message: ", message.Body)
consumer = SimpleConsumer(
queue_url="https://sqs.eu-west-1.amazonaws.com/12345678901/test_queue",
polling_wait_time_ms=5
)
consumer.start()
consumer.start()
will block the main thread.- Consumer uses SQS long polling by default with configurable wait time between polls (
polling_wait_time_ms
). - By default, messages are processed one by one. The
handle_message
method must be finished for processing the next one. For receiving messages in batches, use thebatch_size
attribute. See all attributes. - Messages are deleted from the queue after
handle_message
is successfully completed. - Raising an exception in the handler function will not delete the message from the queue. Define your behavior for handling exceptions by overriding
handle_processing_exception(message, exception)
method. See Handling exceptions
SQS supports receiving messages in batches. Setting batch_size > 1
will fetch multiple messages in a single call to SQS API. Override handle_message_batch(messages)
method to process the message batch.
Note that only after handle_message_batch
is finished, the next batch of messages is fetched. Maximum supported batch_size
is 10
.
from typing import List
from aws_sqs_consumer import Consumer, Message
class BatchConsumer(Consumer):
def handle_message_batch(self, messages: List[Message]):
print(f"Received {len(messages)} Messages")
for message in messages:
print(f"\t{message.Body}")
consumer = BatchConsumer(
queue_url="https://sqs.eu-west-1.amazonaws.com/12345678901/test_queue",
batch_size=5,
polling_wait_time_ms=5,
)
consumer.start()
from aws_sqs_consumer import Consumer, Message
class SimpleConsumer(Consumer):
def handle_message(self, message: Message):
print(f"Processing message: {message.Body}")
raise Exception("Something went wrong!")
def handle_processing_exception(self, message: Message, exception):
# Define your logic to handle exception
print(f"Exception occured while processing: {exception}")
consumer = SimpleConsumer(
queue_url="https://sqs.eu-west-1.amazonaws.com/12345678901/test_queue",
polling_wait_time_ms=5
)
consumer.start()
- Override
handle_batch_processing_exception(messages: List[Message], exception)
in case ofbatch_size
> 1.
- Short polling - If you set
wait_time_seconds=0
, it is short polling. If you also setpolling_wait_time_ms=0
(which is default), you will be making a lot of (unregulated) HTTP calls to AWS. - Long polling - With
wait_time_seconds > 0
, it is long polling.
For a detailed explanation, refer Amazon SQS short and long polling.
Currently, there is no built-in support for running as a daemon. But, you can use nohup
.
nohup python my_sqs_consumer.py > sqs_consumer.log 2>&1 </dev/null &
Consumer uses boto3
for interacting with SQS. Simplest option is to set the following environment variables:
export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...
If you want to manually configure the credentials, pass custom boto3.Client
object to Consumer
:
import boto3
from aws_sqs_consumer import Consumer, Message
class SimpleConsumer(Consumer):
def handle_message(self, message: Message):
print(f"Received message: {message.Body}")
sqs_client = boto3.client(
'sqs',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
aws_session_token=SESSION_TOKEN
)
consumer = SimpleConsumer(
queue_url="https://sqs.eu-west-1.amazonaws.com/12345678901/test_queue",
polling_wait_time_ms=5,
sqs_client=sqs_client
)
consumer.start()