-
Notifications
You must be signed in to change notification settings - Fork 22
/
playpar.go
91 lines (87 loc) · 2.32 KB
/
playpar.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//
// Copyright © 2018 Aljabr, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package flow
import "github.com/kocircuit/kocircuit/lang/circuit/model"
type stepResult struct {
Step *model.Step
Result Edge
Error error
Panic interface{}
}
func playPar(f *model.Func, StepPlayer StepPlayer) (r map[*model.Step]Edge, err error) {
cross := map[*model.Step]chan Edge{}
for _, s := range f.Step {
cross[s] = make(chan Edge, len(f.Spread[s]))
}
done := make(chan *stepResult, len(f.Step))
abort := make(chan bool)
for i := 0; i < len(f.Step); i++ {
s := f.Step[len(f.Step)-1-i] // iterate steps in forward time order
// start step co-routine
go func() {
// wait for each incoming edge to pass a value, or an abort signal
gather := make([]GatherEdge, len(s.Gather))
for j, g := range s.Gather {
select {
case <-abort:
return
case edge := <-cross[g.Step]:
gather[j] = GatherEdge{Field: g.Field, Edge: edge}
}
}
// catch a panic from the step execution
defer func() {
if r := recover(); r != nil {
done <- &stepResult{Step: s, Panic: r}
}
}()
if sReturns, err := StepPlayer.PlayStep(s, gather); err != nil {
done <- &stepResult{Step: s, Error: err}
} else {
for j := 0; j < len(f.Spread[s]); j++ {
cross[s] <- sReturns
}
done <- &stepResult{Step: s, Result: sReturns}
}
}()
}
r = map[*model.Step]Edge{}
aborting := false
var stepPanic interface{}
for i := 0; !aborting && i < len(f.Step); i++ {
select {
case sr := <-done:
if sr.Panic != nil {
stepPanic = sr.Panic
close(abort)
aborting = true
} else if sr.Error != nil {
err = sr.Error
close(abort)
aborting = true
} else {
r[sr.Step] = sr.Result
}
}
}
if stepPanic != nil {
panic(stepPanic)
}
if err != nil {
return nil, err
}
return
}