-
Notifications
You must be signed in to change notification settings - Fork 567
/
etcd_queue.go
114 lines (103 loc) · 2.18 KB
/
etcd_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package task
import (
"context"
"sync"
"time"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/cevaris/ordered_map"
)
const (
waitTime = 10 * time.Millisecond
bufSize = 10
)
type taskFunc func()
type groupEntry struct {
cancel context.CancelFunc
taskFuncChan chan taskFunc
}
type taskQueue struct {
groups *ordered_map.OrderedMap
mu sync.Mutex
}
func newTaskQueue(ctx context.Context) *taskQueue {
tq := &taskQueue{
groups: ordered_map.NewOrderedMap(),
}
go func() {
var cycleKey string
for {
select {
case <-ctx.Done():
return
default:
}
// Get the next group.
tq.mu.Lock()
iter := tq.groups.IterFunc()
kv, ok := iter()
tq.mu.Unlock()
var key string
if ok {
key = kv.Key.(string)
}
// Wait if there are no group entries or a cycle has been completed without finding a task.
if !ok || key == cycleKey {
time.Sleep(waitTime)
cycleKey = ""
continue
}
if cycleKey == "" {
cycleKey = key
}
// Check if the next task is ready for the group and process it.
ge := kv.Value.(*groupEntry)
select {
case cb := <-ge.taskFuncChan:
cb()
cycleKey = ""
default:
}
tq.requeueGroup(key)
}
}()
return tq
}
func (tq *taskQueue) group(ctx context.Context, groupID string, cb func(context.Context, chan taskFunc)) error {
tq.mu.Lock()
defer tq.mu.Unlock()
if _, ok := tq.groups.Get(groupID); ok {
return errors.Errorf("errored creating group %v, which already exists", groupID)
}
ctx, cancel := pctx.WithCancel(ctx)
taskFuncChan := make(chan taskFunc, bufSize)
ge := &groupEntry{
cancel: cancel,
taskFuncChan: taskFuncChan,
}
tq.groups.Set(groupID, ge)
go func() {
cb(ctx, taskFuncChan)
}()
return nil
}
func (tq *taskQueue) requeueGroup(groupID string) {
tq.mu.Lock()
defer tq.mu.Unlock()
ge, ok := tq.groups.Get(groupID)
if !ok {
return
}
tq.groups.Delete(groupID)
tq.groups.Set(groupID, ge)
}
func (tq *taskQueue) deleteGroup(groupID string) {
tq.mu.Lock()
defer tq.mu.Unlock()
ge, ok := tq.groups.Get(groupID)
if !ok {
return
}
ge.(*groupEntry).cancel()
tq.groups.Delete(groupID)
}