-
Notifications
You must be signed in to change notification settings - Fork 1
/
workflow.go
153 lines (141 loc) · 3 KB
/
workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package godist
// Workflow is data structure which allows Job structures to be
// submitted.
type workflow struct {
done chan bool
queue chan Job
}
// NewBasicWorkflow creates a Workflow data structure, with 'count' number
// of go routines for each phase of Job processing.
func NewBasicWorkflow(count int) *workflow {
toParse := make(chan Job)
toExpand := make(chan Job)
toPerform := make(chan Task)
toIntegrate := make(chan Task)
toRespond := make(chan Job)
done := make(chan bool)
self := &workflow{
queue: toParse,
done: done,
}
go func() {
done := make(chan bool)
for index := 0; index < count; index++ {
go func() {
for job := range toParse {
if err := job.Parse(); err != nil {
toRespond <- job
continue
}
toExpand <- job
}
done <- true
}()
}
for index := 0; index < count; index++ {
<-done
}
close(toExpand)
}()
go func() {
done := make(chan bool)
for index := 0; index < count; index++ {
go func() {
for job := range toExpand {
tasks, err := job.Expand()
if err != nil {
toRespond <- job
continue
}
for _, task := range tasks {
if task.Error() == nil {
toPerform <- task
} else {
toIntegrate <- task
}
}
}
done <- true
}()
}
for index := 0; index < count; index++ {
<-done
}
close(toPerform)
}()
go func() {
done := make(chan bool)
for index := 0; index < count; index++ {
go func() {
for task := range toPerform {
// NOTE: even tasks that have
// resulted in errors must
// have their erroroneous
// results integrated...
task.Perform()
toIntegrate <- task
}
done <- true
}()
}
for index := 0; index < count; index++ {
<-done
}
close(toIntegrate)
}()
go func() {
done := make(chan bool)
for index := 0; index < count; index++ {
go func() {
for task := range toIntegrate {
job := task.Integrate()
if job != nil {
toRespond <- job
}
}
done <- true
}()
}
for index := 0; index < count; index++ {
<-done
}
close(toRespond)
}()
go func() {
done := make(chan bool)
for index := 0; index < count; index++ {
go func() {
for job := range toRespond {
job.Respond()
}
done <- true
}()
}
for index := 0; index < count; index++ {
<-done
}
self.done <- true
}()
return self
}
// SubmitWithCallback asynchronously sends a Job to the workflow, and
// calls back the given anonymous function when the Job is completed.
func (self *workflow) SubmitWithCallback(job Job, fn func()) {
self.queue <- job
go func() {
job.Wait()
fn()
}()
}
// SubmitAndWait synchronously sends a Job to the workflow, and waits
// until the job is complete prior to returning to the caller
func (self *workflow) SubmitAndWait(job Job) {
self.queue <- job
job.Wait()
}
// Quit shuts down a workflow, closing each channel in sequence until
// all go routines have stopped.
func (self *workflow) Quit() {
close(self.queue)
<-self.done
}