From a9d9daa33183e418c772096aed70594c1e88eb25 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 29 Jun 2022 22:54:06 +0300 Subject: [PATCH 1/2] initial rewrite Signed-off-by: Vasiliy Tolstov --- flow/default.go | 63 ++++++++++++++++++++----------------------------- flow/options.go | 9 ------- go.mod | 6 +++-- go.sum | 18 ++++++++++---- 4 files changed, 43 insertions(+), 53 deletions(-) diff --git a/flow/default.go b/flow/default.go index 57f0b8e9..67a4225a 100644 --- a/flow/default.go +++ b/flow/default.go @@ -6,7 +6,7 @@ import ( "path/filepath" "sync" - "github.com/silas/dag" + "github.com/heimdalr/dag" "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" @@ -21,7 +21,7 @@ type microFlow struct { type microWorkflow struct { opts Options - g *dag.AcyclicGraph + g *dag.DAG steps map[string]Step id string status Status @@ -42,29 +42,32 @@ func (w *microWorkflow) Status() Status { } func (w *microWorkflow) AppendSteps(steps ...Step) error { + var err error w.Lock() for _, s := range steps { w.steps[s.String()] = s - w.g.Add(s) + if _, err = w.g.AddVertex(s); err != nil { + w.Unlock() + return err + } } for _, dst := range steps { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { + w.Unlock() return ErrStepNotExists } - w.g.Connect(dag.BasicEdge(src, dst)) + if err = w.g.AddEdge(src.String(), dst.String()); err != nil { + w.Unlock() + return err + } } } - if err := w.g.Validate(); err != nil { - w.Unlock() - return err - } - - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.Unlock() @@ -78,32 +81,28 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { for _, s := range steps { delete(w.steps, s.String()) - w.g.Remove(s) + w.g.DeleteVertex(s.String()) } for _, dst := range steps { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { + w.Unlock() return ErrStepNotExists } - w.g.Connect(dag.BasicEdge(src, dst)) + w.g.AddEdge(src.String(), dst.String()) } } - if err := w.g.Validate(); err != nil { - w.Unlock() - return err - } - - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.Unlock() return nil } -func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) { +func (w *microWorkflow) getSteps(start string) ([][]Step, error) { var steps [][]Step var root dag.Vertex var err error @@ -137,11 +136,8 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) { } } - if reverse { - err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn) - } else { - err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) - } + err = w.g.SortedDepthFirstWalk(root, fn) + if err != nil { return nil, err } @@ -167,11 +163,7 @@ func (w *microWorkflow) Resume(ctx context.Context, id string) error { func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) { w.Lock() if !w.init { - if err := w.g.Validate(); err != nil { - w.Unlock() - return "", err - } - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.init = true } w.Unlock() @@ -186,7 +178,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu options := NewExecuteOptions(opts...) - steps, err := w.getSteps(options.Start, options.Reverse) + steps, err := w.getSteps(options.Start) if err != nil { if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) @@ -386,11 +378,11 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { } func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { - w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))} + w := µWorkflow{opts: f.opts, id: id, g: &dag.DAG{}, steps: make(map[string]Step, len(steps))} for _, s := range steps { w.steps[s.String()] = s - w.g.Add(s) + w.g.AddVertex(s) } for _, dst := range steps { @@ -399,14 +391,11 @@ func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step if !ok { return nil, ErrStepNotExists } - w.g.Connect(dag.BasicEdge(src, dst)) + w.g.AddEdge(src.String(), dst.String()) } } - if err := w.g.Validate(); err != nil { - return nil, err - } - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.init = true diff --git a/flow/options.go b/flow/options.go index 2e03b561..a8901471 100644 --- a/flow/options.go +++ b/flow/options.go @@ -123,8 +123,6 @@ type ExecuteOptions struct { Start string // Timeout for execution Timeout time.Duration - // Reverse execution - Reverse bool // Async enables async execution Async bool } @@ -167,13 +165,6 @@ func ExecuteContext(ctx context.Context) ExecuteOption { } } -// ExecuteReverse says that dag must be run in reverse order -func ExecuteReverse(b bool) ExecuteOption { - return func(o *ExecuteOptions) { - o.Reverse = b - } -} - // ExecuteTimeout pass timeout time.Duration for execution func ExecuteTimeout(td time.Duration) ExecuteOption { return func(o *ExecuteOptions) { diff --git a/go.mod b/go.mod index b0b4ccf1..575137ce 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,15 @@ module go.unistack.org/micro/v3 go 1.16 require ( - github.com/google/gnostic v0.6.8 // indirect + github.com/google/gnostic v0.6.9 // indirect github.com/google/go-cmp v0.5.7 // indirect + github.com/heimdalr/dag v1.2.1 github.com/imdario/mergo v0.3.13 github.com/kr/pretty v0.2.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible - github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 + github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 go.unistack.org/micro-proto/v3 v3.2.7 google.golang.org/protobuf v1.28.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index c4a660fa..3e9f86c1 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -22,6 +24,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M= +github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -39,8 +43,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/gnostic v0.6.6/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= -github.com/google/gnostic v0.6.8 h1:bT56GPYBWh1tvBuBEd94qcS3+60b+y0HQur0ITkGuCk= -github.com/google/gnostic v0.6.8/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= +github.com/google/gnostic v0.6.9 h1:ZK/5VhkoX835RikCHpSUJV9a+S3e1zLh59YnyWeBW+0= +github.com/google/gnostic v0.6.9/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -49,8 +53,11 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/heimdalr/dag v1.2.1 h1:XJOMaoWqJK1UKdp+4zaO2uwav9GFbHMGCirdViKMRIQ= +github.com/heimdalr/dag v1.2.1/go.mod h1:Of/wUB7Yoj4dwiOcGOOYIq6MHlPF/8/QMBKFJpwg+yc= github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -65,8 +72,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTK github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= -github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ= -github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= +github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -155,7 +162,8 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= From 79d4318d423a89a7e079c9958468b690e23cf7c9 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 8 Jul 2022 22:12:44 +0300 Subject: [PATCH 2/2] fixup Signed-off-by: Vasiliy Tolstov --- flow/context_test.go | 2 + flow/dag.go | 21 --- flow/dag_test.go | 68 -------- flow/default.go | 403 +++++++++++++++++++++++++------------------ flow/flow.go | 2 - 5 files changed, 235 insertions(+), 261 deletions(-) delete mode 100644 flow/dag.go delete mode 100644 flow/dag_test.go diff --git a/flow/context_test.go b/flow/context_test.go index 03262697..8821df07 100644 --- a/flow/context_test.go +++ b/flow/context_test.go @@ -1,3 +1,5 @@ +//go:build ignore + package flow import ( diff --git a/flow/dag.go b/flow/dag.go deleted file mode 100644 index 4cc728ea..00000000 --- a/flow/dag.go +++ /dev/null @@ -1,21 +0,0 @@ -package flow - -type node struct { - name string -} - -func (n *node) ID() string { - return n.name -} - -func (n *node) Name() string { - return n.name -} - -func (n *node) String() string { - return n.name -} - -func (n *node) Hashcode() interface{} { - return n.name -} diff --git a/flow/dag_test.go b/flow/dag_test.go deleted file mode 100644 index 6563200c..00000000 --- a/flow/dag_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package flow - -import ( - "fmt" - "testing" - - "github.com/silas/dag" -) - -func checkErr(t *testing.T, err error) { - if err != nil { - t.Fatal(err) - } -} - -func TestDag(t *testing.T) { - d1 := &dag.AcyclicGraph{} - d2 := &dag.AcyclicGraph{} - d2v1 := d2.Add(&node{"Substep.Create"}) - v1 := d1.Add(&node{"AccountService.Create"}) - v2 := d1.Add(&node{"AuthzService.Create"}) - v3 := d1.Add(&node{"AuthnService.Create"}) - v4 := d1.Add(&node{"ProjectService.Create"}) - v5 := d1.Add(&node{"ContactService.Create"}) - v6 := d1.Add(&node{"NetworkService.Create"}) - v7 := d1.Add(&node{"MailerService.Create"}) - v8 := d1.Add(&node{"NestedService.Create"}) - v9 := d1.Add(d2v1) - d1.Connect(dag.BasicEdge(v1, v2)) - d1.Connect(dag.BasicEdge(v1, v3)) - d1.Connect(dag.BasicEdge(v1, v4)) - d1.Connect(dag.BasicEdge(v1, v5)) - d1.Connect(dag.BasicEdge(v1, v6)) - d1.Connect(dag.BasicEdge(v1, v7)) - d1.Connect(dag.BasicEdge(v7, v8)) - d1.Connect(dag.BasicEdge(v8, v9)) - - if err := d1.Validate(); err != nil { - t.Fatal(err) - } - - d1.TransitiveReduction() - - var steps [][]string - fn := func(n dag.Vertex, idx int) error { - if idx == 0 { - steps = make([][]string, 1) - steps[0] = make([]string, 0, 1) - } else if idx >= len(steps) { - tsteps := make([][]string, idx+1) - copy(tsteps, steps) - steps = tsteps - steps[idx] = make([]string, 0, 1) - } - steps[idx] = append(steps[idx], fmt.Sprintf("%s", n)) - return nil - } - - start := &node{"AccountService.Create"} - err := d1.SortedDepthFirstWalk([]dag.Vertex{start}, fn) - checkErr(t, err) - if len(steps) != 4 { - t.Fatalf("invalid steps: %#+v", steps) - } - if steps[3][0] != "Substep.Create" { - t.Fatalf("invalid last step: %#+v", steps) - } -} diff --git a/flow/default.go b/flow/default.go index 67a4225a..4d741abb 100644 --- a/flow/default.go +++ b/flow/default.go @@ -33,10 +33,6 @@ func (w *microWorkflow) ID() string { return w.id } -func (w *microWorkflow) Steps() ([][]Step, error) { - return w.getSteps("", false) -} - func (w *microWorkflow) Status() Status { return w.status } @@ -44,11 +40,11 @@ func (w *microWorkflow) Status() Status { func (w *microWorkflow) AppendSteps(steps ...Step) error { var err error w.Lock() + defer w.Unlock() for _, s := range steps { w.steps[s.String()] = s if _, err = w.g.AddVertex(s); err != nil { - w.Unlock() return err } } @@ -57,11 +53,9 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { - w.Unlock() return ErrStepNotExists } if err = w.g.AddEdge(src.String(), dst.String()); err != nil { - w.Unlock() return err } } @@ -69,8 +63,6 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error { w.g.ReduceTransitively() - w.Unlock() - return nil } @@ -78,6 +70,7 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { // TODO: handle case when some step requires or required by removed step w.Lock() + defer w.Unlock() for _, s := range steps { delete(w.steps, s.String()) @@ -88,7 +81,6 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { - w.Unlock() return ErrStepNotExists } w.g.AddEdge(src.String(), dst.String()) @@ -97,54 +89,9 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { w.g.ReduceTransitively() - w.Unlock() - return nil } -func (w *microWorkflow) getSteps(start string) ([][]Step, error) { - var steps [][]Step - var root dag.Vertex - var err error - - fn := func(n dag.Vertex, idx int) error { - if idx == 0 { - steps = make([][]Step, 1) - steps[0] = make([]Step, 0, 1) - } else if idx >= len(steps) { - tsteps := make([][]Step, idx+1) - copy(tsteps, steps) - steps = tsteps - steps[idx] = make([]Step, 0, 1) - } - steps[idx] = append(steps[idx], n.(Step)) - return nil - } - - if start != "" { - var ok bool - w.RLock() - root, ok = w.steps[start] - w.RUnlock() - if !ok { - return nil, ErrStepNotExists - } - } else { - root, err = w.g.Root() - if err != nil { - return nil, err - } - } - - err = w.g.SortedDepthFirstWalk(root, fn) - - if err != nil { - return nil, err - } - - return steps, nil -} - func (w *microWorkflow) Abort(ctx context.Context, id string) error { workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())}) @@ -173,26 +120,11 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu return "", err } - stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) + // stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) options := NewExecuteOptions(opts...) - steps, err := w.getSteps(options.Start) - if err != nil { - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) - } - return "", err - } - - var wg sync.WaitGroup - cherr := make(chan error, 1) - chstatus := make(chan Status, 1) - - nctx, cancel := context.WithCancel(ctx) - defer cancel() - nopts := make([]ExecuteOption, 0, len(opts)+5) nopts = append(nopts, @@ -202,143 +134,274 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu ExecuteMeter(w.opts.Meter), ) nopts = append(nopts, opts...) - done := make(chan struct{}) if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) return eid, werr } - for idx := range steps { - for nidx := range steps[idx] { - cstep := steps[idx][nidx] - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { - return eid, werr + + var startID string + if options.Start == "" { + mp := w.g.GetRoots() + if len(mp) != 1 { + return eid, ErrStepNotExists + } + for k := range mp { + startID = k + } + } else { + for k, v := range w.g.GetVertices() { + if v == options.Start { + startID = k } } } - go func() { - for idx := range steps { - for nidx := range steps[idx] { - wStatus := &codec.Frame{} - if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil { - cherr <- werr - return - } - if status := StringStatus[string(wStatus.Data)]; status != StatusRunning { - chstatus <- status - return - } - if w.opts.Logger.V(logger.TraceLevel) { - w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) + if startID == "" { + return eid, ErrStepNotExists + } + + if options.Async { + go w.handleWorkflow(startID, nopts...) + return eid, nil + } + + return eid, w.handleWorkflow(startID, nopts...) +} + +func (w *microWorkflow) handleWorkflow(startID string, opts ...ExecuteOption) error { + w.RLock() + defer w.RUnlock() + + // stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) + // workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) + + // Get IDs of all descendant vertices. + flowIDs, errDes := w.g.GetDescendants(startID) + if errDes != nil { + return errDes + } + + // inputChannels provides for input channels for each of the descendant vertices (+ the start-vertex). + inputChannels := make(map[string]chan FlowResult, len(flowIDs)+1) + + // Iterate vertex IDs and create an input channel for each of them and a single + // output channel for leaves. Note, this "pre-flight" is needed to ensure we + // really have an input channel regardless of how we traverse the tree and spawn + // workers. + leafCount := 0 + + for id := range flowIDs { + + // Get all parents of this vertex. + parents, errPar := w.g.GetParents(id) + if errPar != nil { + return errPar + } + + // Create a buffered input channel that has capacity for all parent results. + inputChannels[id] = make(chan FlowResult, len(parents)) + + if w.g.isLeaf(id) { + leafCount += 1 + } + } + + // outputChannel caries the results of leaf vertices. + outputChannel := make(chan FlowResult, leafCount) + + // To also process the start vertex and to have its results being passed to its + // children, add it to the vertex IDs. Also add an input channel for the start + // vertex and feed the inputs to this channel. + flowIDs[startID] = struct{}{} + inputChannels[startID] = make(chan FlowResult, len(inputs)) + for _, i := range inputs { + inputChannels[startID] <- i + } + + wg := sync.WaitGroup{} + + // Iterate all vertex IDs (now incl. start vertex) and handle each worker (incl. + // inputs and outputs) in a separate goroutine. + for id := range flowIDs { + + // Get all children of this vertex that later need to be notified. Note, we + // collect all children before the goroutine to be able to release the read + // lock as early as possible. + children, errChildren := w.g.GetChildren(id) + if errChildren != nil { + return errChildren + } + + // Remember to wait for this goroutine. + wg.Add(1) + + go func(id string) { + // Get this vertex's input channel. + // Note, only concurrent read here, which is fine. + c := inputChannels[id] + + // Await all parent inputs and stuff them into a slice. + parentCount := cap(c) + parentResults := make([]FlowResult, parentCount) + for i := 0; i < parentCount; i++ { + parentResults[i] = <-c + } + + // Execute the worker. + errWorker := callback(w.g, id, parentResults) + if errWorker != nil { + return errWorker + } + + // Send this worker's FlowResult onto all children's input channels or, if it is + // a leaf (i.e. no children), send the result onto the output channel. + if len(children) > 0 { + for child := range children { + inputChannels[child] <- flowResult } - cstep := steps[idx][nidx] - // nolint: nestif - if len(cstep.Requires()) == 0 { - wg.Add(1) - go func(step Step) { - defer wg.Done() - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil { + } else { + outputChannel <- flowResult + } + + // "Sign off". + wg.Done() + }(id) + } + + // Wait for all go routines to finish. + wg.Wait() + + // Await all leaf vertex results and stuff them into a slice. + resultCount := cap(outputChannel) + results := make([]FlowResult, resultCount) + for i := 0; i < resultCount; i++ { + results[i] = <-outputChannel + } + + /* + go func() { + for idx := range steps { + for nidx := range steps[idx] { + wStatus := &codec.Frame{} + if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil { + cherr <- werr + return + } + if status := StringStatus[string(wStatus.Data)]; status != StatusRunning { + chstatus <- status + return + } + if w.opts.Logger.V(logger.TraceLevel) { + w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) + } + cstep := steps[idx][nidx] + // nolint: nestif + if len(cstep.Requires()) == 0 { + wg.Add(1) + go func(step Step) { + defer wg.Done() + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil { + cherr <- werr + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { + cherr <- werr + return + } + rsp, serr := step.Execute(nctx, req, nopts...) + if serr != nil { + step.SetStatus(StatusFailure) + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + cherr <- serr + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + cherr <- werr + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + cherr <- werr + return + } + }(cstep) + wg.Wait() + } else { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil { cherr <- werr return } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { cherr <- werr return } - rsp, serr := step.Execute(nctx, req, nopts...) + rsp, serr := cstep.Execute(nctx, req, nopts...) if serr != nil { - step.SetStatus(StatusFailure) - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + cstep.SetStatus(StatusFailure) + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } cherr <- serr return } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) cherr <- werr return } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { cherr <- werr return } - }(cstep) - wg.Wait() - } else { - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil { - cherr <- werr - return - } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { - cherr <- werr - return - } - rsp, serr := cstep.Execute(nctx, req, nopts...) - if serr != nil { - cstep.SetStatus(StatusFailure) - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) - } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) - } - cherr <- serr - return - } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) - cherr <- werr - return - } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - cherr <- werr - return } } } - } - close(done) - }() - - if options.Async { - return eid, nil - } - - logger.Tracef(ctx, "wait for finish or error") - select { - case <-nctx.Done(): - err = nctx.Err() - case cerr := <-cherr: - err = cerr - case <-done: - close(cherr) - case <-chstatus: - close(chstatus) - return eid, nil - } + close(done) + }() - switch { - case nctx.Err() != nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + if options.Async { + return eid, nil } - case err == nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) - } - case err != nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + + logger.Tracef(ctx, "wait for finish or error") + select { + case <-nctx.Done(): + err = nctx.Err() + case cerr := <-cherr: + err = cerr + case <-done: + close(cherr) + case <-chstatus: + close(chstatus) + return eid, nil } - } - return eid, err + switch { + case nctx.Err() != nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + case err == nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + case err != nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + } + */ + return err } // NewFlow create new flow diff --git a/flow/flow.go b/flow/flow.go index 0124db3b..2930701b 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -125,8 +125,6 @@ type Workflow interface { AppendSteps(steps ...Step) error // Status returns workflow status Status() Status - // Steps returns steps slice where parallel steps returned on the same level - Steps() ([][]Step, error) // Suspend suspends execution Suspend(ctx context.Context, id string) error // Resume resumes execution