Skip to content

Commit

Permalink
refactor: make new write/service.go state variables private rather th…
Browse files Browse the repository at this point in the history
…an public
  • Loading branch information
seth-hunter committed Jun 22, 2022
1 parent 29369c7 commit 337ba61
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 63 deletions.
22 changes: 11 additions & 11 deletions internal/write/service.go
Expand Up @@ -62,8 +62,8 @@ type Service struct {
writeOptions *write.Options
retryExponentialBase uint
errorCb BatchErrorCallback
RetryDelay uint
RetryAttempts uint
retryDelay uint
retryAttempts uint
}

// NewService creates new write service
Expand All @@ -89,8 +89,8 @@ func NewService(org string, bucket string, httpService http2.Service, options *w
writeOptions: options,
retryQueue: newQueue(int(retryBufferLimit)),
retryExponentialBase: 2,
RetryDelay: 0,
RetryAttempts: 0,
retryDelay: 0,
retryAttempts: 0,
}
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}

// Can we write? In case of retryable error we must wait a bit
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(w.RetryDelay))) {
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(w.retryDelay))) {
retrying = true
} else {
log.Warn("Write proc: cannot write yet, storing batch to queue")
Expand All @@ -164,9 +164,9 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error())
if perror.RetryAfter > 0 {
w.RetryDelay = perror.RetryAfter * 1000
w.retryDelay = perror.RetryAfter * 1000
} else {
w.RetryDelay = w.computeRetryDelay(w.RetryAttempts)
w.retryDelay = w.computeRetryDelay(w.retryAttempts)
}
if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) {
log.Warn("Callback rejected batch, discarding")
Expand All @@ -187,16 +187,16 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}
}
batchToWrite.RetryAttempts++
w.RetryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", w.RetryDelay)
w.retryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay)
} else {
log.Errorf("Write error: %s\n", perror.Error())
}
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
}

w.RetryDelay = w.writeOptions.RetryInterval()
w.RetryDelay = 0
w.retryDelay = w.writeOptions.RetryInterval()
w.retryDelay = 0
if retrying && !batchToWrite.Evicted {
w.retryQueue.pop()
}
Expand Down
104 changes: 52 additions & 52 deletions internal/write/service_test.go
Expand Up @@ -87,37 +87,37 @@ func TestRetryStrategy(t *testing.T) {
b1 := NewBatch("1\n", opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b1)
assert.NotNil(t, err)
assert.EqualValues(t, 1, srv.RetryDelay)
assert.EqualValues(t, 1, srv.retryDelay)
assert.Equal(t, 1, srv.retryQueue.list.Len())

//wait retry delay + little more
<-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5)
<-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
// First batch will be tried to write again and this one will added to retry queue
b2 := NewBatch("2\n", opts.MaxRetryTime())
err = srv.HandleWrite(ctx, b2)
assert.NotNil(t, err)
assertBetween(t, srv.RetryDelay, 2, 4)
assertBetween(t, srv.retryDelay, 2, 4)
assert.Equal(t, 2, srv.retryQueue.list.Len())

//wait retry delay + little more
<-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5)
<-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
// First batch will be tried to write again and this one will added to retry queue
b3 := NewBatch("3\n", opts.MaxRetryTime())
err = srv.HandleWrite(ctx, b3)
assert.NotNil(t, err)
assertBetween(t, srv.RetryDelay, 4, 8)
assertBetween(t, srv.retryDelay, 4, 8)
assert.Equal(t, 3, srv.retryQueue.list.Len())

//wait retry delay + little more
<-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5)
<-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
// First batch will be tried to write again and this one will added to retry queue
b4 := NewBatch("4\n", opts.MaxRetryTime())
err = srv.HandleWrite(ctx, b4)
assert.NotNil(t, err)
assertBetween(t, srv.RetryDelay, 8, 16)
assertBetween(t, srv.retryDelay, 8, 16)
assert.Equal(t, 4, srv.retryQueue.list.Len())

<-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5)
<-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
// Clear error and let write pass
hs.SetReplyError(nil)
// Batches from retry queue will be sent first
Expand Down Expand Up @@ -148,39 +148,39 @@ func TestBufferOverwrite(t *testing.T) {
b1 := NewBatch("1\n", opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b1)
assert.NotNil(t, err)
assert.Equal(t, uint(1), srv.RetryDelay)
assert.Equal(t, uint(1), srv.retryDelay)
assert.Equal(t, 1, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
b2 := NewBatch("2\n", opts.MaxRetryTime())
// First batch will be tried to write again and this one will added to retry queue
err = srv.HandleWrite(ctx, b2)
assert.NotNil(t, err)
assertBetween(t, srv.RetryDelay, 2, 4)
assertBetween(t, srv.retryDelay, 2, 4)
assert.Equal(t, 2, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
b3 := NewBatch("3\n", opts.MaxRetryTime())
// First batch will be tried to write again and this one will added to retry queue
err = srv.HandleWrite(ctx, b3)
assert.NotNil(t, err)
assertBetween(t, srv.RetryDelay, 4, 8)
assertBetween(t, srv.retryDelay, 4, 8)
assert.Equal(t, 3, srv.retryQueue.list.Len())

// Write early and overwrite
b4 := NewBatch("4\n", opts.MaxRetryTime())
// No write will occur, because retry delay has not passed yet
// However new bach will be added to retry queue. Retry queue has limit 3,
// so first batch will be discarded
priorRetryDelay := srv.RetryDelay
priorRetryDelay := srv.retryDelay
err = srv.HandleWrite(ctx, b4)
assert.NoError(t, err)
assert.Equal(t, priorRetryDelay, srv.RetryDelay) // Accumulated retry delay should be retained despite batch discard
assert.Equal(t, priorRetryDelay, srv.retryDelay) // Accumulated retry delay should be retained despite batch discard
assert.Equal(t, 3, srv.retryQueue.list.Len())

// Overwrite
// TODO check time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.RetryDelay) / 2)
<-time.After(time.Millisecond * time.Duration(srv.retryDelay) / 2)
b5 := NewBatch("5\n", opts.MaxRetryTime())
// Second batch will be tried to write again
// However, write will fail and as new batch is added to retry queue
Expand All @@ -190,7 +190,7 @@ func TestBufferOverwrite(t *testing.T) {
//TODO assertBetween(t, srv.RetryDelay, 2, 4)
assert.Equal(t, 3, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
// Clear error and let write pass
hs.SetReplyError(nil)
// Batches from retry queue will be sent first
Expand Down Expand Up @@ -219,33 +219,33 @@ func TestMaxRetryInterval(t *testing.T) {
b1 := NewBatch("1\n", opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b1)
assert.NotNil(t, err)
assert.Equal(t, uint(1), srv.RetryDelay)
assert.Equal(t, uint(1), srv.retryDelay)
assert.Equal(t, 1, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
b2 := NewBatch("2\n", opts.MaxRetryTime())
// First batch will be tried to write again and this one will added to retry queue
err = srv.HandleWrite(ctx, b2)
assert.NotNil(t, err)
assertBetween(t, srv.RetryDelay, 2, 4)
assertBetween(t, srv.retryDelay, 2, 4)
assert.Equal(t, 2, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
b3 := NewBatch("3\n", opts.MaxRetryTime())
// First batch will be tried to write again and this one will added to retry queue
err = srv.HandleWrite(ctx, b3)
assert.NotNil(t, err)
// New computed delay of first batch should be 4-8, is limited to 4
assert.EqualValues(t, 4, srv.RetryDelay)
assert.EqualValues(t, 4, srv.retryDelay)
assert.Equal(t, 3, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
b4 := NewBatch("4\n", opts.MaxRetryTime())
// First batch will be tried to write again and this one will added to retry queue
err = srv.HandleWrite(ctx, b4)
assert.NotNil(t, err)
// New computed delay of first batch should be 8-116, is limited to 4
assert.EqualValues(t, 4, srv.RetryDelay)
assert.EqualValues(t, 4, srv.retryDelay)
assert.Equal(t, 4, srv.retryQueue.list.Len())
}

Expand All @@ -270,25 +270,25 @@ func TestMaxRetries(t *testing.T) {
b1 := NewBatch("1\n", opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b1)
assert.NotNil(t, err)
assert.EqualValues(t, 1, srv.RetryDelay)
assert.EqualValues(t, 1, srv.retryDelay)
assert.Equal(t, 1, srv.retryQueue.list.Len())
// Write so many batches as it is maxRetries (5)
// First batch will be written and it will reach max retry limit
for i, e := uint(1), uint(2); i <= opts.MaxRetries(); i++ {
//wait retry delay + little more
<-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5)
<-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
b := NewBatch(fmt.Sprintf("%d\n", i+1), opts.MaxRetryTime())
err = srv.HandleWrite(ctx, b)
assert.NotNil(t, err)
assertBetween(t, srv.RetryDelay, e, e*2)
assertBetween(t, srv.retryDelay, e, e*2)
exp := min(i+1, opts.MaxRetries())
assert.EqualValues(t, exp, srv.retryQueue.list.Len())
e *= 2
}
//Test if was removed from retry queue
assert.True(t, b1.Evicted)

<-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5)
<-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
// Clear error and let write pass
hs.SetReplyError(nil)
// Batches from retry queue will be sent first
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestMaxRetryTime(t *testing.T) {
b1 := NewBatch("1\n", opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b1)
assert.NotNil(t, err)
assert.EqualValues(t, 1, srv.RetryDelay)
assert.EqualValues(t, 1, srv.retryDelay)
assert.Equal(t, 1, srv.retryQueue.list.Len())

// Wait for batch expiration
Expand All @@ -337,7 +337,7 @@ func TestMaxRetryTime(t *testing.T) {
assert.Equal(t, 1, srv.retryQueue.list.Len())

//wait until remaining accumulated retryDelay has passed, because there hasn't been a successful write yet
<-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.RetryDelay))))
<-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay))))
// Clear error and let write pass
hs.SetReplyError(nil)
// A batch from retry queue will be sent first
Expand Down Expand Up @@ -366,28 +366,28 @@ func TestRetryOnConnectionError(t *testing.T) {
b1 := NewBatch("1\n", opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b1)
assert.NotNil(t, err)
assert.EqualValues(t, 1, srv.RetryDelay)
assert.EqualValues(t, 1, srv.retryDelay)
assert.Equal(t, 1, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))

b2 := NewBatch("2\n", opts.MaxRetryTime())
// First batch will be tried to write again and this one will added to retry queue
err = srv.HandleWrite(ctx, b2)
assert.NotNil(t, err)
assertBetween(t, srv.RetryDelay, 2, 4)
assertBetween(t, srv.retryDelay, 2, 4)
assert.Equal(t, 2, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))

