diff --git a/pkg/isb/forward/forward.go b/pkg/isb/forward/forward.go index cef19d156..dac9f6b47 100644 --- a/pkg/isb/forward/forward.go +++ b/pkg/isb/forward/forward.go @@ -10,6 +10,7 @@ import ( "time" "github.com/numaproj/numaflow/pkg/watermark/fetch" + "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/publish" "go.uber.org/zap" @@ -179,7 +180,10 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { // fetch watermark if available // TODO: make it async (concurrent and wait later) // let's track only the last element's watermark - processorWM := isdf.fetchWatermark.GetWatermark(readMessages[len(readMessages)-1].ReadOffset) + var processorWM processor.Watermark + if isdf.fetchWatermark != nil { + processorWM = isdf.fetchWatermark.GetWatermark(readMessages[len(readMessages)-1].ReadOffset) + } // create space for writeMessages specific to each step as we could forward to all the steps too. var messageToStep = make(map[string][]isb.Message) @@ -246,9 +250,11 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { // forward the highest watermark to all the edges to avoid idle edge problem // TODO: sort and get the highest value - for edgeName, offsets := range writeOffsets { - if len(offsets) > 0 { - isdf.publishWatermark[edgeName].PublishWatermark(processorWM, offsets[len(offsets)-1]) + if isdf.publishWatermark != nil { + for edgeName, offsets := range writeOffsets { + if len(offsets) > 0 { + isdf.publishWatermark[edgeName].PublishWatermark(processorWM, offsets[len(offsets)-1]) + } } }