Skip to content

Commit

Permalink
[Wisp] Thread-based asynchronous IO implementation
Browse files Browse the repository at this point in the history
Summary:
Use ThreadPool as asynchronous IO delegate for asynchronous IO implementation.
The feature can be used in Wisp flow and non-Wisp flow.

Test Plan: test/com/alibaba/wisp/io/ThreadPoolAIOTest.java

Reviewed-by: leiyu, zhengxiaolinX

Issue:  dragonwell-project/dragonwell8#213
  • Loading branch information
识月 authored and joeylee.lz committed Mar 17, 2021
1 parent 2ee14c2 commit 95c7714
Show file tree
Hide file tree
Showing 20 changed files with 798 additions and 19 deletions.
8 changes: 4 additions & 4 deletions make/mapfiles/libjava/mapfile-vers
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ SUNWprivate_1.1 {
Java_java_io_FileInputStream_initIDs;
Java_java_io_FileInputStream_open0;
Java_java_io_FileInputStream_read0;
Java_java_io_FileInputStream_readBytes;
Java_java_io_FileInputStream_readBytes0;
Java_java_io_FileInputStream_skip0;
Java_java_io_FileOutputStream_close0;
Java_java_io_FileOutputStream_initIDs;
Java_java_io_FileOutputStream_open0;
Java_java_io_FileOutputStream_write;
Java_java_io_FileOutputStream_writeBytes;
Java_java_io_FileOutputStream_writeBytes0;
Java_java_io_ObjectInputStream_bytesToDoubles;
Java_java_io_ObjectInputStream_bytesToFloats;
Java_java_io_ObjectOutputStream_doublesToBytes;
Expand All @@ -100,11 +100,11 @@ SUNWprivate_1.1 {
Java_java_io_RandomAccessFile_length;
Java_java_io_RandomAccessFile_open0;
Java_java_io_RandomAccessFile_read0;
Java_java_io_RandomAccessFile_readBytes;
Java_java_io_RandomAccessFile_readBytes0;
Java_java_io_RandomAccessFile_seek0;
Java_java_io_RandomAccessFile_setLength;
Java_java_io_RandomAccessFile_write0;
Java_java_io_RandomAccessFile_writeBytes;
Java_java_io_RandomAccessFile_writeBytes0;
Java_java_io_UnixFileSystem_canonicalize0;
Java_java_io_UnixFileSystem_checkAccess;
Java_java_io_UnixFileSystem_createDirectory;
Expand Down
4 changes: 2 additions & 2 deletions make/mapfiles/libjava/reorder-x86
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ text: .text%Java_java_lang_Double_doubleToLongBits;
text: .text%Java_java_io_FileInputStream_open0;
text: .text%fileOpen;
text: .text%Java_java_io_UnixFileSystem_getLength;
text: .text%Java_java_io_FileInputStream_readBytes;
text: .text%Java_java_io_FileInputStream_readBytes0;
text: .text%readBytes;
text: .text%Java_java_io_FileInputStream_close0;
text: .text%Java_java_lang_Object_getClass;
Expand All @@ -76,7 +76,7 @@ text: .text%VerifyClassCodes;
# Test Exit
text: .text%Java_java_lang_Shutdown_halt;
# Test Hello
text: .text%Java_java_io_FileOutputStream_writeBytes;
text: .text%Java_java_io_FileOutputStream_writeBytes0;
text: .text%writeBytes;
# Test Sleep
# Test IntToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class WispConfiguration {
// wisp control group
static final int WISP_CONTROL_GROUP_CFS_PERIOD;

// AIO
static final boolean WISP_AIO_ENABLED;

private static List<String> THREAD_AS_WISP_BLACKLIST;

static {
Expand Down Expand Up @@ -131,6 +134,7 @@ public Properties run() {
// WISP_CONTROL_GROUP_CFS_PERIOD default value is 0(Us), WispControlGroup will estimate a cfs period according to SYSMON_TICK_US.
// If WISP_CONTROL_GROUP_CFS_PERIOD was configed by user, WispControlGroup will adopt it directly and won't estimate.
WISP_CONTROL_GROUP_CFS_PERIOD = parsePositiveIntegerParameter(p, "com.alibaba.wisp.controlGroup.cfsPeriod", 0);
WISP_AIO_ENABLED = parseBooleanParameter(p, "com.alibaba.wisp.enableAsyncIO", false);
checkCompatibility();
}

Expand Down Expand Up @@ -218,7 +222,7 @@ public Properties run() {
private static final int UNLOADED = 0, LOADING = 1, LOADED = 2;
private static final AtomicInteger bizLoadStatus = new AtomicInteger(UNLOADED);

private static void ensureBizConfigLoaded() {
public static void ensureBizConfigLoaded() {
if (bizLoadStatus.get() == LOADED) {
return;
}
Expand Down
143 changes: 143 additions & 0 deletions src/share/classes/com/alibaba/wisp/engine/WispAIOSupporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package com.alibaba.wisp.engine;

import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;

public enum WispAIOSupporter {
INSTANCE;

private ExecutorService executor;

private ThreadGroup threadgroup;

WispAIOSupporter() {
}

enum Policy {
ABORT {
@Override
void handle(ThreadPoolExecutor executor) {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
}
},
CALLERRUNS {
@Override
void handle(ThreadPoolExecutor executor) {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
},
DISCARDOLDEST {
@Override
void handle(ThreadPoolExecutor executor) {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
}
},
DISCARD {
@Override
void handle(ThreadPoolExecutor executor) {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
}
};

abstract void handle(ThreadPoolExecutor executor);
}

void startDaemon(ThreadGroup g) {
threadgroup = g;
ThreadPoolExecutor workPool;
if (WispAsyncIO.AIO_QUEUE_LIMIT == -1) {
workPool = new ThreadPoolExecutor(WispAsyncIO.CORE_POOL_SIZE,
WispAsyncIO.MAX_POOL_SIZE, Long.MAX_VALUE,
TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new AIOThreadPoolFactory());
} else {
workPool = new ThreadPoolExecutor(WispAsyncIO.CORE_POOL_SIZE,
WispAsyncIO.MAX_POOL_SIZE, Long.MAX_VALUE,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(WispAsyncIO.AIO_QUEUE_LIMIT), new AIOThreadPoolFactory());
}
WispAsyncIO.REJECTED_POLICY.handle(workPool);
workPool.disableWispShift();
this.executor = workPool;
WispAsyncIO.wisp_aio_done = true;
}

public Integer invokeWriteTask(Callable<Integer> command) throws IOException {
if (WispAsyncIO.AIO_WRITE_NO_WAIT) {
try {
submitIOTask(command);
} catch (RejectedExecutionException e) {
throw new IOException("busy", e);
}
return 0;
} else {
return invokeIOTask(command);
}
}

public Long invokeWriteVTask(Callable<Long> command) throws IOException {
if (WispAsyncIO.AIO_WRITE_NO_WAIT) {
submitIOTask(command);
return 0L;
} else {
return invokeIOTask(command);
}
}


public <T> T invokeIOTask(Callable<T> command) throws IOException {
Future<T> future;
try {
future = submitIOTask(command);
} catch (RejectedExecutionException e) {
throw new IOException("busy", e);
}
T result;
try {
if (WispAsyncIO.TIMEOUT == -1) {
result = future.get();
} else {
result = future.get(WispAsyncIO.TIMEOUT, TimeUnit.MILLISECONDS);
}
} catch (TimeoutException e) {
throw new IOException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Class<?> causeClass = cause.getClass();
if (IOException.class.isAssignableFrom(causeClass)) {
throw (IOException) cause;
} else if (RuntimeException.class.isAssignableFrom(causeClass)) {
throw (RuntimeException) cause;
} else if (Error.class.isAssignableFrom(causeClass)) {
throw (Error) cause;
} else {
throw new Error(e);
}
} catch (InterruptedException e) {
throw new IOException(e);
}
return result;
}

<T> Future<T> submitIOTask(Callable<T> command) {
return executor.submit(command);
}

private class AIOThreadPoolFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final static String namePrefix = "AIO-worker-thread-";
AIOThreadPoolFactory() {}

@Override
public Thread newThread(Runnable r) {
Thread t;
if (threadgroup != null) {
t = new Thread(threadgroup, r, namePrefix + threadNumber.getAndIncrement());
} else {
t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
}
t.setDaemon(true);
return t;
}
}
}
110 changes: 110 additions & 0 deletions src/share/classes/com/alibaba/wisp/engine/WispAsyncIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.alibaba.wisp.engine;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.Properties;
import java.security.AccessController;
import sun.misc.SharedSecrets;
import sun.misc.UnsafeAccess;
import sun.misc.WispAsyncIOAccess;
import sun.security.action.GetBooleanSecurityPropertyAction;
import sun.security.action.GetIntegerAction;

public class WispAsyncIO {
// AIO
static final int CORE_POOL_SIZE;
static final int MAX_POOL_SIZE;
static final int AIO_QUEUE_LIMIT;
static final int TIMEOUT;
static final boolean AIO_WRITE_NO_WAIT;
static final WispAIOSupporter.Policy REJECTED_POLICY;
static boolean wisp_aio_done = false;

static {
Properties p = java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<Properties>() {
public Properties run() {
return System.getProperties();
}
}
);
CORE_POOL_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.aio.corePoolSize",
Runtime.getRuntime().availableProcessors());
MAX_POOL_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.aio.maxPoolSize",
Runtime.getRuntime().availableProcessors());
AIO_QUEUE_LIMIT = parsePositiveIntegerParameter(p, "com.alibaba.aio.queueLimit", -1);
TIMEOUT = parsePositiveIntegerParameter(p, "com.alibaba.aio.timeout", -1);
AIO_WRITE_NO_WAIT = parseBooleanParameter(p, "com.alibaba.aio.nowait", false);
REJECTED_POLICY = WispAIOSupporter.Policy.valueOf(
p.getProperty("com.alibaba.aio.policy", WispAIOSupporter.Policy.ABORT.name()));
setWispAsyncIOAccess();
}

static void setWispAsyncIOAccess() {
// initialize TenantAccess
if (SharedSecrets.getWispAsyncIOAccess() == null) {
SharedSecrets.setWispAsyncIOAccess(new WispAsyncIOAccess() {
@Override
public boolean usingAsyncIO() {
return wisp_aio_done;
}

@Override
public <T> T executeAsyncIO(Callable<T> command) throws IOException {
return WispAIOSupporter.INSTANCE.invokeIOTask(command);
}

@Override
public Integer executeAsyncWrite(Callable<Integer> command) throws IOException {
return WispAIOSupporter.INSTANCE.invokeWriteTask(command);
}

@Override
public Long executeAsyncWriteV(Callable<Long> command) throws IOException {
return WispAIOSupporter.INSTANCE.invokeWriteVTask(command);
}
});
}
}

/*
* Initialize the WispAsyncIO class, called after System.initializeSystemClass by VM.
**/
static void initializeAsyncIOClass() {
try {
Class.forName(WispAIOSupporter.class.getName());
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}

static void startAsyncIODaemonWithThreadGroup(ThreadGroup g) {
WispAIOSupporter.INSTANCE.startDaemon(g);
}

static void startAsyncIODaemon() {
WispAIOSupporter.INSTANCE.startDaemon(null);
}

private static boolean parseBooleanParameter(Properties p, String key, boolean defaultVal) {
String value;
if (p == null || (value = p.getProperty(key)) == null) {
return defaultVal;
}
return Boolean.valueOf(value);
}

private static int parsePositiveIntegerParameter(Properties p, String key, int defaultVal) {
String value;
if (p == null || (value = p.getProperty(key)) == null) {
return defaultVal;
}
int res = defaultVal;
try {
res = Integer.valueOf(value);
} catch (NumberFormatException e) {
return defaultVal;
}
return res <= 0 ? defaultVal : res;
}
}

0 comments on commit 95c7714

Please sign in to comment.