This repository has been archived by the owner on Jan 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 153
/
pool.rb
187 lines (169 loc) · 5.08 KB
/
pool.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# encoding: utf-8
module Moped
class Connection
# This class contains behaviour of connection pools for specific addresses.
#
# @since 2.0.0
class Pool
# The default max size for the connection pool.
POOL_SIZE = 5
# The default timeout for getting connections from the queue.
TIMEOUT = 0.5
# @!attribute host
# @return [ String ] The host the pool is for.
# @!attribute port
# @return [ Integer ] The port on the host.
# @!attribute options
# @return [ Hash ] The connection pool options.
# @!attribute reaper
# @return [ Reaper ] The connection pool reaper.
attr_reader :host, :port, :options, :reaper
# Checkout a connection from the connection pool. If there exists a
# connection pinned to the current thread, then we return that first. If
# no connection is pinned, we will take an unpinned connection or create
# a new one if no unpinned exist and the pool is not saturated.
#
# @example Checkout a connection.
# pool.checkout
#
# @return [ Connection ] A connection.
#
# @since 2.0.0
def checkout
mutex.synchronize do
if connection = pinned[thread_id]
raise Errors::ConnectionInUse, "The connection on thread: #{thread_id} is in use."
else
connection = pinned[thread_id] = next_connection
connection.lease
connection
end
end
end
# Checkin the connection, indicating that it is finished being used. The
# connection will stay pinned to the current thread.
#
# @example Checkin the connection.
# pool.checkin(connection)
#
# @param [ Connection ] connection The connection to checkin.
#
# @since 2.0.0
def checkin(connection)
mutex.synchronize do
if connection == pinned[thread_id]
connection.expire
unpinned.push(pinned.delete(thread_id))
end
end
end
# Initialize the connection pool.
#
# @example Instantiate the connection pool.
# Pool.new(max_size: 4)
#
# @param [ Hash ] options The connection pool options.
#
# @since 2.0.0
def initialize(host, port, options = {})
@host = host
@port = port
@options = options
@reaper = Reaper.new(options[:reap_interval] || Reaper::INTERVAL, self)
@mutex = Mutex.new
@resource = ConditionVariable.new
@pinned = {}
@unpinned = Queue.new(max_size, timeout) do
Connection.new(host, port, options[:timeout] || Connection::TIMEOUT, options)
end
reaper.start
end
# Get the max size for the connection pool.
#
# @example Get the max size.
# pool.max_size
#
# @return [ Integer ] The max size of the pool.
#
# @since 2.0.0
def max_size
@max_size ||= (options[:pool_size] || POOL_SIZE)
end
# Reap all connections that are active and associated with dead threads.
#
# @example Reap the connections.
# pool.reap([ 12351122313 ])
#
# @param [ Array<Integer> ] ids The ids of the current active threads.
#
# @return [ Pool ] The connection pool.
#
# @since 2.0.0
def reap(ids = active_threads)
pinned.each do |id, conn|
unless ids.include?(id)
conn.expire
unpinned.push(pinned.delete(id))
end
end and self
end
# Get the current size of the connection pool. Is the total of pinned
# plus unpinned connections.
#
# @example Get the pool's current size.
# pool.size
#
# @return [ Integer ] The current size of the pool.
#
# @since 2.0.0
def size
unpinned.size + pinned.size
end
# Get the timeout when attempting to check out items from the pool.
#
# @example Get the checkout timeout.
# pool.timeout
#
# @return [ Float ] The pool timeout.
#
# @since 2.0.0
def timeout
@timeout ||= (options[:pool_timeout] || TIMEOUT)
end
# Execute the block with a connection, ensuring that the checkin/checkout
# workflow is properly executed.
#
# @example Execute the block with a connection.
# pool.with_connection do |conn|
# conn.connect
# end
#
# @return [ Object ] The result of the yield.
#
# @since 2.0.0
def with_connection
connection = checkout
begin
yield(connection)
ensure
checkin(connection)
end
end
private
attr_reader :mutex, :resource, :pinned, :unpinned
def next_connection
reap if saturated?
unpinned.shift
end
def saturated?
unpinned.empty?
end
def thread_id
Thread.current.object_id
end
def active_threads
Thread.list.select{ |thread| thread.alive? }.map{ |thread| thread.object_id }
end
end
end
end