Skip to content
Permalink
Browse files

Add subdag to vertex mechanism

Signed-off-by: Swarvanu Sengupta <swarvanusg@gmail.com>
  • Loading branch information...
s8sg committed Dec 6, 2018
1 parent da874a3 commit 8b3c3eaa77ca50746f5bc3bd7f78032c405f424c
@@ -4,4 +4,4 @@
test -z "$(gofmt -l $(find . -type f -name '*.go' -not -path "./vendor/*" -not -path "./example/*" -not -path "./template/*" -not -path "./doc/*" -not -path "./ci/" -not -path "./" ))" || { echo "Run \"gofmt -s -w\" on your Golang code"; exit 1; }

go test ./workflow.go ./context.go ./workflow_test.go ./workflow_dag.go -cover
go test sdk/dag.go sdk/operation.go sdk/pipeline.go -cover
go test sdk/dag.go sdk/operation.go sdk/pipeline.go sdk/dot.go -cover
@@ -7,12 +7,16 @@ import (
var (
// ERR_CYCLIC denotes that dag has a cycle
ERR_CYCLIC = fmt.Errorf("dag has cyclic dependency")
// ERR_DUPLICATE denotes that a dag edge is duplicate
ERR_DUPLICATE = fmt.Errorf("edge redefined")
// ERR_DUPLICATE_EDGE denotes that a dag edge is duplicate
ERR_DUPLICATE_EDGE = fmt.Errorf("edge redefined")
// ERR_DUPLICATE_VERTEX denotes that a dag edge is duplicate
ERR_DUPLICATE_VERTEX = fmt.Errorf("vertex redefined")
// ERR_MULTIPLE_START denotes that a dag has more than one start point
ERR_MULTIPLE_START = fmt.Errorf("only one start vertex is allowed")
// ERR_MULTIPLE_END denotes that a dag has more than one end point
ERR_MULTIPLE_END = fmt.Errorf("only one end vertex is allowed")
// ERR_RECURSIVE_DEP denotes that dag has a recursive dependecy
ERR_RECURSIVE_DEP = fmt.Errorf("dag has recursive dependency")
// NodeIndex
nodeIndex = 0
// Default forwarder
@@ -27,18 +31,28 @@ type Forwarder func([]byte) []byte

// Dag The whole dag
type Dag struct {
nodes map[string]*Node
Id string
nodes map[string]*Node // the nodes in a dag

parentNode *Node // In case the dag is a sub dag the node reference

initialNode *Node // The start of a valid dag
endNode *Node // The end of a valid dag
}

// Node The vertex
type Node struct {
Id string // The id of the vertex
index int // The index of the vertex

operations []*Operation // The list of operations
// Execution modes ([]operation / Dag)
subDag *Dag // Subdag
operations []*Operation // The list of operations

serializer Serializer // The serializer serialize multiple input to a node into one
forwarder map[string]Forwarder // The forwarder handle forwarding output to a children

parentDag *Dag // The reference of the dag this node part of
indegree int // The vertex dag indegree
outdegree int // The vertex dag outdegree
children []*Node // The children of the vertex
@@ -55,11 +69,31 @@ func NewDag() *Dag {
return this
}

// Append appends another dag into an existing dag
// Its a way to define and reuse subdags
// append causes disconnected dag which must be linked with edge in order to execute
func (this *Dag) Append(dag *Dag) error {
err := dag.Validate()
if err != nil {
return err
}
for nodeId, node := range dag.nodes {
_, duplicate := this.nodes[nodeId]
if duplicate {
return ERR_DUPLICATE_VERTEX
}
// add the node
this.nodes[nodeId] = node
}
return nil
}

// AddVertex create a vertex with id and operations
func (this *Dag) AddVertex(id string, operations []*Operation) *Node {

node := &Node{Id: id, operations: operations, index: nodeIndex + 1}
node.forwarder = make(map[string]Forwarder, 0)
node.parentDag = this
nodeIndex = nodeIndex + 1
this.nodes[id] = node
return node
@@ -77,12 +111,12 @@ func (this *Dag) AddEdge(from, to string) error {
toNode = this.AddVertex(to, []*Operation{})
}

// CHeck if duplicate
// CHeck if duplicate (TODO: Check if one way check is enough)
if toNode.inSlice(fromNode.children) || fromNode.inSlice(toNode.dependsOn) {
return ERR_DUPLICATE
return ERR_DUPLICATE_EDGE
}

// Check if cyclic dependency
// Check if cyclic dependency (TODO: Check if one way check if enough)
if fromNode.inSlice(toNode.next) || toNode.inSlice(fromNode.prev) {
return ERR_CYCLIC
}
@@ -119,14 +153,19 @@ func (this *Dag) GetNode(id string) *Node {
return this.nodes[id]
}

// GetInitialNode get the initial node
// GetParentNode returns parent node for a subdag
func (this *Dag) GetParentNode() *Node {
return this.parentNode
}

// GetInitialNode gets the initial node
func (this *Dag) GetInitialNode() *Node {
for _, b := range this.nodes {
if b.indegree == 0 {
return b
}
}
return nil
return this.initialNode
}

// GetEndNode gets the end node
func (this *Dag) GetEndNode() *Node {
return this.endNode
}

// Validate validates a dag as per faas-flow dag requirments
@@ -137,9 +176,17 @@ func (this *Dag) Validate() error {
for _, b := range this.nodes {
if b.indegree == 0 {
initialNodeCount = initialNodeCount + 1
this.initialNode = b
}
if b.outdegree == 0 {
endNodeCount = endNodeCount + 1
this.endNode = b
}
if b.subDag != nil {
err := b.subDag.Validate()
if err != nil {
return err
}
}
}

@@ -187,7 +234,17 @@ func (this *Node) Outdegree() int {
return this.outdegree
}

// AddOperation add an operation
// SubDag returns the subdag added in a node
func (this *Node) SubDag() *Dag {
return this.subDag
}

// ParentDag returns the parent dag of the node
func (this *Node) ParentDag() *Dag {
return this.parentDag
}

// AddOperation adds an operation
func (this *Node) AddOperation(operation *Operation) {
this.operations = append(this.operations, operation)
}
@@ -202,6 +259,28 @@ func (this *Node) AddForwarder(children string, forwarder Forwarder) {
this.forwarder[children] = forwarder
}

// AddSubDag adds a subdag to the node
func (this *Node) AddSubDag(subDag *Dag) error {
parent := this.parentDag
for parent != nil {
if parent == subDag {
return ERR_RECURSIVE_DEP
}
parentNode := parent.parentNode
if parentNode != nil {
parent = parentNode.subDag
continue
}
break
}
this.subDag = subDag
// Set the node the subdag belongs to
subDag.parentNode = this
subDag.Id = this.Id

return nil
}

// GetSerializer get a serializer from a node
func (this *Node) GetSerializer() Serializer {
return this.serializer
@@ -0,0 +1,129 @@
package sdk

import (
"fmt"
"log"
"strings"
)

func generateDag(dag *Dag, sb *strings.Builder) {
// generate nodes
for _, node := range dag.nodes {

log.Printf("%s - %s", dag.Id, node.Id)

sb.WriteString(fmt.Sprintf("\n\tsubgraph cluster_%d {", node.index))
nodeIndexStr := fmt.Sprintf("%d", node.index-1)
if nodeIndexStr != node.Id {
sb.WriteString(fmt.Sprintf("\n\t\tlabel=\"%d-%s\";", node.index, node.Id))
} else {
sb.WriteString(fmt.Sprintf("\n\t\tlabel=\"%d\";", node.index))
}
sb.WriteString("\n\t\tcolor=lightgrey;")
sb.WriteString("\n\t\tstyle=rounded;\n")

previousOperation := ""
subdag := node.SubDag()
if subdag != nil {
generateDag(subdag, sb)
} else {
for opsindex, operation := range node.Operations() {
operationStr := ""
switch {
case operation.Function != "":
operationStr = "func-" + operation.Function
case operation.CallbackUrl != "":
operationStr = "callback-" +
operation.CallbackUrl[len(operation.CallbackUrl)-4:]
default:
operationStr = "modifier"
}
operationKey := fmt.Sprintf("%s.%d.%d-%s", dag.Id, node.index, opsindex+1, operationStr)

switch {
case len(node.children) == 0 &&
opsindex == len(node.Operations())-1:
sb.WriteString(fmt.Sprintf("\n\t\t\"%s\" [color=pink];",
operationKey))
case node.indegree == 0 && opsindex == 0:
sb.WriteString(fmt.Sprintf("\n\t\t\"%s\" [color=lightblue];",
operationKey))
default:
sb.WriteString(fmt.Sprintf("\n\t\t\"%s\" [color=lightgrey];",
operationKey))
}

if previousOperation != "" {
sb.WriteString(fmt.Sprintf("\n\t\t\"%s\" -> \"%s\" [label=\"1:1\" color=grey];",
previousOperation, operationKey))
}
previousOperation = operationKey
}

sb.WriteString("\n\t}")

relation := ""

for _, child := range node.children {

// TODO: Later change to check if 1:N

relation = "1:1"
operationStr := ""
var operation sdk.Operation

node =
for true {
subDag = child.SubDag()
if subdag != nil {
operation = subdag.GetInitialNode()
} else {
operation = child.Operations()[0]
}
}

switch {
case operation.Function != "":
operationStr = "func-" + operation.Function
case operation.CallbackUrl != "":
operationStr = "callback-" +
operation.CallbackUrl[len(operation.CallbackUrl)-4:]
default:
operationStr = "modifier"
}

childOperationKey := fmt.Sprintf("%d.1-%s",
child.index, operationStr)

if node.GetForwarder(child.Id) == nil {
relation = relation + " - nodata"
}

sb.WriteString(fmt.Sprintf("\n\t\"%s\" -> \"%s\" [label=\"%s\" color=grey];",
previousOperation, childOperationKey, relation))
}

sb.WriteString("\n")
}
}
}

// MakeDotGraph create a dot graph of the pipeline
func (pipeline *Pipeline) MakeDotGraph() string {

var sb strings.Builder

sb.WriteString("digraph depgraph {")
sb.WriteString("\n\trankdir=TB;")
sb.WriteString("\n\tsplines=curved;")
sb.WriteString("\n\tfontname=\"Courier New\";")
sb.WriteString("\n\tfontcolor=grey;")

sb.WriteString("\n\tnode [style=filled fontname=\"Courier\" fontcolor=black]\n")

generateDag(pipeline.Dag, &sb)

sb.WriteString("}\n")
return sb.String()

}

0 comments on commit 8b3c3ea

Please sign in to comment.
You can’t perform that action at this time.