incorrect RubyHash multithreaded behaviour when marshalling #1182

Closed
uujava opened this Issue Oct 29, 2013 · 4 comments

Projects

None yet

2 participants

@uujava
uujava commented Oct 29, 2013

It looks like RubyHash::marshalTo implementation has threading issues. It first saves hash size and then iterates over entries. When different thread modify the hash by adding entries this could result in broken dump. The test below could show what I mean.
We have two classes:
Test class holds a "hash" and an "obj" attributes.
Value class used for values stored in "hash" and in "obj" attribute. "obj" attribute in Test always holds the same Value.
When we start a thread that populate hash and begin marshal load/dump in main thread.
Eventually it will print "Wrong dump " if object attribute will not store Value object but some FixNum instance.

class ::Test
 attr_reader :hash, :obj 
 def initialize
   @hash={}
   @obj= ::Value.new(0,-2)
 end
end

class ::Value
  attr_reader :j
  def initialize i, j
   @i=i
   @j=j
  end
  
  def to_s
    "#{@i} #{@j}"
  end
end

test = ::Test.new
hash_size=200000
dump_count=100
class ::Test
 
 def initialize
   @hash={}
   @obj= ::Value.new(0,-2)
 end

 def obj
  @obj
 end

 def hash
   @hash
 end
 
 def marshal_dump
  [@hash, @obj]
 end

 def marshal_load arr
   @hash = arr[0]
   @obj = arr[1]
 end
end

class ::Value
  def initialize i, j
   @i=i
   @j=j
  end

  def j
    @j
  end  
  
  def to_s
    "#{@i} #{@j}"
  end
end

test = ::Test.new
hash_size=300000
dump_count=50
sample_count = dump_count/10

Thread.new do
   hash_size.times do |i|
    test.hash[i] = ::Value.new(i,1)
   end
end

stop = false

dump_count.times do |i|
  next if stop
  d = Marshal.dump test
  t= Marshal.load(d); 
  size = t.hash.size
  puts "sample: #{i} #{size} '#{test.hash[size-1]}'" if i.divmod(sample_count)[1]==0    
  puts "Wrong dump #{t.obj} #{t.obj.class}" unless t.obj.instance_of? ::Value
  stop = t.hash.size == hash_size
end

One could say that we should not share objects between threads, without proper locking, but still better behavior would be to behave like RubyArray::marshalTo throwing concurrent modification exception in almost the same cases. A patch (for 1.7.4 code base) could be as below:

