Skip to content

Commit

Permalink
[Wisp] Thread-based asynchronous IO implementation
Browse files Browse the repository at this point in the history
Summary:
Use ThreadPool as asynchronous IO delegate for asynchronous IO implementation.
The feature can be used in Wisp flow and non-Wisp flow.

Test Plan: test/com/alibaba/wisp/io/ThreadPoolAIOTest.java

Reviewed-by: leiyu, zhengxiaolinX

Issue:  dragonwell-project/dragonwell8#213

thread local judge
  • Loading branch information
joeyleeeeeee97 authored and joeylee.lz committed Apr 25, 2021
1 parent 2ee14c2 commit 81237ca
Show file tree
Hide file tree
Showing 19 changed files with 632 additions and 26 deletions.
8 changes: 4 additions & 4 deletions make/mapfiles/libjava/mapfile-linux
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ SUNWprivate_1.1 {
Java_java_io_FileInputStream_initIDs;
Java_java_io_FileInputStream_open0;
Java_java_io_FileInputStream_read0;
Java_java_io_FileInputStream_readBytes;
Java_java_io_FileInputStream_readBytes0;
Java_java_io_FileInputStream_skip0;
Java_java_io_FileOutputStream_close0;
Java_java_io_FileOutputStream_initIDs;
Java_java_io_FileOutputStream_open0;
Java_java_io_FileOutputStream_write;
Java_java_io_FileOutputStream_writeBytes;
Java_java_io_FileOutputStream_writeBytes0;
Java_java_io_ObjectInputStream_bytesToDoubles;
Java_java_io_ObjectInputStream_bytesToFloats;
Java_java_io_ObjectOutputStream_doublesToBytes;
Expand All @@ -100,11 +100,11 @@ SUNWprivate_1.1 {
Java_java_io_RandomAccessFile_length;
Java_java_io_RandomAccessFile_open0;
Java_java_io_RandomAccessFile_read0;
Java_java_io_RandomAccessFile_readBytes;
Java_java_io_RandomAccessFile_readBytes0;
Java_java_io_RandomAccessFile_seek0;
Java_java_io_RandomAccessFile_setLength;
Java_java_io_RandomAccessFile_write0;
Java_java_io_RandomAccessFile_writeBytes;
Java_java_io_RandomAccessFile_writeBytes0;
Java_java_io_UnixFileSystem_canonicalize0;
Java_java_io_UnixFileSystem_checkAccess;
Java_java_io_UnixFileSystem_createDirectory;
Expand Down
8 changes: 4 additions & 4 deletions make/mapfiles/libjava/mapfile-vers
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ SUNWprivate_1.1 {
Java_java_io_FileInputStream_initIDs;
Java_java_io_FileInputStream_open0;
Java_java_io_FileInputStream_read0;
Java_java_io_FileInputStream_readBytes;
Java_java_io_FileInputStream_readBytes0;
Java_java_io_FileInputStream_skip0;
Java_java_io_FileOutputStream_close0;
Java_java_io_FileOutputStream_initIDs;
Java_java_io_FileOutputStream_open0;
Java_java_io_FileOutputStream_write;
Java_java_io_FileOutputStream_writeBytes;
Java_java_io_FileOutputStream_writeBytes0;
Java_java_io_ObjectInputStream_bytesToDoubles;
Java_java_io_ObjectInputStream_bytesToFloats;
Java_java_io_ObjectOutputStream_doublesToBytes;
Expand All @@ -100,11 +100,11 @@ SUNWprivate_1.1 {
Java_java_io_RandomAccessFile_length;
Java_java_io_RandomAccessFile_open0;
Java_java_io_RandomAccessFile_read0;
Java_java_io_RandomAccessFile_readBytes;
Java_java_io_RandomAccessFile_readBytes0;
Java_java_io_RandomAccessFile_seek0;
Java_java_io_RandomAccessFile_setLength;
Java_java_io_RandomAccessFile_write0;
Java_java_io_RandomAccessFile_writeBytes;
Java_java_io_RandomAccessFile_writeBytes0;
Java_java_io_UnixFileSystem_canonicalize0;
Java_java_io_UnixFileSystem_checkAccess;
Java_java_io_UnixFileSystem_createDirectory;
Expand Down
4 changes: 2 additions & 2 deletions make/mapfiles/libjava/reorder-x86
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ text: .text%Java_java_lang_Double_doubleToLongBits;
text: .text%Java_java_io_FileInputStream_open0;
text: .text%fileOpen;
text: .text%Java_java_io_UnixFileSystem_getLength;
text: .text%Java_java_io_FileInputStream_readBytes;
text: .text%Java_java_io_FileInputStream_readBytes0;
text: .text%readBytes;
text: .text%Java_java_io_FileInputStream_close0;
text: .text%Java_java_lang_Object_getClass;
Expand All @@ -76,7 +76,7 @@ text: .text%VerifyClassCodes;
# Test Exit
text: .text%Java_java_lang_Shutdown_halt;
# Test Hello
text: .text%Java_java_io_FileOutputStream_writeBytes;
text: .text%Java_java_io_FileOutputStream_writeBytes0;
text: .text%writeBytes;
# Test Sleep
# Test IntToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public Properties run() {
private static final int UNLOADED = 0, LOADING = 1, LOADED = 2;
private static final AtomicInteger bizLoadStatus = new AtomicInteger(UNLOADED);

private static void ensureBizConfigLoaded() {
public static void ensureBizConfigLoaded() {
if (bizLoadStatus.get() == LOADED) {
return;
}
Expand Down
121 changes: 121 additions & 0 deletions src/share/classes/com/alibaba/wisp/engine/WispAIOSupporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.alibaba.wisp.engine;

import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;

public enum WispAIOSupporter {
INSTANCE;

private ExecutorService executor;

private ThreadGroup threadgroup;

WispAIOSupporter() {
}

enum Policy {
ABORT {
@Override
void handle(ThreadPoolExecutor executor) {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
}
},
CALLERRUNS {
@Override
void handle(ThreadPoolExecutor executor) {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
},
DISCARDOLDEST {
@Override
void handle(ThreadPoolExecutor executor) {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
}
},
DISCARD {
@Override
void handle(ThreadPoolExecutor executor) {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
}
};

abstract void handle(ThreadPoolExecutor executor);
}

void startDaemon(ThreadGroup g) {
threadgroup = g;
ThreadPoolExecutor workPool;
if (WispAsyncIO.AIO_QUEUE_LIMIT == -1) {
workPool = new ThreadPoolExecutor(WispAsyncIO.CORE_POOL_SIZE,
WispAsyncIO.MAX_POOL_SIZE, Long.MAX_VALUE,
TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new AIOThreadPoolFactory());
} else {
workPool = new ThreadPoolExecutor(WispAsyncIO.CORE_POOL_SIZE,
WispAsyncIO.MAX_POOL_SIZE, Long.MAX_VALUE,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(WispAsyncIO.AIO_QUEUE_LIMIT), new AIOThreadPoolFactory());
}
WispAsyncIO.REJECTED_POLICY.handle(workPool);
this.executor = workPool;
WispAsyncIO.wisp_aio_done = true;
}

public <T> T invokeIOTask(Callable<T> command) throws IOException {
Future<T> future;
try {
future = submitIOTask(command);
} catch (RejectedExecutionException e) {
throw new IOException("busy", e);
}
T result;
while (true) {
try {
if (WispAsyncIO.TIMEOUT == -1) {
result = future.get();
} else {
result = future.get(WispAsyncIO.TIMEOUT, TimeUnit.MILLISECONDS);
}
return result;
} catch (TimeoutException e) {
throw new IOException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Class<?> causeClass = cause.getClass();
if (IOException.class.isAssignableFrom(causeClass)) {
throw (IOException) cause;
} else if (RuntimeException.class.isAssignableFrom(causeClass)) {
throw (RuntimeException) cause;
} else if (Error.class.isAssignableFrom(causeClass)) {
throw (Error) cause;
} else {
throw new Error(e);
}
} catch (InterruptedException e) {
Thread.interrupted();
}
}
}

<T> Future<T> submitIOTask(Callable<T> command) {
return executor.submit(command);
}

private class AIOThreadPoolFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final static String namePrefix = "AIO-worker-thread-";
AIOThreadPoolFactory() {}

@Override
public Thread newThread(Runnable r) {
Thread t;
if (threadgroup != null) {
t = new Thread(threadgroup, r, namePrefix + threadNumber.getAndIncrement());
} else {
t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
}
t.setDaemon(true);
return t;
}
}
}
105 changes: 105 additions & 0 deletions src/share/classes/com/alibaba/wisp/engine/WispAsyncIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.alibaba.wisp.engine;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.Properties;
import java.security.AccessController;
import sun.misc.SharedSecrets;
import sun.misc.UnsafeAccess;
import sun.misc.WispAsyncIOAccess;
import sun.security.action.GetBooleanSecurityPropertyAction;
import sun.security.action.GetIntegerAction;

public class WispAsyncIO {
// AIO
static final int CORE_POOL_SIZE;
static final int MAX_POOL_SIZE;
static final int AIO_QUEUE_LIMIT;
static final int TIMEOUT;
static final WispAIOSupporter.Policy REJECTED_POLICY;
static boolean wisp_aio_done = false;

static {
Properties p = java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<Properties>() {
public Properties run() {
return System.getProperties();
}
}
);
CORE_POOL_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.aio.corePoolSize",
Runtime.getRuntime().availableProcessors());
MAX_POOL_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.aio.maxPoolSize",
Runtime.getRuntime().availableProcessors());
AIO_QUEUE_LIMIT = parsePositiveIntegerParameter(p, "com.alibaba.aio.queueLimit", -1);
TIMEOUT = parsePositiveIntegerParameter(p, "com.alibaba.aio.timeout", -1);
REJECTED_POLICY = WispAIOSupporter.Policy.valueOf(
p.getProperty("com.alibaba.aio.policy", WispAIOSupporter.Policy.ABORT.name()));
}

static void setWispAsyncIOAccess() {
// initialize TenantAccess
if (SharedSecrets.getWispAsyncIOAccess() == null) {
SharedSecrets.setWispAsyncIOAccess(new WispAsyncIOAccess() {
@Override
public boolean usingAsyncIO() {
return wisp_aio_done;
}

@Override
public <T> T executeAsyncIO(Callable<T> command) throws IOException {
return WispAIOSupporter.INSTANCE.invokeIOTask(command);
}
});
}
}

public static boolean useAsyncIO() {
return SharedSecrets.getWispAsyncIOAccess() != null
&& SharedSecrets.getWispAsyncIOAccess().usingAsyncIO()
&& SharedSecrets.getWispEngineAccess() != null
&& SharedSecrets.getWispEngineAccess().runningAsCoroutine(Thread.currentThread());
}

/*
* Initialize the WispAsyncIO class, called after System.initializeSystemClass by VM.
**/
static void initializeAsyncIOClass() {
try {
Class.forName(WispAIOSupporter.class.getName());
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}

static void startAsyncIODaemonWithThreadGroup(ThreadGroup g) {
WispAIOSupporter.INSTANCE.startDaemon(g);
}

static void startAsyncIODaemon() {
WispAIOSupporter.INSTANCE.startDaemon(WispEngine.DAEMON_THREAD_GROUP);
setWispAsyncIOAccess();
}

private static boolean parseBooleanParameter(Properties p, String key, boolean defaultVal) {
String value;
if (p == null || (value = p.getProperty(key)) == null) {
return defaultVal;
}
return Boolean.valueOf(value);
}

private static int parsePositiveIntegerParameter(Properties p, String key, int defaultVal) {
String value;
if (p == null || (value = p.getProperty(key)) == null) {
return defaultVal;
}
int res = defaultVal;
try {
res = Integer.valueOf(value);
} catch (NumberFormatException e) {
return defaultVal;
}
return res <= 0 ? defaultVal : res;
}
}
32 changes: 31 additions & 1 deletion src/share/classes/java/io/FileInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@

package java.io;

import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.Callable;

import com.alibaba.wisp.engine.WispAsyncIO;
import sun.misc.SharedSecrets;
import sun.nio.ch.FileChannelImpl;


Expand Down Expand Up @@ -62,6 +67,10 @@ class FileInputStream extends InputStream
private final Object closeLock = new Object();
private volatile boolean closed = false;

private ByteBuffer bb = null;
private byte[] bs = null; // Invoker's previous array
private byte[] b1 = null;

/**
* Creates a <code>FileInputStream</code> by
* opening a connection to an actual file,
Expand Down Expand Up @@ -178,6 +187,7 @@ public FileInputStream(FileDescriptor fdObj) {
* Register this stream with FileDescriptor tracker.
*/
fd.attach(this);

}

/**
Expand All @@ -204,6 +214,14 @@ private void open(String name) throws FileNotFoundException {
* @exception IOException if an I/O error occurs.
*/
public int read() throws IOException {
if (WispAsyncIO.useAsyncIO()) {
return SharedSecrets.getWispAsyncIOAccess().executeAsyncIO(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return read0();
}
});
}
return read0();
}

Expand All @@ -216,7 +234,19 @@ public int read() throws IOException {
* @param len the number of bytes that are written
* @exception IOException If an I/O error has occurred.
*/
private native int readBytes(byte b[], int off, int len) throws IOException;
private native int readBytes0(byte b[], int off, int len) throws IOException;

private int readBytes(byte b[], int off, int len) throws IOException {
if (WispAsyncIO.useAsyncIO()) {
return SharedSecrets.getWispAsyncIOAccess().executeAsyncIO(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return readBytes0(b, off, len);
}
});
}
return readBytes0(b, off, len);
}

/**
* Reads up to <code>b.length</code> bytes of data from this input
Expand Down

0 comments on commit 81237ca

Please sign in to comment.