Skip to content
Permalink
Browse files

Offload InetAddress.getByXXX to background threads until new name ser…

…vice impl available
  • Loading branch information
AlanBateman committed Nov 15, 2019
1 parent b27af9e commit 7d71fcea4bb37c5def4726d96a14da41e2074c0f
@@ -49,6 +49,7 @@

import jdk.internal.access.JavaNetInetAddressAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.Blocker;
import sun.security.action.*;
import sun.net.InetAddressCachePolicy;
import sun.net.util.IPAddressUtil;
@@ -922,15 +923,13 @@ public int compareTo(CachedAddresses other) {
public InetAddress[] lookupAllHostAddr(String host)
throws UnknownHostException
{
// TBD blocks lightweight thread
return impl.lookupAllHostAddr(host);
return Blocker.runBlocking(() -> impl.lookupAllHostAddr(host));
}

public String getHostByAddr(byte[] addr)
throws UnknownHostException
{
// TBD blocks lightweight thread
return impl.getHostByAddr(addr);
return Blocker.runBlocking(() -> impl.getHostByAddr(addr));
}
}

@@ -25,18 +25,22 @@

package jdk.internal.misc;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;

/**
* Supports the execution of tasks with a ManagedBlocker.
* Defines static methods to execute blocking tasks in a thread pool. This class
* is intended to be used by lightweight threads need to off-load a blocking task
* to avoid pinning the carrier thread, e.g. InetAddress.getByXXXX.
*/

public class Blocker {
private Blocker() { }

private static final Unsafe U = Unsafe.getUnsafe();

public interface BlockingRunnable<X extends Throwable> {
void run() throws X;
}
@@ -45,61 +49,71 @@ private Blocker() { }
V call() throws X;
}

/**
* Runs the given task with a ManagedBlocker when invoked in the context of
* a lightweight thread and the carrier thread is a ForkJoinWorkerThread.
*/
public static <X extends Throwable> void run(BlockingRunnable<X> task) throws X {
BlockingCallable<Void, X> wrapper = () -> {
public static <X extends Throwable> void runBlocking(BlockingRunnable<X> task) throws X {
if (Thread.currentThread().isLightweight()) {
Callable<?> wrapper = () -> {
try {
task.run();
} catch (Throwable e) {
U.throwException(e);
}
return null;
};
runBlockingTask(wrapper);
} else {
task.run();
return null;
};
run(wrapper);
}
}

/**
* Runs the given task with a ManagedBlocker when invoked in the context of
* a lightweight thread and the carrier thread is a ForkJoinWorkerThread.
*/
public static <V, X extends Throwable> V run(BlockingCallable<V, X> task) throws X {
if (!(LightweightThreads.currentCarrierThread() instanceof ForkJoinWorkerThread)
|| !Thread.currentThread().isLightweight()) {
public static <V, X extends Throwable> V runBlocking(BlockingCallable<V, X> task) throws X {
if (Thread.currentThread().isLightweight()) {
Callable<V> wrapper = () -> {
try {
return task.call();
} catch (Throwable e) {
U.throwException(e);
return null;
}
};
return runBlockingTask(wrapper);
} else {
return task.call();
}
}

@SuppressWarnings("unchecked")
private static <V> V runBlockingTask(Callable<V> task) {
Future<?> future = THREAD_POOL.submit(task);

var blocker = new ForkJoinPool.ManagedBlocker() {
V result;
Throwable exception;
boolean done;
@Override
public boolean block() {
if (!done) {
try {
result = task.call();
} catch (Throwable e) {
exception = e;
} finally {
done = true;
}
V result = null;
boolean interrupted = false;
try {
boolean done = false;
while (!done) {
try {
result = (V) future.get();
done = true;
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
U.throwException(e.getCause());
done = true;
}
return true;
}
@Override
public boolean isReleasable() {
return done;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
};

try {
ForkJoinPool.managedBlock(blocker);
} catch (InterruptedException e) {
U.throwException(e);
}
return result;
}

Throwable e = blocker.exception;
if (e != null) {
U.throwException(e);
}
return blocker.result;
private static final Unsafe U = Unsafe.getUnsafe();
private static final ExecutorService THREAD_POOL;
static {
int parallelism = Runtime.getRuntime().availableProcessors() << 1;
ThreadFactory factory = task -> InnocuousThread.newThread(task);
THREAD_POOL = Executors.newFixedThreadPool(parallelism, factory);
}

}

0 comments on commit 7d71fce

Please sign in to comment.
You can’t perform that action at this time.