Skip to content

Commit

Permalink
feat: shuffling support (#306)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Nov 7, 2022
1 parent 81cf456 commit 34a6d70
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 33 deletions.
6 changes: 4 additions & 2 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ func (p Pipeline) ListAllEdges() []Edge {
toVertex := p.GetVertex(e.To)
if toVertex.UDF == nil || toVertex.UDF.GroupBy == nil {
// Clean up parallelism if downstream vertex is not a reduce UDF.
// This has been validated by the controller, harmless to do it here.
edgeCopy.Parallelism = nil
} else if edgeCopy.Parallelism == nil || *edgeCopy.Parallelism < 1 {
// Set parallelism = 1 if it's not set.
} else if edgeCopy.Parallelism == nil || *edgeCopy.Parallelism < 1 || !toVertex.UDF.GroupBy.Keyed {
// Set parallelism = 1 if it's not set, or it's a non-keyed reduce.
// Already validated by the controller to make sure parallelism is not > 1 if it's not keyed, harmless to check it again.
edgeCopy.Parallelism = pointer.Int32(1)
}
edges = append(edges, *edgeCopy)
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func Test_ListAllEdges(t *testing.T) {
es = pl.ListAllEdges()
assert.Equal(t, 2, len(es))
assert.NotNil(t, es[0].Parallelism)
assert.Equal(t, int32(1), *es[0].Parallelism)
pl.Spec.Vertices[1].UDF.GroupBy.Keyed = true
es = pl.ListAllEdges()
assert.Equal(t, int32(3), *es[0].Parallelism)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,13 @@ type VertexList struct {
// GenerateEdgeBufferNames generates buffer names for an edge
func GenerateEdgeBufferNames(namespace, pipelineName string, edge Edge) []string {
buffers := []string{}
// Pipeline controller makes sure the parallelism is always nil for an edge leading to a non-reduce vertex.
if edge.Parallelism == nil {
buffers = append(buffers, fmt.Sprintf("%s-%s-%s-%s", namespace, pipelineName, edge.From, edge.To))
return buffers
}
// Pipeline controller makes sure the parallelism is always not nil for an edge leading to a reduce vertex.
// It also makes sure parallelism = 1 if it's a non-keyed reduce.
for i := int32(0); i < *edge.Parallelism; i++ {
buffers = append(buffers, fmt.Sprintf("%s-%s-%s-%s-%d", namespace, pipelineName, edge.From, edge.To, i))
}
Expand Down
41 changes: 37 additions & 4 deletions pkg/reconciler/pipeline/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
names := make(map[string]bool)
sources := make(map[string]dfv1.AbstractVertex)
sinks := make(map[string]dfv1.AbstractVertex)
udfs := make(map[string]dfv1.AbstractVertex)
mapUdfs := make(map[string]dfv1.AbstractVertex)
reduceUdfs := make(map[string]dfv1.AbstractVertex)
for _, v := range pl.Spec.Vertices {
if names[v.Name] {
return fmt.Errorf("duplicate vertex name %q", v.Name)
Expand All @@ -61,7 +62,11 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
if v.Source != nil || v.Sink != nil {
return fmt.Errorf("invalid vertex %q, only one of 'source', 'sink' and 'udf' can be specified", v.Name)
}
udfs[v.Name] = v
if v.UDF.GroupBy != nil {
reduceUdfs[v.Name] = v
} else {
mapUdfs[v.Name] = v
}
}
}

Expand All @@ -73,7 +78,7 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
return fmt.Errorf("pipeline has no sink, at lease one vertex with 'sink' defined is required")
}

for k, u := range udfs {
for k, u := range mapUdfs {
if u.UDF.Container != nil {
if u.UDF.Container.Image == "" && u.UDF.Builtin == nil {
return fmt.Errorf("invalid vertex %q, either specify a builtin function, or a customized image", k)
Expand All @@ -86,6 +91,18 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
}
}

for k, u := range reduceUdfs {
if u.UDF.Builtin != nil {
// No builtin function supported for reduce vertices.
return fmt.Errorf("invalid vertex %q, there's no buildin function support in reduce vertices", k)
}
if u.UDF.Container != nil {
if u.UDF.Container.Image == "" {
return fmt.Errorf("invalid vertex %q, a customized image is required", k)
}
}
}

namesInEdges := make(map[string]bool)
for _, e := range pl.Spec.Edges {
if e.From == "" || e.To == "" {
Expand All @@ -108,7 +125,23 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
}
if e.Conditions != nil && len(e.Conditions.KeyIn) > 0 {
if _, ok := sources[e.From]; ok { // Source vertex should not do conditional forwarding
return fmt.Errorf("invalid edge, \"conditions.keysIn\" not allowed for %q", e.From)
return fmt.Errorf(`invalid edge, "conditions.keysIn" not allowed for %q`, e.From)
}
}
if e.Parallelism != nil {
if _, ok := reduceUdfs[e.To]; !ok {
return fmt.Errorf(`invalid edge (%s - %s), "parallelism" is not allowed for an edge leading to a non-reduce vertex`, e.From, e.To)
}
if *e.Parallelism < 1 {
return fmt.Errorf(`invalid edge (%s - %s), "parallelism" is < 1`, e.From, e.To)
}
if *e.Parallelism > 1 && !reduceUdfs[e.To].UDF.GroupBy.Keyed {
// We only support single partition non-keyed windowing.
return fmt.Errorf(`invalid edge (%s - %s), "parallelism" should not > 1 for non-keyed windowing`, e.From, e.To)
}
if _, ok := sources[e.From]; ok && reduceUdfs[e.To].UDF.GroupBy.Keyed {
// Source vertex can not lead to a keyed reduce vertex, because the keys coming from sources are undeterminable.
return fmt.Errorf(`invalid spec (%s - %s), "keyed" should not be true for a reduce vertex which has data coming from a source vertex`, e.From, e.To)
}
}
namesInEdges[e.From] = true
Expand Down
112 changes: 111 additions & 1 deletion pkg/reconciler/pipeline/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package pipeline

import (
corev1 "k8s.io/api/core/v1"
"testing"
"time"

corev1 "k8s.io/api/core/v1"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -55,6 +57,64 @@ var (
},
},
}

testReducePipeline = &dfv1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pl",
Namespace: "test-ns",
},
Spec: dfv1.PipelineSpec{
Vertices: []dfv1.AbstractVertex{
{
Name: "input",
Source: &dfv1.Source{},
},
{
Name: "p1",
UDF: &dfv1.UDF{
Container: &dfv1.Container{
Image: "my-image",
},
GroupBy: &dfv1.GroupBy{
Window: dfv1.Window{
Fixed: &dfv1.FixedWindow{
Length: &metav1.Duration{
Duration: time.Duration(60 * time.Second),
},
},
},
},
},
},
{
Name: "p2",
UDF: &dfv1.UDF{
Container: &dfv1.Container{
Image: "my-image",
},
GroupBy: &dfv1.GroupBy{
Window: dfv1.Window{
Fixed: &dfv1.FixedWindow{
Length: &metav1.Duration{
Duration: time.Duration(60 * time.Second),
},
},
},
},
},
},
{
Name: "output",
Sink: &dfv1.Sink{},
},
},
Edges: []dfv1.Edge{
{From: "input", To: "p1"},
{From: "p1", To: "p2"},
{From: "p2", To: "output"},
},
},
}
)

func TestValidatePipeline(t *testing.T) {
Expand All @@ -68,6 +128,14 @@ func TestValidatePipeline(t *testing.T) {
assert.Error(t, err)
})

t.Run("parallelism on non-reduce vertex", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Edges[0].Parallelism = pointer.Int32(3)
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), `"parallelism" is not allowed for an edge leading to a non-reduce vertex`)
})

