Skip to content

Commit

Permalink
fix: add idle handler offset nil check (#1489)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
  • Loading branch information
jy4096 committed Feb 3, 2024
1 parent 9dcbce8 commit 345e7ca
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
9 changes: 8 additions & 1 deletion pkg/shared/idlehandler/idlehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ func PublishIdleWatermark(ctx context.Context, toBufferPartition isb.BufferWrite
// publish WMB (this will naturally incr or set the timestamp of rl.wmbOffset)
if vertexType == dfv1.VertexTypeSource || vertexType == dfv1.VertexTypeMapUDF ||
vertexType == dfv1.VertexTypeReduceUDF {
wmPublisher.PublishIdleWatermark(wm, idleManager.Get(toPartitionName), toVertexPartition)
// We create one forwarder for each fromPartitions, and all the forwarders share one idleManager.
// Therefore, it's possible that one forwarder marks the toPartition to be "idling" and tries to
// publish a valid idle watermark while another forwarder just marks the toPartition to be "active"
// right after. In that case, the offset we get here will be nil, and we ignore the "idling"
// and consider the toPartition to be "active"
if offset := idleManager.Get(toPartitionName); offset != nil {
wmPublisher.PublishIdleWatermark(wm, offset, toVertexPartition)
}
} else {
// for Sink vertex, and it does not care about the offset during watermark publishing
wmPublisher.PublishIdleWatermark(wm, nil, toVertexPartition)
Expand Down
4 changes: 2 additions & 2 deletions pkg/udf/map_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
defer finalWg.Done()
log.Infow("Start processing udf messages", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("from", fromBufferPartitionName), zap.Any("to", u.VertexInstance.Vertex.GetToBuffers()))

stopped := forwarder.Start()
stopped := isdf.Start()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
Expand All @@ -268,7 +268,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {

<-ctx.Done()
log.Info("SIGTERM, exiting inside partition...", zap.String("partition", fromBufferPartitionName))
forwarder.Stop()
isdf.Stop()
wg.Wait()
log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName))
}(bufferPartition, forwarder)
Expand Down

0 comments on commit 345e7ca

Please sign in to comment.