Skip to content

Commit

Permalink
fix: toVertexPartitions for reduce was incorrectly populated to 1 (#756)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed May 30, 2023
1 parent bea1478 commit 7cb399e
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 77 deletions.
14 changes: 0 additions & 14 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/pointer"
)

// +kubebuilder:validation:Enum="";Running;Succeeded;Failed;Pausing;Paused;Deleting
Expand Down Expand Up @@ -88,19 +87,6 @@ func (p Pipeline) ListAllEdges() []Edge {
edges := []Edge{}
for _, e := range p.Spec.Edges {
edgeCopy := e.DeepCopy()
toVertex := p.GetVertex(e.To)
if toVertex == nil {
continue
}
if toVertex.UDF == nil || toVertex.UDF.GroupBy == nil {
// Clean up parallelism if downstream vertex is not a reduce UDF.
// This has been validated by the controller, harmless to do it here.
edgeCopy.DeprecatedParallelism = nil
} else if edgeCopy.DeprecatedParallelism == nil || *edgeCopy.DeprecatedParallelism < 1 || !toVertex.UDF.GroupBy.Keyed {
// Set parallelism = 1 if it's not set, or it's a non-keyed reduce.
// Already validated by the controller to make sure parallelism is not > 1 if it's not keyed, harmless to check it again.
edgeCopy.DeprecatedParallelism = pointer.Int32(1)
}
edges = append(edges, *edgeCopy)
}
return edges
Expand Down
10 changes: 0 additions & 10 deletions pkg/apis/numaflow/v1alpha1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,13 @@ var (
func Test_ListAllEdges(t *testing.T) {
es := testPipeline.ListAllEdges()
assert.Equal(t, 2, len(es))
assert.Nil(t, es[0].DeprecatedParallelism)
assert.Nil(t, es[1].DeprecatedParallelism)
pl := testPipeline.DeepCopy()
pl.Spec.Vertices[1].UDF.GroupBy = &GroupBy{}
es = pl.ListAllEdges()
assert.Equal(t, 2, len(es))
assert.NotNil(t, es[0].DeprecatedParallelism)
assert.Equal(t, int32(1), *es[0].DeprecatedParallelism)
assert.Nil(t, es[1].DeprecatedParallelism)
pl.Spec.Edges[0].DeprecatedParallelism = pointer.Int32(3)
es = pl.ListAllEdges()
assert.Equal(t, 2, len(es))
assert.NotNil(t, es[0].DeprecatedParallelism)
assert.Equal(t, int32(1), *es[0].DeprecatedParallelism)
pl.Spec.Vertices[1].UDF.GroupBy.Keyed = true
es = pl.ListAllEdges()
assert.Equal(t, int32(3), *es[0].DeprecatedParallelism)
}

func Test_GetToEdges(t *testing.T) {
Expand Down
10 changes: 0 additions & 10 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,18 +529,8 @@ func mergeLimits(plLimits dfv1.PipelineLimits, vLimits *dfv1.VertexLimits) dfv1.
}

func copyEdges(pl *dfv1.Pipeline, edges []dfv1.Edge) []dfv1.CombinedEdge {
plLimits := pl.GetPipelineLimits()
result := []dfv1.CombinedEdge{}
for _, e := range edges {
if e.DeprecatedLimits == nil {
e.DeprecatedLimits = &dfv1.DeprecatedEdgeLimits{}
}
if e.DeprecatedLimits.BufferMaxLength == nil {
e.DeprecatedLimits.BufferMaxLength = plLimits.BufferMaxLength
}
if e.DeprecatedLimits.BufferUsageLimit == nil {
e.DeprecatedLimits.BufferUsageLimit = plLimits.BufferUsageLimit
}
vFrom := pl.GetVertex(e.From)
vTo := pl.GetVertex(e.To)
fromVertexLimits := mergeLimits(pl.GetPipelineLimits(), vFrom.Limits)
Expand Down
78 changes: 51 additions & 27 deletions pkg/reconciler/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,34 +194,58 @@ func Test_copyVertexLimits(t *testing.T) {
}

func Test_copyEdges(t *testing.T) {
pl := testPipeline.DeepCopy()
edges := []dfv1.Edge{{From: "input", To: "p1"}}
result := copyEdges(pl, edges)
for _, e := range result {
assert.NotNil(t, e.ToVertexLimits)
assert.Equal(t, int64(dfv1.DefaultBufferLength), int64(*e.ToVertexLimits.BufferMaxLength))
}
onethouand := uint64(1000)
eighty := uint32(80)
pl.Spec.Limits = &dfv1.PipelineLimits{BufferMaxLength: &onethouand, BufferUsageLimit: &eighty}
result = copyEdges(pl, edges)
for _, e := range result {
assert.NotNil(t, e.ToVertexLimits)
assert.NotNil(t, e.ToVertexLimits.BufferMaxLength)
assert.NotNil(t, e.ToVertexLimits.BufferUsageLimit)
}
t.Run("test copy map", func(t *testing.T) {
pl := testPipeline.DeepCopy()
edges := []dfv1.Edge{{From: "input", To: "p1"}}
result := copyEdges(pl, edges)
for _, e := range result {
assert.NotNil(t, e.ToVertexLimits)
assert.Equal(t, int64(dfv1.DefaultBufferLength), int64(*e.ToVertexLimits.BufferMaxLength))
}
onethouand := uint64(1000)
eighty := uint32(80)
pl.Spec.Limits = &dfv1.PipelineLimits{BufferMaxLength: &onethouand, BufferUsageLimit: &eighty}
result = copyEdges(pl, edges)
for _, e := range result {
assert.NotNil(t, e.ToVertexLimits)
assert.NotNil(t, e.ToVertexLimits.BufferMaxLength)
assert.NotNil(t, e.ToVertexLimits.BufferUsageLimit)
}

twothouand := uint64(2000)
pl.Spec.Vertices[2].Limits = &dfv1.VertexLimits{BufferMaxLength: &twothouand}
edges = []dfv1.Edge{{From: "p1", To: "output"}}
result = copyEdges(pl, edges)
for _, e := range result {
assert.NotNil(t, e.ToVertexLimits)
assert.NotNil(t, e.ToVertexLimits.BufferMaxLength)
assert.Equal(t, twothouand, *e.ToVertexLimits.BufferMaxLength)
assert.NotNil(t, e.ToVertexLimits.BufferUsageLimit)
assert.Equal(t, eighty, *e.ToVertexLimits.BufferUsageLimit)
}
})

t.Run("test copy reduce", func(t *testing.T) {
pl := testReducePipeline.DeepCopy()
edges := []dfv1.Edge{{From: "p1", To: "p2"}}
result := copyEdges(pl, edges)
assert.Equal(t, 1, len(result))
assert.Equal(t, "p1", result[0].From)
assert.Equal(t, "p2", result[0].To)
assert.NotNil(t, result[0].ToVertexLimits)
assert.Equal(t, int64(dfv1.DefaultBufferLength), int64(*result[0].ToVertexLimits.BufferMaxLength))
assert.Equal(t, int32(2), *result[0].ToVertexPartitions)
assert.Equal(t, int32(1), *result[0].FromVertexPartitions)

edges = []dfv1.Edge{{From: "p2", To: "p3"}}
result = copyEdges(pl, edges)
assert.Equal(t, 1, len(result))
assert.Equal(t, "p2", result[0].From)
assert.Equal(t, "p3", result[0].To)
assert.Equal(t, int32(1), *result[0].ToVertexPartitions)
assert.Equal(t, int32(2), *result[0].FromVertexPartitions)
})

twothouand := uint64(2000)
pl.Spec.Vertices[2].Limits = &dfv1.VertexLimits{BufferMaxLength: &twothouand}
edges = []dfv1.Edge{{From: "p1", To: "output"}}
result = copyEdges(pl, edges)
for _, e := range result {
assert.NotNil(t, e.ToVertexLimits)
assert.NotNil(t, e.ToVertexLimits.BufferMaxLength)
assert.Equal(t, twothouand, *e.ToVertexLimits.BufferMaxLength)
assert.NotNil(t, e.ToVertexLimits.BufferUsageLimit)
assert.Equal(t, eighty, *e.ToVertexLimits.BufferUsageLimit)
}
}

func Test_buildISBBatchJob(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/reconciler/pipeline/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ var (
},
},
{
Name: "p2",
Name: "p2",
Partitions: pointer.Int32(2),
UDF: &dfv1.UDF{
Container: &dfv1.Container{
Image: "my-image",
Expand All @@ -115,6 +116,7 @@ var (
},
},
},
Keyed: true,
Storage: &dfv1.PBQStorage{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/function/uds_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package function
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"strconv"
"sync"
"time"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down
14 changes: 7 additions & 7 deletions pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ import (
// edgeFetcher is a fetcher between two vertices.
type edgeFetcher struct {
ctx context.Context
bufferName string
bucketName string
storeWatcher store.WatermarkStoreWatcher
processorManager *processor.ProcessorManager
log *zap.SugaredLogger
}

// NewEdgeFetcher returns a new edge fetcher.
func NewEdgeFetcher(ctx context.Context, bufferName string, storeWatcher store.WatermarkStoreWatcher, manager *processor.ProcessorManager) Fetcher {
log := logging.FromContext(ctx).With("bufferName", bufferName)
func NewEdgeFetcher(ctx context.Context, bucketName string, storeWatcher store.WatermarkStoreWatcher, manager *processor.ProcessorManager) Fetcher {
log := logging.FromContext(ctx).With("bucketName", bucketName)
log.Info("Creating a new edge watermark fetcher")
return &edgeFetcher{
ctx: ctx,
bufferName: bufferName,
bucketName: bucketName,
storeWatcher: storeWatcher,
processorManager: manager,
log: log,
Expand Down Expand Up @@ -87,7 +87,7 @@ func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) wmb.Watermark {
if epoch == math.MaxInt64 {
epoch = -1
}
e.log.Debugf("%s[%s] get watermark for offset %d: %+v", debugString.String(), e.bufferName, offset, epoch)
e.log.Debugf("%s[%s] get watermark for offset %d: %+v", debugString.String(), e.bucketName, offset, epoch)

return wmb.Watermark(time.UnixMilli(epoch))
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (e *edgeFetcher) GetHeadWMB() wmb.WMB {
// we only consider the latest wmb in the offset timeline
var curHeadWMB = p.GetOffsetTimeline().GetHeadWMB()
if !curHeadWMB.Idle {
e.log.Debugf("[%s] GetHeadWMB finds an active head wmb for offset, return early", e.bufferName)
e.log.Debugf("[%s] GetHeadWMB finds an active head wmb for offset, return early", e.bucketName)
return wmb.WMB{}
}
if curHeadWMB.Watermark != -1 {
Expand All @@ -160,7 +160,7 @@ func (e *edgeFetcher) GetHeadWMB() wmb.WMB {
// there is no valid watermark yet
return wmb.WMB{}
}
e.log.Debugf("GetHeadWMB: %s[%s] get idle head wmb for offset", debugString.String(), e.bufferName)
e.log.Debugf("GetHeadWMB: %s[%s] get idle head wmb for offset", debugString.String(), e.bucketName)
return headWMB
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/watermark/fetch/edge_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestBuffer_GetWatermark(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
b := &edgeFetcher{
ctx: ctx,
bufferName: "testBuffer",
bucketName: "testBucket",
processorManager: tt.processorManager,
log: zaptest.NewLogger(t).Sugar(),
}
Expand All @@ -161,7 +161,7 @@ func TestBuffer_GetWatermark(t *testing.T) {
func Test_edgeFetcher_GetHeadWatermark(t *testing.T) {
var (
ctx = context.Background()
bufferName = "testBuffer"
bucketName = "testBucket"
hbWatcher = noop.NewKVOpWatch()
otWatcher = noop.NewKVOpWatch()
storeWatcher = store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
Expand Down Expand Up @@ -192,7 +192,7 @@ func Test_edgeFetcher_GetHeadWatermark(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
e := &edgeFetcher{
ctx: ctx,
bufferName: bufferName,
bucketName: bucketName,
storeWatcher: storeWatcher,
processorManager: tt.processorManager,
log: zaptest.NewLogger(t).Sugar(),
Expand Down Expand Up @@ -289,7 +289,7 @@ func getHeadWMTest2(ctx context.Context, processorManager1 *processor.ProcessorM
func Test_edgeFetcher_GetHeadWMB(t *testing.T) {
var (
ctx = context.Background()
bufferName = "testBuffer"
bucketName = "testBucket"
hbWatcher = noop.NewKVOpWatch()
otWatcher = noop.NewKVOpWatch()
storeWatcher = store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
Expand Down Expand Up @@ -338,7 +338,7 @@ func Test_edgeFetcher_GetHeadWMB(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
e := &edgeFetcher{
ctx: ctx,
bufferName: bufferName,
bucketName: bucketName,
storeWatcher: storeWatcher,
processorManager: tt.processorManager,
log: zaptest.NewLogger(t).Sugar(),
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
assert.NoError(t, err)
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
var processorManager = processor.NewProcessorManager(ctx, storeWatcher)
var fetcher = NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager)
var fetcher = NewEdgeFetcher(ctx, "testBucket", storeWatcher, processorManager)
// start p1 heartbeat for 3 loops
wg.Add(1)
go func() {
Expand Down Expand Up @@ -791,7 +791,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
assert.NoError(t, err)
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
processorManager := processor.NewProcessorManager(ctx, storeWatcher)
fetcher := NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager)
fetcher := NewEdgeFetcher(ctx, "testBucket", storeWatcher, processorManager)
wg.Add(1)
go func() {
defer wg.Done()
Expand Down

0 comments on commit 7cb399e

Please sign in to comment.