b3 := NewBatch("3\n", opts.MaxRetryTime())
// First batch will be tried to write again and this one will added to retry queue
err = srv.HandleWrite(ctx, b3)
assert.NotNil(t, err)
assertBetween(t, srv.RetryDelay, 4, 8)
assertBetween(t, srv.retryDelay, 4, 8)
assert.Equal(t, 3, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
// Clear error and let write pass
hs.SetReplyError(nil)
// Batches from retry queue will be sent first
Expand Down Expand Up @@ -484,13 +484,13 @@ func TestErrorCallback(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, 1, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
b := NewBatch("2\n", opts.MaxRetryTime())
err = srv.HandleWrite(ctx, b)
assert.NotNil(t, err)
assert.Equal(t, 2, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
b = NewBatch("3\n", opts.MaxRetryTime())
err = srv.HandleWrite(ctx, b)
assert.NotNil(t, err)
Expand Down Expand Up @@ -523,30 +523,30 @@ func TestRetryIntervalAccumulation(t *testing.T) {
hs.SetReplyError(&http.Error{StatusCode: 429})

lastInterval := uint(0)
assert.Equal(t, uint(0), srv.RetryDelay) // Should initialize to zero
assert.Equal(t, uint(0), srv.retryDelay) // Should initialize to zero
i := 1
for ; i <= 45; i++ {
b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b)
assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len())
assert.GreaterOrEqual(t, srv.RetryDelay, lastInterval) // Should not decrease while writes failing
assert.LessOrEqual(t, srv.RetryDelay, opts.MaxRetryInterval()) // Should not grow larger than max
assert.GreaterOrEqual(t, srv.retryDelay, lastInterval) // Should not decrease while writes failing
assert.LessOrEqual(t, srv.retryDelay, opts.MaxRetryInterval()) // Should not grow larger than max
if err != nil {
if lastInterval == opts.MaxRetryInterval() {
// Write attempt failed, and interval was already at max, so should stay there
assert.Equal(t, srv.RetryDelay, opts.MaxRetryInterval())
log.Log.Infof("Retry interval capped at %d ms", srv.RetryDelay)
assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval())
log.Log.Infof("Retry interval capped at %d ms", srv.retryDelay)
} else {
// A write attempt was made and failed, so retry interval should have increased
assert.Greater(t, srv.RetryDelay, lastInterval)
log.Log.Infof("Retry interval increased to %d ms", srv.RetryDelay)
assert.Greater(t, srv.retryDelay, lastInterval)
log.Log.Infof("Retry interval increased to %d ms", srv.retryDelay)
}
} else {
// Write attempt was not made, so retry interval should remain the same
assert.Equal(t, srv.RetryDelay, lastInterval)
log.Log.Infof("Retry interval still at %d ms", srv.RetryDelay)
assert.Equal(t, srv.retryDelay, lastInterval)
log.Log.Infof("Retry interval still at %d ms", srv.retryDelay)
}
lastInterval = srv.RetryDelay
lastInterval = srv.retryDelay

<-time.After(writeInterval)
}
Expand All @@ -555,15 +555,15 @@ func TestRetryIntervalAccumulation(t *testing.T) {
hs.SetReplyError(nil)

// Wait until write queue is ready to retry; in meantime, keep writing and confirming queue state
retryTimeout := srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.RetryDelay))
retryTimeout := srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay))
log.Log.Infof("Continuing to write for %d ms until flushing write attempt", time.Until(retryTimeout).Milliseconds())
for ; time.Until(retryTimeout) >= 0; i++ {
b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b)
assert.Nil(t, err) // There should be no write attempt
assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len())
assert.Equal(t, srv.RetryDelay, opts.MaxRetryInterval()) // Should remain the same
log.Log.Infof("Retry interval still at %d ms", srv.RetryDelay)
assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval()) // Should remain the same
log.Log.Infof("Retry interval still at %d ms", srv.retryDelay)
<-time.After(writeInterval)
}

Expand All @@ -572,7 +572,7 @@ func TestRetryIntervalAccumulation(t *testing.T) {
err := srv.HandleWrite(ctx, b)
assert.Nil(t, err)
assert.Equal(t, 0, srv.retryQueue.list.Len())
assert.Equal(t, srv.RetryDelay, uint(0)) // Should reset to zero
assert.Equal(t, srv.retryDelay, uint(0)) // Should reset to zero

// Ensure proper batches got written to server
require.Len(t, hs.Lines(), 5)
Expand Down

0 comments on commit 337ba61

Please sign in to comment.