Skip to content

Commit

Permalink
fix: retry only the failed offsets (#255)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 authored and whynowy committed Oct 27, 2022
1 parent cf5d734 commit 391b53e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
43 changes: 33 additions & 10 deletions pkg/isb/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package forward
import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
Expand Down Expand Up @@ -285,24 +287,45 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
}

// ackFromBuffer acknowledges an array of offsets back to fromBuffer and is a blocking call or until shutdown has been initiated.
func (isdf *InterStepDataForward) ackFromBuffer(ctx context.Context, offsets []isb.Offset) (err error) {
for {
errs := isdf.fromBuffer.Ack(ctx, offsets)
func (isdf *InterStepDataForward) ackFromBuffer(ctx context.Context, offsets []isb.Offset) error {

var ackRetryBackOff = wait.Backoff{
Factor: 1,
Jitter: 0.1,
Steps: math.MaxInt64,
Duration: time.Millisecond * 10,
}
var ackOffsets = offsets
ctxClosedErr := wait.ExponentialBackoffWithContext(ctx, ackRetryBackOff, func() (done bool, err error) {
errs := isdf.fromBuffer.Ack(ctx, ackOffsets)
summarizedErr := errorArrayToMap(errs)
var failedOffsets []isb.Offset

if len(summarizedErr) > 0 {
isdf.opts.logger.Errorw("failed to ack from buffer", zap.Any("errors", summarizedErr))
// TODO: implement retry with backoff etc.
time.Sleep(isdf.opts.retryInterval)

// retry only the failed offsets
for i, offset := range ackOffsets {
if errs[i] != nil {
failedOffsets = append(failedOffsets, offset)
}
}
ackOffsets = failedOffsets
if ok, _ := isdf.IsShuttingDown(); ok {
err := fmt.Errorf("ackFromBuffer, Stop called while stuck on an internal error, %v", summarizedErr)
return err
ackErr := fmt.Errorf("ackFromBuffer, Stop called while stuck on an internal error, %v", summarizedErr)
return false, ackErr
}
// TODO: only retry failed ones.
return false, nil
} else {
break
return true, nil
}
})

if ctxClosedErr != nil {
isdf.opts.logger.Errorw("Context closed while waiting to ack messages inside forward", zap.Error(ctxClosedErr))
}
return err

return ctxClosedErr
}

// writeToBuffers is a blocking call until all the messages have be forwarded to all the toBuffers, or a shutdown
Expand Down
11 changes: 7 additions & 4 deletions pkg/reduce/pnf/processandforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,23 @@ func (p *ProcessAndForward) writeToBuffer(ctx context.Context, bufferID string,
Jitter: 0.1,
}

writeMessages := resultMessages

// write to isb with infinite exponential backoff (until shutdown is triggered)
var failedMessages []isb.Message
var offsets []isb.Offset
ctxClosedErr := wait.ExponentialBackoffWithContext(ctx, ISBWriteBackoff, func() (done bool, err error) {
var writeErrs []error
offsets, writeErrs = p.toBuffers[bufferID].Write(ctx, resultMessages)
for i, message := range resultMessages {
var failedMessages []isb.Message
offsets, writeErrs = p.toBuffers[bufferID].Write(ctx, writeMessages)
for i, message := range writeMessages {
if writeErrs[i] != nil {
failedMessages = append(failedMessages, message)
}
}
// retry only the failed messages
if len(failedMessages) > 0 {
resultMessages = failedMessages
p.log.Warnw("Failed to write messages to isb inside pnf", zap.Errors("errors", writeErrs))
writeMessages = failedMessages
return false, nil
}
return true, nil
Expand Down

0 comments on commit 391b53e

Please sign in to comment.