Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IO.read_nonblock raises EOFError with concurrent reads #5706

Closed
jonathanswenson opened this issue Apr 24, 2019 · 12 comments

Comments

@jonathanswenson
Copy link

commented Apr 24, 2019

Environment

jruby -v:

jruby 9.1.13.0 (2.3.3) 2017-09-06 8e1c115 Java HotSpot(TM) 64-Bit Server VM 25.181-b13 on 1.8.0_181-b13 +jit [darwin-x86_64]

and

jruby 9.1.17.0 (2.3.3) 2018-04-20 d8b1ff9 Java HotSpot(TM) 64-Bit Server VM 25.181-b13 on 1.8.0_181-b13 +jit [darwin-x86_64]

and

jruby 9.2.7.0 (2.5.3) 2019-04-09 8a269e3 Java HotSpot(TM) 64-Bit Server VM 25.181-b13 on 1.8.0_181-b13 +jit [darwin-x86_64]

JRUBY_OPTS:

-J-Dapple.awt.UIElement=true -J-Djava.awt.headless=true -X+O

java -version:

java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

uname -a

Darwin hades.local 18.5.0 Darwin Kernel Version 18.5.0: Mon Mar 11 20:40:32 PDT 2019; root:xnu-4903.251.3~3/RELEASE_X86_64 x86_64

Expected Behavior

When executing IO.read_nonblock on a socket with no data to be read while another thread is concurrently writing to the same socket, an EOFError can be raised. The expected behavior is for an Errno::EAGAIN or Errno::EWOULDBLOCK to be raised for every read_nonblock Same behavior appears to happen when using TCPSockets.

The socket appears to still be functional after these EOFErrors, as more data can still be read/written to/from socket.

In Ruby (C) this either appears to be threadsafe (or at least more threadsafe).

require 'socket'

s1, s2 = UNIXSocket.pair
ITRS = 2000
failures = {}

t1 = Thread.new do
  ITRS.times do |i|
    begin
      s1.read_nonblock(1)
    rescue EOFError, IOError => eof
      failures[i] = eof
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK
      # ignore
    end
  end
end

ITRS.times { |i| s1.write(".") }
t1.join

# verify that despite all the EOFErrors the socket is not actually EOF
s2.write('.')
puts "read from s1 (after all EOFs): #{s1.read_nonblock(1)}"

s1.close
s2.close

# show some info about failures
puts "#{failures.length} / #{ITRS} read_nonblock(1) failed"
iteration, error = failures.first
if error
  puts error
  puts error.backtrace 
end

output on ruby-2.5.1 (MRI) ruby 2.5.1p57 (2018-03-29 revision 63029) [x86_64-darwin17]

$ ruby socket_test.rb
read from s1 (after all EOFs): .
0 / 2000 read_nonblock(1) failed

Actual Behavior

output on jruby-9.1.17.0:

$ ruby socket_test.rb
read from s1 (after all EOFs): .
87 / 2000 read_nonblock(1) failed

End of file reached
org/jruby/RubyIO.java:2855:in `read_nonblock'
socket_test.rb:13:in `block in socket_test.rb'
org/jruby/RubyFixnum.java:305:in `times'
socket_test.rb:11:in `block in socket_test.rb'

stack trace with -Xbacktrace.style=full

