diff --git a/beaver/transports/sqs_transport.py b/beaver/transports/sqs_transport.py index 055f3ea0..c97a07b5 100644 --- a/beaver/transports/sqs_transport.py +++ b/beaver/transports/sqs_transport.py @@ -41,15 +41,18 @@ def callback(self, filename, lines, **kwargs): del kwargs['timestamp'] message_batch = [] + message_batch_length = 0 for line in lines: + message_batch_length = message_batch_length+len(line) message_batch.append((uuid.uuid4(), self.format(filename, line, timestamp, **kwargs), 0)) - if len(message_batch) == 10: # SQS can only handle up to 10 messages in batch send - self._logger.debug('Flushing 10 messages to SQS queue') + if (len(message_batch) == 10) or (message_batch_length >= 250000): # SQS can only handle up to 10 messages in batch send and it can not exceed 256KiB (leave a little headroom) + self._logger.debug('Flushing last {0} messages to SQS queue {1} bytes'.format(len(message_batch),message_batch_length)) self._send_message_batch(message_batch) message_batch = [] + message_batch_length = 0 if len(message_batch) > 0: - self._logger.debug('Flushing last {0} messages to SQS queue'.format(len(message_batch))) + self._logger.debug('Flushing last {0} messages to SQS queue {1} bytes'.format(len(message_batch),message_batch_length)) self._send_message_batch(message_batch) return True