Skip to content

Commit

Permalink
Get managedBlock working again
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanBateman committed Feb 18, 2020
1 parent 5f10582 commit f728c6d
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 102 deletions.
12 changes: 11 additions & 1 deletion src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import jdk.internal.misc.Blocker;

import static sun.nio.ch.EPoll.EPOLLIN;
import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
Expand Down Expand Up @@ -98,6 +99,10 @@ private void ensureOpen() {
throw new ClosedSelectorException();
}

private int poll(int timeout) throws IOException {
return EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, timeout);
}

@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
Expand All @@ -117,7 +122,12 @@ protected int doSelect(Consumer<SelectionKey> action, long timeout)

do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
if (blocking && Thread.currentThread().isVirtual()) {
int millis = to;
numEntries = Blocker.managedBlock(() -> poll(millis));
} else {
numEntries = poll(to);
}
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
Expand Down
12 changes: 11 additions & 1 deletion src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import jdk.internal.misc.Blocker;

import static sun.nio.ch.KQueue.EVFILT_READ;
import static sun.nio.ch.KQueue.EVFILT_WRITE;
Expand Down Expand Up @@ -101,6 +102,10 @@ private void ensureOpen() {
throw new ClosedSelectorException();
}

private int poll(long timeout) throws IOException {
return KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, timeout);
}

@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
Expand All @@ -119,7 +124,12 @@ protected int doSelect(Consumer<SelectionKey> action, long timeout)

do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
if (blocking && Thread.currentThread().isVirtual()) {
long millis = to;
numEntries = Blocker.managedBlock(() -> poll(millis));
} else {
numEntries = poll(to);
}
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
Expand Down
21 changes: 17 additions & 4 deletions src/java.base/share/classes/java/io/FileInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
package java.io;

import java.nio.channels.FileChannel;
import jdk.internal.misc.Blocker;
import sun.nio.ch.FileChannelImpl;