End of file reached
java/lang/Thread.java:1559:in `getStackTrace'
org/jruby/runtime/backtrace/TraceType.java:244:in `getBacktraceData'
org/jruby/runtime/backtrace/TraceType.java:47:in `getBacktrace'
org/jruby/RubyException.java:245:in `prepareBacktrace'
org/jruby/exceptions/RaiseException.java:216:in `preRaise'
org/jruby/exceptions/RaiseException.java:183:in `preRaise'
org/jruby/exceptions/RaiseException.java:111:in `<init>'
org/jruby/Ruby.java:4127:in `newRaiseException'
org/jruby/Ruby.java:4078:in `newEOFError'
org/jruby/RubyIO.java:2872:in `nonblockEOF'
org/jruby/RubyIO.java:2959:in `getPartial'
org/jruby/RubyIO.java:2864:in `doReadNonblock'
org/jruby/RubyIO.java:2855:in `read_nonblock'
org/jruby/RubyIO$INVOKER$i$0$2$read_nonblock.gen:-1:in `call'
org/jruby/internal/runtime/methods/JavaMethod.java:796:in `call'
org/jruby/internal/runtime/methods/DynamicMethod.java:202:in `call'
org/jruby/runtime/callsite/CachingCallSite.java:153:in `call'
socket_test.rb:13:in `invokeOther1:read_nonblock'
socket_test.rb:13:in `block in socket_test.rb'
org/jruby/runtime/CompiledIRBlockBody.java:156:in `yieldDirect'
org/jruby/runtime/BlockBody.java:114:in `yield'
org/jruby/runtime/Block.java:165:in `yield'
org/jruby/RubyFixnum.java:305:in `times'
org/jruby/RubyFixnum$INVOKER$i$0$0$times.gen:-1:in `call'
org/jruby/runtime/callsite/CachingCallSite.java:308:in `cacheAndCall'
org/jruby/runtime/callsite/CachingCallSite.java:137:in `call'
org/jruby/runtime/callsite/CachingCallSite.java:142:in `callIter'
socket_test.rb:11:in `invokeOther4:times'
socket_test.rb:11:in `block in socket_test.rb'
org/jruby/runtime/CompiledIRBlockBody.java:145:in `callDirect'
org/jruby/runtime/IRBlockBody.java:71:in `call'
org/jruby/runtime/Block.java:124:in `call'
org/jruby/RubyProc.java:289:in `call'
org/jruby/RubyProc.java:246:in `call'
org/jruby/internal/runtime/RubyRunnable.java:104:in `run'
java/lang/Thread.java:748:in `run'
@jonathanswenson

This comment has been minimized.

Copy link
Author

commented Apr 24, 2019

Appears to work in jruby 1.7.26:

$ ruby -v 
jruby 1.7.26 (1.9.3p551) 2016-08-26 69763b8 on Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 +jit [darwin-x86_64]

$ ruby socket_test.rb
read from s1 (after all EOFs): .
0 / 2000 read_nonblock(1) failed

Doesn't appear to work in jruby 9.0.0.0

$ ruby -v 
jruby 9.0.0.0 (2.2.2) 2015-07-21 e10ec96 Java HotSpot(TM) 64-Bit Server VM 25.181-b13 on 1.8.0_181-b13 +jit [darwin-x86_64]

$ ruby socket_test.rb
read from s1 (after all EOFs): .
236 / 2000 read_nonblock(1) failed

org/jruby/RubyIO.java:2768:in `read_nonblock'
socket_test.rb:13:in `block in socket_test.rb'
org/jruby/RubyFixnum.java:304:in `times'
socket_test.rb:11:in `block in socket_test.rb'
@enebo

This comment has been minimized.

Copy link
Member

commented Apr 24, 2019

@jonathanswenson It took me a while to figure out both read and write side of your script was the same variable s1 but even past that I ran into two issues:

  1. eagain can happen so read_nonblock sometimes is not ready yet
  2. without Thread.pass or some IO in the read_nonblock loop it seems to have issues in MRI handing control back to the main thread.

Here is my revised script:

require 'socket'

s1, s2 = UNIXSocket.pair
ITRS = 2000
failures = {}
finished = 0

t1 = Thread.new do
  loop do |i|
    begin
      s2.read_nonblock(1)
      Thread.pass
      finished += 1
      break if finished >= ITRS
    rescue EOFError, IOError => eof
      failures[i] = eof
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK
      # ignore
    end
  end
end

ITRS.times { |i| s1.write(".") }
t1.join

# verify that despite all the EOFErrors the socket is not actually EOF
s2.write('.')
puts "read from s1 (after all EOFs): #{s1.read_nonblock(1)}"

s1.close
s2.close

# show some info about failures
puts "#{failures.length} / #{ITRS} read_nonblock(1) failed"
iteration, error = failures.first
if error
  puts error
  puts error.backtrace 
end

On linux (FC 29) I cannot get this script to fail ever. I think possibly either I somehow I muddled which condition leads to the IOError from my changes or possibly Linux s just behaving different. Can you rerun with this version and tell me whether you still see the IOErrors?

@enebo

This comment has been minimized.

Copy link
Member

commented Apr 24, 2019

I should add I was using master but this should not differ from 9.2.7.0 wrt to IO

@jonathanswenson

This comment has been minimized.

Copy link
Author

commented Apr 24, 2019

@enebo The above script completes without any failures for me.

There is one important difference between the two scripts. In the original script there were two threads that were concurrently accessing the same IO (one reading -- in the thread and one writing -- the main thread) as well as a single thread that reads and writes from the other socket. The intent was to have a "server" could concurrently read and write from the "server" socket while the "client" could read and write from the "client" socket. the read_nonblock is NOT supposed to be reading the data written to the socket. It should get an eagain every time it attempts to read.