t.Run("no type", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "abc"})
Expand Down Expand Up @@ -198,6 +266,48 @@ func TestValidatePipeline(t *testing.T) {
})
}

func TestValidateReducePipeline(t *testing.T) {
t.Run("test good reduce pipeline", func(t *testing.T) {
err := ValidatePipeline(testReducePipeline)
assert.NoError(t, err)
})

t.Run("test builtin and container co-existing", func(t *testing.T) {
testObj := testReducePipeline.DeepCopy()
testObj.Spec.Vertices[1].UDF.Builtin = &dfv1.Function{
Name: "cat",
}
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "no buildin function support in reduce vertices")
})

t.Run("test no image in container", func(t *testing.T) {
testObj := testReducePipeline.DeepCopy()
testObj.Spec.Vertices[1].UDF.Container.Image = ""
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "a customized image is required")
})

t.Run("test source with keyed", func(t *testing.T) {
testObj := testReducePipeline.DeepCopy()
testObj.Spec.Edges[0].Parallelism = pointer.Int32(2)
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), `"parallelism" should not > 1 for non-keyed windowing`)
testObj.Spec.Edges[0].Parallelism = pointer.Int32(-1)
err = ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), `"parallelism" is < 1`)
testObj.Spec.Edges[0].Parallelism = pointer.Int32(1)
testObj.Spec.Vertices[1].UDF.GroupBy.Keyed = true
err = ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), `"keyed" should not be true for a reduce vertex which has data coming from a source vertex`)
})
}