Expand Down Expand Up @@ -209,7 +210,11 @@ public FileInputStream(FileDescriptor fdObj) {
* @param name the name of the file
*/
private void open(String name) throws FileNotFoundException {
open0(name);
if (Thread.currentThread().isVirtual()) {
Blocker.managedBlock(() -> open0(name));
} else {
open0(name);
}
}

/**
Expand All @@ -221,7 +226,11 @@ private void open(String name) throws FileNotFoundException {
* @throws IOException if an I/O error occurs.
*/
public int read() throws IOException {
return read0();
if (Thread.currentThread().isVirtual()) {
return Blocker.managedBlock(() -> read0());
} else {
return read0();
}
}

private native int read0() throws IOException;
Expand All @@ -247,7 +256,7 @@ public int read() throws IOException {
* @throws IOException if an I/O error occurs.
*/
public int read(byte b[]) throws IOException {
return readBytes(b, 0, b.length);
return read(b, 0, b.length);
}

/**
Expand All @@ -269,7 +278,11 @@ public int read(byte b[]) throws IOException {
* @throws IOException if an I/O error occurs.
*/
public int read(byte b[], int off, int len) throws IOException {
return readBytes(b, off, len);
if (Thread.currentThread().isVirtual()) {
return Blocker.managedBlock(() -> readBytes(b, off, len));
} else {
return readBytes(b, off, len);
}
}

/**
Expand Down
26 changes: 20 additions & 6 deletions src/java.base/share/classes/java/io/FileOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.channels.FileChannel;
import jdk.internal.access.SharedSecrets;
import jdk.internal.access.JavaIOFileDescriptorAccess;
import jdk.internal.misc.Blocker;
import sun.nio.ch.FileChannelImpl;


Expand Down Expand Up @@ -286,9 +287,12 @@ private native void open0(String name, boolean append)
* @param name name of file to be opened
* @param append whether the file is to be opened in append mode
*/
private void open(String name, boolean append)
throws FileNotFoundException {
open0(name, append);
private void open(String name, boolean append) throws FileNotFoundException {
if (Thread.currentThread().isVirtual()) {
Blocker.managedBlock(() -> open0(name, append));
} else {
open0(name, append);
}
}

/**
Expand All @@ -308,7 +312,12 @@ private void open(String name, boolean append)
* @throws IOException if an I/O error occurs.
*/
public void write(int b) throws IOException {
write(b, fdAccess.getAppend(fd));
boolean append = fdAccess.getAppend(fd);
if (Thread.currentThread().isVirtual()) {
Blocker.managedBlock(() -> write(b, append));
} else {
write(b, append);
}
}

/**
Expand All @@ -331,7 +340,7 @@ private native void writeBytes(byte b[], int off, int len, boolean append)
* @throws IOException if an I/O error occurs.
*/
public void write(byte b[]) throws IOException {
writeBytes(b, 0, b.length, fdAccess.getAppend(fd));
write(b, 0, b.length);
}

/**
Expand All @@ -344,7 +353,12 @@ public void write(byte b[]) throws IOException {
* @throws IOException if an I/O error occurs.
*/
public void write(byte b[], int off, int len) throws IOException {
writeBytes(b, off, len, fdAccess.getAppend(fd));
boolean append = fdAccess.getAppend(fd);
if (Thread.currentThread().isVirtual()) {
Blocker.managedBlock(() -> writeBytes(b, off, len, append));
} else {
writeBytes(b, off, len, append);
}
}

/**
Expand Down
34 changes: 25 additions & 9 deletions src/java.base/share/classes/java/io/RandomAccessFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import jdk.internal.access.JavaIORandomAccessFileAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.Blocker;
import sun.nio.ch.FileChannelImpl;


Expand Down Expand Up @@ -342,9 +343,12 @@ private native void open0(String name, int mode)
* @param mode the mode flags, a combination of the O_ constants
* defined above
*/
private void open(String name, int mode)
throws FileNotFoundException {
open0(name, mode);
private void open(String name, int mode) throws FileNotFoundException {
if (Thread.currentThread().isVirtual()) {
Blocker.managedBlock(() -> open0(name, mode));
} else {
open0(name, mode);
}
}

// 'Read' primitives
Expand All @@ -365,7 +369,11 @@ private void open(String name, int mode)
* end-of-file has been reached.
*/
public int read() throws IOException {
return read0();
if (Thread.currentThread().isVirtual()) {
return Blocker.managedBlock(() -> read0());
} else {
return read0();
}
}

private native int read0() throws IOException;
Expand Down Expand Up @@ -405,7 +413,11 @@ public int read() throws IOException {
* {@code b.length - off}
*/
public int read(byte b[], int off, int len) throws IOException {
return readBytes(b, off, len);
if (Thread.currentThread().isVirtual()) {
return Blocker.managedBlock(() -> readBytes(b, off, len));
} else {
return readBytes(b, off, len);
}
}

/**
Expand All @@ -428,7 +440,7 @@ public int read(byte b[], int off, int len) throws IOException {
* @throws NullPointerException If {@code b} is {@code null}.
*/
public int read(byte b[]) throws IOException {
return readBytes(b, 0, b.length);
return read(b, 0, b.length);
}

/**
Expand Down Expand Up @@ -522,7 +534,7 @@ public int skipBytes(int n) throws IOException {
* @throws IOException if an I/O error occurs.
*/
public void write(int b) throws IOException {
write0(b);
Blocker.managedBlock(() -> write0(b));
}

private native void write0(int b) throws IOException;
Expand All @@ -545,7 +557,7 @@ public void write(int b) throws IOException {
* @throws IOException if an I/O error occurs.
*/
public void write(byte b[]) throws IOException {
writeBytes(b, 0, b.length);
write(b, 0, b.length);
}

/**
Expand All @@ -558,7 +570,11 @@ public void write(byte b[]) throws IOException {
* @throws IOException if an I/O error occurs.
*/
public void write(byte b[], int off, int len) throws IOException {
writeBytes(b, off, len);
if (Thread.currentThread().isVirtual()) {
Blocker.managedBlock(() -> writeBytes(b, off, len));
} else {
writeBytes(b, off, len);
}
}

// 'Random access' stuff
Expand Down
3 changes: 2 additions & 1 deletion src/java.base/share/classes/java/lang/System.java
Original file line number Diff line number Diff line change
Expand Up @@ -2285,7 +2285,8 @@ public void loadLibrary(Class<?> caller, String library) {
public Thread currentCarrierThread() {
return Thread.currentCarrierThread();
}
public <R> R executeOnCarrierThread(Callable<R> task) throws Exception {

public <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
Thread carrier = Thread.currentCarrierThread();
VirtualThread vthread = carrier.getVirtualThread();
if (vthread != null) carrier.setVirtualThread(null);
Expand Down
12 changes: 10 additions & 2 deletions src/java.base/share/classes/java/net/InetAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -935,13 +935,21 @@ private static final class PlatformNameService implements NameService {
public InetAddress[] lookupAllHostAddr(String host)
throws UnknownHostException
{
return Blocker.runBlocking(() -> impl.lookupAllHostAddr(host));
if (Thread.currentThread().isVirtual()) {
return Blocker.managedBlock(() -> impl.lookupAllHostAddr(host));
} else {
return impl.lookupAllHostAddr(host);
}
}

public String getHostByAddr(byte[] addr)
throws UnknownHostException
{
return Blocker.runBlocking(() -> impl.getHostByAddr(addr));
if (Thread.currentThread().isVirtual()) {
return Blocker.managedBlock(() -> impl.getHostByAddr(addr));
} else {
return impl.getHostByAddr(addr);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public interface JavaLangAccess {
/**
* Executes the given value returning task on the current carrier thread.
*/
<R> R executeOnCarrierThread(Callable<R> task) throws Exception;
<V> V executeOnCarrierThread(Callable<V> task) throws Exception;

/**
* Returns the value of the current carrier thread's copy of a thread-local.
Expand Down
Loading

0 comments on commit f728c6d

Please sign in to comment.