Skip to content

Commit

Permalink
Merge pull request #332 from seth-hunter/write-retry-fixes
Browse files Browse the repository at this point in the history
fix: write retry queue stops retrying
  • Loading branch information
vlastahajek committed Jun 20, 2022
2 parents 7ca3d22 + a67dec1 commit fe6c7cb
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
17 changes: 11 additions & 6 deletions internal/write/service.go
Expand Up @@ -124,6 +124,17 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
log.Debug("Write proc: taking batch from retry queue")
if !retrying {
b := w.retryQueue.first()

// Discard batches at beginning of retryQueue that have already expired
if time.Now().After(b.Expires) {
log.Warn("Write proc: oldest batch in retry queue expired, discarding")
if !b.Evicted {
w.retryQueue.pop()
}

continue
}

// Can we write? In case of retryable error we must wait a bit
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) {
retrying = true
Expand All @@ -147,12 +158,6 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}
// write batch
if batchToWrite != nil {
if time.Now().After(batchToWrite.Expires) {
if !batchToWrite.Evicted {
w.retryQueue.pop()
}
return fmt.Errorf("write failed (attempts %d): max retry time exceeded", batchToWrite.RetryAttempts)
}
perror := w.WriteBatch(ctx, batchToWrite)
if perror != nil {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
Expand Down
18 changes: 14 additions & 4 deletions internal/write/service_test.go
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
ilog "log"
"runtime"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -313,14 +314,23 @@ func TestMaxRetryTime(t *testing.T) {

// Wait for batch expiration
<-time.After(5 * time.Millisecond)
b := NewBatch("2\n", opts.RetryInterval(), opts.MaxRetryTime())
// First batch will be tried to write again and it will be checked agains maxRetryTime. New batch will added to retry queue

exp := opts.MaxRetryTime()
// sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
if runtime.GOOS == "windows" {
exp = 20
}
// create new batch for sending
b := NewBatch("2\n", opts.RetryInterval(), exp)
// First batch will be checked against maxRetryTime and it will expire. New batch will fail and it will added to retry queue
err = srv.HandleWrite(ctx, b)
require.NotNil(t, err)
// Error about batch expiration
assert.Equal(t, "write failed (attempts 1): max retry time exceeded", err.Error())
// 1st Batch expires and writing 2nd trows error
assert.Equal(t, "write failed (attempts 1): Unexpected status code 429", err.Error())
assert.Equal(t, 1, srv.retryQueue.list.Len())

//wait minimum retry time
<-time.After(time.Millisecond)
// Clear error and let write pass
hs.SetReplyError(nil)
// A batch from retry queue will be sent first
Expand Down

0 comments on commit fe6c7cb

Please sign in to comment.