The main assumption here is that the IO object was threadsafe to concurrent reads and writes. Perhaps this assumption is just incorrect.

My inspiration for this line of investigation was this issue: #4854 and the problem could be modeled with a TCP socket instead (which is how I discovered this problem in the first place)

@jonathanswenson

This comment has been minimized.

Copy link
Author

commented Apr 24, 2019

Perhaps the comparison to CRuby is not valid because of the difference in concurrency model for CRuby.

Wrote up another example that is a bit closer to what I'm actually looking into (with a client and server that are both reading and writing from a socket). Maybe this better represents the problem I'm seeing.
Again, this assumes that concurrent access to the same IO object is valid.

require 'socket'

ITRS = 2000

# server counter data
eof_count = 0
eagain_count = 0
data_read_count = 0

server = TCPServer.open(50000)
client_socket_to_server = TCPSocket.open('localhost', 50000)
server_socket_to_client = server.accept

# these are two threads that are concurrently reading / writing to the (same) socket

# this is the "server" write thread
server_write_thread = Thread.new do
  print "starting server thread that writes #{ITRS} characters to the client\n"
  ITRS.times { |i| server_socket_to_client.write(".") }
end

# this is the "server" read thread
server_read_thread = Thread.new do
  print "starting server thread the reads from the client until it receives a character\n"
  while (true)
    begin
      server_socket_to_client.read_nonblock(1)
      data_read_count += 1
      break;
    rescue EOFError, IOError => eof
      eof_count += 1
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK
      eagain_count += 1
    end
  end
end

CHUNK_SIZE = 10
client_read_thread = Thread.new do
  print "starting client thread that reads until it has received all the data from the server (#{ITRS} characters)\n"
  remaining = ITRS

  while remaining > 0
    result = client_socket_to_server.read(CHUNK_SIZE)
    remaining -= result.length
  end
  puts "client has received all data. writing one character to the server"
  client_socket_to_server.write(".")
end

client_read_thread.join
server_read_thread.join
server_write_thread.join

puts "eof: #{eof_count} (expected to be 0)"
puts "eagain: #{eagain_count}"
puts "successful_read: #{data_read_count} (expected to be 1 because the client sends one character when it completes)"

server_socket_to_client.close
client_socket_to_server.close
server.close

Expected Output

(what I get when running on jruby-1.7.26)

starting server thread that writes 2000 characters to the client
starting server thread the reads from the client until it receives a character
starting client thread that reads until it has received all the data from the server (2000 characters)
client has received all data. writing one character to the server
eof: 0 (expected to be 0)
eagain: 489
successful_read: 1 (expected to be 1 because the client sends one character when it completes)

Actual Output

(what I get when I run against jruby 9k)

starting server thread the reads from the client until it receives a character
starting server thread that writes 2000 characters to the client
starting client thread that reads until it has received all the data from the server (2000 characters)
client has received all data. writing one character to the server
eof: 187 (expected to be 0)
eagain: 1502
successful_read: 1 (expected to be 1 because the client sends one character when it completes)
@enebo

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

@jonathanswenson I will try this script tomorrow morning and see what I can see.

I am really confused how that top reported script works. You read and write to one half of the pipe you created except for the eof bit. Are you sure you don't have the one of those variables changed locally in your script?

@jonathanswenson

This comment has been minimized.

Copy link
Author

commented Apr 25, 2019

@enebo Yeah, both will read and write from the same IO object (s1) -- however the reads will not read the data written to this socket, they (should) get an EAGAIN for all 2k reads. I can even reproduce the problem by never reading or writing from the matched pair (s2).

It seems like the two threads operating on s1 (the thread and the main thread doing the writing) appear to be racing somehow.

@headius

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

I've found the problem.

In order to avoid holding a lock while doing a blocking read or write, the innermost part of our IO subsystem will release its lock on the IO stream immediately before entering the actual read/write logic.

Unfortunately that read/write logic writes to some shared state. So the sequence that leads to an EOFError goes like this:

  • Thread 1: read_nonblock sees no data to read and sets errno to EAGAIN
  • Thread 2: at that same moment, write attempts to begin its operation, which involves clearing the current errno
  • Thread 1: proceeds to return -1 back to the calling logic of read_nonblock, which then sees a -1 return value with a null errno and falls back on raising EOFError.
@headius

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

A trivial patch that breaks our releasing of the lock causes this to run properly:

