Skip to content

Commit

Permalink
Implements new solution that jumps to the last offset
Browse files Browse the repository at this point in the history
Also a test to ensure that truncation due to MaxBytes is
still handled properly.
  • Loading branch information
nlsun committed Feb 8, 2022
1 parent 3eae360 commit f8b1f38
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 24 deletions.
40 changes: 26 additions & 14 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ type Batch struct {
offset int64
highWaterMark int64
err error
// The last offset in the batch.
//
// We use lastOffset to skip offsets that have been compacted away.
//
// We store lastOffset because we get lastOffset when we read a new message
// but only try to handle compaction when we receive an EOF. However, when
// we get an EOF we do not get the lastOffset. So there is a mismatch
// between when we receive it and need to use it.
lastOffset int64
}

// Throttle gives the throttling duration applied by the kafka server on the
Expand Down Expand Up @@ -227,10 +236,12 @@ func (batch *Batch) readMessage(
return
}

offset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
var lastOffset int64
offset, lastOffset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
switch err {
case nil:
batch.offset = offset + 1
batch.lastOffset = lastOffset
case errShortRead:
// As an "optimization" kafka truncates the returned response after
// producing MaxBytes, which could then cause the code to return
Expand All @@ -245,19 +256,6 @@ func (batch *Batch) readMessage(
// consumed or a batch whose connection is in an error state.
batch.err = dontExpectEOF(err)
case batch.msgs.remaining() == 0:
// Log compaction can create batches with 0 unread messages.
//
// If the "next offset" reaches the "originally requested offset"
// and we have 0 messages remaining, then there were 0 unread
// messages in the batch.
//
// We normally set the batch offset to the "next" batch offset upon
// reading a message but since there were no messages to read we
// update it now instead.
if batch.offset == batch.conn.offset {
batch.offset++
}

// Because we use the adjusted deadline we could end up returning
// before the actual deadline occurred. This is necessary otherwise
// timing out the connection for real could end up leaving it in an
Expand All @@ -267,6 +265,20 @@ func (batch *Batch) readMessage(
// read deadline management.
err = checkTimeoutErr(batch.deadline)
batch.err = err

// Checks:
// - `batch.err` for a "success" from the previous check
// - `batch.msgs.lengthRemain` to ensure that this EOF is not due
// to MaxBytes truncation
// - `batch.lastOffset` to ensure that the message format allows
// compaction
if batch.err == io.EOF && batch.msgs.lengthRemain == 0 && batch.lastOffset != -1 {
// Log compaction can create batches with 0 unread records.
//
// In order to reliably reach the next non-compacted offset we
// jump to the saved lastOffset.
batch.offset = batch.lastOffset + 1
}
}
default:
// Since io.EOF is used by the batch to indicate that there is are
Expand Down
24 changes: 21 additions & 3 deletions message_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type messageSetReader struct {
*readerStack // used for decompressing compressed messages and record batches
empty bool // if true, short circuits messageSetReader methods
debug bool // enable debug log messages
// How many bytes are expected to remain in the response.
//
// This is used to detect truncation of the response.
lengthRemain int
}

type readerStack struct {
Expand Down Expand Up @@ -114,7 +118,7 @@ func (r *messageSetReader) discard() (err error) {
}

func (r *messageSetReader) readMessage(min int64, key readBytesFunc, val readBytesFunc) (
offset int64, timestamp int64, headers []Header, err error) {
offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {

if r.empty {
err = RequestTimedOut
Expand All @@ -126,8 +130,10 @@ func (r *messageSetReader) readMessage(min int64, key readBytesFunc, val readByt
switch r.header.magic {
case 0, 1:
offset, timestamp, headers, err = r.readMessageV1(min, key, val)
// Set an invalid value so that it can be ignored
lastOffset = -1
case 2:
offset, timestamp, headers, err = r.readMessageV2(min, key, val)
offset, lastOffset, timestamp, headers, err = r.readMessageV2(min, key, val)
default:
err = r.header.badMagic()
}
Expand Down Expand Up @@ -239,7 +245,7 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
}

func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readBytesFunc) (
offset int64, timestamp int64, headers []Header, err error) {
offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
if err = r.readHeader(); err != nil {
return
}
Expand Down Expand Up @@ -282,10 +288,12 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
r.readerStack.parent.count = 0
}
}
remainBefore := r.remain
var length int64
if err = r.readVarInt(&length); err != nil {
return
}
lengthOfLength := remainBefore - r.remain
var attrs int8
if err = r.readInt8(&attrs); err != nil {
return
Expand Down Expand Up @@ -316,6 +324,8 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
return
}
}
lastOffset = r.header.firstOffset + int64(r.header.v2.lastOffsetDelta)
r.lengthRemain -= int(length) + lengthOfLength
r.markRead()
return
}
Expand Down Expand Up @@ -407,6 +417,9 @@ func (r *messageSetReader) readHeader() (err error) {
return
}
r.count = 1
// Set arbitrary non-zero length so that we always assume the
// message is truncated since bytes remain.
r.lengthRemain = 1
r.log("Read v0 header with offset=%d len=%d magic=%d attributes=%d", r.header.firstOffset, r.header.length, r.header.magic, r.header.v1.attributes)
case 1:
r.header.crc = crcOrLeaderEpoch
Expand All @@ -417,6 +430,9 @@ func (r *messageSetReader) readHeader() (err error) {
return
}
r.count = 1
// Set arbitrary non-zero length so that we always assume the
// message is truncated since bytes remain.
r.lengthRemain = 1
r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes)
case 2:
r.header.v2.leaderEpoch = crcOrLeaderEpoch
Expand Down Expand Up @@ -448,6 +464,8 @@ func (r *messageSetReader) readHeader() (err error) {
return
}
r.count = int(r.header.v2.count)
// Subtracts the header bytes from the length
r.lengthRemain = int(r.header.length) - 49
r.log("Read v2 header with count=%d offset=%d len=%d magic=%d attributes=%d", r.count, r.header.firstOffset, r.header.length, r.header.magic, r.header.v2.attributes)
default:
err = r.header.badMagic()
Expand Down
6 changes: 3 additions & 3 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func TestMessageSetReaderEmpty(t *testing.T) {
return 0, nil
}

offset, timestamp, headers, err := m.readMessage(0, noop, noop)
offset, _, timestamp, headers, err := m.readMessage(0, noop, noop)
if offset != 0 {
t.Errorf("expected offset of 0, get %d", offset)
}
Expand Down Expand Up @@ -737,12 +737,12 @@ func (r *readerHelper) readMessageErr() (msg Message, err error) {
}
var timestamp int64
var headers []Header
r.offset, timestamp, headers, err = r.messageSetReader.readMessage(r.offset, keyFunc, valueFunc)
r.offset, _, timestamp, headers, err = r.messageSetReader.readMessage(r.offset, keyFunc, valueFunc)
if err != nil {
return
}
msg.Offset = r.offset
msg.Time = time.Unix(timestamp / 1000, (timestamp % 1000) * 1000000)
msg.Time = time.Unix(timestamp/1000, (timestamp%1000)*1000000)
msg.Headers = headers
return
}
Expand Down
90 changes: 86 additions & 4 deletions reader_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -1672,8 +1673,85 @@ func TestErrorCannotConnectGroupSubscription(t *testing.T) {
}
}

