-
Notifications
You must be signed in to change notification settings - Fork 419
Description
Abstract:
Playing with Concurrent::Hash here. I hoped it would block (by using the locks against the object itself) one of two concurrent callers so they will iterate the hash in a one-at-a-time manner.
CASE 1 of 2: Two threads reading the same Concurrent::Hash
When 2 threads reading hash by using iteration method #each it does not lock as expected. Instead in the middle of 1st thread reading through half of hash 2nd thread kicks in and starts reading hash over as if no any locking happening under the hood.
(see code snippet to reproduce below)
CASE 2 of 2: One thread slowly reading other thread writing from/to same Concurrent::Hash
When one thread artificially sleeps fraction of a second inside #each block the 2nd thread tries to modify the same Concurrent::Hash and fails with exception "can't add a new key into hash during iteration (RuntimeError)". Basically showing exact same behaviour as Ruby's standard Hash
(see code snippet to reproduce below)
OS: Ubuntu 14.04.5 LTS, Trusty Tahr
ruby version:
ruby 2.2.5p319 (2016-04-26 revision 54774) [x86_64-linux-gnu]
concurrent version:
concurrent-ruby-1.0.2
concurrent-ruby-ext
installed: no
concurrent-ruby-edge
used: no
gem list
*** LOCAL GEMS ***
bigdecimal (1.2.6)
bundler (1.13.6)
concurrent-ruby (1.0.2)
connection_pool (2.2.0)
daemons (1.2.4)
dogapi (1.23.0)
dogstatsd-ruby (2.0.0)
eventmachine (1.2.0.1)
ffi (1.9.14)
io-console (0.4.3)
json (1.8.1)
listen (3.1.5)
macaddr (1.7.1)
multi_json (1.12.1)
net-http-persistent (3.0.0)
posix_mq (2.3.0)
psych (2.0.8)
rack (1.6.4)
rack-protection (1.5.3)
rake (10.4.2)
rb-fsevent (0.9.8)
rb-inotify (0.9.7)
rdoc (4.2.0)
ruby-kafka (0.3.15)
ruby_dep (1.5.0)
sinatra (1.4.7)
sinatra-advanced-routes (0.5.3)
statsd-ruby (1.3.0)
sys-cpu (0.7.2)
sys-filesystem (1.1.7)
sys-proctable (1.1.3 universal-linux)
systemu (2.6.5)
thin (1.7.0)
thread_safe (0.3.5)
tilt (2.0.5)
uuid (2.3.8)
CASE 1 of 2: Two threads reading the same Concurrent::Hash code example:
#!/usr/bin/env ruby
require 'concurrent'
$h = Concurrent::Hash.new({})
def hash_reader(delim)
loop{
count = 0
$h.each{
count += 1
if count != $h.size
print delim
else
puts delim.upcase
end
}
}
end
# fill hash with N=50 items
hash_size = 50
hash_size.times{|i|
$h[i] = i
}
t1 = Thread.new(){
hash_reader("o")
}
t2 = Thread.new(){
hash_reader("i")
}
t1.join # will never happen, but for completeness
t2.join
output (you can see not all lines are complete with capitalized letter at the end):
oooooooooooooooooooooooooooooooooooooooooooooooooO
oooooooooooooooooooooooooooooooooooooooooooooooooO
oooooooooooooooooooooooooooooooooooooooooooooooooO
oooooooooooooooooooooooooooooooooooooooooooooooooO
ooooooooooooooooooooooooooooiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiioooooooooooooooooooooO
oooooooooooooooooooooooooooooooooooooooooooooooooO
oooooooooooooooooooooooooooooiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiI
iiiiiiiiiooooooooooooooooooooO
oooooooooooooooooooooooooooooooooooooooooooooooooO
oooooooooooooooooooooooooooooooooooooooooooooooooO
CASE 2 of 2: One thread slowly reading other thread writing from/to same Concurrent::Hash code example:
#!/usr/bin/env ruby
require 'concurrent'
$h = Concurrent::Hash.new({})
def hash_reader(delim)
loop{
count = 0
$h.each{
count += 1
if count != $h.size
print delim
else
puts delim.upcase
end
sleep 0.1 # lets make reader slow
}
}
end
def hash_writer()
loop{
begin
epoch_ms = (Time.now.to_f * 1000).round
$h[epoch_ms] = epoch_ms
rescue Object => boom
$stderr.puts "Exception: #{boom} (#{boom.class})"
$stderr.puts "Exception message: #{boom.message}"
end
sleep 1
}
end
# fill hash with N=50 items
hash_size = 50
hash_size.times{|i|
$h[i] = i
}
t1 = Thread.new(){
hash_reader("o")
}
t2 = Thread.new(){
hash_writer()
}
t1.join
t2.join
case 2 of 2 example output:
oException: can't add a new key into hash during iteration (RuntimeError)
Exception message: can't add a new key into hash during iteration
oooooooooException: can't add a new key into hash during iteration (RuntimeError)
Exception message: can't add a new key into hash during iteration
ooooooooooException: can't add a new key into hash during iteration (RuntimeError)
Exception message: can't add a new key into hash during iteration
ooooooooooException: can't add a new key into hash during iteration (RuntimeError)
I couldn't find example/docs on how to properly use Concurrent::Hash, so please suggest if I'm using it in a wrong way or is it a bug?
thank you in advance,
Dmitry Shevkoplyas