-
Notifications
You must be signed in to change notification settings - Fork 28
/
semaphore.go
174 lines (156 loc) · 4.46 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
"errors"
"sync"
)
// Semaphore is an implementation of unbounded semaphores. Abstractly, a
// semaphore holds a nonnegative integer value, and supports operations to
// increment and decrement the value. The semaphore value is not allowed to be
// negative; decrement operations block until the semaphore value is positive.
// http://en.wikipedia.org/wiki/Semaphore_%28programming%29
//
// The standard suggestion for implementing semaphores in Go is to use a
// buffered channel, where the number of elements in the channel is the max
// value of the semaphore. However, what we implement here is _unbounded_
// semaphores (up to a max value of 2^31-1).
//
// A mutex and integer is used to keep track of the numerical value of the
// and a channel is used for notification of changes.
// When decrementing, the value of the semaphore is decremented and if not
// sufficient, DecN will block until it can subtract more.
//
// Because of this looping, the semaphore is not fair.
// The reason for using a channel for notifications is for cancellation.
// The Dec(cancel <-chan struct{}) method takes a cancelation channel, so we use a
// "select" operation to determine whether to perform a semaphore operation or
// abort because the semaphore is close or the operation was canceled.
//
// NOTE: when the Semaphore is closed, the Dec (or DecN) operations are
// unblocked, returning an error (ErrClosed) if the semaphore value is 0 (or
// less than the DecN value), respectively. However, even with the Semaphore
// closed, if the semaphore value is non-zero (or sufficient to satisfy the DecN
// value), Dec (or DecN) performs the decrement successfully and returns without
// an error. Regardless of whether the Semaphore is closed or not, Inc/IncN
// succeed in incrementing the semaphore value.
type Semaphore struct {
mu sync.Mutex
value uint // GUARDED_BY(mu)
notify chan bool
isClosed bool
closed chan struct{}
}
var (
ErrClosed = errors.New("semaphore is closed")
ErrCanceled = errors.New("semaphore operation was canceled")
ErrTryAgain = errors.New("semaphore operation failed, try again")
)
// NewSemaphore allocates a semaphore with an initial value.
func NewSemaphore() *Semaphore {
return &Semaphore{notify: make(chan bool, 1), closed: make(chan struct{})}
}
// Close closes the semaphore. Subsequent operations do not block.
func (s *Semaphore) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.isClosed {
s.isClosed = true
close(s.closed)
}
}
// DecN decrements the semaphore. Blocks until the final value of the semaphore
// is nonnegative, or the <cancel> channel is closed (or has a value).
func (s *Semaphore) DecN(n uint, cancel <-chan struct{}) error {
s.mu.Lock()
if s.value >= n {
s.value -= n
s.mu.Unlock()
return nil
}
taken := s.value
s.value = 0
s.mu.Unlock()
for taken < n {
needed := n - taken
select {
case <-s.notify:
notify := true
s.mu.Lock()
if s.value <= needed {
needed = s.value
notify = false
}
taken += needed
s.value -= needed
s.mu.Unlock()
if notify {
select {
case s.notify <- true:
default:
}
}
case <-s.closed:
// Close does not zero out the semaphore, so check to
// see if the DecN can be satisfied.
s.mu.Lock()
if s.value >= needed {
s.value -= needed
taken += needed
}
s.mu.Unlock()
if taken < n {
return ErrClosed
}
case <-cancel:
s.IncN(taken)
return ErrCanceled
}
}
return nil
}
// TryDecN tries to decrement the semaphore.
func (s *Semaphore) TryDecN(n uint) error {
if n == 0 {
return nil
}
s.mu.Lock()
if s.value >= n {
s.value -= n
s.mu.Unlock()
return nil
}
closed := s.isClosed
s.mu.Unlock()
if closed {
return ErrClosed
}
return ErrTryAgain
}
// IncN increments the semaphore. Wakes any potential waiters.
func (s *Semaphore) IncN(n uint) {
if n == 0 {
// Avoid notifications when there is no change to the value.
return
}
s.mu.Lock()
s.value += n
s.mu.Unlock()
select {
case s.notify <- true:
default:
}
}
// Dec decrements the semaphore by 1.
func (s *Semaphore) Dec(cancel <-chan struct{}) error {
return s.DecN(1, cancel)
}
// TryDec tries to decrement the semaphore by 1.
func (s *Semaphore) TryDec() error {
return s.TryDecN(1)
}
// Inc increments the semaphore by 1.
func (s *Semaphore) Inc() {
s.IncN(1)
}