Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: write retry delay #335

Merged
merged 4 commits into from Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/write.go
Expand Up @@ -167,7 +167,7 @@ x:
func (w *WriteAPIImpl) flushBuffer() {
if len(w.writeBuffer) > 0 {
log.Info("sending batch")
batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.RetryInterval(), w.writeOptions.MaxRetryTime())
batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.MaxRetryTime())
w.writeCh <- batch
w.writeBuffer = w.writeBuffer[:0]
}
Expand Down
2 changes: 1 addition & 1 deletion api/writeAPIBlocking.go
Expand Up @@ -73,7 +73,7 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write
}

func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval(), w.writeOptions.MaxRetryTime()))
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime()))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions api/write_test.go
Expand Up @@ -174,7 +174,7 @@ func TestWriteErrorCallback(t *testing.T) {
return retryAttempts < 2
})
points := test.GenPoints(10)
// first two batches will be discarded by callback after 3 write attempts for each
// first batch will be discarded by callback after 3 write attempts, second batch should survive with only one failed attempt
for i, j := 0, 0; i < 6; i++ {
writeAPI.WritePoint(points[i])
writeAPI.waitForFlushing()
Expand All @@ -194,7 +194,7 @@ func TestWriteErrorCallback(t *testing.T) {
writeAPI.WritePoint(points[i])
}
writeAPI.waitForFlushing()
assert.Len(t, service.Lines(), 8)
assert.Len(t, service.Lines(), 9)

writeAPI.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/write/queue_test.go
Expand Up @@ -15,7 +15,7 @@ func TestQueue(t *testing.T) {
assert.True(t, que.isEmpty())
assert.Nil(t, que.first())
assert.Nil(t, que.pop())
b := &Batch{Batch: "batch", RetryDelay: 3, RetryAttempts: 3}
b := &Batch{Batch: "batch", RetryAttempts: 3}
que.push(b)
assert.False(t, que.isEmpty())
b2 := que.pop()
Expand Down
34 changes: 16 additions & 18 deletions internal/write/service.go
Expand Up @@ -30,8 +30,6 @@ import (
type Batch struct {
// lines to send
Batch string
// current retry delay
RetryDelay uint
// retry attempts so far
RetryAttempts uint
// true if it was removed from queue
Expand All @@ -41,11 +39,10 @@ type Batch struct {
}

// NewBatch creates new batch
func NewBatch(data string, retryDelay uint, expireDelayMs uint) *Batch {
func NewBatch(data string, expireDelayMs uint) *Batch {
return &Batch{
Batch: data,
RetryDelay: retryDelay,
Expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond),
Batch: data,
Expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond),
}
}

Expand All @@ -65,6 +62,8 @@ type Service struct {
writeOptions *write.Options
retryExponentialBase uint
errorCb BatchErrorCallback
RetryDelay uint
RetryAttempts uint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those fields don't need to be public

}

// NewService creates new write service
Expand All @@ -90,6 +89,8 @@ func NewService(org string, bucket string, httpService http2.Service, options *w
writeOptions: options,
retryQueue: newQueue(int(retryBufferLimit)),
retryExponentialBase: 2,
RetryDelay: 0,
RetryAttempts: 0,
}
}

Expand Down Expand Up @@ -136,7 +137,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}

// Can we write? In case of retryable error we must wait a bit
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) {
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(w.RetryDelay))) {
retrying = true
} else {
log.Warn("Write proc: cannot write yet, storing batch to queue")
Expand All @@ -161,11 +162,11 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
perror := w.WriteBatch(ctx, batchToWrite)
if perror != nil {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
log.Errorf("Write error: %s\nBatch kept for retrying\n", perror.Error())
log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error())
if perror.RetryAfter > 0 {
batchToWrite.RetryDelay = perror.RetryAfter * 1000
w.RetryDelay = perror.RetryAfter * 1000
} else {
batchToWrite.RetryDelay = w.computeRetryDelay(batchToWrite.RetryAttempts)
w.RetryDelay = w.computeRetryDelay(w.RetryAttempts)
}
if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) {
log.Warn("Callback rejected batch, discarding")
Expand All @@ -186,12 +187,16 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}
}
batchToWrite.RetryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", batchToWrite.RetryDelay)
w.RetryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", w.RetryDelay)
} else {
log.Errorf("Write error: %s\n", perror.Error())
}
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
}

w.RetryDelay = w.writeOptions.RetryInterval()
w.RetryDelay = 0
if retrying && !batchToWrite.Evicted {
w.retryQueue.pop()
}
Expand Down Expand Up @@ -351,10 +356,3 @@ func precisionToString(precision time.Duration) string {
}
return prec
}

func min(a, b uint) uint {
if a > b {
return b
}
return a
}