Skip to content

Commit

Permalink
recycle compression buffers (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille committed Aug 9, 2019
1 parent 46aa607 commit b4c376a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 36 deletions.
27 changes: 27 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package kafka

import (
"bytes"
"sync"
)

var bufferPool = sync.Pool{
New: func() interface{} { return newBuffer() },
}

func newBuffer() *bytes.Buffer {
b := new(bytes.Buffer)
b.Grow(65536)
return b
}

func acquireBuffer() *bytes.Buffer {
return bufferPool.Get().(*bytes.Buffer)
}

func releaseBuffer(b *bytes.Buffer) {
if b != nil {
b.Reset()
bufferPool.Put(b)
}
}
2 changes: 0 additions & 2 deletions compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
)

const (
CompressionNoneCode = 0

compressionCodecMask = 0x07
)

Expand Down
69 changes: 35 additions & 34 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,19 @@ func (wb *writeBuffer) writeListOffsetRequestV1(correlationID int32, clientID, t
}

func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {
attributes := int8(CompressionNoneCode)
if codec != nil {
if msgs, err = compressMessageSet(codec, msgs...); err != nil {
return err
var size int32
var attributes int8
var compressed *bytes.Buffer

if codec == nil {
size = messageSetSize(msgs...)
} else {
compressed, attributes, size, err = compressMessageSet(codec, msgs...)
if err != nil {
return
}
attributes = codec.Code()
msgs = []Message{{Value: compressed.Bytes()}}
}
size := messageSetSize(msgs...)

h := requestHeader{
ApiKey: int16(produceRequest),
Expand Down Expand Up @@ -365,13 +370,14 @@ func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlation
wb.writeMessage(msg.Offset, attributes, msg.Time, msg.Key, msg.Value, cw)
}

releaseBuffer(compressed)
return wb.Flush()
}

func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) {
var size int32
var compressed []byte
var attributes int16
var compressed *bytes.Buffer

if codec == nil {
size = recordBatchSize(msgs...)
Expand Down Expand Up @@ -417,10 +423,11 @@ func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlation
baseTime := msgs[0].Time
lastTime := msgs[len(msgs)-1].Time

if codec != nil {
if compressed != nil {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(compressed)
wb.Write(compressed.Bytes())
})
releaseBuffer(compressed)
} else {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range msgs {
Expand All @@ -434,8 +441,8 @@ func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlation

func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) {
var size int32
var compressed []byte
var attributes int16
var compressed *bytes.Buffer

if codec == nil {
size = recordBatchSize(msgs...)
Expand Down Expand Up @@ -480,10 +487,11 @@ func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlation
baseTime := msgs[0].Time
lastTime := msgs[len(msgs)-1].Time

if codec != nil {
if compressed != nil {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(compressed)
wb.Write(compressed.Bytes())
})
releaseBuffer(compressed)
} else {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range msgs {
Expand Down Expand Up @@ -539,49 +547,42 @@ func (wb *writeBuffer) writeRecordBatch(attributes int16, size int32, count int,
write(wb)
}

var maxDate = time.Date(5000, time.January, 0, 0, 0, 0, 0, time.UTC)

func compressMessageSet(codec CompressionCodec, msgs ...Message) ([]Message, error) {
estimatedLen := 0

for _, msg := range msgs {
estimatedLen += int(messageSize(msg.Key, msg.Value))
}

buffer := &bytes.Buffer{}
buffer.Grow(estimatedLen / 2)
compressor := codec.NewWriter(buffer)
func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int8, size int32, err error) {
compressed = acquireBuffer()
compressor := codec.NewWriter(compressed)
wb := &writeBuffer{w: compressor}
cw := &crc32Writer{table: crc32.IEEETable}

for offset, msg := range msgs {
wb.writeMessage(int64(offset), CompressionNoneCode, msg.Time, msg.Key, msg.Value, cw)
wb.writeMessage(int64(offset), 0, msg.Time, msg.Key, msg.Value, cw)
}

if err := compressor.Close(); err != nil {
return nil, err
if err = compressor.Close(); err != nil {
releaseBuffer(compressed)
return
}

return []Message{{Value: buffer.Bytes()}}, nil
attributes = codec.Code()
size = messageSetSize(Message{Value: compressed.Bytes()})
return
}

func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed []byte, attributes int16, size int32, err error) {
recordBuf := &bytes.Buffer{}
recordBuf.Grow(int(recordBatchSize(msgs...)) / 2)
compressor := codec.NewWriter(recordBuf)
func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
compressed = acquireBuffer()
compressor := codec.NewWriter(compressed)
wb := &writeBuffer{w: compressor}

for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}

if err = compressor.Close(); err != nil {
releaseBuffer(compressed)
return
}

compressed = recordBuf.Bytes()
attributes = int16(codec.Code())
size = recordBatchHeaderSize + int32(len(compressed))
size = recordBatchHeaderSize + int32(compressed.Len())
return
}

Expand Down

0 comments on commit b4c376a

Please sign in to comment.