Skip to content

Commit

Permalink
[Wisp] Thread-based asynchronous IO implementation
Browse files Browse the repository at this point in the history
Summary:
Use ThreadPool as asynchronous IO delegate for asynchronous IO implementation.
The feature can be used in Wisp flow and non-Wisp flow.

Test Plan: test/com/alibaba/wisp/io/ThreadPoolAIOTest.java

Reviewed-by: leiyu, zhengxiaolinX

Issue:  dragonwell-project/dragonwell8#213

thread local judge
  • Loading branch information
joeyleeeeeee97 authored and joeylee.lz committed Apr 25, 2021
1 parent 2ee14c2 commit 75b70bc
Show file tree
Hide file tree
Showing 19 changed files with 551 additions and 26 deletions.
8 changes: 4 additions & 4 deletions make/mapfiles/libjava/mapfile-linux
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ SUNWprivate_1.1 {
Java_java_io_FileInputStream_initIDs;
Java_java_io_FileInputStream_open0;
Java_java_io_FileInputStream_read0;
Java_java_io_FileInputStream_readBytes;
Java_java_io_FileInputStream_readBytes0;
Java_java_io_FileInputStream_skip0;
Java_java_io_FileOutputStream_close0;
Java_java_io_FileOutputStream_initIDs;
Java_java_io_FileOutputStream_open0;
Java_java_io_FileOutputStream_write;
Java_java_io_FileOutputStream_writeBytes;
Java_java_io_FileOutputStream_writeBytes0;
Java_java_io_ObjectInputStream_bytesToDoubles;
Java_java_io_ObjectInputStream_bytesToFloats;
Java_java_io_ObjectOutputStream_doublesToBytes;
Expand All @@ -100,11 +100,11 @@ SUNWprivate_1.1 {
Java_java_io_RandomAccessFile_length;
Java_java_io_RandomAccessFile_open0;
Java_java_io_RandomAccessFile_read0;
Java_java_io_RandomAccessFile_readBytes;
Java_java_io_RandomAccessFile_readBytes0;
Java_java_io_RandomAccessFile_seek0;
Java_java_io_RandomAccessFile_setLength;
Java_java_io_RandomAccessFile_write0;
Java_java_io_RandomAccessFile_writeBytes;
Java_java_io_RandomAccessFile_writeBytes0;
Java_java_io_UnixFileSystem_canonicalize0;
Java_java_io_UnixFileSystem_checkAccess;
Java_java_io_UnixFileSystem_createDirectory;
Expand Down
8 changes: 4 additions & 4 deletions make/mapfiles/libjava/mapfile-vers
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ SUNWprivate_1.1 {
Java_java_io_FileInputStream_initIDs;
Java_java_io_FileInputStream_open0;
Java_java_io_FileInputStream_read0;
Java_java_io_FileInputStream_readBytes;
Java_java_io_FileInputStream_readBytes0;
Java_java_io_FileInputStream_skip0;
Java_java_io_FileOutputStream_close0;
Java_java_io_FileOutputStream_initIDs;
Java_java_io_FileOutputStream_open0;
Java_java_io_FileOutputStream_write;
Java_java_io_FileOutputStream_writeBytes;
Java_java_io_FileOutputStream_writeBytes0;
Java_java_io_ObjectInputStream_bytesToDoubles;
Java_java_io_ObjectInputStream_bytesToFloats;
Java_java_io_ObjectOutputStream_doublesToBytes;
Expand All @@ -100,11 +100,11 @@ SUNWprivate_1.1 {
Java_java_io_RandomAccessFile_length;
Java_java_io_RandomAccessFile_open0;
Java_java_io_RandomAccessFile_read0;
Java_java_io_RandomAccessFile_readBytes;
Java_java_io_RandomAccessFile_readBytes0;
Java_java_io_RandomAccessFile_seek0;
Java_java_io_RandomAccessFile_setLength;
Java_java_io_RandomAccessFile_write0;
Java_java_io_RandomAccessFile_writeBytes;
Java_java_io_RandomAccessFile_writeBytes0;
Java_java_io_UnixFileSystem_canonicalize0;
Java_java_io_UnixFileSystem_checkAccess;
Java_java_io_UnixFileSystem_createDirectory;
Expand Down
4 changes: 2 additions & 2 deletions make/mapfiles/libjava/reorder-x86
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ text: .text%Java_java_lang_Double_doubleToLongBits;
text: .text%Java_java_io_FileInputStream_open0;
text: .text%fileOpen;
text: .text%Java_java_io_UnixFileSystem_getLength;
text: .text%Java_java_io_FileInputStream_readBytes;
text: .text%Java_java_io_FileInputStream_readBytes0;
text: .text%readBytes;
text: .text%Java_java_io_FileInputStream_close0;
text: .text%Java_java_lang_Object_getClass;
Expand All @@ -76,7 +76,7 @@ text: .text%VerifyClassCodes;
# Test Exit
text: .text%Java_java_lang_Shutdown_halt;
# Test Hello
text: .text%Java_java_io_FileOutputStream_writeBytes;
text: .text%Java_java_io_FileOutputStream_writeBytes0;
text: .text%writeBytes;
# Test Sleep
# Test IntToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public Properties run() {
private static final int UNLOADED = 0, LOADING = 1, LOADED = 2;
private static final AtomicInteger bizLoadStatus = new AtomicInteger(UNLOADED);

private static void ensureBizConfigLoaded() {
public static void ensureBizConfigLoaded() {
if (bizLoadStatus.get() == LOADED) {
return;
}
Expand Down
82 changes: 82 additions & 0 deletions src/share/classes/com/alibaba/wisp/engine/WispAIOSupporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.alibaba.wisp.engine;

import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;

public enum WispAIOSupporter {
INSTANCE;

private ExecutorService executor;

private ThreadGroup threadgroup;

WispAIOSupporter() {
}

void startDaemon(ThreadGroup g) {
threadgroup = g;
ThreadPoolExecutor workPool;
workPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(), Long.MAX_VALUE,
TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new AIOThreadPoolFactory());
workPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.executor = workPool;
WispAsyncIO.wispAIOLoaded = true;
}

public <T> T invokeIOTask(Callable<T> command) throws IOException {
Future<T> future;
try {
future = submitIOTask(command);
} catch (RejectedExecutionException e) {
throw new IOException("busy", e);
}
T result;
while (true) {
try {
result = future.get();
return result;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Class<?> causeClass = cause.getClass();
if (IOException.class.isAssignableFrom(causeClass)) {
throw (IOException) cause;
} else if (RuntimeException.class.isAssignableFrom(causeClass)) {
throw (RuntimeException) cause;
} else if (Error.class.isAssignableFrom(causeClass)) {
throw (Error) cause;
} else {
throw new Error(e);
}
} catch (InterruptedException e) {
Thread.interrupted();
}
}
}

<T> Future<T> submitIOTask(Callable<T> command) {
return executor.submit(command);
}

private class AIOThreadPoolFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final static String namePrefix = "AIO-worker-thread-";

AIOThreadPoolFactory() {
}

@Override
public Thread newThread(Runnable r) {
Thread t;
if (threadgroup != null) {
t = new Thread(threadgroup, r, namePrefix + threadNumber.getAndIncrement());
} else {
t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
}
t.setDaemon(true);
return t;
}
}
}
63 changes: 63 additions & 0 deletions src/share/classes/com/alibaba/wisp/engine/WispAsyncIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.alibaba.wisp.engine;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.Properties;

import sun.misc.SharedSecrets;
import sun.misc.WispAsyncIOAccess;

public class WispAsyncIO {
// AIO
static boolean wispAIOLoaded = false;

static {
Properties p = java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<Properties>() {
public Properties run() {
return System.getProperties();
}
}
);
}

static void setWispAsyncIOAccess() {
// initialize TenantAccess
if (SharedSecrets.getWispAsyncIOAccess() == null) {
SharedSecrets.setWispAsyncIOAccess(new WispAsyncIOAccess() {
@Override
public boolean usingAsyncIO() {
return wispAIOLoaded;
}

@Override
public <T> T executeAsyncIO(Callable<T> command) throws IOException {
return WispAIOSupporter.INSTANCE.invokeIOTask(command);
}
});
}
}

public static boolean useAsyncIO() {
return SharedSecrets.getWispAsyncIOAccess() != null
&& SharedSecrets.getWispAsyncIOAccess().usingAsyncIO()
&& SharedSecrets.getWispEngineAccess() != null
&& SharedSecrets.getWispEngineAccess().runningAsCoroutine(Thread.currentThread());
}

/*
* Initialize the WispAsyncIO class, called after System.initializeSystemClass by VM.
**/
static void initializeAsyncIOClass() {
try {
Class.forName(WispAIOSupporter.class.getName());
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}

static void startAsyncIODaemon() {
WispAIOSupporter.INSTANCE.startDaemon(WispEngine.DAEMON_THREAD_GROUP);
setWispAsyncIOAccess();
}
}
32 changes: 31 additions & 1 deletion src/share/classes/java/io/FileInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@

package java.io;

import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.Callable;

import com.alibaba.wisp.engine.WispAsyncIO;
import sun.misc.SharedSecrets;
import sun.nio.ch.FileChannelImpl;


Expand Down Expand Up @@ -62,6 +67,10 @@ class FileInputStream extends InputStream
private final Object closeLock = new Object();
private volatile boolean closed = false;

private ByteBuffer bb = null;
private byte[] bs = null; // Invoker's previous array
private byte[] b1 = null;

/**
* Creates a <code>FileInputStream</code> by
* opening a connection to an actual file,
Expand Down Expand Up @@ -178,6 +187,7 @@ public FileInputStream(FileDescriptor fdObj) {
* Register this stream with FileDescriptor tracker.
*/
fd.attach(this);

}

/**
Expand All @@ -204,6 +214,14 @@ private void open(String name) throws FileNotFoundException {
* @exception IOException if an I/O error occurs.
*/
public int read() throws IOException {
if (WispAsyncIO.useAsyncIO()) {
return SharedSecrets.getWispAsyncIOAccess().executeAsyncIO(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return read0();
}
});
}
return read0();
}

Expand All @@ -216,7 +234,19 @@ public int read() throws IOException {
* @param len the number of bytes that are written
* @exception IOException If an I/O error has occurred.
*/
private native int readBytes(byte b[], int off, int len) throws IOException;
private native int readBytes0(byte b[], int off, int len) throws IOException;

private int readBytes(byte b[], int off, int len) throws IOException {
if (WispAsyncIO.useAsyncIO()) {
return SharedSecrets.getWispAsyncIOAccess().executeAsyncIO(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return readBytes0(b, off, len);
}
});
}
return readBytes0(b, off, len);
}

