Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use exponential retry strategy #271

Merged
merged 2 commits into from Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,6 +1,7 @@
## 2.5.0 [in progress]
### Features
- [#264](https://github.com/influxdata/influxdb-client-go/pull/264) Synced generated server API with the latest [oss.yml](https://github.com/influxdata/openapi/blob/master/contracts/oss.yml).
- [#271](https://github.com/influxdata/influxdb-client-go/pull/271) Use exponential _random_ retry strategy

### Bug fixes
- [#270](https://github.com/influxdata/influxdb-client-go/pull/270) Fixed duplicate `Content-Type` header in requests to managemet API
Expand Down
2 changes: 1 addition & 1 deletion api/write.go
Expand Up @@ -146,7 +146,7 @@ x:
func (w *WriteAPIImpl) flushBuffer() {
if len(w.writeBuffer) > 0 {
log.Info("sending batch")
batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.RetryInterval())
batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.RetryInterval(), w.writeOptions.MaxRetryTime())
w.writeCh <- batch
w.writeBuffer = w.writeBuffer[:0]
}
Expand Down
49 changes: 38 additions & 11 deletions api/write/options.go
Expand Up @@ -21,14 +21,18 @@ type Options struct {
useGZip bool
// Tags added to each point during writing. If a point already has a tag with the same key, it is left unchanged.
defaultTags map[string]string
// Default retry interval in ms, if not sent by server. Default 5,000ms
// Default retry interval in ms, if not sent by server. Default 5,000.
retryInterval uint
// Maximum count of retry attempts of failed writes
// Maximum count of retry attempts of failed writes, default 5.
maxRetries uint
// Maximum number of points to keep for retry. Should be multiple of BatchSize. Default 50,000
// Maximum number of points to keep for retry. Should be multiple of BatchSize. Default 50,000.
retryBufferLimit uint
// Maximum retry interval, default 5min (300,000ms)
// The maximum delay between each retry attempt in milliseconds, default 125,000.
maxRetryInterval uint
// The maximum total retry timeout in millisecond, default 180,000.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find a test herein that proves that it is a maximum retry time since the first retry attempt ... is it really so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is intended to test that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is IMHO testing maxRetryInterval, I was looking for a test (or proof) that maxRetryTime works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, this test should test that

Copy link
Contributor

@sranka sranka Aug 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is really difficult to understand as it is not obvious that a second srv.HandleWrite is required to return an error from the previously inserted expired batch ... a comment might have helped me to understand it. Anyway, It is not clear from the code that the second WriteBatch is actually tried immediately, IMHO it is not. Higher knowledge of the execution context is required even for the test. A test that would match the user expectation is something that was looking for, simply checking that an error is signalized after maxRetryTime is reached and the expired retried item is removed from the retry queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for someone unfamiliar with how the write service works this seems complicated. I added more comments to improve readability.
One thing that is not obvious, and is that retries are not scheduled to be sent automatically. Retries are triggered by new writes.

Copy link
Contributor

@sranka sranka Aug 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also confused by the fact that WriteBatch can fail even without trying to write the data on input. It was this way before, so it is not in the scope of this PR. Thank you for your explanation.

maxRetryTime uint
// The base for the exponential retry delay
exponentialBase uint
}

// BatchSize returns size of batch
Expand All @@ -53,18 +57,18 @@ func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options {
return o
}

// RetryInterval returns the retry interval in ms
// RetryInterval returns the default retry interval in ms, if not sent by server. Default 5,000.
func (o *Options) RetryInterval() uint {
return o.retryInterval
}

// SetRetryInterval sets retry interval in ms, which is set if not sent by server
// SetRetryInterval sets the time to wait before retry unsuccessful write in ms, if not sent by server
func (o *Options) SetRetryInterval(retryIntervalMs uint) *Options {
o.retryInterval = retryIntervalMs
return o
}

// MaxRetries returns maximum count of retry attempts of failed writes
// MaxRetries returns maximum count of retry attempts of failed writes, default 5.
func (o *Options) MaxRetries() uint {
return o.maxRetries
}
Expand All @@ -76,7 +80,7 @@ func (o *Options) SetMaxRetries(maxRetries uint) *Options {
return o
}

// RetryBufferLimit returns retry buffer limit
// RetryBufferLimit returns retry buffer limit.
func (o *Options) RetryBufferLimit() uint {
return o.retryBufferLimit
}
Expand All @@ -87,17 +91,39 @@ func (o *Options) SetRetryBufferLimit(retryBufferLimit uint) *Options {
return o
}

// MaxRetryInterval return maximum retry interval in ms. Default 5min.
// MaxRetryInterval returns the maximum delay between each retry attempt in milliseconds, default 125,000.
func (o *Options) MaxRetryInterval() uint {
return o.maxRetryInterval
}

// SetMaxRetryInterval set maximum retry interval in ms
// SetMaxRetryInterval sets the maximum delay between each retry attempt in millisecond
func (o *Options) SetMaxRetryInterval(maxRetryIntervalMs uint) *Options {
o.maxRetryInterval = maxRetryIntervalMs
return o
}

// MaxRetryTime returns the maximum total retry timeout in millisecond, default 180,000.
func (o *Options) MaxRetryTime() uint {
return o.maxRetryTime
}

// SetMaxRetryTime sets the maximum total retry timeout in millisecond.
func (o *Options) SetMaxRetryTime(maxRetryTimeMs uint) *Options {
o.maxRetryTime = maxRetryTimeMs
return o
}

// ExponentialBase returns the base for the exponential retry delay. Default 2.
func (o *Options) ExponentialBase() uint {
return o.exponentialBase
}

// SetExponentialBase sets the base for the exponential retry delay.
func (o *Options) SetExponentialBase(retryExponentialBase uint) *Options {
o.exponentialBase = retryExponentialBase
return o
}

// Precision returns time precision for writes
func (o *Options) Precision() time.Duration {
return o.precision
Expand Down Expand Up @@ -138,5 +164,6 @@ func (o *Options) DefaultTags() map[string]string {

// DefaultOptions returns Options object with default values
func DefaultOptions() *Options {
return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 5000, maxRetryInterval: 300000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 50000, defaultTags: make(map[string]string)}
return &Options{batchSize: 5_000, flushInterval: 1_000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 50_000, defaultTags: make(map[string]string),
maxRetries: 5, retryInterval: 5_000, maxRetryInterval: 125_000, maxRetryTime: 180_000, exponentialBase: 2}
}
44 changes: 25 additions & 19 deletions api/write/options_test.go
Expand Up @@ -14,36 +14,42 @@ import (

func TestDefaultOptions(t *testing.T) {
opts := write.DefaultOptions()
assert.Equal(t, uint(5000), opts.BatchSize())
assert.Equal(t, false, opts.UseGZip())
assert.Equal(t, uint(1000), opts.FlushInterval())
assert.Equal(t, time.Nanosecond, opts.Precision())
assert.Equal(t, uint(50000), opts.RetryBufferLimit())
assert.Equal(t, uint(5000), opts.RetryInterval())
assert.Equal(t, uint(3), opts.MaxRetries())
assert.Equal(t, uint(300000), opts.MaxRetryInterval())
assert.EqualValues(t, 5_000, opts.BatchSize())
assert.EqualValues(t, false, opts.UseGZip())
assert.EqualValues(t, 1_000, opts.FlushInterval())
assert.EqualValues(t, time.Nanosecond, opts.Precision())
assert.EqualValues(t, 50_000, opts.RetryBufferLimit())
assert.EqualValues(t, 5_000, opts.RetryInterval())
assert.EqualValues(t, 5, opts.MaxRetries())
assert.EqualValues(t, 125_000, opts.MaxRetryInterval())
assert.EqualValues(t, 180_000, opts.MaxRetryTime())
assert.EqualValues(t, 2, opts.ExponentialBase())
assert.Len(t, opts.DefaultTags(), 0)
}

func TestSettingsOptions(t *testing.T) {
opts := write.DefaultOptions().
SetBatchSize(5).
SetUseGZip(true).
SetFlushInterval(5000).
SetFlushInterval(5_000).
SetPrecision(time.Millisecond).
SetRetryBufferLimit(5).
SetRetryInterval(1000).
SetRetryInterval(1_000).
SetMaxRetries(7).
SetMaxRetryInterval(150000).
SetMaxRetryInterval(150_000).
SetExponentialBase(3).
SetMaxRetryTime(200_000).
AddDefaultTag("a", "1").
AddDefaultTag("b", "2")
assert.Equal(t, uint(5), opts.BatchSize())
assert.Equal(t, true, opts.UseGZip())
assert.Equal(t, uint(5000), opts.FlushInterval())
assert.Equal(t, time.Millisecond, opts.Precision())
assert.Equal(t, uint(5), opts.RetryBufferLimit())
assert.Equal(t, uint(1000), opts.RetryInterval())
assert.Equal(t, uint(7), opts.MaxRetries())
assert.Equal(t, uint(150000), opts.MaxRetryInterval())
assert.EqualValues(t, 5, opts.BatchSize())
assert.EqualValues(t, true, opts.UseGZip())
assert.EqualValues(t, 5000, opts.FlushInterval())
assert.EqualValues(t, time.Millisecond, opts.Precision())
assert.EqualValues(t, 5, opts.RetryBufferLimit())
assert.EqualValues(t, 1000, opts.RetryInterval())
assert.EqualValues(t, 7, opts.MaxRetries())
assert.EqualValues(t, 150_000, opts.MaxRetryInterval())
assert.EqualValues(t, 200_000, opts.MaxRetryTime())
assert.EqualValues(t, 3, opts.ExponentialBase())
assert.Len(t, opts.DefaultTags(), 2)
}
2 changes: 1 addition & 1 deletion api/writeAPIBlocking.go
Expand Up @@ -72,7 +72,7 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write
}

func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval()))
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval(), w.writeOptions.MaxRetryTime()))
if err != nil {
return err.Unwrap()
}
Expand Down
75 changes: 55 additions & 20 deletions internal/write/service.go
Expand Up @@ -8,7 +8,9 @@ package write
import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"sort"
Expand All @@ -26,31 +28,37 @@ import (

// Batch holds information for sending points batch
type Batch struct {
batch string
retryDelay uint
// lines to send
batch string
// current retry delay
retryDelay uint
// retry attempts so far
retryAttempts uint
evicted bool
// true if it was removed from queue
evicted bool
// time where this batch expires
expires time.Time
}

// NewBatch creates new batch
func NewBatch(data string, retryDelay uint) *Batch {
func NewBatch(data string, retryDelay uint, expireDelayMs uint) *Batch {
return &Batch{
batch: data,
retryDelay: retryDelay,
expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond),
}
}

// Service is responsible for reliable writing of batches
type Service struct {
org string
bucket string
httpService http2.Service
url string
lastWriteAttempt time.Time
retryQueue *queue
lock sync.Mutex
writeOptions *write.Options
retryExponentialBase uint
org string
bucket string
httpService http2.Service
url string
lastWriteAttempt time.Time
retryQueue *queue
lock sync.Mutex
writeOptions *write.Options
}

// NewService creates new write service
Expand All @@ -68,7 +76,7 @@ func NewService(org string, bucket string, httpService http2.Service, options *w
params.Set("precision", precisionToString(options.Precision()))
u.RawQuery = params.Encode()
writeURL := u.String()
return &Service{org: org, bucket: bucket, httpService: httpService, url: writeURL, writeOptions: options, retryQueue: newQueue(int(retryBufferLimit)), retryExponentialBase: 5}
return &Service{org: org, bucket: bucket, httpService: httpService, url: writeURL, writeOptions: options, retryQueue: newQueue(int(retryBufferLimit))}
}

// HandleWrite handles writes batches and handles retrying
Expand Down Expand Up @@ -111,18 +119,20 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}
// write batch
if batchToWrite != nil {
if time.Now().After(batchToWrite.expires) {
if !batchToWrite.evicted {
w.retryQueue.pop()
}
return fmt.Errorf("write failed (attempts %d): max retry time exceeded", batchToWrite.retryAttempts)
}
perror := w.WriteBatch(ctx, batchToWrite)
if perror != nil {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
log.Errorf("Write error: %s\nBatch kept for retrying\n", perror.Error())
if perror.RetryAfter > 0 {
batchToWrite.retryDelay = perror.RetryAfter * 1000
} else {
exp := uint(1)
for i := uint(0); i < batchToWrite.retryAttempts; i++ {
exp = exp * w.retryExponentialBase
}
batchToWrite.retryDelay = min(w.writeOptions.RetryInterval()*exp, w.writeOptions.MaxRetryInterval())
batchToWrite.retryDelay = w.computeRetryDelay(batchToWrite.retryAttempts)
}
if batchToWrite.retryAttempts == 0 {
if w.retryQueue.push(batch) {
Expand All @@ -137,7 +147,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
} else {
log.Errorf("Write error: %s\n", perror.Error())
}
return perror
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.retryAttempts, perror)
}
if retrying && !batchToWrite.evicted {
w.retryQueue.pop()
Expand All @@ -150,6 +160,31 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
return nil
}

// computeRetryDelay calculates retry delay
// Retry delay is calculated as random value within the interval
// [retry_interval * exponential_base^(attempts) and retry_interval * exponential_base^(attempts+1)]
func (w *Service) computeRetryDelay(attempts uint) uint {
minDelay := int(w.writeOptions.RetryInterval() * pow(w.writeOptions.ExponentialBase(), attempts))
maxDelay := int(w.writeOptions.RetryInterval() * pow(w.writeOptions.ExponentialBase(), attempts+1))
retryDelay := uint(rand.Intn(maxDelay-minDelay) + minDelay)
if retryDelay > w.writeOptions.MaxRetryInterval() {
retryDelay = w.writeOptions.MaxRetryInterval()
}
return retryDelay
}

// pow computes x**y
func pow(x, y uint) uint {
p := uint(1)
if y == 0 {
return 1
}
for i := uint(1); i <= y; i++ {
p = p * x
}
return p
}

// WriteBatch performs actual writing via HTTP service
func (w *Service) WriteBatch(ctx context.Context, batch *Batch) *http2.Error {
var body io.Reader
Expand Down