SQS Queue Consumer Wiki

Joe Martinez edited this page Jan 16, 2018 · 1 revision

SqsQueueConsumer is a very simple and easy-to-use Java library that monitors an Amazon SQS (Simple Queue Service) message queue for new messages that come in. As each message is received, it is processed via a method that you provide (by implementing a single-method Java interface) to handle it.

There are two different classes available. You can use QueueConsumerJson if the body of each message is expected to be a simple JSON key-value set, such as

{
	"name":"Peter Baker",
	"address":"123 Fake St.",
	"phone":"555-123-4444"
}

This will allow your code to receive the message as a Java Map<String,String> for ease of processing. If your message is in any other format, use QueueConsumer, and the your code will simply receive a String containing the raw body of the message and you can parse/process it however you want.

Both classes implement the java Runnable interface, so you can run them on separate threads if you want (see example below).

The library defines a Java exception called MessageProcessingException. Your processMessage method should throw this exception if you determine that the message is unable to be processed.

The library also supports logging via log4j. It uses log.trace(), log.info(), and log.error() calls in various places in the code. The constructors of both classes accept a logger parameter. If you don't want to take advantage of the logging, simply pass in null for this parameter.

How to configure and use:

  1. Log into your Amazon AWS Console and go to the SQS Management Console.
  2. Create a new queue or navigate to an existing one that you want to consume.
  3. (Optional but recommended) - Enable long polling by setting the "Receive Message Wait Time". I recommend using the maximum value of 20 seconds. Doing this will cause SQS to wait up to 20 seconds on each poll request for any new messages to come in before returning an empty response. This will reduce network traffic.

Enabling Long Polling

  1. Make note of your queue's URL (circled in blue below) and region (boxed in red below), as you will need these values to instantiate the queue consumer.

SQS queue URL and region

  1. Implement either the QueueMessageProcessor or JsonQueueMessageProcessor interface, depending on whether you are going to use QueueConsumer or QueueConsumerJson.

  2. Instantiate the QueueConsumer or QueueConsumerJson class and call the run() method.

QueueConsumerJson Example:

public class EmailProcessor implements QueueConsumerJson.JsonQueueMessageProcessor
{
	public void processMessage(Map<String, String> messageData) throws QueueConsumer.MessageProcessingException
	{
		...
	}
}

public static void main(String[] args)
{
	private org.apache.log4j.Logger log = ... ;

	EmailProcessor emailProcessor = new EmailProcessor();
	QueueConsumerJson emailQueueConsumer = new QueueConsumerJson(prop.getProperty("EmailQueueUrl"), prop.getProperty("EmailQueueRegion"), emailProcessor, log);
	Thread emailProcessingThread = new Thread(emailQueueConsumer);
	emailProcessingThread.start();
}

QueueConsumer Example:

public class EmailProcessor implements QueueConsumer.QueueMessageProcessor
{
	public void processMessage(String messageData) throws QueueConsumer.MessageProcessingException
	{
		...
	}
}

public static void main(String[] args)
{
	private org.apache.log4j.Logger log = ... ;

	EmailProcessor emailProcessor = new EmailProcessor();
	QueueConsumer emailQueueConsumer = new QueueConsumer(prop.getProperty("EmailQueueUrl"), prop.getProperty("EmailQueueRegion"), emailProcessor, log);
	Thread emailProcessingThread = new Thread(emailQueueConsumer);
	emailProcessingThread.start();
}

Clone this wiki locally
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.