/
advisory.rb
128 lines (106 loc) · 3.53 KB
/
advisory.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
require "zlib"
class Locker
class Advisory
class LockConnectionLost < StandardError; end
attr_reader :key, :crc, :lockspace, :blocking, :locked
# The advisory function we use from PostgreSQL needs the arguments to be
# INT, therefore this are the range of int numbers for PostgreSQL
MAX_LOCK = 2147483647
MIN_LOCK = -2147483648
# Max number that a 32bit computer can hold
OVERFLOW_ADJUSTMENT = 2**32
def initialize(key, options={})
raise ArgumentError, "key must be a string" unless key.is_a?(String)
@key = key
@crc = convert_to_crc(key)
@lockspace = (options[:lockspace] || 1)
@blocking = !!options[:blocking]
@locked = false
@block_timeout = options[:block_timeout]
@block_spin_wait = options[:block_spin_wait] || 0.005
if !@lockspace.is_a?(Integer) || @lockspace < MIN_LOCK || @lockspace > MAX_LOCK
raise ArgumentError, "The :lockspace option must be an integer between #{MIN_LOCK} and #{MAX_LOCK}"
end
end
def self.run(key, options={}, &block)
advisory = new(key, options)
advisory.run(&block)
end
def run(&block)
connection = ActiveRecord::Base.connection_pool.checkout
connection.transaction :requires_new => true do
if @blocking && @block_timeout
break_at = Time.now + @block_timeout
end
while !get(connection) && @blocking
break if break_at && break_at < Time.now
sleep @block_spin_wait
end
if @locked
begin
parent_thread = Thread.current
mutex = Mutex.new
checker = Thread.new do
while @locked
10.times{ sleep 0.5 if @locked }
mutex.synchronize do
if @locked
check(connection, parent_thread)
end
end
end
end
block.call
ensure
@locked = false
# Using a mutex to synchronize so that we're sure we're not
# executing a query when we kill the thread.
mutex.synchronize do
if checker.alive?
checker.exit rescue nil
end
end
end
true
else
false
end
end
ensure
ActiveRecord::Base.connection_pool.checkin(connection) if connection
end
protected
def get(connection)
lockspace_quote = connection.quote(@lockspace)
crc_quote = connection.quote(@crc)
result = exec_query(
connection,
"SELECT pg_try_advisory_xact_lock(#{lockspace_quote}, #{crc_quote})"
)
@locked = successful_result?(result)
end
def check(connection, thread)
if !connection.active?
@locked = false
thread.raise LockConnectionLost
end
end
# CRC32 digest to get a decimal numeric of the key used, make sure the
# resulting number is within PostgreSql max and min Integer numbers
def convert_to_crc(key)
crc = Zlib.crc32(key)
crc -= OVERFLOW_ADJUSTMENT if crc > MAX_LOCK
crc
end
def successful_result?(result)
result.rows.size == 1 &&
result.rows[0].size == 1 && (
result.rows[0][0] == 't' || # Checking for old ActiveRecord
result.rows[0][0].class == TrueClass # Checking for the value true
)
end
def exec_query(connection, query)
connection.exec_query(query, "Locker::Advisory")
end
end
end