Skip to content

Commit

Permalink
Set S3 Select record message length to 128KiB
Browse files Browse the repository at this point in the history
- Previously this limit was a little more than 1MiB, and it broke
  compatibility with AWS SDK Java causing a buffer overflow error.
  • Loading branch information
donatello committed Apr 3, 2019
1 parent 313a3a2 commit 02126a7
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions pkg/s3select/message.go
Expand Up @@ -61,6 +61,14 @@ var recordsHeader = []byte{
11, ':', 'e', 'v', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 7, 'R', 'e', 'c', 'o', 'r', 'd', 's',
}

const (
maxRecordMessageLength = 128 * 1024 // Chosen for compatibility with AWS JAVA SDK
)

var (
bufLength = payloadLenForMsgLen(maxRecordMessageLength)
)

// newRecordsMessage - creates new Records Message which can contain a single record, partial records,
// or multiple records. Depending on the size of the result, a response can contain one or more of these messages.
//
Expand All @@ -74,6 +82,14 @@ func newRecordsMessage(payload []byte) []byte {
return genMessage(recordsHeader, payload)
}

// payloadLenForMsgLen computes the length of the payload in a record
// message given the length of the message.
func payloadLenForMsgLen(messageLength int) int {
headerLength := len(recordsHeader)
payloadLength := messageLength - 4 - 4 - 4 - headerLength - 4
return payloadLength
}

// continuationMessage - S3 periodically sends this message to keep the TCP connection open.
// These messages appear in responses at random. The client must detect the message type and process accordingly.
//
Expand Down Expand Up @@ -292,16 +308,21 @@ func (writer *messageWriter) start() {
}
writer.write(endMessage)
} else {
// Write record payload to staging buffer
freeSpace := bufLength - writer.payloadBufferIndex
if len(payload) > freeSpace {
if !writer.flushRecords() {
quitFlag = true
break
for len(payload) > 0 {
copiedLen := copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload)
writer.payloadBufferIndex += copiedLen
payload = payload[copiedLen:]

// If buffer is filled, flush it now!
freeSpace := bufLength - writer.payloadBufferIndex
if freeSpace == 0 {
if !writer.flushRecords() {
quitFlag = true
break
}
}

}
copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload)
writer.payloadBufferIndex += len(payload)
}

case <-recordStagingTicker.C:
Expand Down Expand Up @@ -331,10 +352,6 @@ func (writer *messageWriter) start() {
}
}

const (
bufLength = maxRecordSize
)

// Sends a single whole record.
func (writer *messageWriter) SendRecord(payload []byte) error {
select {
Expand Down

0 comments on commit 02126a7

Please sign in to comment.