diff --git a/batch.go b/batch.go index 8fc6c4e3..ff9fae10 100644 --- a/batch.go +++ b/batch.go @@ -230,7 +230,12 @@ func (batch *Batch) readMessage( err = batch.msgs.discard() switch { case err != nil: - batch.err = err + // Since io.EOF is used by the batch to indicate that there is are + // no more messages to consume, it is crucial that any io.EOF errors + // on the underlying connection are repackaged. Otherwise, the + // caller can't tell the difference between a batch that was fully + // consumed or a batch whose connection is in an error state. + batch.err = dontExpectEOF(err) case batch.msgs.remaining() == 0: // Because we use the adjusted deadline we could end up returning // before the actual deadline occurred. This is necessary otherwise @@ -243,7 +248,12 @@ func (batch *Batch) readMessage( batch.err = err } default: - batch.err = err + // Since io.EOF is used by the batch to indicate that there is are + // no more messages to consume, it is crucial that any io.EOF errors + // on the underlying connection are repackaged. Otherwise, the + // caller can't tell the difference between a batch that was fully + // consumed or a batch whose connection is in an error state. + batch.err = dontExpectEOF(err) } return diff --git a/conn.go b/conn.go index 75408d01..221cbd46 100644 --- a/conn.go +++ b/conn.go @@ -851,7 +851,11 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch { partition: int(c.partition), // partition is copied to Batch to prevent race with Batch.close offset: offset, highWaterMark: highWaterMark, - err: dontExpectEOF(err), + // there shouldn't be a short read on initially setting up the batch. + // as such, any io.EOF is re-mapped to an io.ErrUnexpectedEOF so that we + // don't accidentally signal that we successfully reached the end of the + // batch. + err: dontExpectEOF(err), } } diff --git a/reader.go b/reader.go index 0c3d55e9..8347bcab 100644 --- a/reader.go +++ b/reader.go @@ -1191,6 +1191,12 @@ func (r *reader) run(ctx context.Context, offset int64) { switch offset, err = r.read(ctx, offset, conn); err { case nil: errcount = 0 + case io.EOF: + // done with this batch of messages...carry on. note that this + // block relies on the batch repackaging real io.EOF errors as + // io.UnexpectedEOF. otherwise, we would end up swallowing real + // errors here. + break readLoop case UnknownTopicOrPartition: r.withErrorLogger(func(log Logger) { log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers) @@ -1265,14 +1271,6 @@ func (r *reader) run(ctx context.Context, offset int64) { r.sendError(ctx, err) break readLoop - case io.EOF: - r.withLogger(func(log Logger) { - log.Printf("the kafka reader got an EOF for partition %d of %s at offset %d: %s", r.partition, r.topic, offset, err) - }) - r.stats.errors.observe(1) - conn.Close() - break readLoop - default: if _, ok := err.(Error); ok { r.sendError(ctx, err)