-
Notifications
You must be signed in to change notification settings - Fork 173
/
working_set.go
151 lines (128 loc) · 2.75 KB
/
working_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package batch_table_scheduler
import (
"sync"
"time"
"github.com/juju/errors"
)
type workingElement struct {
ack chan struct{}
nrWaitingItem int
}
type workingSet struct {
sync.Mutex
ticker *time.Ticker
elements map[string]*workingElement
}
func (ws *workingSet) checkConflict(k string) (ack chan struct{}, conflict bool) {
ws.Lock()
defer ws.Unlock()
e, ok := ws.elements[k]
if ok {
if e.nrWaitingItem == 0 {
return nil, false
} else {
return e.ack, true
}
} else {
return nil, false
}
}
func (ws *workingSet) checkConflictWithBatch(batch []string) ([]chan struct{}, bool) {
ws.Lock()
defer ws.Unlock()
acks := make([]chan struct{}, len(batch))
conflicts := make([]bool, len(batch))
hasConflict := false
for i, k := range batch {
e, ok := ws.elements[k]
if ok {
conflicts[i] = true
acks[i] = e.ack
hasConflict = true
} else {
conflicts[i] = false
acks[i] = nil
}
}
return acks, hasConflict
}
func (ws *workingSet) put(k string) {
ws.Lock()
defer ws.Unlock()
e, ok := ws.elements[k]
if ok {
e.nrWaitingItem++
} else {
ws.elements[k] = &workingElement{nrWaitingItem: 1, ack: make(chan struct{})}
}
}
func (ws *workingSet) putBatch(batch []string) {
ws.Lock()
defer ws.Unlock()
for _, k := range batch {
e, ok := ws.elements[k]
if ok {
e.nrWaitingItem++
} else {
ws.elements[k] = &workingElement{nrWaitingItem: 1, ack: make(chan struct{})}
}
}
}
func (ws *workingSet) checkAndPut(k string) (hadConflict bool) {
ack, conflict := ws.checkConflict(k)
if conflict {
<-ack
}
ws.put(k)
return conflict
}
func (ws *workingSet) checkAndPutBatch(batch []string) (hadConflict bool) {
acks, hadConflict := ws.checkConflictWithBatch(batch)
if hadConflict {
for _, ack := range acks {
if ack != nil {
<-ack
}
}
}
ws.putBatch(batch)
return hadConflict
}
func (ws *workingSet) ack(k string) error {
ws.Lock()
defer ws.Unlock()
element, ok := ws.elements[k]
if !ok {
return errors.Errorf("no element found in working set")
}
// one row's same update/insert may end up in the same working item key.
element.nrWaitingItem--
if element.nrWaitingItem == 0 {
close(element.ack)
delete(ws.elements, k)
}
return nil
}
func (ws *workingSet) numElements() int {
ws.Lock()
defer ws.Unlock()
return len(ws.elements)
}
func newWorkingSet() *workingSet {
ws := workingSet{elements: make(map[string]*workingElement), ticker: time.NewTicker(100 * time.Millisecond)}
// we need a background goroutine here,=
// since some component may not send ack back to scheduler
go func() {
for range ws.ticker.C {
// big lock here
ws.Lock()
for k, e := range ws.elements {
if e.nrWaitingItem == 0 {
delete(ws.elements, k)
}
}
ws.Unlock()
}
}()
return &ws
}