diff --git a/pkg/s3select/message.go b/pkg/s3select/message.go index 6fabd97c6355b3..2d62af44a3e69e 100644 --- a/pkg/s3select/message.go +++ b/pkg/s3select/message.go @@ -61,6 +61,14 @@ var recordsHeader = []byte{ 11, ':', 'e', 'v', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 7, 'R', 'e', 'c', 'o', 'r', 'd', 's', } +const ( + maxRecordMessageLength = 128 * 1024 // Chosen for compatibility with AWS JAVA SDK +) + +var ( + bufLength = payloadLenForMsgLen(maxRecordMessageLength) +) + // newRecordsMessage - creates new Records Message which can contain a single record, partial records, // or multiple records. Depending on the size of the result, a response can contain one or more of these messages. // @@ -74,6 +82,14 @@ func newRecordsMessage(payload []byte) []byte { return genMessage(recordsHeader, payload) } +// payloadLenForMsgLen computes the length of the payload in a record +// message given the length of the message. +func payloadLenForMsgLen(messageLength int) int { + headerLength := len(recordsHeader) + payloadLength := messageLength - 4 - 4 - 4 - headerLength - 4 + return payloadLength +} + // continuationMessage - S3 periodically sends this message to keep the TCP connection open. // These messages appear in responses at random. The client must detect the message type and process accordingly. // @@ -292,16 +308,21 @@ func (writer *messageWriter) start() { } writer.write(endMessage) } else { - // Write record payload to staging buffer - freeSpace := bufLength - writer.payloadBufferIndex - if len(payload) > freeSpace { - if !writer.flushRecords() { - quitFlag = true - break + for len(payload) > 0 { + copiedLen := copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload) + writer.payloadBufferIndex += copiedLen + payload = payload[copiedLen:] + + // If buffer is filled, flush it now! + freeSpace := bufLength - writer.payloadBufferIndex + if freeSpace == 0 { + if !writer.flushRecords() { + quitFlag = true + break + } } + } - copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload) - writer.payloadBufferIndex += len(payload) } case <-recordStagingTicker.C: @@ -331,10 +352,6 @@ func (writer *messageWriter) start() { } } -const ( - bufLength = maxRecordSize -) - // Sends a single whole record. func (writer *messageWriter) SendRecord(payload []byte) error { select {