Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] FetchHeartbeat option for Fetch and FetchBytes #1548

Merged
merged 1 commit into from Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 13 additions & 3 deletions jetstream/consumer.go
Expand Up @@ -52,6 +52,12 @@ type (
// defaults to 30 seconds and can be configured using FetchMaxWait
// option.
//
// By default, Fetch uses a 5s idle heartbeat for requests longer than
// 10 seconds. For shorter requests, the idle heartbeat is disabled.
// This can be configured using FetchHeartbeat option. If a client does
// not receive a heartbeat message from a stream for more than 2 times
// the idle heartbeat setting, Fetch will return [ErrNoHeartbeat].
//
// Fetch is non-blocking and returns MessageBatch, exposing a channel
// for delivered messages.
//
Expand All @@ -65,6 +71,12 @@ type (
// timeout defaults to 30 seconds and can be configured using
// FetchMaxWait option.
//
// By default, FetchBytes uses a 5s idle heartbeat for requests longer than
// 10 seconds. For shorter requests, the idle heartbeat is disabled.
// This can be configured using FetchHeartbeat option. If a client does
// not receive a heartbeat message from a stream for more than 2 times
// the idle heartbeat setting, Fetch will return ErrNoHeartbeat.
//
// FetchBytes is non-blocking and returns MessageBatch, exposing a channel
// for delivered messages.
//
Expand All @@ -75,9 +87,7 @@ type (
// FetchNoWait is used to retrieve up to a provided number of messages
// from a stream. Unlike Fetch, FetchNoWait will only deliver messages
// that are currently available in the stream and will not wait for new
// messages to arrive, even if batch size is not met. FetchNoWait
// timeout defaults to 30 seconds and can be configured using
// FetchMaxWait option.
// messages to arrive, even if batch size is not met.
//
// FetchNoWait is non-blocking and returns MessageBatch, exposing a
// channel for delivered messages.
Expand Down
20 changes: 20 additions & 0 deletions jetstream/jetstream_options.go
Expand Up @@ -260,6 +260,8 @@ func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt {
}

// FetchMaxWait sets custom timeout for fetching predefined batch of messages.
//
// If not provided, a default of 30 seconds will be used.
func FetchMaxWait(timeout time.Duration) FetchOpt {
return func(req *pullRequest) error {
if timeout <= 0 {
Expand All @@ -270,6 +272,24 @@ func FetchMaxWait(timeout time.Duration) FetchOpt {
}
}

// FetchHeartbeat sets custom heartbeat for individual fetch request. If a
// client does not receive a heartbeat message from a stream for more than 2
// times the idle heartbeat setting, Fetch will return [ErrNoHeartbeat].
//
// Heartbeat value has to be lower than FetchMaxWait / 2.
//
// If not provided, heartbeat will is set to 5s for requests with FetchMaxWait > 30s
// and disabled otherwise.
func FetchHeartbeat(hb time.Duration) FetchOpt {
return func(req *pullRequest) error {
if hb <= 0 {
return fmt.Errorf("%w: timeout value must be greater than 0", ErrInvalidOption)
}
req.Heartbeat = hb
return nil
}
}

// WithDeletedDetails can be used to display the information about messages
// deleted from a stream on a stream info request
func WithDeletedDetails(deletedDetails bool) StreamInfoOpt {
Expand Down
48 changes: 35 additions & 13 deletions jetstream/pull.go
Expand Up @@ -729,17 +729,26 @@ func (s *pullSubscription) Drain() {
// It will wait up to provided expiry time if not all messages are available.
func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {
req := &pullRequest{
Batch: batch,
Expires: DefaultExpires,
Batch: batch,
Expires: DefaultExpires,
Heartbeat: unset,
}
for _, opt := range opts {
if err := opt(req); err != nil {
return nil, err
}
}
// for longer pulls, set heartbeat value
if req.Expires >= 10*time.Second {
req.Heartbeat = 5 * time.Second
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
if req.Expires >= 10*time.Second {
req.Heartbeat = 5 * time.Second
} else {
req.Heartbeat = 0
}
}
if req.Expires < 2*req.Heartbeat {
return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
}

return p.fetch(req)
Expand All @@ -748,26 +757,35 @@ func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
// FetchBytes is used to retrieve up to a provided bytes from the stream.
func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) {
req := &pullRequest{
Batch: 1000000,
MaxBytes: maxBytes,
Expires: DefaultExpires,
Batch: 1000000,
MaxBytes: maxBytes,
Expires: DefaultExpires,
Heartbeat: unset,
}
for _, opt := range opts {
if err := opt(req); err != nil {
return nil, err
}
}
// for longer pulls, set heartbeat value
if req.Expires >= 10*time.Second {
req.Heartbeat = 5 * time.Second
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
if req.Expires >= 10*time.Second {
req.Heartbeat = 5 * time.Second
} else {
req.Heartbeat = 0
}
}
if req.Expires < 2*req.Heartbeat {
return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
}

return p.fetch(req)
}

// FetchNoWait sends a single request to retrieve given number of messages.
// If there are any messages available at the time of sending request,
// FetchNoWait will return immediately.
// FetchNoWait will only return messages that are available at the time of the
// request. It will not wait for more messages to arrive.
func (p *pullConsumer) FetchNoWait(batch int) (MessageBatch, error) {
req := &pullRequest{
Batch: batch,
Expand Down Expand Up @@ -842,6 +860,10 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
return
}
p.Unlock()
case err := <-sub.errs:
res.err = err
res.done = true
return
case <-time.After(req.Expires + 1*time.Second):
res.done = true
return
Expand Down