-
Notifications
You must be signed in to change notification settings - Fork 21.4k
/
queue.rb
209 lines (183 loc) · 6.32 KB
/
queue.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# frozen_string_literal: true
require "thread"
require "monitor"
module ActiveRecord
module ConnectionAdapters
class ConnectionPool
# Threadsafe, fair, LIFO queue. Meant to be used by ConnectionPool
# with which it shares a Monitor.
class Queue
def initialize(lock = Monitor.new)
@lock = lock
@cond = @lock.new_cond
@num_waiting = 0
@queue = []
end
# Test if any threads are currently waiting on the queue.
def any_waiting?
synchronize do
@num_waiting > 0
end
end
# Returns the number of threads currently waiting on this
# queue.
def num_waiting
synchronize do
@num_waiting
end
end
# Add +element+ to the queue. Never blocks.
def add(element)
synchronize do
@queue.push element
@cond.signal
end
end
# If +element+ is in the queue, remove and return it, or +nil+.
def delete(element)
synchronize do
@queue.delete(element)
end
end
# Remove all elements from the queue.
def clear
synchronize do
@queue.clear
end
end
# Remove the head of the queue.
#
# If +timeout+ is not given, remove and return the head of the
# queue if the number of available elements is strictly
# greater than the number of threads currently waiting (that
# is, don't jump ahead in line). Otherwise, return +nil+.
#
# If +timeout+ is given, block if there is no element
# available, waiting up to +timeout+ seconds for an element to
# become available.
#
# Raises:
# - ActiveRecord::ConnectionTimeoutError if +timeout+ is given and no element
# becomes available within +timeout+ seconds,
def poll(timeout = nil)
synchronize { internal_poll(timeout) }
end
private
def internal_poll(timeout)
no_wait_poll || (timeout && wait_poll(timeout))
end
def synchronize(&block)
@lock.synchronize(&block)
end
# Test if the queue currently contains any elements.
def any?
!@queue.empty?
end
# A thread can remove an element from the queue without
# waiting if and only if the number of currently available
# connections is strictly greater than the number of waiting
# threads.
def can_remove_no_wait?
@queue.size > @num_waiting
end
# Removes and returns the head of the queue if possible, or +nil+.
def remove
@queue.pop
end
# Remove and return the head of the queue if the number of
# available elements is strictly greater than the number of
# threads currently waiting. Otherwise, return +nil+.
def no_wait_poll
remove if can_remove_no_wait?
end
# Waits on the queue up to +timeout+ seconds, then removes and
# returns the head of the queue.
def wait_poll(timeout)
@num_waiting += 1
t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
elapsed = 0
loop do
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
@cond.wait(timeout - elapsed)
end
return remove if any?
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
if elapsed >= timeout
msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" %
[timeout, elapsed]
raise ConnectionTimeoutError, msg
end
end
ensure
@num_waiting -= 1
end
end
# Adds the ability to turn a basic fair FIFO queue into one
# biased to some thread.
module BiasableQueue # :nodoc:
class BiasedConditionVariable # :nodoc:
# semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+,
# +signal+ and +wait+ methods are only called while holding a lock
def initialize(lock, other_cond, preferred_thread)
@real_cond = lock.new_cond
@other_cond = other_cond
@preferred_thread = preferred_thread
@num_waiting_on_real_cond = 0
end
def broadcast
broadcast_on_biased
@other_cond.broadcast
end
def broadcast_on_biased
@num_waiting_on_real_cond = 0
@real_cond.broadcast
end
def signal
if @num_waiting_on_real_cond > 0
@num_waiting_on_real_cond -= 1
@real_cond
else
@other_cond
end.signal
end
def wait(timeout)
if Thread.current == @preferred_thread
@num_waiting_on_real_cond += 1
@real_cond
else
@other_cond
end.wait(timeout)
end
end
def with_a_bias_for(thread)
previous_cond = nil
new_cond = nil
synchronize do
previous_cond = @cond
@cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread)
end
yield
ensure
synchronize do
@cond = previous_cond if previous_cond
new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers
end
end
end
# Connections must be leased while holding the main pool mutex. This is
# an internal subclass that also +.leases+ returned connections while
# still in queue's critical section (queue synchronizes with the same
# <tt>@lock</tt> as the main pool) so that a returned connection is already
# leased and there is no need to re-enter synchronized block.
class ConnectionLeasingQueue < Queue # :nodoc:
include BiasableQueue
private
def internal_poll(timeout)
conn = super
conn.lease if conn
conn
end
end
end
end
end