-
Notifications
You must be signed in to change notification settings - Fork 21
/
workqueue.go
120 lines (106 loc) · 2.96 KB
/
workqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package core
import (
"errors"
"sort"
)
type WQueue struct {
workMap map[string]*Workunit //all parsed workunits
wait map[string]bool //ids of waiting workunits
checkout map[string]bool //ids of workunits being checked out
suspend map[string]bool //ids of suspended workunits
}
func NewWQueue() *WQueue {
return &WQueue{
workMap: map[string]*Workunit{},
wait: map[string]bool{},
checkout: map[string]bool{},
suspend: map[string]bool{},
}
}
func (wq WQueue) Len() int {
return len(wq.workMap)
}
func (wq *WQueue) Add(workunit *Workunit) (err error) {
if workunit.Id == "" {
return errors.New("try to push a workunit with an empty id")
}
wq.workMap[workunit.Id] = workunit
wq.StatusChange(workunit.Id, WORK_STAT_QUEUED)
return nil
}
func (wq *WQueue) Get(id string) (work *Workunit, ok bool) {
work, ok = wq.workMap[id]
return
}
func (wq *WQueue) Delete(id string) {
delete(wq.wait, id)
delete(wq.checkout, id)
delete(wq.suspend, id)
delete(wq.workMap, id)
}
func (wq *WQueue) Has(id string) (has bool) {
if _, ok := wq.workMap[id]; ok {
has = true
} else {
has = false
}
return
}
func (wq *WQueue) StatusChange(id string, new_status string) (err error) {
if _, ok := wq.workMap[id]; !ok {
return errors.New("WQueue.statusChange: invalid workunit id:" + id)
}
//move workunit id between maps. no need to care about the old status because
//delete function will do nothing if the operated map has no such key.
if new_status == WORK_STAT_CHECKOUT {
wq.checkout[id] = true
delete(wq.wait, id)
delete(wq.suspend, id)
} else if new_status == WORK_STAT_QUEUED {
wq.wait[id] = true
delete(wq.checkout, id)
delete(wq.suspend, id)
} else if new_status == WORK_STAT_SUSPEND {
wq.suspend[id] = true
delete(wq.checkout, id)
delete(wq.wait, id)
} else {
return errors.New("WQueue.statusChange: invalid new status:" + new_status)
}
wq.workMap[id].State = new_status
return
}
//select workunits, return a slice of ids based on given queuing policy and requested count
func (wq *WQueue) selectWorkunits(workid []string, policy string, count int) (selected []*Workunit, err error) {
worklist := []*Workunit{}
for _, id := range workid {
worklist = append(worklist, wq.workMap[id])
}
//selected = []*Workunit{}
if policy == "FCFS" {
sort.Sort(byFCFS{worklist})
}
for i := 0; i < count; i++ {
selected = append(selected, worklist[i])
}
return
}
//queuing/prioritizing related functions
type WorkList []*Workunit
func (wl WorkList) Len() int { return len(wl) }
func (wl WorkList) Swap(i, j int) { wl[i], wl[j] = wl[j], wl[i] }
type byFCFS struct{ WorkList }
//compare priority first, then FCFS (if priorities are the same)
func (s byFCFS) Less(i, j int) (ret bool) {
p_i := s.WorkList[i].Info.Priority
p_j := s.WorkList[j].Info.Priority
switch {
case p_i > p_j:
return true
case p_i < p_j:
return false
case p_i == p_j:
return s.WorkList[i].Info.SubmitTime.Before(s.WorkList[j].Info.SubmitTime)
}
return
}