Skip to content

Commit

Permalink
Extract record batch structure (#314)
Browse files Browse the repository at this point in the history
* Replace argument of writeProduceRequestV3

* Calculate recordBatch size in init

* Extract writeTo method for record batch

* Extract recordBatch processing from ProduceRequestV7

* Add constructor for record batch
  • Loading branch information
VictorDenisov authored and Achille committed Sep 10, 2019
1 parent 752cbf8 commit 0430ada
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 123 deletions.
22 changes: 18 additions & 4 deletions conn.go
Expand Up @@ -1057,28 +1057,42 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
switch version := c.apiVersions[produceRequest].MaxVersion; {
case version >= 7:
recordBatch, err :=
newRecordBatch(
codec,
msgs...,
)
if err != nil {
return err
}
return c.wb.writeProduceRequestV7(
codec,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
c.transactionalID,
msgs...,
recordBatch,
)
case version >= 3:
recordBatch, err :=
newRecordBatch(
codec,
msgs...,
)
if err != nil {
return err
}
return c.wb.writeProduceRequestV3(
codec,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
c.transactionalID,
msgs...,
recordBatch,
)
default:
return c.wb.writeProduceRequestV2(
Expand Down
108 changes: 108 additions & 0 deletions recordbatch.go
@@ -0,0 +1,108 @@
package kafka

import (
"bytes"
"time"
)

const recordBatchHeaderSize int32 = 0 +
8 + // base offset
4 + // batch length
4 + // partition leader epoch
1 + // magic
4 + // crc
2 + // attributes
4 + // last offset delta
8 + // first timestamp
8 + // max timestamp
8 + // producer id
2 + // producer epoch
4 + // base sequence
4 // msg count

func recordBatchSize(msgs ...Message) (size int32) {
size = recordBatchHeaderSize
baseTime := msgs[0].Time

for i := range msgs {
msg := &msgs[i]
msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i))
size += int32(msz + varIntLen(int64(msz)))
}

return
}

func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
compressed = acquireBuffer()
compressor := codec.NewWriter(compressed)
wb := &writeBuffer{w: compressor}

for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}

if err = compressor.Close(); err != nil {
releaseBuffer(compressed)
return
}

attributes = int16(codec.Code())
size = recordBatchHeaderSize + int32(compressed.Len())
return
}

type recordBatch struct {
// required input parameters
codec CompressionCodec
attributes int16
msgs []Message

// parameters calculated during init
compressed *bytes.Buffer
size int32
}

func newRecordBatch(codec CompressionCodec, msgs ...Message) (r *recordBatch, err error) {
r = &recordBatch{
codec: codec,
msgs: msgs,
}
if r.codec == nil {
r.size = recordBatchSize(r.msgs...)
} else {
r.compressed, r.attributes, r.size, err = compressRecordBatch(r.codec, r.msgs...)
}
return
}

func (r *recordBatch) writeTo(wb *writeBuffer) {
wb.writeInt32(r.size)

baseTime := r.msgs[0].Time
lastTime := r.msgs[len(r.msgs)-1].Time
if r.compressed != nil {
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(r.compressed.Bytes())
})
releaseBuffer(r.compressed)
} else {
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range r.msgs {
wb.writeRecord(0, r.msgs[0].Time, int64(i), msg)
}
})
}
}

func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int {
return 1 + // attributes
varIntLen(int64(milliseconds(timestampDelta))) +
varIntLen(offsetDelta) +
varBytesLen(msg.Key) +
varBytesLen(msg.Value) +
varArrayLen(len(msg.Headers), func(i int) int {
h := &msg.Headers[i]
return varStringLen(h.Key) + varBytesLen(h.Value)
})
}
125 changes: 6 additions & 119 deletions write.go
Expand Up @@ -379,19 +379,7 @@ func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlation
return wb.Flush()
}

func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) {
var size int32
var attributes int16
var compressed *bytes.Buffer

if codec == nil {
size = recordBatchSize(msgs...)
} else {
compressed, attributes, size, err = compressRecordBatch(codec, msgs...)
if err != nil {
return
}
}
func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {

h := requestHeader{
ApiKey: int16(produceRequest),
Expand All @@ -409,7 +397,7 @@ func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlation
4 + // partition array length
4 + // partition
4 + // message set size
size
recordBatch.size

h.writeTo(wb)
wb.writeNullableString(transactionalID)
Expand All @@ -424,39 +412,12 @@ func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlation
wb.writeArrayLen(1)
wb.writeInt32(partition)

wb.writeInt32(size)
baseTime := msgs[0].Time
lastTime := msgs[len(msgs)-1].Time

if compressed != nil {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(compressed.Bytes())
})
releaseBuffer(compressed)
} else {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}
})
}
recordBatch.writeTo(wb)

return wb.Flush()
}

func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) {
var size int32
var attributes int16
var compressed *bytes.Buffer

if codec == nil {
size = recordBatchSize(msgs...)
} else {
compressed, attributes, size, err = compressRecordBatch(codec, msgs...)
if err != nil {
return
}
}
func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {

h := requestHeader{
ApiKey: int16(produceRequest),
Expand All @@ -473,7 +434,7 @@ func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlation
4 + // partition array length
4 + // partition
4 + // message set size
size
recordBatch.size

h.writeTo(wb)
wb.writeNullableString(transactionalID)
Expand All @@ -488,22 +449,7 @@ func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlation
wb.writeArrayLen(1)
wb.writeInt32(partition)

wb.writeInt32(size)
baseTime := msgs[0].Time
lastTime := msgs[len(msgs)-1].Time

if compressed != nil {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(compressed.Bytes())
})
releaseBuffer(compressed)
} else {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}
})
}
recordBatch.writeTo(wb)

return wb.Flush()
}
Expand Down Expand Up @@ -572,25 +518,6 @@ func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *by
return
}

func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
compressed = acquireBuffer()
compressor := codec.NewWriter(compressed)
wb := &writeBuffer{w: compressor}

for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}

if err = compressor.Close(); err != nil {
releaseBuffer(compressed)
return
}

attributes = int16(codec.Code())
size = recordBatchHeaderSize + int32(compressed.Len())
return
}

func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) {
const magicByte = 1 // compatible with kafka 0.10.0.0+

Expand Down Expand Up @@ -685,43 +612,3 @@ func messageSetSize(msgs ...Message) (size int32) {
}
return
}

func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int {
return 1 + // attributes
varIntLen(int64(milliseconds(timestampDelta))) +
varIntLen(offsetDelta) +
varBytesLen(msg.Key) +
varBytesLen(msg.Value) +
varArrayLen(len(msg.Headers), func(i int) int {
h := &msg.Headers[i]
return varStringLen(h.Key) + varBytesLen(h.Value)
})
}

const recordBatchHeaderSize int32 = 0 +
8 + // base offset
4 + // batch length
4 + // partition leader epoch
1 + // magic
4 + // crc
2 + // attributes
4 + // last offset delta
8 + // first timestamp
8 + // max timestamp
8 + // producer id
2 + // producer epoch
4 + // base sequence
4 // msg count

func recordBatchSize(msgs ...Message) (size int32) {
size = recordBatchHeaderSize
baseTime := msgs[0].Time

for i := range msgs {
msg := &msgs[i]
msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i))
size += int32(msz + varIntLen(int64(msz)))
}

return
}

0 comments on commit 0430ada

Please sign in to comment.