-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.cr
115 lines (103 loc) · 2.63 KB
/
pool.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# A simple thread-safe generic pool.
#
# A pool can be used to store resources that are expensive to create (like connections).
# [See here](https://en.wikipedia.org/wiki/Pool_(computer_science)) for more explanations.
#
# ```
# require "pool"
#
# pool = Pool.new { IO::Memory.new }
# pool.get do |resource|
# # do something
# end
# ```
class Pool(T)
{% if flag?(:preview_mt) %}
@mutex = Mutex.new
private def synchronize
@mutex.synchronize { yield }
end
{% else %}
private def synchronize
yield
end
{% end %}
# total_resources of open connections managed by this pool
@total_resources : Set(T)
# connections available for checkout
@idle_resources : Deque(T)
# Creates a new Pool.
#
# ```
# require "pool"
#
# Pool.new { IO::Memory.new }
# ```
def initialize(initial_capacity : Int = 0, &@block : -> T)
@idle_resources = Deque(T).new initial_capacity, &@block
@total_resources = Set(T).new @idle_resources
end
# Adds a resource.
def add(resource : T) : Nil
synchronize { @idle_resources.push resource }
end
private def new_resource : T
resource = @block.call
@total_resources.add resource
resource
end
# Gets a resource.
def get : T
synchronize { @idle_resources.shift? || new_resource }
end
# Gets an resource, yields, then adds it back.
#
# Note that if an exception was raised in the block, the resource won't be added back.
# This behavior is used to prevent adding an invalid resource, for example a connection causing an IO error.
def get(& : T ->) : Nil
resource = get
begin
yield resource
rescue ex
@total_resources.delete resource
raise ex
else
add resource
end
end
# Returns the pool's used resources size.
def used_size : Int32
total_size - idle_size
end
# Returns the pool's idle size.
def idle_size : Int32
@idle_resources.size
end
# Returns the pool's total size.
def total_size : Int32
@total_resources.size
end
# Resizes the pool to match the new `total_size`, and yields each deleted resource on size shrinking.
def resize(new_size : Int32, & : T ->) : Nil
case total_size
when .< new_size
synchronize do
while total_size < new_size
@idle_resources.push new_resource
end
end
when .> new_size
synchronize do
while total_size > new_size
resource = @idle_resources.shift
@total_resources.delete resource
yield resource
end
end
end
end
# Resizes the pool to match the new size
def resize(new_size : Int32) : Nil
resize(new_size) { }
end
end