This repository has been archived by the owner on Aug 29, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 80
/
dispatcher.go
123 lines (108 loc) · 2.63 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Queue manager for executing jobs, handling deduplication of requests, and
// limiting the consumption of resources by a server. Allows callers to
// "join" an existing job and listen to the output provided by that job without
// reexecuting the actual task.
package dispatcher
import (
"errors"
"log"
"reflect"
"github.com/openshift/geard/jobs"
)
type Dispatcher struct {
QueueFast int
QueueSlow int
Concurrent int
TrackDuplicateIds int
fastJobs chan jobTracker
slowJobs chan jobTracker
recentJobs *RequestIdentifierMap
}
type Fast interface {
Fast() bool
}
func (d *Dispatcher) Start() {
d.recentJobs = NewRequestIdentifierMap(d.TrackDuplicateIds)
d.fastJobs = make(chan jobTracker, d.QueueFast)
d.slowJobs = make(chan jobTracker, d.QueueSlow)
for i := 0; i < d.Concurrent; i++ {
d.work(d.fastJobs)
d.work(d.slowJobs)
}
}
func (d *Dispatcher) work(queue <-chan jobTracker) {
go func() {
for tracker := range queue {
id := tracker.id
log.Printf("job START %s, %s: %+v", reflect.TypeOf(tracker.job).String(), id.String(), tracker.job)
tracker.job.Execute(tracker.response)
log.Printf("job END %s", id.String())
close(tracker.complete)
d.recentJobs.Put(id, nil)
}
}()
}
type jobTracker struct {
id jobs.RequestIdentifier
job jobs.Job
response jobs.Response
complete chan bool
}
func (d *Dispatcher) Dispatch(id jobs.RequestIdentifier, j jobs.Job, resp jobs.Response) (done <-chan bool, err error) {
complete := make(chan bool)
tracker := jobTracker{id, j, resp, complete}
if existing, found := d.recentJobs.Put(id, tracker); found {
var join jobs.Join
if existing != nil {
other, _ := existing.(jobTracker)
j, ok := other.job.(jobs.Join)
if !ok {
err = jobs.ErrRanToCompletion
return
}
join = j
complete = other.complete
} else {
self, ok := j.(jobs.Join)
if !ok {
err = jobs.ErrRanToCompletion
return
}
join = self
}
joined, complete, errj := join.Join(j, complete)
if errj != nil {
log.Println("Attempt to join job rejected ", j)
err = errj
return
} else if joined {
log.Println("Joined already running job ", j)
done = complete
return
}
log.Println("Queueing an already existing job ", j)
}
var queue chan jobTracker
fast := false
if f, ok := j.(Fast); ok {
fast = f.Fast()
}
if fast {
queue = d.fastJobs
} else {
queue = d.slowJobs
}
select {
case queue <- tracker:
default:
err = errors.New("The server is at maximum capacity - please try again shortly")
return
}
done = complete
return
}
func closedChannel() <-chan bool {
c := make(chan bool)
close(c)
return c
}