/**
* Reads up to <code>b.length</code> bytes of data from this input
Expand Down
37 changes: 35 additions & 2 deletions src/share/classes/java/io/FileOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@

package java.io;

import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.Callable;

import com.alibaba.wisp.engine.WispAsyncIO;
import sun.misc.SharedSecrets;
import sun.nio.ch.FileChannelImpl;


Expand Down Expand Up @@ -67,6 +72,10 @@ class FileOutputStream extends OutputStream
*/
private FileChannel channel;

private ByteBuffer bb = null;
private byte[] bs = null; // Invoker's previous array
private byte[] b1 = null;

/**
* The path of the referenced file
* (null if the stream is created with a file descriptor)
Expand Down Expand Up @@ -287,7 +296,17 @@ private void open(String name, boolean append)
* @exception IOException if an I/O error occurs.
*/
public void write(int b) throws IOException {
write(b, append);
if (WispAsyncIO.useAsyncIO()) {
SharedSecrets.getWispAsyncIOAccess().executeAsyncIO(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
write(b, append);
return 0;
}
});
} else {
write(b, append);
}
}

/**
Expand All @@ -299,9 +318,23 @@ public void write(int b) throws IOException {
* end of file
* @exception IOException If an I/O error has occurred.
*/
private native void writeBytes(byte b[], int off, int len, boolean append)
private native void writeBytes0(byte b[], int off, int len, boolean append)
throws IOException;

private void writeBytes(byte b[], int off, int len, boolean append) throws IOException {
if (WispAsyncIO.useAsyncIO()) {
SharedSecrets.getWispAsyncIOAccess().executeAsyncIO(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
writeBytes0(b, off, len, append);
return 0;
}
});
} else {
writeBytes0(b, off, len, append);
}
}

/**
* Writes <code>b.length</code> bytes from the specified byte array
* to this file output stream.
Expand Down

0 comments on commit 75b70bc

Please sign in to comment.