Skip to content

Commit

Permalink
add SwitchNode
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang0 committed Apr 5, 2024
1 parent 7134817 commit 5871492
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 2 deletions.
10 changes: 9 additions & 1 deletion flow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,15 @@ func (e *Executor) Execute(ctx context.Context) error {
return errors.Newf("inconsist flow name: %s != %s", flowName, f.Name)
}
node := dagNode.(*flowNode) // nolint
resp, err := node.fn(e.Body, nil)
nodeFn := node.fn
if node.condFn != nil {
key := node.condFn(e.Body)
nodeFn = node.cases[key]
}
if nodeFn == nil {
return errors.Newf("failed to get function for node: %s", nodeName)
}
resp, err := nodeFn(e.Body, nil)
if err != nil {
if node.execOpts.failureHandler != nil {
err = node.execOpts.failureHandler(err)
Expand Down
21 changes: 21 additions & 0 deletions flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

type NodeFunc func([]byte, map[string][]string) ([]byte, error)
type SwitchCondFunc func([]byte) string

type Flow struct {
Name string
Expand Down Expand Up @@ -43,6 +44,10 @@ type flowNode struct {
name string
fn NodeFunc
execOpts *ExecutionOptions

// for switch node
condFn SwitchCondFunc
cases map[string]NodeFunc
}

func (f *Flow) Node(name string, fn NodeFunc, opts ...Option) error {
Expand All @@ -57,6 +62,22 @@ func (f *Flow) Node(name string, fn NodeFunc, opts ...Option) error {
})
}

func (f *Flow) SwitchNode(
name string, condFn SwitchCondFunc,
cases map[string]NodeFunc, opts ...Option,
) error {
execOpts := &ExecutionOptions{}
for _, opt := range opts {
opt(execOpts)
}
return f.DAG.AddVertexByID(name, &flowNode{
name: name,
execOpts: execOpts,
condFn: condFn,
cases: cases,
})
}

func (f *Flow) Edge(src, dst string) error {
return f.DAG.AddEdge(src, dst)
}
Expand Down
70 changes: 69 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,72 @@ a DAG task engine based on asynq
5. submit dagflow tasks
```golang
svc.Submit("f1", []byte(`1`))
```
```

## DAG
### single node DAG

```golang
f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
if err = f.Node("n1", incOp); err != nil {
log.Fatal("failed to create node", err)
}
```

### complex DAG
```golang

func prepareFlow(f *flow.Flow) error {
if err := f.Node("l1n1", incOp); err != nil {
return err
}
if err := f.Node("l2n1", incOp); err != nil {
return err
}
if err := f.Node("l2n2", decOp); err != nil {
return err
}
if err := f.Node("l3n1", mulOp, flow.WithAggregator(func (dataMap map[string][]byte) ([]byte, error) {
l2n1Result := dataMap["l2n1"]
l2n2Result := dataMap["l2n2"]
// do anything you want to construct input data for node l3n1
})); err != nil {
return err
}
if err := f.Edge("l1n1", "l2n1"); err != nil {
return err
}
if err := f.Edge("l1n1", "l2n2"); err != nil {
return err
}
if err := f.Edge("l2n1", "l3n1"); err != nil {
return err
}
if err := f.Edge("l2n2", "l3n1"); err != nil {
return err
}
return nil
}
```

### SwitchNode
SwitchNode is a special type node which works like switch case statment in golang

```golang

f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
if err = f.SwitchNode("n1", func(data []byte) string {
return "+"
}, map[string]flow.NodeFunc{
"+": incOp,
"-": decOp,
}); err != nil {
log.Fatal("failed to create node", err)
}
```

0 comments on commit 5871492

Please sign in to comment.