diff --git a/core/src/main/java/org/jruby/util/io/OpenFile.java b/core/src/main/java/org/jruby/util/io/OpenFile.java
index f7f2cbe160..a32c7fa358 100644
--- a/core/src/main/java/org/jruby/util/io/OpenFile.java
+++ b/core/src/main/java/org/jruby/util/io/OpenFile.java
@@ -1322,11 +1322,11 @@ public class OpenFile implements Finalizable {
 
             assert fptr.lockedByMe();
 
-            fptr.unlock();
+//            fptr.unlock();
             try {
                 return fptr.posix.read(fd, iis.bufBytes, iis.buf, iis.capa, fptr.nonblock);
             } finally {
-                fptr.lock();
+//                fptr.lock();
             }
         }
 
@@ -1343,11 +1343,11 @@ public class OpenFile implements Finalizable {
 
             assert fptr.lockedByMe();
 
-            fptr.unlock();
+//            fptr.unlock();
             try {
                 return iis.fptr.posix.write(iis.fd, iis.bufBytes, iis.buf, iis.capa, iis.fptr.nonblock);
             } finally {
-                fptr.lock();
+//                fptr.lock();
             }
         }
 
@headius

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

Here's an attempt to move the lock deeper into the write call, but I think a different approach is necessary. This still allows threads to step on each others errno, just under lock when they do it.

diff --git a/core/src/main/java/org/jruby/util/io/OpenFile.java b/core/src/main/java/org/jruby/util/io/OpenFile.java
index f7f2cbe160..6146af078d 100644
--- a/core/src/main/java/org/jruby/util/io/OpenFile.java
+++ b/core/src/main/java/org/jruby/util/io/OpenFile.java
@@ -1322,12 +1322,15 @@ public class OpenFile implements Finalizable {
 
             assert fptr.lockedByMe();
 
-            fptr.unlock();
-            try {
-                return fptr.posix.read(fd, iis.bufBytes, iis.buf, iis.capa, fptr.nonblock);
-            } finally {
-                fptr.lock();
-            }
+            return iis.fptr.posix.write(iis.fd, iis.bufBytes, iis.buf, iis.capa, iis.fptr.nonblock, (w) -> {
+                assert fptr.lockedByMe();
+                fptr.unlock();
+                try {
+                    return w.getAsInt();
+                } finally {
+                    fptr.lock();
+                }
+            });
         }
 
         @Override
