Skip to content

Commit

Permalink
Merge pull request #335 from seth-hunter/fix-write-retry-delay
Browse files Browse the repository at this point in the history
fix: write retry delay
  • Loading branch information
vlastahajek committed Jun 22, 2022
2 parents fe6c7cb + 337ba61 commit af34012
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 95 deletions.
2 changes: 1 addition & 1 deletion api/write.go
Expand Up @@ -167,7 +167,7 @@ x:
func (w *WriteAPIImpl) flushBuffer() {
if len(w.writeBuffer) > 0 {
log.Info("sending batch")
batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.RetryInterval(), w.writeOptions.MaxRetryTime())
batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.MaxRetryTime())
w.writeCh <- batch
w.writeBuffer = w.writeBuffer[:0]
}
Expand Down
2 changes: 1 addition & 1 deletion api/writeAPIBlocking.go
Expand Up @@ -73,7 +73,7 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write
}

func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval(), w.writeOptions.MaxRetryTime()))
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime()))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions api/write_test.go
Expand Up @@ -174,7 +174,7 @@ func TestWriteErrorCallback(t *testing.T) {
return retryAttempts < 2
})
points := test.GenPoints(10)
// first two batches will be discarded by callback after 3 write attempts for each
// first batch will be discarded by callback after 3 write attempts, second batch should survive with only one failed attempt
for i, j := 0, 0; i < 6; i++ {
writeAPI.WritePoint(points[i])
writeAPI.waitForFlushing()
Expand All @@ -194,7 +194,7 @@ func TestWriteErrorCallback(t *testing.T) {
writeAPI.WritePoint(points[i])
}
writeAPI.waitForFlushing()
assert.Len(t, service.Lines(), 8)
assert.Len(t, service.Lines(), 9)

writeAPI.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/write/queue_test.go
Expand Up @@ -15,7 +15,7 @@ func TestQueue(t *testing.T) {
assert.True(t, que.isEmpty())
assert.Nil(t, que.first())
assert.Nil(t, que.pop())
b := &Batch{Batch: "batch", RetryDelay: 3, RetryAttempts: 3}
b := &Batch{Batch: "batch", RetryAttempts: 3}
que.push(b)
assert.False(t, que.isEmpty())
b2 := que.pop()
Expand Down
34 changes: 16 additions & 18 deletions internal/write/service.go
Expand Up @@ -30,8 +30,6 @@ import (
type Batch struct {
// lines to send
Batch string
// current retry delay
RetryDelay uint
// retry attempts so far
RetryAttempts uint
// true if it was removed from queue
Expand All @@ -41,11 +39,10 @@ type Batch struct {
}

// NewBatch creates new batch
func NewBatch(data string, retryDelay uint, expireDelayMs uint) *Batch {
func NewBatch(data string, expireDelayMs uint) *Batch {
return &Batch{
Batch: data,
RetryDelay: retryDelay,
Expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond),
Batch: data,
Expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond),
}
}

Expand All @@ -65,6 +62,8 @@ type Service struct {
writeOptions *write.Options
retryExponentialBase uint
errorCb BatchErrorCallback
retryDelay uint
retryAttempts uint
}

// NewService creates new write service
Expand All @@ -90,6 +89,8 @@ func NewService(org string, bucket string, httpService http2.Service, options *w
writeOptions: options,
retryQueue: newQueue(int(retryBufferLimit)),
retryExponentialBase: 2,
retryDelay: 0,
retryAttempts: 0,
}
}

Expand Down Expand Up @@ -136,7 +137,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}

// Can we write? In case of retryable error we must wait a bit
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) {
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(w.retryDelay))) {
retrying = true
} else {
log.Warn("Write proc: cannot write yet, storing batch to queue")
Expand All @@ -161,11 +162,11 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
perror := w.WriteBatch(ctx, batchToWrite)
if perror != nil {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
log.Errorf("Write error: %s\nBatch kept for retrying\n", perror.Error())
log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error())
if perror.RetryAfter > 0 {
batchToWrite.RetryDelay = perror.RetryAfter * 1000
w.retryDelay = perror.RetryAfter * 1000
} else {
batchToWrite.RetryDelay = w.computeRetryDelay(batchToWrite.RetryAttempts)
w.retryDelay = w.computeRetryDelay(w.retryAttempts)
}
if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) {
log.Warn("Callback rejected batch, discarding")
Expand All @@ -186,12 +187,16 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}
}
batchToWrite.RetryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", batchToWrite.RetryDelay)
w.retryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay)
} else {
log.Errorf("Write error: %s\n", perror.Error())
}
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
}

w.retryDelay = w.writeOptions.RetryInterval()
w.retryDelay = 0
if retrying && !batchToWrite.Evicted {
w.retryQueue.pop()
}
Expand Down Expand Up @@ -351,10 +356,3 @@ func precisionToString(precision time.Duration) string {
}
return prec
}

func min(a, b uint) uint {
if a > b {
return b
}
return a
}

0 comments on commit af34012

Please sign in to comment.