Skip to content

Commit

Permalink
Add message_batch size checking since SQS can only handle 256KiB
Browse files Browse the repository at this point in the history
in a batch. Flush queue if message_batch is 10 messages or >= 250KiB
  • Loading branch information
ronnocol committed Dec 16, 2014
1 parent 0c21523 commit 2fc514b
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions beaver/transports/sqs_transport.py
Expand Up @@ -41,15 +41,18 @@ def callback(self, filename, lines, **kwargs):
del kwargs['timestamp']

message_batch = []
message_batch_length = 0
for line in lines:
message_batch_length = message_batch_length+len(line)
message_batch.append((uuid.uuid4(), self.format(filename, line, timestamp, **kwargs), 0))
if len(message_batch) == 10: # SQS can only handle up to 10 messages in batch send
self._logger.debug('Flushing 10 messages to SQS queue')
if (len(message_batch) == 10) or (message_batch_length >= 250000): # SQS can only handle up to 10 messages in batch send and it can not exceed 256KiB (leave a little headroom)
self._logger.debug('Flushing last {0} messages to SQS queue {1} bytes'.format(len(message_batch),message_batch_length))
self._send_message_batch(message_batch)
message_batch = []
message_batch_length = 0

if len(message_batch) > 0:
self._logger.debug('Flushing last {0} messages to SQS queue'.format(len(message_batch)))
self._logger.debug('Flushing last {0} messages to SQS queue {1} bytes'.format(len(message_batch),message_batch_length))
self._send_message_batch(message_batch)
return True

Expand Down

0 comments on commit 2fc514b

Please sign in to comment.