forked from harness/gitness
-
Notifications
You must be signed in to change notification settings - Fork 0
/
director.go
117 lines (101 loc) · 2.59 KB
/
director.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package director
import (
"sync"
"code.google.com/p/go.net/context"
"github.com/drone/drone/server/worker"
"github.com/drone/drone/server/worker/pool"
)
// Director manages workloads and delegates to workers.
type Director struct {
sync.Mutex
pending map[*worker.Work]bool
started map[*worker.Work]worker.Worker
}
func New() *Director {
return &Director{
pending: make(map[*worker.Work]bool),
started: make(map[*worker.Work]worker.Worker),
}
}
// Do processes the work request async.
func (d *Director) Do(c context.Context, work *worker.Work) {
defer func() {
recover()
}()
d.do(c, work)
}
// do is a blocking function that waits for an
// available worker to process work.
func (d *Director) do(c context.Context, work *worker.Work) {
d.markPending(work)
var pool = pool.FromContext(c)
var worker = <-pool.Reserve()
// var worker worker.Worker
//
// // waits for an available worker. This is a blocking
// // operation and will reject any nil workers to avoid
// // a potential panic.
// select {
// case worker = <-pool.Reserve():
// if worker != nil {
// break
// }
// }
d.markStarted(work, worker)
worker.Do(c, work)
d.markComplete(work)
pool.Release(worker)
}
// GetStarted returns a list of all jobs that
// are assigned and being worked on.
func (d *Director) GetStarted() []*worker.Work {
d.Lock()
defer d.Unlock()
started := []*worker.Work{}
for work, _ := range d.started {
started = append(started, work)
}
return started
}
// GetPending returns a list of all work that
// is pending assignment to a worker.
func (d *Director) GetPending() []*worker.Work {
d.Lock()
defer d.Unlock()
pending := []*worker.Work{}
for work, _ := range d.pending {
pending = append(pending, work)
}
return pending
}
// GetAssignments returns a list of assignments. The
// assignment type is a structure that stores the
// work being performed and the assigned worker.
func (d *Director) GetAssignemnts() []*worker.Assignment {
d.Lock()
defer d.Unlock()
assignments := []*worker.Assignment{}
for work, _worker := range d.started {
assignment := &worker.Assignment{Work: work, Worker: _worker}
assignments = append(assignments, assignment)
}
return assignments
}
func (d *Director) markPending(work *worker.Work) {
d.Lock()
defer d.Unlock()
delete(d.started, work)
d.pending[work] = true
}
func (d *Director) markStarted(work *worker.Work, worker worker.Worker) {
d.Lock()
defer d.Unlock()
delete(d.pending, work)
d.started[work] = worker
}
func (d *Director) markComplete(work *worker.Work) {
d.Lock()
defer d.Unlock()
delete(d.pending, work)
delete(d.started, work)
}