Skip to content

Commit

Permalink
feat: implement pipeline validation for unsupported states (#1043)
Browse files Browse the repository at this point in the history
Signed-off-by: Dillen Padhiar <dillen_padhiar@intuit.com>
Signed-off-by: Keran Yang <yangkr920208@gmail.com>
  • Loading branch information
dpadhiar authored and whynowy committed Sep 14, 2023
1 parent c6f444e commit 4702849
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
23 changes: 17 additions & 6 deletions pkg/reconciler/pipeline/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
if v.Sink != nil || v.UDF != nil {
return fmt.Errorf("invalid vertex %q, only one of 'source', 'sink' and 'udf' can be specified", v.Name)
}
if len(pl.GetToEdges(v.Name)) == 0 || len(pl.GetFromEdges(v.Name)) > 0 {
return fmt.Errorf("invalid vertex %q, source must have 0 from edges and at least 1 to edge", v.Name)
}
sources[v.Name] = v
if v.Source.UDTransformer != nil {
udTransformers[v.Name] = v
Expand All @@ -67,12 +70,18 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
if v.Source != nil || v.UDF != nil {
return fmt.Errorf("invalid vertex %q, only one of 'source', 'sink' and 'udf' can be specified", v.Name)
}
if len(pl.GetFromEdges(v.Name)) == 0 || len(pl.GetToEdges(v.Name)) > 0 {
return fmt.Errorf("invalid vertex %q, sink must have 0 to edges and at least 1 from edge", v.Name)
}
sinks[v.Name] = v
}
if v.UDF != nil {
if v.Source != nil || v.Sink != nil {
return fmt.Errorf("invalid vertex %q, only one of 'source', 'sink' and 'udf' can be specified", v.Name)
}
if len(pl.GetToEdges(v.Name)) == 0 || len(pl.GetFromEdges(v.Name)) == 0 {
return fmt.Errorf("invalid vertex %q, UDF must have to and from edges", v.Name)
}
if v.UDF.GroupBy != nil {
reduceUdfs[v.Name] = v
} else {
Expand Down Expand Up @@ -140,6 +149,7 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
}

namesInEdges := make(map[string]bool)
toFromEdge := make(map[string]bool)
for _, e := range pl.Spec.Edges {
if e.From == "" || e.To == "" {
return fmt.Errorf("invalid edge: both from and to need to be specified")
Expand All @@ -150,15 +160,16 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
if !names[e.To] {
return fmt.Errorf("invalid edge: no vertex named %q", e.To)
}
if _, existing := sources[e.To]; existing {
return fmt.Errorf("source vertex %q can not be define as 'to'", e.To)
}
if _, existing := sinks[e.From]; existing {
return fmt.Errorf("sink vertex %q can not be define as 'from'", e.To)
}
namesInEdges[e.From] = true
namesInEdges[e.To] = true
// check for redundant edges
if _, existing := toFromEdge[e.From+e.To]; existing {
return fmt.Errorf("cannot define multiple edges from vertex %q to vertex %q", e.From, e.To)
} else {
toFromEdge[e.From+e.To] = true
}
}

if len(namesInEdges) != len(names) {
return fmt.Errorf("not all the vertex names are defined in edges")
}
Expand Down
30 changes: 25 additions & 5 deletions pkg/reconciler/pipeline/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,29 +294,38 @@ func TestValidatePipeline(t *testing.T) {
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "p1", To: "input"})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "can not be define as 'to'")
assert.Contains(t, err.Error(), "source must have 0 from edges and at least 1 to edge")
})

t.Run("edge - sink as from", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "output", To: "p1"})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "can not be define as 'from'")
assert.Contains(t, err.Error(), "sink must have 0 to edges and at least 1 from edge")
})

t.Run("vertex not in edges", func(t *testing.T) {
t.Run("edge - duplicate", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "input1", Source: &dfv1.Source{}})
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "input", To: "p1"})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "not all the vertex names are defined")
assert.Contains(t, err.Error(), "cannot define multiple edges")
})

t.Run("UDF not connected to pipeline", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "input1", UDF: &dfv1.UDF{Builtin: &dfv1.Function{Name: "cat"}}})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "UDF must have to and from edges")
})

t.Run("pipeline has not source", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices[0].Source = nil
testObj.Spec.Vertices[0].UDF = &dfv1.UDF{Builtin: &dfv1.Function{Name: "cat"}}
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "input", To: "input"})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "pipeline has no source")
Expand All @@ -326,11 +335,22 @@ func TestValidatePipeline(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices[2].Sink = nil
testObj.Spec.Vertices[2].UDF = &dfv1.UDF{Builtin: &dfv1.Function{Name: "cat"}}
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "output", To: "output"})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "pipeline has no sink")
})

t.Run("last vertex is not sink", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "bad-output", UDF: &dfv1.UDF{Builtin: &dfv1.Function{Name: "cat"}}})
testObj.Spec.Edges[1] = dfv1.Edge{From: "p1", To: "bad-output"}
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "bad-output", To: "p1"})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "sink must have 0 to edges and at least 1 from edge")
})

t.Run("or conditional forwarding", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
operatorOr := dfv1.LogicOperatorOr
Expand Down

0 comments on commit 4702849

Please sign in to comment.