Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
1460 lines (1231 sloc) 39.7 KB
package kafka
import (
"context"
"errors"
"fmt"
"io"
"math"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
)
const (
LastOffset int64 = -1 // The most recent offset available for a partition.
FirstOffset int64 = -2 // The least recent offset available for a partition.
)
const (
// defaultCommitRetries holds the number commit attempts to make
// before giving up
defaultCommitRetries = 3
)
var (
errOnlyAvailableWithGroup = errors.New("unavailable when GroupID is not set")
errNotAvailableWithGroup = errors.New("unavailable when GroupID is set")
)
const (
// defaultReadBackoffMax/Min sets the boundaries for how long the reader wait before
// polling for new messages
defaultReadBackoffMin = 100 * time.Millisecond
defaultReadBackoffMax = 1 * time.Second
)
// Reader provides a high-level API for consuming messages from kafka.
//
// A Reader automatically manages reconnections to a kafka server, and
// blocking methods have context support for asynchronous cancellations.
type Reader struct {
// immutable fields of the reader
config ReaderConfig
// communication channels between the parent reader and its subreaders
msgs chan readerMessage
// mutable fields of the reader (synchronized on the mutex)
mutex sync.Mutex
join sync.WaitGroup
cancel context.CancelFunc
stop context.CancelFunc
done chan struct{}
commits chan commitRequest
version int64 // version holds the generation of the spawned readers
offset int64
lag int64
closed bool
// reader stats are all made of atomic values, no need for synchronization.
once uint32
stctx context.Context
// reader stats are all made of atomic values, no need for synchronization.
// Use a pointer to ensure 64-bit alignment of the values.
stats *readerStats
}
// useConsumerGroup indicates whether the Reader is part of a consumer group.
func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" }
// useSyncCommits indicates whether the Reader is configured to perform sync or
// async commits.
func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }
func (r *Reader) unsubscribe() {
r.cancel()
r.join.Wait()
// it would be interesting to drain the r.msgs channel at this point since
// it will contain buffered messages for partitions that may not be
// re-assigned to this reader in the next consumer group generation.
// however, draining the channel could race with the client calling
// ReadMessage, which could result in messages delivered and/or committed
// with gaps in the offset. for now, we will err on the side of caution and
// potentially have those messages be reprocessed in the next generation by
// another consumer to avoid such a race.
}
func (r *Reader) subscribe(assignments []PartitionAssignment) {
offsetsByPartition := make(map[int]int64)
for _, assignment := range assignments {
offsetsByPartition[assignment.ID] = assignment.Offset
}
r.mutex.Lock()
r.start(offsetsByPartition)
r.mutex.Unlock()
r.withLogger(func(l Logger) {
l.Printf("subscribed to partitions: %+v", offsetsByPartition)
})
}
func (r *Reader) waitThrottleTime(throttleTimeMS int32) {
if throttleTimeMS == 0 {
return
}
t := time.NewTimer(time.Duration(throttleTimeMS) * time.Millisecond)
defer t.Stop()
select {
case <-r.stctx.Done():
return
case <-t.C:
}
}
// commitOffsetsWithRetry attempts to commit the specified offsets and retries
// up to the specified number of times
func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) {
const (
backoffDelayMin = 100 * time.Millisecond
backoffDelayMax = 5 * time.Second
)
for attempt := 0; attempt < retries; attempt++ {
if attempt != 0 {
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
return
}
}
if err = gen.CommitOffsets(offsetStash); err == nil {
return
}
}
return // err will not be nil
}
// offsetStash holds offsets by topic => partition => offset
type offsetStash map[string]map[int]int64
// merge updates the offsetStash with the offsets from the provided messages
func (o offsetStash) merge(commits []commit) {
for _, c := range commits {
offsetsByPartition, ok := o[c.topic]
if !ok {
offsetsByPartition = map[int]int64{}
o[c.topic] = offsetsByPartition
}
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
offsetsByPartition[c.partition] = c.offset
}
}
}
// reset clears the contents of the offsetStash
func (o offsetStash) reset() {
for key := range o {
delete(o, key)
}
}
// commitLoopImmediate handles each commit synchronously
func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
offsets := offsetStash{}
for {
select {
case <-ctx.Done():
return
case req := <-r.commits:
offsets.merge(req.commits)
req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
offsets.reset()
}
}
}
// commitLoopInterval handles each commit asynchronously with a period defined
// by ReaderConfig.CommitInterval
func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {
ticker := time.NewTicker(r.config.CommitInterval)
defer ticker.Stop()
// the offset stash should not survive rebalances b/c the consumer may
// receive new assignments.
offsets := offsetStash{}
commit := func() {
if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) })
} else {
offsets.reset()
}
}
for {
select {
case <-ctx.Done():
// drain the commit channel in order to prepare the final commit.
for hasCommits := true; hasCommits; {
select {
case req := <-r.commits:
offsets.merge(req.commits)
default:
hasCommits = false
}
}
commit()
return
case <-ticker.C:
commit()
case req := <-r.commits:
offsets.merge(req.commits)
}
}
}
// commitLoop processes commits off the commit chan
func (r *Reader) commitLoop(ctx context.Context, gen *Generation) {
r.withLogger(func(l Logger) {
l.Printf("started commit for group %s\n", r.config.GroupID)
})
defer r.withLogger(func(l Logger) {
l.Printf("stopped commit for group %s\n", r.config.GroupID)
})
if r.config.CommitInterval == 0 {
r.commitLoopImmediate(ctx, gen)
} else {
r.commitLoopInterval(ctx, gen)
}
}
// run provides the main consumer group management loop. Each iteration performs the
// handshake to join the Reader to the consumer group.
//
// This function is responsible for closing the consumer group upon exit.
func (r *Reader) run(cg *ConsumerGroup) {
defer close(r.done)
defer cg.Close()
r.withLogger(func(l Logger) {
l.Printf("entering loop for consumer group, %v\n", r.config.GroupID)
})
for {
gen, err := cg.Next(r.stctx)
if err != nil {
if err == r.stctx.Err() {
return
}
r.stats.errors.observe(1)
r.withErrorLogger(func(l Logger) {
l.Printf(err.Error())
})
continue
}
r.stats.rebalances.observe(1)
r.subscribe(gen.Assignments[r.config.Topic])
gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
})
gen.Start(func(ctx context.Context) {
// wait for the generation to end and then unsubscribe.
select {
case <-ctx.Done():
// continue to next generation
case <-r.stctx.Done():
// this will be the last loop because the reader is closed.
}
r.unsubscribe()
})
}
}
// ReaderConfig is a configuration object used to create new instances of
// Reader.
type ReaderConfig struct {
// The list of broker addresses used to connect to the kafka cluster.
Brokers []string
// GroupID holds the optional consumer group id. If GroupID is specified, then
// Partition should NOT be specified e.g. 0
GroupID string
// The topic to read messages from.
Topic string
// Partition to read messages from. Either Partition or GroupID may
// be assigned, but not both
Partition int
// An dialer used to open connections to the kafka server. This field is
// optional, if nil, the default dialer is used instead.
Dialer *Dialer
// The capacity of the internal message queue, defaults to 100 if none is
// set.
QueueCapacity int
// Min and max number of bytes to fetch from kafka in each request.
MinBytes int
MaxBytes int
// Maximum amount of time to wait for new data to come when fetching batches
// of messages from kafka.
MaxWait time.Duration
// ReadLagInterval sets the frequency at which the reader lag is updated.
// Setting this field to a negative value disables lag reporting.
ReadLagInterval time.Duration
// GroupBalancers is the priority-ordered list of client-side consumer group
// balancing strategies that will be offered to the coordinator. The first
// strategy that all group members support will be chosen by the leader.
//
// Default: [Range, RoundRobin]
//
// Only used when GroupID is set
GroupBalancers []GroupBalancer
// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
// group heartbeat update.
//
// Default: 3s
//
// Only used when GroupID is set
HeartbeatInterval time.Duration
// CommitInterval indicates the interval at which offsets are committed to
// the broker. If 0, commits will be handled synchronously.
//
// Default: 0
//
// Only used when GroupID is set
CommitInterval time.Duration
// PartitionWatchInterval indicates how often a reader checks for partition changes.
// If a reader sees a partition change (such as a partition add) it will rebalance the group
// picking up new partitions.
//
// Default: 5s
//
// Only used when GroupID is set and WatchPartitionChanges is set.
PartitionWatchInterval time.Duration
// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
// polling the brokers and rebalancing if any partition changes happen to the topic.
WatchPartitionChanges bool
// SessionTimeout optionally sets the length of time that may pass without a heartbeat
// before the coordinator considers the consumer dead and initiates a rebalance.
//
// Default: 30s
//
// Only used when GroupID is set
SessionTimeout time.Duration
// RebalanceTimeout optionally sets the length of time the coordinator will wait
// for members to join as part of a rebalance. For kafka servers under higher
// load, it may be useful to set this value higher.
//
// Default: 30s
//
// Only used when GroupID is set
RebalanceTimeout time.Duration
// JoinGroupBackoff optionally sets the length of time to wait between re-joining
// the consumer group after an error.
//
// Default: 5s
JoinGroupBackoff time.Duration
// RetentionTime optionally sets the length of time the consumer group will be saved
// by the broker
//
// Default: 24h
//
// Only used when GroupID is set
RetentionTime time.Duration
// StartOffset determines from whence the consumer group should begin
// consuming when it finds a partition without a committed offset. If
// non-zero, it must be set to one of FirstOffset or LastOffset.
//
// Default: FirstOffset
//
// Only used when GroupID is set
StartOffset int64
// BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
// polling for new messages
//
// Default: 100ms
ReadBackoffMin time.Duration
// BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
// polling for new messages
//
// Default: 1s
ReadBackoffMax time.Duration
// If not nil, specifies a logger used to report internal changes within the
// reader.
Logger Logger
// ErrorLogger is the logger used to report errors. If nil, the reader falls
// back to using Logger instead.
ErrorLogger Logger
// IsolationLevel controls the visibility of transactional records.
// ReadUncommitted makes all records visible. With ReadCommitted only
// non-transactional and committed records are visible.
IsolationLevel IsolationLevel
// Limit of how many attempts will be made before delivering the error.
//
// The default is to try 3 times.
MaxAttempts int
}
// Validate method validates ReaderConfig properties.
func (config *ReaderConfig) Validate() error {
if len(config.Brokers) == 0 {
return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
}
if len(config.Topic) == 0 {
return errors.New("cannot create a new kafka reader with an empty topic")
}
if config.Partition < 0 || config.Partition >= math.MaxInt32 {
return errors.New(fmt.Sprintf("partition number out of bounds: %d", config.Partition))
}
if config.MinBytes < 0 {
return errors.New(fmt.Sprintf("invalid negative minimum batch size (min = %d)", config.MinBytes))
}
if config.MaxBytes < 0 {
return errors.New(fmt.Sprintf("invalid negative maximum batch size (max = %d)", config.MaxBytes))
}
if config.GroupID != "" && config.Partition != 0 {
return errors.New("either Partition or GroupID may be specified, but not both")
}
if config.MinBytes > config.MaxBytes {
return errors.New(fmt.Sprintf("minimum batch size greater than the maximum (min = %d, max = %d)", config.MinBytes, config.MaxBytes))
}
if config.ReadBackoffMax < 0 {
return errors.New(fmt.Sprintf("ReadBackoffMax out of bounds: %d", config.ReadBackoffMax))
}
if config.ReadBackoffMin < 0 {
return errors.New(fmt.Sprintf("ReadBackoffMin out of bounds: %d", config.ReadBackoffMin))
}
return nil
}
// ReaderStats is a data structure returned by a call to Reader.Stats that exposes
// details about the behavior of the reader.
type ReaderStats struct {
Dials int64 `metric:"kafka.reader.dial.count" type:"counter"`
Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"`
Messages int64 `metric:"kafka.reader.message.count" type:"counter"`
Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"`
Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"`
Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"`
Errors int64 `metric:"kafka.reader.error.count" type:"counter"`
DialTime DurationStats `metric:"kafka.reader.dial.seconds"`
ReadTime DurationStats `metric:"kafka.reader.read.seconds"`
WaitTime DurationStats `metric:"kafka.reader.wait.seconds"`
FetchSize SummaryStats `metric:"kafka.reader.fetch.size"`
FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"`
Offset int64 `metric:"kafka.reader.offset" type:"gauge"`
Lag int64 `metric:"kafka.reader.lag" type:"gauge"`
MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`
MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`
MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"`
QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"`
QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"`
ClientID string `tag:"client_id"`
Topic string `tag:"topic"`
Partition string `tag:"partition"`
// The original `Fetches` field had a typo where the metric name was called
// "kafak..." instead of "kafka...", in order to offer time to fix monitors
// that may be relying on this mistake we are temporarily introducing this
// field.
DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"`
}
// readerStats is a struct that contains statistics on a reader.
type readerStats struct {
dials counter
fetches counter
messages counter
bytes counter
rebalances counter
timeouts counter
errors counter
dialTime summary
readTime summary
waitTime summary
fetchSize summary
fetchBytes summary
offset gauge
lag gauge
partition string
}
// NewReader creates and returns a new Reader configured with config.
// The offset is initialized to FirstOffset.
func NewReader(config ReaderConfig) *Reader {
if err := config.Validate(); err != nil {
panic(err)
}
if config.GroupID != "" {
if len(config.GroupBalancers) == 0 {
config.GroupBalancers = []GroupBalancer{
RangeGroupBalancer{},
RoundRobinGroupBalancer{},
}
}
}
if config.Dialer == nil {
config.Dialer = DefaultDialer
}
if config.MaxBytes == 0 {
config.MaxBytes = 1e6 // 1 MB
}
if config.MinBytes == 0 {
config.MinBytes = config.MaxBytes
}
if config.MaxWait == 0 {
config.MaxWait = 10 * time.Second
}
if config.ReadLagInterval == 0 {
config.ReadLagInterval = 1 * time.Minute
}
if config.ReadBackoffMin == 0 {
config.ReadBackoffMin = defaultReadBackoffMin
}
if config.ReadBackoffMax == 0 {
config.ReadBackoffMax = defaultReadBackoffMax
}
if config.ReadBackoffMax < config.ReadBackoffMin {
panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin))
}
if config.QueueCapacity == 0 {
config.QueueCapacity = 100
}
if config.MaxAttempts == 0 {
config.MaxAttempts = 3
}
// when configured as a consumer group; stats should report a partition of -1
readerStatsPartition := config.Partition
if config.GroupID != "" {
readerStatsPartition = -1
}
// when configured as a consume group, start version as 1 to ensure that only
// the rebalance function will start readers
version := int64(0)
if config.GroupID != "" {
version = 1
}
stctx, stop := context.WithCancel(context.Background())
r := &Reader{
config: config,
msgs: make(chan readerMessage, config.QueueCapacity),
cancel: func() {},
commits: make(chan commitRequest, config.QueueCapacity),
stop: stop,
offset: FirstOffset,
stctx: stctx,
stats: &readerStats{
dialTime: makeSummary(),
readTime: makeSummary(),
waitTime: makeSummary(),
fetchSize: makeSummary(),
fetchBytes: makeSummary(),
// Generate the string representation of the partition number only
// once when the reader is created.
partition: strconv.Itoa(readerStatsPartition),
},
version: version,
}
if r.useConsumerGroup() {
r.done = make(chan struct{})
cg, err := NewConsumerGroup(ConsumerGroupConfig{
ID: r.config.GroupID,
Brokers: r.config.Brokers,
Dialer: r.config.Dialer,
Topics: []string{r.config.Topic},
GroupBalancers: r.config.GroupBalancers,
HeartbeatInterval: r.config.HeartbeatInterval,
PartitionWatchInterval: r.config.PartitionWatchInterval,
WatchPartitionChanges: r.config.WatchPartitionChanges,
SessionTimeout: r.config.SessionTimeout,
RebalanceTimeout: r.config.RebalanceTimeout,
JoinGroupBackoff: r.config.JoinGroupBackoff,
RetentionTime: r.config.RetentionTime,
StartOffset: r.config.StartOffset,
Logger: r.config.Logger,
ErrorLogger: r.config.ErrorLogger,
})
if err != nil {
panic(err)
}
go r.run(cg)
}
return r
}
// Config returns the reader's configuration.
func (r *Reader) Config() ReaderConfig {
return r.config
}
// Close closes the stream, preventing the program from reading any more
// messages from it.
func (r *Reader) Close() error {
atomic.StoreUint32(&r.once, 1)
r.mutex.Lock()
closed := r.closed
r.closed = true
r.mutex.Unlock()
r.cancel()
r.stop()
r.join.Wait()
if r.done != nil {
<-r.done
}
if !closed {
close(r.msgs)
}
return nil
}
// ReadMessage reads and return the next message from the r. The method call
// blocks until a message becomes available, or an error occurs. The program
// may also specify a context to asynchronously cancel the blocking operation.
//
// The method returns io.EOF to indicate that the reader has been closed.
//
// If consumer groups are used, ReadMessage will automatically commit the
// offset when called.
func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
m, err := r.FetchMessage(ctx)
if err != nil {
return Message{}, err
}
if r.useConsumerGroup() {
if err := r.CommitMessages(ctx, m); err != nil {
return Message{}, err
}
}
return m, nil
}
// FetchMessage reads and return the next message from the r. The method call
// blocks until a message becomes available, or an error occurs. The program
// may also specify a context to asynchronously cancel the blocking operation.
//
// The method returns io.EOF to indicate that the reader has been closed.
//
// FetchMessage does not commit offsets automatically when using consumer groups.
// Use CommitMessages to commit the offset.
func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
r.activateReadLag()
for {
r.mutex.Lock()
if !r.closed && r.version == 0 {
r.start(map[int]int64{r.config.Partition: r.offset})
}
version := r.version
r.mutex.Unlock()
select {
case <-ctx.Done():
return Message{}, ctx.Err()
case m, ok := <-r.msgs:
if !ok {
return Message{}, io.EOF
}
if m.version >= version {
r.mutex.Lock()
switch {
case m.error != nil:
case version == r.version:
r.offset = m.message.Offset + 1
r.lag = m.watermark - r.offset
}
r.mutex.Unlock()
switch m.error {
case nil:
case io.EOF:
// io.EOF is used as a marker to indicate that the stream
// has been closed, in case it was received from the inner
// reader we don't want to confuse the program and replace
// the error with io.ErrUnexpectedEOF.
m.error = io.ErrUnexpectedEOF
}
return m.message, m.error
}
}
}
}
// CommitMessages commits the list of messages passed as argument. The program
// may pass a context to asynchronously cancel the commit operation when it was
// configured to be blocking.
func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
if !r.useConsumerGroup() {
return errOnlyAvailableWithGroup
}
var errch <-chan error
var creq = commitRequest{
commits: makeCommits(msgs...),
}
if r.useSyncCommits() {
ch := make(chan error, 1)
errch, creq.errch = ch, ch
}
select {
case r.commits <- creq:
case <-ctx.Done():
return ctx.Err()
case <-r.stctx.Done():
// This context is used to ensure we don't allow commits after the
// reader was closed.
return io.ErrClosedPipe
}
if !r.useSyncCommits() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errch:
return err
}
}
// ReadLag returns the current lag of the reader by fetching the last offset of
// the topic and partition and computing the difference between that value and
// the offset of the last message returned by ReadMessage.
//
// This method is intended to be used in cases where a program may be unable to
// call ReadMessage to update the value returned by Lag, but still needs to get
// an up to date estimation of how far behind the reader is. For example when
// the consumer is not ready to process the next message.
//
// The function returns a lag of zero when the reader's current offset is
// negative.
func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) {
if r.useConsumerGroup() {
return 0, errNotAvailableWithGroup
}
type offsets struct {
first int64
last int64
}
offch := make(chan offsets, 1)
errch := make(chan error, 1)
go func() {
var off offsets
var err error
for _, broker := range r.config.Brokers {
var conn *Conn
if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil {
continue
}
deadline, _ := ctx.Deadline()
conn.SetDeadline(deadline)
off.first, off.last, err = conn.ReadOffsets()
conn.Close()
if err == nil {
break
}
}
if err != nil {
errch <- err
} else {
offch <- off
}
}()
select {
case off := <-offch:
switch cur := r.Offset(); {
case cur == FirstOffset:
lag = off.last - off.first
case cur == LastOffset:
lag = 0
default:
lag = off.last - cur
}
case err = <-errch:
case <-ctx.Done():
err = ctx.Err()
}
return
}
// Offset returns the current absolute offset of the reader, or -1
// if r is backed by a consumer group.
func (r *Reader) Offset() int64 {
if r.useConsumerGroup() {
return -1
}
r.mutex.Lock()
offset := r.offset
r.mutex.Unlock()
r.withLogger(func(log Logger) {
log.Printf("looking up offset of kafka reader for partition %d of %s: %d", r.config.Partition, r.config.Topic, offset)
})
return offset
}
// Lag returns the lag of the last message returned by ReadMessage, or -1
// if r is backed by a consumer group.
func (r *Reader) Lag() int64 {
if r.useConsumerGroup() {
return -1
}
r.mutex.Lock()
lag := r.lag
r.mutex.Unlock()
return lag
}
// SetOffset changes the offset from which the next batch of messages will be
// read. The method fails with io.ErrClosedPipe if the reader has already been closed.
//
// From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first
// or last available offset in the partition. Please note while -1 and -2 were accepted
// to indicate the first or last offset in previous versions, the meanings of the numbers
// were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol
// specification.
func (r *Reader) SetOffset(offset int64) error {
if r.useConsumerGroup() {
return errNotAvailableWithGroup
}
var err error
r.mutex.Lock()
if r.closed {
err = io.ErrClosedPipe
} else if offset != r.offset {
r.withLogger(func(log Logger) {
log.Printf("setting the offset of the kafka reader for partition %d of %s from %d to %d",
r.config.Partition, r.config.Topic, r.offset, offset)
})
r.offset = offset
if r.version != 0 {
r.start(map[int]int64{r.config.Partition: r.offset})
}
r.activateReadLag()
}
r.mutex.Unlock()
return err
}
// SetOffsetAt changes the offset from which the next batch of messages will be
// read given the timestamp t.
//
// The method fails if the unable to connect partition leader, or unable to read the offset
// given the ts, or if the reader has been closed.
func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
r.mutex.Lock()
if r.closed {
r.mutex.Unlock()
return io.ErrClosedPipe
}
r.mutex.Unlock()
for _, broker := range r.config.Brokers {
conn, err := r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
if err != nil {
continue
}
deadline, _ := ctx.Deadline()
conn.SetDeadline(deadline)
offset, err := conn.ReadOffset(t)
conn.Close()
if err != nil {
return err
}
return r.SetOffset(offset)
}
return fmt.Errorf("error setting offset for timestamp %+v", t)
}
// Stats returns a snapshot of the reader stats since the last time the method
// was called, or since the reader was created if it is called for the first
// time.
//
// A typical use of this method is to spawn a goroutine that will periodically
// call Stats on a kafka reader and report the metrics to a stats collection
// system.
func (r *Reader) Stats() ReaderStats {
stats := ReaderStats{
Dials: r.stats.dials.snapshot(),
Fetches: r.stats.fetches.snapshot(),
Messages: r.stats.messages.snapshot(),
Bytes: r.stats.bytes.snapshot(),
Rebalances: r.stats.rebalances.snapshot(),
Timeouts: r.stats.timeouts.snapshot(),
Errors: r.stats.errors.snapshot(),
DialTime: r.stats.dialTime.snapshotDuration(),
ReadTime: r.stats.readTime.snapshotDuration(),
WaitTime: r.stats.waitTime.snapshotDuration(),
FetchSize: r.stats.fetchSize.snapshot(),
FetchBytes: r.stats.fetchBytes.snapshot(),
Offset: r.stats.offset.snapshot(),
Lag: r.stats.lag.snapshot(),
MinBytes: int64(r.config.MinBytes),
MaxBytes: int64(r.config.MaxBytes),
MaxWait: r.config.MaxWait,
QueueLength: int64(len(r.msgs)),
QueueCapacity: int64(cap(r.msgs)),
ClientID: r.config.Dialer.ClientID,
Topic: r.config.Topic,
Partition: r.stats.partition,
}
// TODO: remove when we get rid of the deprecated field.
stats.DeprecatedFetchesWithTypo = stats.Fetches
return stats
}
func (r *Reader) withLogger(do func(Logger)) {
if r.config.Logger != nil {
do(r.config.Logger)
}
}
func (r *Reader) withErrorLogger(do func(Logger)) {
if r.config.ErrorLogger != nil {
do(r.config.ErrorLogger)
} else {
r.withLogger(do)
}
}
func (r *Reader) activateReadLag() {
if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) {
// read lag will only be calculated when not using consumer groups
// todo discuss how capturing read lag should interact with rebalancing
if !r.useConsumerGroup() {
go r.readLag(r.stctx)
}
}
}
func (r *Reader) readLag(ctx context.Context) {
ticker := time.NewTicker(r.config.ReadLagInterval)
defer ticker.Stop()
for {
timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2)
lag, err := r.ReadLag(timeout)
cancel()
if err != nil {
r.stats.errors.observe(1)
r.withErrorLogger(func(log Logger) {
log.Printf("kafka reader failed to read lag of partition %d of %s", r.config.Partition, r.config.Topic)
})
} else {
r.stats.lag.observe(lag)
}
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}
func (r *Reader) start(offsetsByPartition map[int]int64) {
if r.closed {
// don't start child reader if parent Reader is closed
return
}
ctx, cancel := context.WithCancel(context.Background())
r.cancel() // always cancel the previous reader
r.cancel = cancel
r.version++
r.join.Add(len(offsetsByPartition))
for partition, offset := range offsetsByPartition {
go func(ctx context.Context, partition int, offset int64, join *sync.WaitGroup) {
defer join.Done()
(&reader{
dialer: r.config.Dialer,
logger: r.config.Logger,
errorLogger: r.config.ErrorLogger,
brokers: r.config.Brokers,
topic: r.config.Topic,
partition: partition,
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
backoffDelayMin: r.config.ReadBackoffMin,
backoffDelayMax: r.config.ReadBackoffMax,
version: r.version,
msgs: r.msgs,
stats: r.stats,
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,
}).run(ctx, offset)
}(ctx, partition, offset, &r.join)
}
}
// A reader reads messages from kafka and produces them on its channels, it's
// used as an way to asynchronously fetch messages while the main program reads
// them using the high level reader API.
type reader struct {
dialer *Dialer
logger Logger
errorLogger Logger
brokers []string
topic string
partition int
minBytes int
maxBytes int
maxWait time.Duration
backoffDelayMin time.Duration
backoffDelayMax time.Duration
version int64
msgs chan<- readerMessage
stats *readerStats
isolationLevel IsolationLevel
maxAttempts int
}
type readerMessage struct {
version int64
message Message
watermark int64
error error
}
func (r *reader) run(ctx context.Context, offset int64) {
// This is the reader's main loop, it only ends if the context is canceled
// and will keep attempting to reader messages otherwise.
//
// Retrying indefinitely has the nice side effect of preventing Read calls
// on the parent reader to block if connection to the kafka server fails,
// the reader keeps reporting errors on the error channel which will then
// be surfaced to the program.
// If the reader wasn't retrying then the program would block indefinitely
// on a Read call after reading the first error.
for attempt := 0; true; attempt++ {
if attempt != 0 {
if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) {
return
}
}
r.withLogger(func(log Logger) {
log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, offset)
})
conn, start, err := r.initialize(ctx, offset)
switch err {
case nil:
case OffsetOutOfRange:
// This would happen if the requested offset is passed the last
// offset on the partition leader. In that case we're just going
// to retry later hoping that enough data has been produced.
r.withErrorLogger(func(log Logger) {
log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, OffsetOutOfRange)
})
continue
default:
// Wait 4 attempts before reporting the first errors, this helps
// mitigate situations where the kafka server is temporarily
// unavailable.
if attempt >= r.maxAttempts {
r.sendError(ctx, err)
} else {
r.stats.errors.observe(1)
r.withErrorLogger(func(log Logger) {
log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
})
}
continue
}
// Resetting the attempt counter ensures that if a failure occurs after
// a successful initialization we don't keep increasing the backoff
// timeout.
attempt = 0
// Now we're sure to have an absolute offset number, may anything happen
// to the connection we know we'll want to restart from this offset.
offset = start
errcount := 0
readLoop:
for {
if !sleep(ctx, backoff(errcount, r.backoffDelayMin, r.backoffDelayMax)) {
conn.Close()
return
}
switch offset, err = r.read(ctx, offset, conn); err {
case nil:
errcount = 0
case io.EOF:
// done with this batch of messages...carry on. note that this
// block relies on the batch repackaging real io.EOF errors as
// io.UnexpectedEOF. otherwise, we would end up swallowing real
// errors here.
break readLoop
case UnknownTopicOrPartition:
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers)
})
conn.Close()
// The next call to .initialize will re-establish a connection to the proper
// topic/partition broker combo.
r.stats.rebalances.observe(1)
break readLoop
case NotLeaderForPartition:
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, offset)
})
conn.Close()
// The next call to .initialize will re-establish a connection to the proper
// partition leader.
r.stats.rebalances.observe(1)
break readLoop
case RequestTimedOut:
// Timeout on the kafka side, this can be safely retried.
errcount = 0
r.withLogger(func(log Logger) {
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, offset)
})
r.stats.timeouts.observe(1)
continue
case OffsetOutOfRange:
first, last, err := r.readOffsets(conn)
if err != nil {
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err)
})
conn.Close()
break readLoop
}
switch {
case offset < first:
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, offset, first, first-offset)
})
offset, errcount = first, 0
continue // retry immediately so we don't keep falling behind due to the backoff
case offset < last:
errcount = 0
continue // more messages have already become available, retry immediately
default:
// We may be reading past the last offset, will retry later.
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, offset)
})
}
case context.Canceled:
// Another reader has taken over, we can safely quit.
conn.Close()
return
case errUnknownCodec:
// The compression codec is either unsupported or has not been
// imported. This is a fatal error b/c the reader cannot
// proceed.
r.sendError(ctx, err)
break readLoop
default:
if _, ok := err.(Error); ok {
r.sendError(ctx, err)
} else {
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, offset, err)
})
r.stats.errors.observe(1)
conn.Close()
break readLoop
}
}
errcount++
}
}
}
func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
for i := 0; i != len(r.brokers) && conn == nil; i++ {
var broker = r.brokers[i]
var first, last int64
t0 := time.Now()
conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition)
t1 := time.Now()
r.stats.dials.observe(1)
r.stats.dialTime.observeDuration(t1.Sub(t0))
if err != nil {
continue
}
if first, last, err = r.readOffsets(conn); err != nil {
conn.Close()
conn = nil
break
}
switch {
case offset == FirstOffset:
offset = first
case offset == LastOffset:
offset = last
case offset < first:
offset = first
}
r.withLogger(func(log Logger) {
log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, offset)
})
if start, err = conn.Seek(offset, SeekAbsolute); err != nil {
conn.Close()
conn = nil
break
}
conn.SetDeadline(time.Time{})
}
return
}
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
r.stats.fetches.observe(1)
r.stats.offset.observe(offset)
t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))
batch := conn.ReadBatchWith(ReadBatchConfig{
MinBytes: r.minBytes,
MaxBytes: r.maxBytes,
IsolationLevel: r.isolationLevel,
})
highWaterMark := batch.HighWaterMark()
t1 := time.Now()
r.stats.waitTime.observeDuration(t1.Sub(t0))
var msg Message
var err error
var size int64
var bytes int64
const safetyTimeout = 10 * time.Second
deadline := time.Now().Add(safetyTimeout)
conn.SetReadDeadline(deadline)
for {
if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
deadline = now.Add(safetyTimeout)
conn.SetReadDeadline(deadline)
}
if msg, err = batch.ReadMessage(); err != nil {
batch.Close()
break
}
n := int64(len(msg.Key) + len(msg.Value))
r.stats.messages.observe(1)
r.stats.bytes.observe(n)
if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
batch.Close()
break
}
offset = msg.Offset + 1
r.stats.offset.observe(offset)
r.stats.lag.observe(highWaterMark - offset)
size++
bytes += n
}
conn.SetReadDeadline(time.Time{})
t2 := time.Now()
r.stats.readTime.observeDuration(t2.Sub(t1))
r.stats.fetchSize.observe(size)
r.stats.fetchBytes.observe(bytes)
return offset, err
}
func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
conn.SetDeadline(time.Now().Add(10 * time.Second))
return conn.ReadOffsets()
}
func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error {
select {
case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (r *reader) sendError(ctx context.Context, err error) error {
select {
case r.msgs <- readerMessage{version: r.version, error: err}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (r *reader) withLogger(do func(Logger)) {
if r.logger != nil {
do(r.logger)
}
}
func (r *reader) withErrorLogger(do func(Logger)) {
if r.errorLogger != nil {
do(r.errorLogger)
} else {
r.withLogger(do)
}
}
// extractTopics returns the unique list of topics represented by the set of
// provided members
func extractTopics(members []GroupMember) []string {
var visited = map[string]struct{}{}
var topics []string
for _, member := range members {
for _, topic := range member.Topics {
if _, seen := visited[topic]; seen {
continue
}
topics = append(topics, topic)
visited[topic] = struct{}{}
}
}
sort.Strings(topics)
return topics
}
You can’t perform that action at this time.