func TestValidateVertex(t *testing.T) {
t.Run("bad min", func(t *testing.T) {
v := dfv1.AbstractVertex{
Expand Down
20 changes: 13 additions & 7 deletions pkg/shuffle/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package shuffle

import (
"github.com/numaproj/numaflow/pkg/isb"
"hash"
"hash/fnv"

"github.com/numaproj/numaflow/pkg/isb"
)

// Shuffle shuffles messages among ISB
Expand All @@ -39,16 +40,21 @@ func NewShuffle(bufferIdentifiers []string) *Shuffle {
}
}

// ShuffleMessages accepts list of isb messages and returns the mapping of isb to messages
func (s *Shuffle) ShuffleMessages(messages []*isb.Message) map[string][]*isb.Message {

// Shuffle functions returns a shuffled identifier.
func (s *Shuffle) Shuffle(key string) string {
// hash of the message key returns a unique hashValue
// mod of hashValue will decide which isb it will belong
hashValue := s.generateHash(key)
hashValue = hashValue % uint64(s.buffersCount)
return s.bufferIdentifiers[hashValue]
}

// ShuffleMessages accepts list of isb messages and returns the mapping of isb to messages
func (s *Shuffle) ShuffleMessages(messages []*isb.Message) map[string][]*isb.Message {
hashMap := make(map[string][]*isb.Message)
for _, message := range messages {
hashValue := s.generateHash(message.Key)
hashValue = hashValue % uint64(s.buffersCount)
hashMap[s.bufferIdentifiers[hashValue]] = append(hashMap[s.bufferIdentifiers[hashValue]], message)
identifier := s.Shuffle(message.Key)
hashMap[identifier] = append(hashMap[identifier], message)
}
return hashMap
}
Expand Down
31 changes: 21 additions & 10 deletions pkg/udf/map_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import (
"sync"
"time"

"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/forward"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"

"github.com/numaproj/numaflow/pkg/shuffle"
"github.com/numaproj/numaflow/pkg/udf/function"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/generic/jetstream"
"go.uber.org/zap"
)

type MapUDFProcessor struct {
Expand Down Expand Up @@ -70,18 +71,28 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
return fmt.Errorf("unrecognized isbsvc type %q", u.ISBSvcType)
}

// Populate shuffle function map
shuffleFuncMap := make(map[string]*shuffle.Shuffle)
for _, edge := range u.VertexInstance.Vertex.Spec.ToEdges {
if edge.Parallelism != nil && *edge.Parallelism > 1 {
s := shuffle.NewShuffle(dfv1.GenerateEdgeBufferNames(u.VertexInstance.Vertex.Namespace, u.VertexInstance.Vertex.Spec.PipelineName, edge))
shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)] = s
}
}

conditionalForwarder := forward.GoWhere(func(key string) ([]string, error) {
result := []string{}
_key := string(key)
if _key == dfv1.MessageKeyAll || _key == dfv1.MessageKeyDrop {
result = append(result, _key)
if key == dfv1.MessageKeyDrop {
return result, nil
}
for _, to := range u.VertexInstance.Vertex.Spec.ToEdges {
// If returned key is not "ALL" or "DROP", and there's no conditions defined in the edge,
// treat it as "ALL"?
if to.Conditions == nil || len(to.Conditions.KeyIn) == 0 || sharedutil.StringSliceContains(to.Conditions.KeyIn, _key) {
result = append(result, dfv1.GenerateEdgeBufferNames(u.VertexInstance.Vertex.Namespace, u.VertexInstance.Vertex.Spec.PipelineName, to)...)
for _, edge := range u.VertexInstance.Vertex.Spec.ToEdges {
// If returned key is not "DROP", and there's no conditions defined in the edge, treat it as "ALL"?
if edge.Conditions == nil || len(edge.Conditions.KeyIn) == 0 || sharedutil.StringSliceContains(edge.Conditions.KeyIn, key) {
if edge.Parallelism != nil && *edge.Parallelism > 1 { // Need to shuffle
result = append(result, shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)].Shuffle(key))
} else {
result = append(result, dfv1.GenerateEdgeBufferNames(u.VertexInstance.Vertex.Namespace, u.VertexInstance.Vertex.Spec.PipelineName, edge)...)
}
}
}
return result, nil
Expand Down
Loading

0 comments on commit 34a6d70

Please sign in to comment.