// Tests that the reader can read zero message batches from log compacted
// topics.
// Tests that the reader can handle messages where the response is truncated
// due to reaching MaxBytes.
//
// If MaxBytes is too small to fit 1 record then it will never truncate, so
// we start from a small message size and increase it until we are sure
// truncation has happened at some point.
func TestReaderTruncatedResponse(t *testing.T) {
topic := makeTopic()
createTopic(t, topic, 1)
defer deleteTopic(t, topic)

readerMaxBytes := 100
batchSize := 4
maxMsgPadding := 5
readContextTimeout := 10 * time.Second

var msgs []Message
// The key of each message
n := 0
// `i` is the amount of padding per message
for i := 0; i < maxMsgPadding; i++ {
bb := bytes.Buffer{}
for x := 0; x < i; x++ {
_, err := bb.WriteRune('0')
require.NoError(t, err)
}
padding := bb.Bytes()
// `j` is the number of times the message repeats
for j := 0; j < batchSize*4; j++ {
msgs = append(msgs, Message{
Key: []byte(fmt.Sprintf("%05d", n)),
Value: padding,
})
n++
}
}

wr := NewWriter(WriterConfig{
Brokers: []string{"localhost:9092"},
BatchSize: batchSize,
Async: false,
Topic: topic,
Balancer: &LeastBytes{},
})
err := wr.WriteMessages(context.Background(), msgs...)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), readContextTimeout)
defer cancel()
r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
MinBytes: 1,
MaxBytes: readerMaxBytes,
// Speed up testing
MaxWait: 100 * time.Millisecond,
})
defer r.Close()

expectedKeys := map[string]struct{}{}
for _, k := range msgs {
expectedKeys[string(k.Key)] = struct{}{}
}
keys := map[string]struct{}{}
for {
m, err := r.FetchMessage(ctx)
require.NoError(t, err)
keys[string(m.Key)] = struct{}{}

t.Logf("got key %s have %d keys expect %d\n", string(m.Key), len(keys), len(expectedKeys))
if len(keys) == len(expectedKeys) {
require.Equal(t, expectedKeys, keys)
return
}
}
}

// Tests that the reader can read record batches from log compacted topics
// where the batch ends with compacted records.
//
// This test forces varying sized chunks of duplicated messages along with
// configuring the topic with a minimal `segment.bytes` in order to
Expand All @@ -1698,8 +1776,12 @@ func TestReaderReadCompactedMessage(t *testing.T) {
for {
success := func() bool {
r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
Brokers: []string{"localhost:9092"},
Topic: topic,
MinBytes: 200,
MaxBytes: 200,
// Speed up testing
MaxWait: 100 * time.Millisecond,
})
defer r.Close()

Expand Down

0 comments on commit f8b1f38

Please sign in to comment.