-
Notifications
You must be signed in to change notification settings - Fork 141
/
connection_pool.rb
148 lines (123 loc) · 3.32 KB
/
connection_pool.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
require_relative 'connection_pool/version'
require_relative 'connection_pool/timed_stack'
# Generic connection pool class for sharing a limited number of objects or network connections
# among many threads. Note: pool elements are lazily created.
#
# Example usage with block (faster):
#
# @pool = ConnectionPool.new { Redis.new }
#
# @pool.with do |redis|
# redis.lpop('my-list') if redis.llen('my-list') > 0
# end
#
# Using optional timeout override (for that single invocation)
#
# @pool.with(timeout: 2.0) do |redis|
# redis.lpop('my-list') if redis.llen('my-list') > 0
# end
#
# Example usage replacing an existing connection (slower):
#
# $redis = ConnectionPool.wrap { Redis.new }
#
# def do_work
# $redis.lpop('my-list') if $redis.llen('my-list') > 0
# end
#
# Accepts the following options:
# - :size - number of connections to pool, defaults to 5
# - :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds
#
class ConnectionPool
DEFAULTS = {size: 5, timeout: 5}
class Error < RuntimeError
end
def self.wrap(options, &block)
Wrapper.new(options, &block)
end
def initialize(options = {}, &block)
raise ArgumentError, 'Connection pool requires a block' unless block
options = DEFAULTS.merge(options)
@size = Integer(options.fetch(:size))
@timeout = options.fetch(:timeout)
@available = TimedStack.new(@size, &block)
@key = :"pool-#{@available.object_id}"
@key_count = :"pool-#{@available.object_id}-count"
end
def with(options = {})
Thread.handle_interrupt(Exception => :never) do
conn = checkout(options)
begin
Thread.handle_interrupt(Exception => :immediate) do
yield conn
end
ensure
checkin
end
end
end
def checkout(options = {})
if ::Thread.current[@key]
::Thread.current[@key_count] += 1
::Thread.current[@key]
else
::Thread.current[@key_count] = 1
::Thread.current[@key] = @available.pop(options[:timeout] || @timeout)
end
end
def checkin
if ::Thread.current[@key]
if ::Thread.current[@key_count] == 1
@available.push(::Thread.current[@key])
::Thread.current[@key] = nil
else
::Thread.current[@key_count] -= 1
end
else
raise ConnectionPool::Error, 'no connections are checked out'
end
nil
end
def shutdown(&block)
@available.shutdown(&block)
end
# Size of this connection pool
def size
@size
end
# Number of pool entries available for checkout at this instant.
def available
@available.length
end
private
class Wrapper < ::BasicObject
METHODS = [:with, :pool_shutdown, :wrapped_pool]
def initialize(options = {}, &block)
@pool = options.fetch(:pool) { ::ConnectionPool.new(options, &block) }
end
def wrapped_pool
@pool
end
def with(&block)
@pool.with(&block)
end
def pool_shutdown(&block)
@pool.shutdown(&block)
end
def pool_size
@pool.size
end
def pool_available
@pool.available
end
def respond_to?(id, *args)
METHODS.include?(id) || with { |c| c.respond_to?(id, *args) }
end
def method_missing(name, *args, &block)
with do |connection|
connection.send(name, *args, &block)
end
end
end
end