@@ -1341,14 +1344,15 @@ public class OpenFile implements Finalizable {
         public Integer run(ThreadContext context, InternalWriteStruct iis) throws InterruptedException {
             OpenFile fptr = iis.fptr;
 
-            assert fptr.lockedByMe();
-
-            fptr.unlock();
-            try {
-                return iis.fptr.posix.write(iis.fd, iis.bufBytes, iis.buf, iis.capa, iis.fptr.nonblock);
-            } finally {
-                fptr.lock();
-            }
+            return iis.fptr.posix.write(iis.fd, iis.bufBytes, iis.buf, iis.capa, iis.fptr.nonblock, (r) -> {
+                assert fptr.lockedByMe();
+                fptr.unlock();
+                try {
+                    return r.getAsInt();
+                } finally {
+                    fptr.lock();
+                }
+            });
         }
 
         @Override
diff --git a/core/src/main/java/org/jruby/util/io/PosixShim.java b/core/src/main/java/org/jruby/util/io/PosixShim.java
index 53cacd79d1..ae61cb839f 100644
--- a/core/src/main/java/org/jruby/util/io/PosixShim.java
+++ b/core/src/main/java/org/jruby/util/io/PosixShim.java
@@ -9,6 +9,8 @@ import java.nio.channels.Channels;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
 import java.nio.channels.Pipe;
+import java.util.function.IntSupplier;
+import java.util.function.ToIntFunction;
 
 import jnr.constants.platform.Errno;
 import jnr.constants.platform.Fcntl;
@@ -89,6 +91,10 @@ public class PosixShim {
     }
 
     public int write(ChannelFD fd, byte[] bytes, int offset, int length, boolean nonblock) {
+        return write(fd, bytes, offset, length, nonblock, w -> w.getAsInt());
+    }
+
+    public int write(ChannelFD fd, byte[] bytes, int offset, int length, boolean nonblock, ToIntFunction<IntSupplier> releaseLock) {
         clear();
 
         // FIXME: don't allocate every time
@@ -104,7 +110,8 @@ public class PosixShim {
                 errno = Errno.EACCES;
                 return -1;
             }
-            int written = fd.chWrite.write(tmp);
+
+            int written = writeWithoutLock(fd, tmp, releaseLock);
 
             if (written == 0 && length > 0) {
                 // if it's a nonblocking write against a file and we've hit EOF, do EAGAIN
@@ -116,16 +123,41 @@ public class PosixShim {
 
             return written;
         } catch (IOException ioe) {
+            ioe.printStackTrace();
             errno = Helpers.errnoFromException(ioe);
             error = ioe;
             return -1;
         }
     }
 
+    /**
+     * Innermost write, which uses the provided function to release any locks associated with this stream.
+     *
+     * @param fd
+     * @param buffer
+     * @param withLock
+     * @return
+     * @throws IOException
+     */
+    private final int writeWithoutLock(ChannelFD fd, ByteBuffer buffer, ToIntFunction<IntSupplier> withLock) throws IOException {
+        return withLock.applyAsInt(() -> {
+            try {
+                return fd.chWrite.write(buffer);
+            } catch (IOException e) {
+                Helpers.throwException(e);
+                return -1; // not reached
+            }
+        });
+    }
+
     private static final int NATIVE_EOF = 0;
     private static final int JAVA_EOF = -1;
 
     public int read(ChannelFD fd, byte[] target, int offset, int length, boolean nonblock) {
+        return read(fd, target, offset, length, nonblock, r -> r.getAsInt());
+    }
+
+    public int read(ChannelFD fd, byte[] target, int offset, int length, boolean nonblock, ToIntFunction<IntSupplier> withoutLock) {
         clear();
 
         try {
@@ -155,7 +187,7 @@ public class PosixShim {
 
             // FIXME: inefficient to recreate ByteBuffer every time
             ByteBuffer buffer = ByteBuffer.wrap(target, offset, length);
-            int read = fd.chRead.read(buffer);
+            int read = readWithoutLock(fd, buffer, withoutLock);
 
             if (nonblock) {
                 if (read == JAVA_EOF) {
@@ -168,16 +200,38 @@ public class PosixShim {
                 }
             } else {
                 // NIO channels will always raise for errors, so -1 only means EOF.
-                if (read == JAVA_EOF) read = NATIVE_EOF;
+                if (read == JAVA_EOF) {
+                    return NATIVE_EOF;
+                }
+                return read;
             }
-
-            return read;
         } catch (IOException ioe) {
+            ioe.printStackTrace();
             errno = Helpers.errnoFromException(ioe);
             return -1;
         }
     }
 
+    /**
+     * Innermost write, which uses the provided function to release any locks associated with this stream.
+     *
+     * @param fd
+     * @param buffer
+     * @param withoutLock
+     * @return
+     * @throws IOException
+     */
+    private final int readWithoutLock(ChannelFD fd, ByteBuffer buffer, ToIntFunction<IntSupplier> withoutLock) throws IOException {
+        return withoutLock.applyAsInt(() -> {
+            try {
+                return fd.chRead.read(buffer);
+            } catch (IOException e) {
+                Helpers.throwException(e);
+                return -1; // not reached
+            }
+        });
+    }
+
     // rb_thread_flock
     public int flock(ChannelFD fd, int lockMode) {
         // TODO: null channel always succeeds for all locking operations
@headius

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

Changing the errno field to be thread-local appears to be the better fix. It adds the overhead of reading and writing a threadlocal when accessing the errno, but it likely has less overhead than the nexted lambda implementation in my patch above.

In CRuby, errno is just inherited from the C library, which does not make it thread-safe. This is a problem for any implementations running C extensions in parallel, or for extensions that release the global lock while doing an IO operation. I believe this works in CRuby partly by accident; they may prevent enough concurrent IO from happening to avoid wiping out the errno value.

Our native subsystem also localizes errno to a thread-local, though I'm not certain how atomic that behavior is. Because errno is shared across all C calls in the C library, it's not possible to make it completely thread-safe. Our simulated errno, on the other hand, should actually be atomically safe across threads with this upcoming patch.

headius added a commit to headius/jruby that referenced this issue Apr 25, 2019

Make the PosixShim errno field into a thread-local.
In order to support concurrent reads and writes via the PosixShim,
we need to make errno be a thread-local. Some callers do their own
locking, releasing the lock before calling PosixShim, which causes
the errno updates and clears to step on other threads. It might be
possible to have those callers pass in their "unlock around" logic
as shown in jruby#5706, but my first attempt did not work exactly right
and may introduces object overhead for stateful lambdas.

This implementation introduces thread-local read/write overhead to
all accessors of errno, but appears to be a clean way to keep this
field safe across concurrent IO operations.

Fixes jruby#5706.
@enebo

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

@jonathanswenson ah thanks for the explanation.

@headius headius added this to the JRuby 9.2.8.0 milestone Apr 25, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.