From 24a16a049f2f3a1752ee702f5020136a51d66e69 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Thu, 18 Aug 2022 14:20:58 -0700 Subject: [PATCH] bug: watermark needs nil check Signed-off-by: Vigith Maurice --- pkg/isb/forward/forward.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/isb/forward/forward.go b/pkg/isb/forward/forward.go index cef19d1567..dac9f6b47f 100644 --- a/pkg/isb/forward/forward.go +++ b/pkg/isb/forward/forward.go @@ -10,6 +10,7 @@ import ( "time" "github.com/numaproj/numaflow/pkg/watermark/fetch" + "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/publish" "go.uber.org/zap" @@ -179,7 +180,10 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { // fetch watermark if available // TODO: make it async (concurrent and wait later) // let's track only the last element's watermark - processorWM := isdf.fetchWatermark.GetWatermark(readMessages[len(readMessages)-1].ReadOffset) + var processorWM processor.Watermark + if isdf.fetchWatermark != nil { + processorWM = isdf.fetchWatermark.GetWatermark(readMessages[len(readMessages)-1].ReadOffset) + } // create space for writeMessages specific to each step as we could forward to all the steps too. var messageToStep = make(map[string][]isb.Message) @@ -246,9 +250,11 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { // forward the highest watermark to all the edges to avoid idle edge problem // TODO: sort and get the highest value - for edgeName, offsets := range writeOffsets { - if len(offsets) > 0 { - isdf.publishWatermark[edgeName].PublishWatermark(processorWM, offsets[len(offsets)-1]) + if isdf.publishWatermark != nil { + for edgeName, offsets := range writeOffsets { + if len(offsets) > 0 { + isdf.publishWatermark[edgeName].PublishWatermark(processorWM, offsets[len(offsets)-1]) + } } }