Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set S3 Select record message length to 128KiB #7475

Merged
merged 1 commit into from Apr 4, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 28 additions & 12 deletions pkg/s3select/message.go
Expand Up @@ -61,6 +61,14 @@ var recordsHeader = []byte{
11, ':', 'e', 'v', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 7, 'R', 'e', 'c', 'o', 'r', 'd', 's',
}

const (
maxRecordMessageLength = 128 * 1024 // Chosen for compatibility with AWS JAVA SDK
)

var (
bufLength = payloadLenForMsgLen(maxRecordMessageLength)
)

// newRecordsMessage - creates new Records Message which can contain a single record, partial records,
// or multiple records. Depending on the size of the result, a response can contain one or more of these messages.
//
Expand All @@ -74,6 +82,14 @@ func newRecordsMessage(payload []byte) []byte {
return genMessage(recordsHeader, payload)
}

// payloadLenForMsgLen computes the length of the payload in a record
// message given the length of the message.
func payloadLenForMsgLen(messageLength int) int {
headerLength := len(recordsHeader)
payloadLength := messageLength - 4 - 4 - 4 - headerLength - 4
return payloadLength
}

// continuationMessage - S3 periodically sends this message to keep the TCP connection open.
// These messages appear in responses at random. The client must detect the message type and process accordingly.
//
Expand Down Expand Up @@ -292,16 +308,20 @@ func (writer *messageWriter) start() {
}
writer.write(endMessage)
} else {
// Write record payload to staging buffer
freeSpace := bufLength - writer.payloadBufferIndex
if len(payload) > freeSpace {
if !writer.flushRecords() {
quitFlag = true
break
for len(payload) > 0 {
copiedLen := copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload)
writer.payloadBufferIndex += copiedLen
payload = payload[copiedLen:]

// If buffer is filled, flush it now!
freeSpace := bufLength - writer.payloadBufferIndex
if freeSpace == 0 {
if !writer.flushRecords() {
quitFlag = true
break
}
}
}
copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload)
writer.payloadBufferIndex += len(payload)
}

case <-recordStagingTicker.C:
Expand Down Expand Up @@ -331,10 +351,6 @@ func (writer *messageWriter) start() {
}
}

const (
bufLength = maxRecordSize
)

// Sends a single whole record.
func (writer *messageWriter) SendRecord(payload []byte) error {
select {
Expand Down