Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: idle manager refactor for multi partitions #1512

Merged
merged 17 commits into from
Feb 20, 2024

Conversation

jy4096
Copy link
Contributor

@jy4096 jy4096 commented Feb 16, 2024

The idleManager is shared across all forwarders for map and sink vertex, but out current implementation doesn't take multiple forwarders into consideration, causing race condition when publishing control messages.

In this PR

  • Refactor the current idle manager interface to include fromBufferParitionIndex.
  • Refactor the current idle manager structure to have a map[toBufferParitionName]int64.
// forwarderActivePartition is a map[toPartitionName]uint64 to record if a forwarder is sending to the toPartition.
// for each "toPartition" we have an integer represents in binary, and mark the #n bit as 1 if the #n forwarder
// is sending to the "toPartition"
// example:
//   if we have three forwarders, the initial value in binary format will be {"toPartition": 000} which is 0 in decimal
//   if forwarder0 is sending data to the toPartition, then the value will become {"toPartition": 001} which is 1 in decimal
//   if forwarder1 is sending data to the toPartition, then the value will become {"toPartition": 011} which is 3 in decimal
// when we do the ctrlMsg check, we reply on the value to decide if we need to send a ctrlMsg
// note that we use uint64, therefore there is a limit of maximum 64 partitions
forwarderActiveToPartition map[string]uint64

jyu6 added 6 commits February 16, 2024 10:41
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
@jy4096 jy4096 marked this pull request as ready for review February 16, 2024 21:39
@jy4096 jy4096 requested a review from yhl25 February 16, 2024 21:39
Signed-off-by: Vigith Maurice <vigith@gmail.com>
@vigith vigith marked this pull request as draft February 16, 2024 21:46
jyu6 added 5 commits February 16, 2024 13:54
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
@jy4096 jy4096 marked this pull request as ready for review February 16, 2024 23:13
@@ -251,7 +251,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
for toVertexName, toVertexBuffer := range df.toBuffers {
if publisher, ok := df.wmPublishers[toVertexName]; ok {
for _, bufferPartition := range toVertexBuffer {
idlehandler.PublishIdleWatermark(ctx, bufferPartition, publisher, df.idleManager, df.log, df.vertexName, df.pipelineName, dfv1.VertexTypeReduceUDF, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
idlehandler.PublishIdleWatermark(ctx, 0, bufferPartition, publisher, df.idleManager, df.log, df.vertexName, df.pipelineName, dfv1.VertexTypeReduceUDF, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we create a const for 0th index called PARTITION_0? this will give a hint that we only have one partition?

jyu6 and others added 2 commits February 20, 2024 08:25
pkg/watermark/wmb/idle_manager.go Show resolved Hide resolved
Signed-off-by: Vigith Maurice <vigith@gmail.com>

Co-authored-by: Vigith Maurice <vigith@gmail.com>
@vigith vigith merged commit f05ce9e into numaproj:main Feb 20, 2024
19 of 20 checks passed
@jy4096 jy4096 deleted the idle-manager-multipartitions branch February 20, 2024 19:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants