From 4702849c298b17f908b7de47d96ce94699661cc7 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar <38965141+dpadhiar@users.noreply.github.com> Date: Thu, 14 Sep 2023 13:51:55 -0700 Subject: [PATCH] feat: implement pipeline validation for unsupported states (#1043) Signed-off-by: Dillen Padhiar Signed-off-by: Keran Yang --- pkg/reconciler/pipeline/validate.go | 23 +++++++++++++----- pkg/reconciler/pipeline/validate_test.go | 30 ++++++++++++++++++++---- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/pkg/reconciler/pipeline/validate.go b/pkg/reconciler/pipeline/validate.go index 13f02fb97..e25651bee 100644 --- a/pkg/reconciler/pipeline/validate.go +++ b/pkg/reconciler/pipeline/validate.go @@ -58,6 +58,9 @@ func ValidatePipeline(pl *dfv1.Pipeline) error { if v.Sink != nil || v.UDF != nil { return fmt.Errorf("invalid vertex %q, only one of 'source', 'sink' and 'udf' can be specified", v.Name) } + if len(pl.GetToEdges(v.Name)) == 0 || len(pl.GetFromEdges(v.Name)) > 0 { + return fmt.Errorf("invalid vertex %q, source must have 0 from edges and at least 1 to edge", v.Name) + } sources[v.Name] = v if v.Source.UDTransformer != nil { udTransformers[v.Name] = v @@ -67,12 +70,18 @@ func ValidatePipeline(pl *dfv1.Pipeline) error { if v.Source != nil || v.UDF != nil { return fmt.Errorf("invalid vertex %q, only one of 'source', 'sink' and 'udf' can be specified", v.Name) } + if len(pl.GetFromEdges(v.Name)) == 0 || len(pl.GetToEdges(v.Name)) > 0 { + return fmt.Errorf("invalid vertex %q, sink must have 0 to edges and at least 1 from edge", v.Name) + } sinks[v.Name] = v } if v.UDF != nil { if v.Source != nil || v.Sink != nil { return fmt.Errorf("invalid vertex %q, only one of 'source', 'sink' and 'udf' can be specified", v.Name) } + if len(pl.GetToEdges(v.Name)) == 0 || len(pl.GetFromEdges(v.Name)) == 0 { + return fmt.Errorf("invalid vertex %q, UDF must have to and from edges", v.Name) + } if v.UDF.GroupBy != nil { reduceUdfs[v.Name] = v } else { @@ -140,6 +149,7 @@ func ValidatePipeline(pl *dfv1.Pipeline) error { } namesInEdges := make(map[string]bool) + toFromEdge := make(map[string]bool) for _, e := range pl.Spec.Edges { if e.From == "" || e.To == "" { return fmt.Errorf("invalid edge: both from and to need to be specified") @@ -150,15 +160,16 @@ func ValidatePipeline(pl *dfv1.Pipeline) error { if !names[e.To] { return fmt.Errorf("invalid edge: no vertex named %q", e.To) } - if _, existing := sources[e.To]; existing { - return fmt.Errorf("source vertex %q can not be define as 'to'", e.To) - } - if _, existing := sinks[e.From]; existing { - return fmt.Errorf("sink vertex %q can not be define as 'from'", e.To) - } namesInEdges[e.From] = true namesInEdges[e.To] = true + // check for redundant edges + if _, existing := toFromEdge[e.From+e.To]; existing { + return fmt.Errorf("cannot define multiple edges from vertex %q to vertex %q", e.From, e.To) + } else { + toFromEdge[e.From+e.To] = true + } } + if len(namesInEdges) != len(names) { return fmt.Errorf("not all the vertex names are defined in edges") } diff --git a/pkg/reconciler/pipeline/validate_test.go b/pkg/reconciler/pipeline/validate_test.go index 93b80092a..ea345f60f 100644 --- a/pkg/reconciler/pipeline/validate_test.go +++ b/pkg/reconciler/pipeline/validate_test.go @@ -294,7 +294,7 @@ func TestValidatePipeline(t *testing.T) { testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "p1", To: "input"}) err := ValidatePipeline(testObj) assert.Error(t, err) - assert.Contains(t, err.Error(), "can not be define as 'to'") + assert.Contains(t, err.Error(), "source must have 0 from edges and at least 1 to edge") }) t.Run("edge - sink as from", func(t *testing.T) { @@ -302,21 +302,30 @@ func TestValidatePipeline(t *testing.T) { testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "output", To: "p1"}) err := ValidatePipeline(testObj) assert.Error(t, err) - assert.Contains(t, err.Error(), "can not be define as 'from'") + assert.Contains(t, err.Error(), "sink must have 0 to edges and at least 1 from edge") }) - t.Run("vertex not in edges", func(t *testing.T) { + t.Run("edge - duplicate", func(t *testing.T) { testObj := testPipeline.DeepCopy() - testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "input1", Source: &dfv1.Source{}}) + testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "input", To: "p1"}) err := ValidatePipeline(testObj) assert.Error(t, err) - assert.Contains(t, err.Error(), "not all the vertex names are defined") + assert.Contains(t, err.Error(), "cannot define multiple edges") + }) + + t.Run("UDF not connected to pipeline", func(t *testing.T) { + testObj := testPipeline.DeepCopy() + testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "input1", UDF: &dfv1.UDF{Builtin: &dfv1.Function{Name: "cat"}}}) + err := ValidatePipeline(testObj) + assert.Error(t, err) + assert.Contains(t, err.Error(), "UDF must have to and from edges") }) t.Run("pipeline has not source", func(t *testing.T) { testObj := testPipeline.DeepCopy() testObj.Spec.Vertices[0].Source = nil testObj.Spec.Vertices[0].UDF = &dfv1.UDF{Builtin: &dfv1.Function{Name: "cat"}} + testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "input", To: "input"}) err := ValidatePipeline(testObj) assert.Error(t, err) assert.Contains(t, err.Error(), "pipeline has no source") @@ -326,11 +335,22 @@ func TestValidatePipeline(t *testing.T) { testObj := testPipeline.DeepCopy() testObj.Spec.Vertices[2].Sink = nil testObj.Spec.Vertices[2].UDF = &dfv1.UDF{Builtin: &dfv1.Function{Name: "cat"}} + testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "output", To: "output"}) err := ValidatePipeline(testObj) assert.Error(t, err) assert.Contains(t, err.Error(), "pipeline has no sink") }) + t.Run("last vertex is not sink", func(t *testing.T) { + testObj := testPipeline.DeepCopy() + testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "bad-output", UDF: &dfv1.UDF{Builtin: &dfv1.Function{Name: "cat"}}}) + testObj.Spec.Edges[1] = dfv1.Edge{From: "p1", To: "bad-output"} + testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "bad-output", To: "p1"}) + err := ValidatePipeline(testObj) + assert.Error(t, err) + assert.Contains(t, err.Error(), "sink must have 0 to edges and at least 1 from edge") + }) + t.Run("or conditional forwarding", func(t *testing.T) { testObj := testPipeline.DeepCopy() operatorOr := dfv1.LogicOperatorOr