Skip to content

Commit

Permalink
fix: getWatermark to return-1 if any processor returns -1 (#402)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 committed Dec 5, 2022
1 parent 7acf977 commit 6504a56
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
4 changes: 1 addition & 3 deletions pkg/reduce/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,9 +770,7 @@ func fetcherAndPublisher(ctx context.Context, toBuffers map[string]isb.BufferWri
case <-ctx.Done():
return
default:
for key := range publishers {
_ = hb.PutKV(ctx, key, []byte(fmt.Sprintf("%d", time.Now().Unix())))
}
_ = hb.PutKV(ctx, fromBuffer.GetName(), []byte(fmt.Sprintf("%d", time.Now().Unix())))
time.Sleep(time.Duration(1) * time.Second)
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,17 @@ func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) processor.Watermark {
for _, p := range allProcessors {
debugString.WriteString(fmt.Sprintf("[Processor: %v] \n", p))
var t = p.offsetTimeline.GetEventTime(inputOffset)
if t != -1 && t < epoch {
if t == -1 { // watermark cannot be computed, perhaps a new processing unit was added or offset fell off the timeline
epoch = t
} else if t < epoch {
epoch = t
}
if p.IsDeleted() && (offset > p.offsetTimeline.GetHeadOffset()) {
// if the pod is not active and the current offset is ahead of all offsets in Timeline
e.processorManager.DeleteProcessor(p.entity.GetName())
}
}
// if the offset is smaller than every offset in the timeline, set the value to be -1
// if there are no processors
if epoch == math.MaxInt64 {
epoch = -1
}
Expand Down

0 comments on commit 6504a56

Please sign in to comment.