Index: src/org/jruby/RubyHash.java
===================================================================
--- src/org/jruby/RubyHash.java (date 1381739421000)
+++ src/org/jruby/RubyHash.java (revision )
@@ -626,16 +626,25 @@
     }
 
     public void visitAll(Visitor visitor) {
+       visitLimited(visitor, -1);
+    }
+
+   private void visitLimited(Visitor visitor, long size) {
-        int startGeneration = generation;
+       int startGeneration = generation;
-        for (RubyHashEntry entry = head.nextAdded; entry != head; entry = entry.nextAdded) {
+       long count = size;
+       for (RubyHashEntry entry = head.nextAdded; entry != head && count != 0; entry = entry.nextAdded) {
+           count--;
-            if (startGeneration != generation) {
-                startGeneration = generation;
-                entry = head.nextAdded;
-                if (entry == head) break;
-            }
-            if (entry.isLive()) visitor.visit(entry.key, entry.value);
-        }
+           if (startGeneration != generation) {
+               startGeneration = generation;
+               entry = head.nextAdded;
+               if (entry == head) break;
+           }
+           if (entry.isLive()) visitor.visit(entry.key, entry.value);
+       }
+       // it does not handle all concurrent modification cases,
+       // but at least provdes correct marshal
+       if(count > 0) concurrentModification();
-    }
+   }
 
     /* ============================
      * End of hash internals
@@ -1949,9 +1958,10 @@
     // to totally change marshalling to work with overridden core classes.
     public static void marshalTo(final RubyHash hash, final MarshalStream output) throws IOException {
         output.registerLinkTarget(hash);
-        output.writeInt(hash.size);
+       int hashSize = hash.size;
+       output.writeInt(hashSize);
         try {
-            hash.visitAll(new Visitor() {
+            hash.visitLimited(new Visitor() {
                 public void visit(IRubyObject key, IRubyObject value) {
                     try {
                         output.dumpObject(key);
@@ -1960,7 +1970,7 @@
                         throw new VisitorIOException(e);
                     }
                 }
-            });
+            }, hashSize);
         } catch (VisitorIOException e) {
             throw (IOException)e.getCause();
         }
@headius
Member
headius commented Oct 29, 2013

The patch looks good, but I would propose we modify it to check that count == 0 at the end of visitLimited. This would catch cases where size decreases during iteration (which is what your patch catches) as well as where size increases during iteration (not caught by your patch).

I'm testing that change now and if it looks ok we'll go with it.

@headius headius closed this in c4f13b5 Oct 29, 2013
@uujava
uujava commented Oct 30, 2013

Sorry for late comment, but adding concurrent modification exception to visitAll method could brake a lot more existing programs. Instead of having issue with not visiting part of hash entries they will have an exception. Not sure it's Ok.
So, my preference was to modify only marshalTo as it already brakes the dumped objects and would not brake existing code. Adding proper thread safety support to RubyHash currently not possible as there are a lot of missing synchronization, and I guess it's by design.
So, I'd go with visitAll never throwing concurrent exception for now.

@uujava
uujava commented Oct 30, 2013

Better test case checks for puts and deletions:

class ::Test
 attr_reader :hash, :obj 
 def initialize
   @hash={}
   @obj= ::Value.new(0,-2)
 end
end
class ::Value
  attr_reader :j
  def initialize i, j
   @i=i
   @j=j
  end
  
  def to_s
    "#{@i} #{@j}"
  end
end
test = ::Test.new
hash_size=200000
dump_count=100
class ::Test
 
 def initialize
   @hash={}
   @obj= ::Value.new(0,-2)
 end
 def obj
  @obj
 end
 def hash
   @hash
 end
 
 def marshal_dump
  [@hash, @obj]
 end
 def marshal_load arr
   @hash = arr[0]
   @obj = arr[1]
 end
end
class ::Value
  def initialize i, j
   @i=i
   @j=j
  end
  def j
    @j
  end  
  
  def to_s
    "#{@i} #{@j}"
  end
end
hash_size=1000
dump_count=30
sample_count = dump_count/20
test = ::Test.new
def test_puts msg
  puts msg
end
Thread.new do
   hash_size.times do |i|
    test.hash[i] = ::Value.new(i,1)
    sleep 0.001 if i.divmod(sample_count)[1]==0   
   end
  hash_size.times do |i|
    test.hash.delete(rand(hash_size - i))
    sleep 0.001 if i.divmod(sample_count)[1]==0   
   end
end
dump_count.times do |i|
 begin 
  d = Marshal.dump test
  t = Marshal.load(d); 
  
  size = t.hash.size
  test_puts "sample: #{i} #{size} '#{test.hash[size-1]}'" if i.divmod(sample_count)[1]==0    
  test_puts "Wrong dump #{t.obj} #{t.obj.class}" unless t.obj.instance_of? ::Value
  sleep 0.05
 rescue ConcurrencyError => ex
   test_puts "Handle concurrency ..."
 rescue Exception => ex
   test_puts "Wrong dump: another case of broken data ..."
 end
end

And the patch I'd like to propose (1.7.4 codebase):

===================================================================
--- src/org/jruby/RubyHash.java (revision bb98e4b8decffc0361b70a6c99c1ade5fed0abcb)
+++ src/org/jruby/RubyHash.java (revision )
@@ -626,16 +626,30 @@
     }
 
     public void visitAll(Visitor visitor) {
+       // use -1 to disable concurrency checks
+       visitLimited(visitor, -1);
+    }
+
+   private void visitLimited(Visitor visitor, long size) {
-        int startGeneration = generation;
+       int startGeneration = generation;
-        for (RubyHashEntry entry = head.nextAdded; entry != head; entry = entry.nextAdded) {
+       long count = size;
+       // visit not more than size entries
+       for (RubyHashEntry entry = head.nextAdded; entry != head && count != 0; entry = entry.nextAdded) {
-            if (startGeneration != generation) {
-                startGeneration = generation;
-                entry = head.nextAdded;
-                if (entry == head) break;
-            }
+           if (startGeneration != generation) {
+               startGeneration = generation;
+               entry = head.nextAdded;
+               if (entry == head) break;
+           }
-            if (entry.isLive()) visitor.visit(entry.key, entry.value);
+           if (entry != null && entry.isLive()){
+               visitor.visit(entry.key, entry.value);
+               count--;
-        }
-    }
+           }
+       }
+       // it does not handle all concurrent modification cases,
+       // but at least provides correct marshal as we have exactly size entries visited (count == 0)
+       // or if count < 0 - skipped concurrent modification checks
+       if(count > 0) throw concurrentModification();
+   }
 
     /* ============================
      * End of hash internals
@@ -1949,9 +1963,10 @@
     // to totally change marshalling to work with overridden core classes.
     public static void marshalTo(final RubyHash hash, final MarshalStream output) throws IOException {
         output.registerLinkTarget(hash);
-        output.writeInt(hash.size);
+       int hashSize = hash.size;
+       output.writeInt(hashSize);
         try {
-            hash.visitAll(new Visitor() {
+            hash.visitLimited(new Visitor() {
                 public void visit(IRubyObject key, IRubyObject value) {
                     try {
                         output.dumpObject(key);
@@ -1960,7 +1975,7 @@
                         throw new VisitorIOException(e);
                     }
                 }
-            });
+            }, hashSize);
         } catch (VisitorIOException e) {
             throw (IOException)e.getCause();
         }
@headius
Member
headius commented Oct 30, 2013

Ok, I will agree that causing errors for iterations that don't see every element is perhaps going too far, since it would not have been fatal before. Working on incorporating your modified patch and test now.

@headius headius added a commit that referenced this issue Oct 30, 2013
@headius headius New fix for #1182, plus a regression spec.
Revert "Raise error if iteration ends without walking size elements."

This reverts commit c4f13b5.
a2c0adb
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment