Skip to content

Commit

Permalink
Handle end of fetched batches of messages gracefully (#353)
Browse files Browse the repository at this point in the history
The Batch read functions return io.EOF to signal the end of the batch. 
However, there was ambiguity because io.EOF errors could also surface 
for other reasons (e.g. a connection closed by the broker).  Due to this
ambiguity, the Reader would treat io.EOF as an error.  In turn, this
would result in an error message being logged and the connection being
closed after every single fetch request.  Such behavior is undesirable
from a performance perpsective because the connection could be re-used. 
It is also misleading because it increases error stats and logs stats
when there is really no error.

This PR fixes the handling of the io.EOF in the Reader to be an expected
condition meanwhile ensuring that the Batch maps any underlying io.EOF
errors to io.ErrUnexpectedEOF.

This PR also fixes an accidental line deletion of deadline
management that was introduced in #277.

Fixes #258
  • Loading branch information
Steve van Loben Sels committed Sep 21, 2019
1 parent 46a3f29 commit 5341c5f
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
14 changes: 12 additions & 2 deletions batch.go
Expand Up @@ -230,7 +230,12 @@ func (batch *Batch) readMessage(
err = batch.msgs.discard()
switch {
case err != nil:
batch.err = err
// Since io.EOF is used by the batch to indicate that there is are
// no more messages to consume, it is crucial that any io.EOF errors
// on the underlying connection are repackaged. Otherwise, the
// caller can't tell the difference between a batch that was fully
// consumed or a batch whose connection is in an error state.
batch.err = dontExpectEOF(err)
case batch.msgs.remaining() == 0:
// Because we use the adjusted deadline we could end up returning
// before the actual deadline occurred. This is necessary otherwise
Expand All @@ -243,7 +248,12 @@ func (batch *Batch) readMessage(
batch.err = err
}
default:
batch.err = err
// Since io.EOF is used by the batch to indicate that there is are
// no more messages to consume, it is crucial that any io.EOF errors
// on the underlying connection are repackaged. Otherwise, the
// caller can't tell the difference between a batch that was fully
// consumed or a batch whose connection is in an error state.
batch.err = dontExpectEOF(err)
}

return
Expand Down
6 changes: 5 additions & 1 deletion conn.go
Expand Up @@ -851,7 +851,11 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
partition: int(c.partition), // partition is copied to Batch to prevent race with Batch.close
offset: offset,
highWaterMark: highWaterMark,
err: dontExpectEOF(err),
// there shouldn't be a short read on initially setting up the batch.
// as such, any io.EOF is re-mapped to an io.ErrUnexpectedEOF so that we
// don't accidentally signal that we successfully reached the end of the
// batch.
err: dontExpectEOF(err),
}
}

Expand Down
14 changes: 6 additions & 8 deletions reader.go
Expand Up @@ -1191,6 +1191,12 @@ func (r *reader) run(ctx context.Context, offset int64) {
switch offset, err = r.read(ctx, offset, conn); err {
case nil:
errcount = 0
case io.EOF:
// done with this batch of messages...carry on. note that this
// block relies on the batch repackaging real io.EOF errors as
// io.UnexpectedEOF. otherwise, we would end up swallowing real
// errors here.
break readLoop
case UnknownTopicOrPartition:
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers)
Expand Down Expand Up @@ -1265,14 +1271,6 @@ func (r *reader) run(ctx context.Context, offset int64) {
r.sendError(ctx, err)
break readLoop

case io.EOF:
r.withLogger(func(log Logger) {
log.Printf("the kafka reader got an EOF for partition %d of %s at offset %d: %s", r.partition, r.topic, offset, err)
})
r.stats.errors.observe(1)
conn.Close()
break readLoop

default:
if _, ok := err.(Error); ok {
r.sendError(ctx, err)
Expand Down

0 comments on commit 5341c5f

Please sign in to comment.