From 391b53e1203d0989f71bcad4840446e0dda55324 Mon Sep 17 00:00:00 2001 From: Yashash H L <109710325+yhl25@users.noreply.github.com> Date: Mon, 24 Oct 2022 08:29:51 +0530 Subject: [PATCH] fix: retry only the failed offsets (#255) Signed-off-by: Yashash H L --- pkg/isb/forward/forward.go | 43 ++++++++++++++++++++++------- pkg/reduce/pnf/processandforward.go | 11 +++++--- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/pkg/isb/forward/forward.go b/pkg/isb/forward/forward.go index 1e5811404..1a7a113b2 100644 --- a/pkg/isb/forward/forward.go +++ b/pkg/isb/forward/forward.go @@ -6,12 +6,14 @@ package forward import ( "context" "fmt" + "math" "sync" "time" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/wait" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" @@ -285,24 +287,45 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { } // ackFromBuffer acknowledges an array of offsets back to fromBuffer and is a blocking call or until shutdown has been initiated. -func (isdf *InterStepDataForward) ackFromBuffer(ctx context.Context, offsets []isb.Offset) (err error) { - for { - errs := isdf.fromBuffer.Ack(ctx, offsets) +func (isdf *InterStepDataForward) ackFromBuffer(ctx context.Context, offsets []isb.Offset) error { + + var ackRetryBackOff = wait.Backoff{ + Factor: 1, + Jitter: 0.1, + Steps: math.MaxInt64, + Duration: time.Millisecond * 10, + } + var ackOffsets = offsets + ctxClosedErr := wait.ExponentialBackoffWithContext(ctx, ackRetryBackOff, func() (done bool, err error) { + errs := isdf.fromBuffer.Ack(ctx, ackOffsets) summarizedErr := errorArrayToMap(errs) + var failedOffsets []isb.Offset + if len(summarizedErr) > 0 { isdf.opts.logger.Errorw("failed to ack from buffer", zap.Any("errors", summarizedErr)) - // TODO: implement retry with backoff etc. - time.Sleep(isdf.opts.retryInterval) + + // retry only the failed offsets + for i, offset := range ackOffsets { + if errs[i] != nil { + failedOffsets = append(failedOffsets, offset) + } + } + ackOffsets = failedOffsets if ok, _ := isdf.IsShuttingDown(); ok { - err := fmt.Errorf("ackFromBuffer, Stop called while stuck on an internal error, %v", summarizedErr) - return err + ackErr := fmt.Errorf("ackFromBuffer, Stop called while stuck on an internal error, %v", summarizedErr) + return false, ackErr } - // TODO: only retry failed ones. + return false, nil } else { - break + return true, nil } + }) + + if ctxClosedErr != nil { + isdf.opts.logger.Errorw("Context closed while waiting to ack messages inside forward", zap.Error(ctxClosedErr)) } - return err + + return ctxClosedErr } // writeToBuffers is a blocking call until all the messages have be forwarded to all the toBuffers, or a shutdown diff --git a/pkg/reduce/pnf/processandforward.go b/pkg/reduce/pnf/processandforward.go index c99d5f28b..ddf68bae6 100644 --- a/pkg/reduce/pnf/processandforward.go +++ b/pkg/reduce/pnf/processandforward.go @@ -160,20 +160,23 @@ func (p *ProcessAndForward) writeToBuffer(ctx context.Context, bufferID string, Jitter: 0.1, } + writeMessages := resultMessages + // write to isb with infinite exponential backoff (until shutdown is triggered) - var failedMessages []isb.Message var offsets []isb.Offset ctxClosedErr := wait.ExponentialBackoffWithContext(ctx, ISBWriteBackoff, func() (done bool, err error) { var writeErrs []error - offsets, writeErrs = p.toBuffers[bufferID].Write(ctx, resultMessages) - for i, message := range resultMessages { + var failedMessages []isb.Message + offsets, writeErrs = p.toBuffers[bufferID].Write(ctx, writeMessages) + for i, message := range writeMessages { if writeErrs[i] != nil { failedMessages = append(failedMessages, message) } } // retry only the failed messages if len(failedMessages) > 0 { - resultMessages = failedMessages + p.log.Warnw("Failed to write messages to isb inside pnf", zap.Errors("errors", writeErrs)) + writeMessages = failedMessages return false, nil } return true, nil