diff --git a/src/hotspot/share/c1/c1_Runtime1.cpp b/src/hotspot/share/c1/c1_Runtime1.cpp index 0e5e8727d8a..f5a32031aa9 100644 --- a/src/hotspot/share/c1/c1_Runtime1.cpp +++ b/src/hotspot/share/c1/c1_Runtime1.cpp @@ -496,7 +496,8 @@ JRT_ENTRY_NO_ASYNC(static address, exception_handler_for_pc_helper(JavaThread* t // Reset method handle flag. thread->set_is_method_handle_return(false); - Handle exception(thread, ex); + Handle exception(thread, WispThread::is_current_death_pending(thread)? + (oopDesc*) Universe::wisp_thread_death_exception() : ex); nm = CodeCache::find_nmethod(pc); assert(nm != NULL, "this is not an nmethod"); // Adjust the pc as needed/ diff --git a/src/hotspot/share/classfile/javaClasses.cpp b/src/hotspot/share/classfile/javaClasses.cpp index d46df5a6432..48b7e63146f 100644 --- a/src/hotspot/share/classfile/javaClasses.cpp +++ b/src/hotspot/share/classfile/javaClasses.cpp @@ -4522,6 +4522,8 @@ int com_alibaba_wisp_engine_WispTask::_activeCount_offset = 0; int com_alibaba_wisp_engine_WispTask::_stealCount_offset = 0; int com_alibaba_wisp_engine_WispTask::_stealFailureCount_offset = 0; int com_alibaba_wisp_engine_WispTask::_preemptCount_offset = 0; +int com_alibaba_wisp_engine_WispTask::_shutdownPending_offset = 0; + #define WISPTASK_FIELDS_DO(macro) \ macro(_jvmParkStatus_offset, k, vmSymbols::jvmParkStatus_name(), int_signature, false); \ @@ -4594,6 +4596,10 @@ int com_alibaba_wisp_engine_WispTask::get_stealFailureCount(oop obj) { return obj->int_field(_stealFailureCount_offset); } +bool com_alibaba_wisp_engine_WispTask::get_shutdownPending(oop obj) { + return obj->bool_field(_shutdownPending_offset); +} + #if INCLUDE_CDS void java_nio_Buffer::serialize_offsets(SerializeClosure* f) { BUFFER_FIELDS_DO(FIELD_SERIALIZE_OFFSET); diff --git a/src/hotspot/share/classfile/javaClasses.hpp b/src/hotspot/share/classfile/javaClasses.hpp index f0517fe8eff..10d4e496fc7 100644 --- a/src/hotspot/share/classfile/javaClasses.hpp +++ b/src/hotspot/share/classfile/javaClasses.hpp @@ -1643,6 +1643,7 @@ class com_alibaba_wisp_engine_WispTask: AllStatic { static int _stealCount_offset; static int _stealFailureCount_offset; static int _preemptCount_offset; + static int _shutdownPending_offset; public: static void set_jvmParkStatus(oop obj, jint status); static int get_jvmParkStatus(oop obj); @@ -1656,6 +1657,7 @@ class com_alibaba_wisp_engine_WispTask: AllStatic { static int get_stealFailureCount(oop obj); static int get_preemptCount(oop obj); static void set_preemptCount(oop obj, jint count); + static bool get_shutdownPending(oop obj); static void compute_offsets(); static void serialize_offsets(SerializeClosure* f) NOT_CDS_RETURN; diff --git a/src/hotspot/share/classfile/vmSymbols.hpp b/src/hotspot/share/classfile/vmSymbols.hpp index 2c19f90292d..0b4991d179f 100644 --- a/src/hotspot/share/classfile/vmSymbols.hpp +++ b/src/hotspot/share/classfile/vmSymbols.hpp @@ -820,6 +820,7 @@ template(string_string_string_string_bool_bool_void_signature, "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ZZ)V") \ template(isInCritical_name, "isInCritical") \ template(jdkParkStatus_name, "jdkParkStatus") \ + template(shutdownPending_name, "shutdownPending") \ template(jvmParkStatus_name, "jvmParkStatus") \ template(id_name, "id") \ template(threadWrapper_name, "threadWrapper") \ diff --git a/src/hotspot/share/interpreter/interpreterRuntime.cpp b/src/hotspot/share/interpreter/interpreterRuntime.cpp index fdc9fd3a410..b3af6422fc3 100644 --- a/src/hotspot/share/interpreter/interpreterRuntime.cpp +++ b/src/hotspot/share/interpreter/interpreterRuntime.cpp @@ -485,7 +485,12 @@ IRT_END IRT_ENTRY(address, InterpreterRuntime::exception_handler_for_exception(JavaThread* thread, oopDesc* exception)) LastFrameAccessor last_frame(thread); - Handle h_exception(thread, exception); +// Wisp relys on threadDeath as a special uncatchable exception to shutdown +// all running coroutines. However, exceptions throw in finally block +// will overwrite current threadDeath exception, thus we need to replace +// all exception with threadDeath after coroutine shutdown. + Handle h_exception(thread, WispThread::is_current_death_pending(thread)? + (oopDesc*) Universe::wisp_thread_death_exception() : exception); methodHandle h_method (thread, last_frame.method()); constantPoolHandle h_constants(thread, h_method->constants()); bool should_repeat; diff --git a/src/hotspot/share/memory/universe.cpp b/src/hotspot/share/memory/universe.cpp index 314e46d6623..791b8a476a7 100644 --- a/src/hotspot/share/memory/universe.cpp +++ b/src/hotspot/share/memory/universe.cpp @@ -122,6 +122,7 @@ oop Universe::_out_of_memory_error_class_metaspace = NULL; oop Universe::_out_of_memory_error_array_size = NULL; oop Universe::_out_of_memory_error_gc_overhead_limit = NULL; oop Universe::_out_of_memory_error_realloc_objects = NULL; +oop Universe::_wisp_thread_death_exception = NULL; oop Universe::_tenant_death_exception = NULL; oop Universe::_delayed_stack_overflow_error_message = NULL; objArrayOop Universe::_preallocated_out_of_memory_error_array = NULL; @@ -208,6 +209,9 @@ void Universe::oops_do(OopClosure* f, bool do_all) { } f->do_oop((oop*)&_delayed_stack_overflow_error_message); f->do_oop((oop*)&_preallocated_out_of_memory_error_array); + if (EnableCoroutine && Wisp2ThreadStop) { + f->do_oop((oop*)&_wisp_thread_death_exception); + } f->do_oop((oop*)&_null_ptr_exception_instance); f->do_oop((oop*)&_arithmetic_exception_instance); f->do_oop((oop*)&_virtual_machine_error_instance); @@ -1054,7 +1058,12 @@ bool universe_post_init() { Universe::_out_of_memory_error_gc_overhead_limit = ik->allocate_instance(CHECK_false); Universe::_out_of_memory_error_realloc_objects = ik->allocate_instance(CHECK_false); - + if (EnableCoroutine && Wisp2ThreadStop) { + // Create the special exception used to kill thread + k = SystemDictionary::resolve_or_fail(vmSymbols::java_lang_ThreadDeath(), true, CHECK_false); + assert(NULL != k, "pre-condition"); + Universe::_wisp_thread_death_exception = InstanceKlass::cast(k)->allocate_instance(CHECK_false); + } if (MultiTenant && TenantThreadStop) { // Create the special exception used to kill thread k = SystemDictionary::resolve_or_fail(vmSymbols::com_alibaba_tenant_TenantDeathException(), true, CHECK_false); diff --git a/src/hotspot/share/memory/universe.hpp b/src/hotspot/share/memory/universe.hpp index c712b89aa66..faeddfe4514 100644 --- a/src/hotspot/share/memory/universe.hpp +++ b/src/hotspot/share/memory/universe.hpp @@ -158,6 +158,7 @@ class Universe: AllStatic { static oop _out_of_memory_error_array_size; static oop _out_of_memory_error_gc_overhead_limit; static oop _out_of_memory_error_realloc_objects; + static oop _wisp_thread_death_exception; // preallocated cause message for delayed StackOverflowError static oop _delayed_stack_overflow_error_message; @@ -377,6 +378,7 @@ class Universe: AllStatic { static oop out_of_memory_error_gc_overhead_limit() { return gen_out_of_memory_error(_out_of_memory_error_gc_overhead_limit); } static oop out_of_memory_error_realloc_objects() { return gen_out_of_memory_error(_out_of_memory_error_realloc_objects); } static oop delayed_stack_overflow_error_message() { return _delayed_stack_overflow_error_message; } + static oop wisp_thread_death_exception() { return _wisp_thread_death_exception; } // special exception used for killing thread static oop tenant_death_exception() { return _tenant_death_exception; } diff --git a/src/hotspot/share/oops/method.cpp b/src/hotspot/share/oops/method.cpp index b5538f80915..96a160e319e 100644 --- a/src/hotspot/share/oops/method.cpp +++ b/src/hotspot/share/oops/method.cpp @@ -210,6 +210,9 @@ int Method::fast_exception_handler_bci_for(const methodHandle& mh, Klass* ex_kla is_tenant_death_exception = true; } } + bool is_force_thread_death_exception = (EnableCoroutine && Wisp2ThreadStop + && (ex_klass == SystemDictionary::ThreadDeath_klass() + || ex_klass->is_subtype_of(SystemDictionary::ThreadDeath_klass()))); // iterate through all entries sequentially constantPoolHandle pool(THREAD, mh->constants()); for (int i = 0; i < length; i ++) { @@ -238,6 +241,9 @@ int Method::fast_exception_handler_bci_for(const methodHandle& mh, Klass* ex_kla && is_tenant_death_exception && THREAD->is_Java_thread()) { continue; } + if (is_force_thread_death_exception) { + continue; + } return handler_bci; } } diff --git a/src/hotspot/share/opto/runtime.cpp b/src/hotspot/share/opto/runtime.cpp index c2f02dc15f7..d13edebab7f 100644 --- a/src/hotspot/share/opto/runtime.cpp +++ b/src/hotspot/share/opto/runtime.cpp @@ -1328,7 +1328,8 @@ JRT_ENTRY_NO_ASYNC(address, OptoRuntime::handle_exception_C_helper(JavaThread* t assert(thread->exception_oop() != NULL, "exception oop is found"); address handler_address = NULL; - Handle exception(thread, thread->exception_oop()); + Handle exception(thread, WispThread::is_current_death_pending(thread)? + Universe::wisp_thread_death_exception() : thread->exception_oop()); address pc = thread->exception_pc(); // Clear out the exception oop and pc since looking up an diff --git a/src/hotspot/share/prims/unsafe.cpp b/src/hotspot/share/prims/unsafe.cpp index 9bdf31195b3..ff7e2a1eea5 100644 --- a/src/hotspot/share/prims/unsafe.cpp +++ b/src/hotspot/share/prims/unsafe.cpp @@ -1150,7 +1150,7 @@ JVM_ENTRY(jboolean, CoroutineSupport_stealCoroutine(JNIEnv* env, jclass klass, j // The lock will also block coroutine switch operation, // so we must finish the steal operation as soon as possible. Coroutine* coro = (Coroutine*) coroPtr; - if (coro == NULL || coro->enable_steal_count() != coro->java_call_counter()) { + if (coro == NULL || coro->enable_steal_count() != coro->java_call_counter()|| coro->is_yielding()) { return false; // an Exception throws and the coroutine being stealed is exited } assert(coro->thread() != thread, "steal from self"); @@ -1176,7 +1176,7 @@ JVM_ENTRY (jboolean, CoroutineSupport_shouldThrowException0(JNIEnv* env, jclass assert(EnableCoroutine, "pre-condition"); Coroutine* coro = (Coroutine*)coroPtr; assert(coro == thread->current_coroutine(), "invariant"); - return !coro->is_yielding() && coro->clinit_call_count() == 0; + return coro->clinit_call_count() == 0; JVM_END UNSAFE_ENTRY(jint, Unsafe_GetMaxVectorSize(JNIEnv *env, jobject unsafe, jobject clazz)) diff --git a/src/hotspot/share/runtime/arguments.cpp b/src/hotspot/share/runtime/arguments.cpp index 2465972e651..335a72ccd93 100644 --- a/src/hotspot/share/runtime/arguments.cpp +++ b/src/hotspot/share/runtime/arguments.cpp @@ -4032,6 +4032,10 @@ jint Arguments::parse(const JavaVMInitArgs* initial_cmd_args) { } } + if (Wisp2ThreadStop && !UseWisp2) { + vm_exit_during_initialization("Wisp2ThreadStop only works with UseWisp2"); + } + if (EagerAppCDS && !FLAG_IS_CMDLINE(NotFoundClassOpt)) { NotFoundClassOpt = true; } diff --git a/src/hotspot/share/runtime/coroutine.cpp b/src/hotspot/share/runtime/coroutine.cpp index fcab6f446be..bf077a917fe 100644 --- a/src/hotspot/share/runtime/coroutine.cpp +++ b/src/hotspot/share/runtime/coroutine.cpp @@ -742,7 +742,10 @@ void WispThread::park(long millis, const ObjectWaiter* ow) { if (jt->has_pending_exception()) { assert((MultiTenant && TenantThreadStop && jt->has_tenant_death_exception()) || jt->pending_exception()->klass() == SystemDictionary::ThreadDeath_klass(), "tenant_death_exception expected"); - jt->clear_pending_exception(); + if (EnableCoroutine && Wisp2ThreadStop && jt->pending_exception()->klass() == SystemDictionary::ThreadDeath_klass()) { + jt->set_pending_async_exception(jt->pending_exception()); + } + jt->clear_pending_exception(); } ThreadStateTransition::transition(jt, _thread_in_vm, _thread_blocked); @@ -788,6 +791,10 @@ void WispThread::unpark(int task_id, bool using_wisp_park, bool proxy_unpark, Pa // due to the fact that we modify the priority of Wisp_lock from `non-leaf` to `special`, // so we'd use `MutexLockerEx` and `_no_safepoint_check_flag` to make our program run + // We don't want to yield a safepoint here, so we use the `special` rank to prevent it: + // In UnlockNode, we will call java in Wisp. We can't yield a safepoint that may cause + // deoptimization, which is very fatal for monitors. + NoSafepointVerifier nsv; MutexLockerEx mu(Wisp_lock, Mutex::_no_safepoint_check_flag); wisp_thread->_unpark_status = WispThread::_proxy_unpark_begin; _proxy_unpark->append(task_id); @@ -846,9 +853,18 @@ void WispThread::unpark(int task_id, bool using_wisp_park, bool proxy_unpark, Pa } int WispThread::get_proxy_unpark(jintArray res) { + // We need to hoist code of safepoint state out of MutexLocker to prevent safepoint deadlock problem + // See the same usage: SR_lock in `JavaThread::exit()` + ThreadBlockInVM tbivm(JavaThread::current()); + // When wait()ing, GC may occur. So we shouldn't verify GC. + NoSafepointVerifier nsv(true, false); MutexLockerEx mu(Wisp_lock, Mutex::_no_safepoint_check_flag); while (_proxy_unpark == NULL || _proxy_unpark->is_empty()) { - Wisp_lock->wait(); + // we need to use _no_safepoint_check_flag, which won't yield a safepoint. + // origin wait(false): first hold lock then do a safepoint. + // Other thread will stuck when grabbing the lock. + // current wait(true): first safepoint then hold lock to deal with the problem. + Wisp_lock->wait(Mutex::_no_safepoint_check_flag); } typeArrayOop a = typeArrayOop(JNIHandles::resolve_non_null(res)); if (a == NULL) { @@ -1012,6 +1028,9 @@ void Coroutine::after_safepoint(JavaThread* thread) { "Only SOF/OOM/ThreadDeath/TenantDeath happens here"); // If it's a SOF / OOM / ThreadDeath / TenantDeath exception, we'd clear it // because polling page stub shouldn't have a pending exception. + if (UseWisp2 && Wisp2ThreadStop && thread->pending_exception()->klass() == SystemDictionary::ThreadDeath_klass()) { + thread->set_pending_async_exception(thread->pending_exception()); + } thread->clear_pending_exception(); } diff --git a/src/hotspot/share/runtime/coroutine.hpp b/src/hotspot/share/runtime/coroutine.hpp index 1aea37886ff..c4d07a8451d 100644 --- a/src/hotspot/share/runtime/coroutine.hpp +++ b/src/hotspot/share/runtime/coroutine.hpp @@ -415,6 +415,20 @@ class WispThread: public JavaThread { static void set_wisp_booted(Thread* thread); static const char *print_os_park_reason(int reason); static const char *print_blocking_status(int status); + static const bool is_current_death_pending(JavaThread *thread) { + if (EnableCoroutine && Wisp2ThreadStop) { + if (thread->current_coroutine() == NULL) { + // Main Thread + return false; + } + if (thread->current_coroutine()->wisp_task() == NULL) { + // Blacklisted threads that are not converted to couroutines + return false; + } + return com_alibaba_wisp_engine_WispTask::get_shutdownPending(thread->current_coroutine()->wisp_task()); + } + return false; + } virtual bool is_Wisp_thread() const { return true; } diff --git a/src/hotspot/share/runtime/globals_ext.hpp b/src/hotspot/share/runtime/globals_ext.hpp index a569399624c..f58bf373c1d 100644 --- a/src/hotspot/share/runtime/globals_ext.hpp +++ b/src/hotspot/share/runtime/globals_ext.hpp @@ -74,6 +74,8 @@ \ product(bool, UseWisp2, false, \ "Enable Wisp2") \ + product(bool, Wisp2ThreadStop, false, \ + "ThreadDeath cannot be catched") \ \ manageable(bool, PrintThreadCoroutineInfo, false, \ "print the park/unpark information for thread coroutine") \ diff --git a/src/hotspot/share/runtime/safepoint.cpp b/src/hotspot/share/runtime/safepoint.cpp index 782c121773e..c6d20bc416c 100644 --- a/src/hotspot/share/runtime/safepoint.cpp +++ b/src/hotspot/share/runtime/safepoint.cpp @@ -985,6 +985,10 @@ void SafepointSynchronize::block(JavaThread *thread) { thread->handle_special_runtime_exit_condition( !thread->is_at_poll_safepoint() && (state != _thread_in_native_trans)); } + + if (EnableCoroutine) { + Coroutine::after_safepoint(thread); + } } // ------------------------------------------------------------------------------------------------------ diff --git a/src/java.base/share/classes/com/alibaba/rcm/ResourceContainer.java b/src/java.base/share/classes/com/alibaba/rcm/ResourceContainer.java index 92920ea90a8..7fcd8402f7b 100644 --- a/src/java.base/share/classes/com/alibaba/rcm/ResourceContainer.java +++ b/src/java.base/share/classes/com/alibaba/rcm/ResourceContainer.java @@ -185,4 +185,8 @@ static ResourceContainer current() { * Then the container state will become {@link State#DEAD}. */ void destroy(); + + Long getId(); + + Long getConsumedAmount(ResourceType resourceType); } diff --git a/src/java.base/share/classes/com/alibaba/rcm/ResourceContainerMonitor.java b/src/java.base/share/classes/com/alibaba/rcm/ResourceContainerMonitor.java new file mode 100644 index 00000000000..90d0f5c1c1c --- /dev/null +++ b/src/java.base/share/classes/com/alibaba/rcm/ResourceContainerMonitor.java @@ -0,0 +1,49 @@ +package com.alibaba.rcm; + +import com.alibaba.rcm.Constraint; +import com.alibaba.rcm.ResourceContainer; +import com.alibaba.rcm.ResourceType; +import com.alibaba.rcm.internal.AbstractResourceContainer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class ResourceContainerMonitor { + private static Map tenantContainerMap = new ConcurrentHashMap<>(); + private static AtomicLong idGen = new AtomicLong(0); + + public static long register(ResourceContainer resourceContainer) { + long id = idGen.getAndIncrement(); + tenantContainerMap.put(id, resourceContainer); + return id; + } + + public static void deregister(long id) { + tenantContainerMap.remove(id); + } + + public static ResourceContainer getContainerById(long id) { + return tenantContainerMap.get(id); + } + + public static List getAllContainerIds() { + return new ArrayList<>(tenantContainerMap.keySet()); + } + + public static List getConstraintsById(long id) { + AbstractResourceContainer resourceContainer = (AbstractResourceContainer) tenantContainerMap.get(id); + return StreamSupport + .stream(resourceContainer.getConstraints().spliterator(), false) + .collect(Collectors.toList()); + } + + public long getContainerConsumedAmount(long id) { + return 0; + } + +} diff --git a/src/java.base/share/classes/com/alibaba/rcm/internal/AbstractResourceContainer.java b/src/java.base/share/classes/com/alibaba/rcm/internal/AbstractResourceContainer.java index 30c655d4692..55fca9ba397 100644 --- a/src/java.base/share/classes/com/alibaba/rcm/internal/AbstractResourceContainer.java +++ b/src/java.base/share/classes/com/alibaba/rcm/internal/AbstractResourceContainer.java @@ -2,11 +2,14 @@ import com.alibaba.rcm.Constraint; import com.alibaba.rcm.ResourceContainer; +import com.alibaba.rcm.ResourceContainerMonitor; +import com.alibaba.rcm.ResourceType; import jdk.internal.misc.RCMAccesss; import jdk.internal.misc.VM; import jdk.internal.misc.SharedSecrets; import java.util.Collections; +import java.util.List; import java.util.function.Predicate; /** @@ -43,6 +46,12 @@ public Predicate getResourceContainerInheritancePredicate(ResourceContai protected final static AbstractResourceContainer ROOT = new RootContainer(); private Predicate threadInherited = DEFAULT_PREDICATE; + final long id; + + protected AbstractResourceContainer() { + id = ResourceContainerMonitor.register(this); + } + public static AbstractResourceContainer root() { return ROOT; @@ -57,6 +66,12 @@ public static AbstractResourceContainer current() { return SharedSecrets.getJavaLangAccess().getResourceContainer(Thread.currentThread()); } + public abstract List getActiveContainerThreadIds(); + + public abstract Long getConsumedAmount(ResourceType resourceType); + + public abstract Long getResourceLimitReachedCount(ResourceType resourceType); + @Override public void run(Runnable command) { if (getState() != State.RUNNING) { @@ -79,6 +94,12 @@ public void run(Runnable command) { } } + + @Override + public Long getId() { + return id; + } + /** * Attach to this resource container. * Ensure {@link ResourceContainer#current()} as a root container @@ -155,5 +176,22 @@ public Iterable getConstraints() { public void destroy() { throw new UnsupportedOperationException("destroy() is not supported by root container"); } + + @Override + public List getActiveContainerThreadIds() { + // root resource container is not monitored + return Collections.emptyList(); + } + + @Override + public Long getConsumedAmount(ResourceType resourceType) { + return 0L; + } + + @Override + public Long getResourceLimitReachedCount(ResourceType resourceType) { + return 0L; + } + } } diff --git a/src/java.base/share/classes/com/alibaba/rcm/internal/RCMUnsafe.java b/src/java.base/share/classes/com/alibaba/rcm/internal/RCMUnsafe.java index 13743d2e1e5..d3631ed4eef 100644 --- a/src/java.base/share/classes/com/alibaba/rcm/internal/RCMUnsafe.java +++ b/src/java.base/share/classes/com/alibaba/rcm/internal/RCMUnsafe.java @@ -2,6 +2,7 @@ import com.alibaba.rcm.ResourceContainer; +import com.alibaba.rcm.ResourceContainerMonitor; import java.util.Objects; import java.util.function.Predicate; @@ -40,5 +41,14 @@ public static void killThreads(ResourceContainer resourceContainer) { assert resourceContainer instanceof AbstractResourceContainer; Objects.requireNonNull(resourceContainer); ((AbstractResourceContainer) resourceContainer).killThreads(); + ResourceContainerMonitor.deregister(resourceContainer.getId()); + } + + + public static void attach(ResourceContainer container) { + if (container != ResourceContainer.root()) + ((AbstractResourceContainer) container).attach(); + else + ((AbstractResourceContainer)ResourceContainer.current()).detach(); } } \ No newline at end of file diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispCarrier.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispCarrier.java index d77b5ed6517..a786f361b42 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispCarrier.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispCarrier.java @@ -125,7 +125,7 @@ final WispTask runTaskInternal(Runnable target, String name, Thread thread, Clas } finally { isInCritical = isInCritical0; } - yieldTo(wispTask); + yieldTo(wispTask, false); runWispTaskEpilog(); return wispTask; @@ -136,14 +136,20 @@ final WispTask runTaskInternal(Runnable target, String name, Thread thread, Clas * WispTask must call {@code taskExit()} to exit safely. */ void taskExit() { // and exit - current.status = WispTask.Status.ZOMBIE; TASK_COUNT_UPDATER.decrementAndGet(engine); current.countExecutionTime(switchTimestamp); switchTimestamp = 0; + current.setEpollArray(0); - unregisterEvent(); - returnTaskToCache(current); + boolean cached = !current.shutdownPending && returnTaskToCache(current); + TASK_COUNT_UPDATER.decrementAndGet(engine); + if (cached) { + current.status = WispTask.Status.CACHED; + } else { + current.status = WispTask.Status.DEAD; + WispTask.cleanExitedTask(current); + } // reset threadWrapper after call returnTaskToCache, // since the threadWrapper will be used in Thread.currentThread() @@ -153,7 +159,7 @@ void taskExit() { // and exit // In Tenant killing process, we have an pending exception, // WispTask.Coroutine's loop will be breaked // invoke an explicit reschedule instead of return - schedule(); + schedule(!cached); } /** @@ -182,16 +188,22 @@ private WispTask getTaskFromCache() { } /** - * return task back to global cache + * cache task back to global or local cache and return true, if beyond the capacity of + * cache will return false. */ - private void returnTaskToCache(WispTask task) { + private boolean returnTaskToCache(WispTask task) { // reuse exited wispTasks from shutdown wispEngine is very tricky, so we'd better not return // these tasks to global cache if (taskCache.size() > WispConfiguration.WISP_ENGINE_TASK_CACHE_SIZE && !engine.hasBeenShutdown) { - engine.groupTaskCache.add(task); + if (engine.groupTaskCache.size() > WispConfiguration.WISP_ENGINE_TASK_GLOBAL_CACHE_SIZE) { + return false; + } else { + engine.groupTaskCache.add(task); + } } else { taskCache.add(task); } + return true; } /** @@ -214,7 +226,7 @@ void destroy() { * Block current coroutine and do scheduling. * Typically called when resource is not ready. */ - final void schedule() { + final void schedule(boolean terminal) { assert WispCarrier.current() == this; WispTask current = this.current; current.countExecutionTime(switchTimestamp); @@ -224,21 +236,27 @@ final void schedule() { current.controlGroup.calcCpuTicks(current); } current.resumeEntry.setStealEnable(true); - yieldTo(threadTask); // letting the scheduler choose runnable task + yieldTo(threadTask, terminal); // letting the scheduler choose runnable task current.carrier.checkAndDispatchShutdown(); } - private void checkAndDispatchShutdown() { + void checkAndDispatchShutdown() { + assert WispCarrier.current() == this; WispTask current = WispCarrier.current().getCurrentTask(); - if ((engine.hasBeenShutdown - || (current.inDestoryedGroup() && current.inheritedFromNonRootContainer())) - && !WispTask.SHUTDOWN_TASK_NAME.equals(current.getName()) - && current.isAlive() + if (shutdownPending(current) && CoroutineSupport.checkAndThrowException(current.ctx)) { + current.shutdownPending = true; throw new ThreadDeath(); } } + boolean shutdownPending(WispTask current) { + return (engine.hasBeenShutdown + || (current.inDestoryedGroup() && current.inheritedFromNonRootContainer())) + && !WispTask.SHUTDOWN_TASK_NAME.equals(current.getName()) + && current.isAlive(); + } + /** * Wake up a {@link WispTask} that belongs to this carrier * @@ -291,11 +309,12 @@ public void run() { if (task.controlGroup != null) { long res = task.controlGroup.checkCpuLimit(task, false); if (res != 0) { + task.controlGroup.cpuLimitationReached++; current.resumeLater(System.nanoTime() + res, task); return; } } - current.yieldTo(task); + current.yieldTo(task, false); current.runWispTaskEpilog(); } @@ -350,7 +369,7 @@ private Coroutine.StealResult steal(WispTask task) { * * @param task coroutine to run */ - private boolean yieldTo(WispTask task) { + private boolean yieldTo(WispTask task, boolean terminal) { assert task != null; assert WispCarrier.current() == this; assert task.carrier == this; @@ -358,7 +377,7 @@ private boolean yieldTo(WispTask task) { schedTick++; - if (task.status == WispTask.Status.ZOMBIE) { + if (task.status != WispTask.Status.ALIVE) { unregisterEvent(task); return false; } @@ -368,7 +387,7 @@ private boolean yieldTo(WispTask task) { counter.incrementSwitchCount(); switchTimestamp = WispEngine.getNanoTime(); assert !isInCritical; - WispTask.switchTo(from, task); + WispTask.switchTo(from, task, terminal); // Since carrier is changed with stealing, we shouldn't directly access carrier's member any more. assert WispCarrier.current().current == from; assert !from.carrier.isInCritical; @@ -397,9 +416,9 @@ void yield() { assert yieldingTask == null; yieldingTask = current; // delay it, make sure wakeupTask is called after yield out - schedule(); + schedule(false); } - current.carrier.checkAndDispatchShutdown(); + WispCarrier.current().checkAndDispatchShutdown(); } else { WispEngine.JLA.yield0(); } diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispConfiguration.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispConfiguration.java index 14c877114ad..c079caf1163 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispConfiguration.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispConfiguration.java @@ -36,9 +36,11 @@ class WispConfiguration { static final boolean WISP_HIGH_PRECISION_TIMER; static final boolean WISP_USE_STEAL_LOCK; static final int WISP_ENGINE_TASK_CACHE_SIZE; + static final int WISP_ENGINE_TASK_GLOBAL_CACHE_SIZE; static final int WISP_SCHEDULE_STEAL_RETRY; static final int WISP_SCHEDULE_PUSH_RETRY; static final int WISP_SCHEDULE_HELP_STEAL_RETRY; + static final int WISP_SHUTDOWN_SLEEP_TIME; static final WispScheduler.SchedulingPolicy SCHEDULING_POLICY; static final boolean USE_DIRECT_SELECTOR_WAKEUP; static final boolean CARRIER_AS_POLLER; @@ -47,6 +49,9 @@ class WispConfiguration { // io static final boolean WISP_ENABLE_SOCKET_LOCK; + static final boolean WISP_ENABLE_ASYNC_FILE_IO; + static final int WISP_FILE_IO_WORKER_CORE; + static final int WISP_FILE_IO_WORKER_MAX; // wisp control group static final int WISP_CONTROL_GROUP_CFS_PERIOD; @@ -73,6 +78,8 @@ public Properties run() { PARK_ONE_MS_AT_LEAST = parseBooleanParameter(p, "com.alibaba.wisp.parkOneMs", true); WORKER_COUNT = parsePositiveIntegerParameter(p, "com.alibaba.wisp.carrierEngines", Runtime.getRuntime().availableProcessors()); + WISP_SHUTDOWN_SLEEP_TIME = parsePositiveIntegerParameter(p, "com.alibaba.wisp.shutdownSleepTime", + 100); POLLER_SHARDING_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.pollerShardingSize", 8); ENABLE_HANDOFF = parseBooleanParameter(p, "com.alibaba.wisp.enableHandOff", TRANSPARENT_WISP_SWITCH); @@ -98,6 +105,7 @@ public Properties run() { WISP_HIGH_PRECISION_TIMER = parseBooleanParameter(p, "com.alibaba.wisp.highPrecisionTimer", false); WISP_USE_STEAL_LOCK = parseBooleanParameter(p, "com.alibaba.wisp.useStealLock", true); WISP_ENGINE_TASK_CACHE_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.engineTaskCache", 20); + WISP_ENGINE_TASK_GLOBAL_CACHE_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.engineTaskGlobalCache", WORKER_COUNT * 10); WISP_SCHEDULE_STEAL_RETRY = parsePositiveIntegerParameter(p, "com.alibaba.wisp.schedule.stealRetry", Math.max(1, WORKER_COUNT / 2)); WISP_SCHEDULE_PUSH_RETRY = parsePositiveIntegerParameter(p, "com.alibaba.wisp.schedule.pushRetry", WORKER_COUNT); WISP_SCHEDULE_HELP_STEAL_RETRY = parsePositiveIntegerParameter(p, "com.alibaba.wisp.schedule.helpStealRetry", Math.max(1, WORKER_COUNT / 4)); @@ -110,6 +118,9 @@ public Properties run() { // WISP_CONTROL_GROUP_CFS_PERIOD default value is 0(Us), WispControlGroup will estimate a cfs period according to SYSMON_TICK_US. // If WISP_CONTROL_GROUP_CFS_PERIOD was configed by user, WispControlGroup will adopt it directly and won't estimate. WISP_CONTROL_GROUP_CFS_PERIOD = parsePositiveIntegerParameter(p, "com.alibaba.wisp.controlGroup.cfsPeriod", 0); + WISP_ENABLE_ASYNC_FILE_IO = parseBooleanParameter(p, "com.alibaba.wisp.enableAsyncFileIO", false); + WISP_FILE_IO_WORKER_CORE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.fileIOCoreWorkerCnt", WORKER_COUNT); + WISP_FILE_IO_WORKER_MAX = parsePositiveIntegerParameter(p, "com.alibaba.wisp.fileIOMaxWorkerCnt", WORKER_COUNT); checkCompatibility(); } diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispControlGroup.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispControlGroup.java index a2fa198d709..78045bfd69e 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispControlGroup.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispControlGroup.java @@ -5,6 +5,7 @@ import com.alibaba.rcm.ResourceType; import com.alibaba.rcm.internal.AbstractResourceContainer; +import java.util.ArrayList; import java.util.List; import java.util.Collections; import java.util.Objects; @@ -31,6 +32,9 @@ class WispControlGroup extends AbstractExecutorService { private static final int MAX_PERIOD = 100_000; // limit min period duration 10ms. private static final int MIN_PERIOD = 10_000; + + private long totalConsume = 0; + long cpuLimitationReached = 0; /** * ESTIMATED_PERIOD is an estimated cpu_cfs period according to wisp preemptive * schedule period, make sure there are at least SCHEDULE_TIMES wisp schedule @@ -143,6 +147,7 @@ void calcCpuTicks(WispTask task) { long usage = System.nanoTime() - task.enterTs; remainQuota.addAndGet(-usage); task.enterTs = 0; + totalConsume += usage; } private void attach() { @@ -289,6 +294,33 @@ protected void killThreads() { } } } + + @Override + public List getActiveContainerThreadIds() { + List threadIdList = new ArrayList<>(); + for (WispTask task : WispTask.id2Task.values()) { + if (task.isAlive() + && task.getThreadWrapper() != null + && task.controlGroup == WispControlGroup.this) { + threadIdList.add(task.getThreadWrapper().getId()); + } + } + return threadIdList; + } + + @Override + public Long getConsumedAmount(ResourceType resourceType) { + if (resourceType != ResourceType.CPU_PERCENT) + return 0L; + return totalConsume; + } + + @Override + public Long getResourceLimitReachedCount(ResourceType resourceType) { + if (resourceType != ResourceType.CPU_PERCENT) + return 0L; + return cpuLimitationReached; + } }; } } diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispEngine.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispEngine.java index e60fa4d0bd0..82853cfbe9a 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispEngine.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispEngine.java @@ -113,6 +113,9 @@ public int compare(Thread o1, Thread o2) { initializeClasses(); JLA.wispBooted(); } + if (WispConfiguration.WISP_ENABLE_ASYNC_FILE_IO) { + WispFileIO.initializeWispFileIOClass(); + } } private static void initializeClasses() { @@ -181,6 +184,9 @@ public void run() { if (WispConfiguration.WISP_PROFILE_LOG_ENABLED) { WispPerfCounterMonitor.INSTANCE.startDaemon(); } + if (WispConfiguration.WISP_ENABLE_ASYNC_FILE_IO) { + WispFileIO.startWispFileIODaemon(); + } } } @@ -483,7 +489,7 @@ public void run() { } } // wait tasks to exit on fixed frequency instead of polling - WispTask.jdkPark(TimeUnit.MILLISECONDS.toNanos(1)); + WispTask.jdkPark(TimeUnit.MILLISECONDS.toNanos(WispConfiguration.WISP_SHUTDOWN_SLEEP_TIME)); } while (!tasks.isEmpty()); finishShutdown(); } @@ -549,15 +555,16 @@ private List getRunningTasks(WispControlGroup group) { && task.carrier.engine == WispEngine.this && !task.isThreadTask() && !task.getName().equals(WispTask.SHUTDOWN_TASK_NAME) - && (group == null - || task.inDestoryedGroup() && task.inheritedFromNonRootContainer())) { + && (group == null || (task.inDestoryedGroup() && task.controlGroup == group))) { runningTasks.add(task); } } - return runningTasks; + } catch (Exception e) { + e.printStackTrace(); } finally { carrier.isInCritical = isInCritical0; } + return runningTasks; } @Override diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispFileIO.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispFileIO.java new file mode 100644 index 00000000000..4dd8263f319 --- /dev/null +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispFileIO.java @@ -0,0 +1,111 @@ +package com.alibaba.wisp.engine; + +import java.io.IOException; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import jdk.internal.misc.SharedSecrets; +import jdk.internal.misc.WispFileSyncIOAccess; + + +enum WispFileIO { + INSTANCE; + + private static volatile boolean wispFileWorkerStarted = false; + + static void setWispFileSyncIOIOAccess() { + if (SharedSecrets.getWispFileSyncIOAccess() == null) { + SharedSecrets.setWispFileSyncIOAccess(new WispFileSyncIOAccess() { + @Override + public boolean usingAsyncFileIO() { + return wispFileWorkerStarted + && WispEngine.runningAsCoroutine(Thread.currentThread()); + } + + @Override + public T executeAsAsyncFileIO(Callable command) throws IOException { + return WispFileIO.INSTANCE.invokeIOTask(command); + } + }); + } + } + + /* + * Initialize the WispFileIO class, called after System.initializeSystemClass by VM. + **/ + static void initializeWispFileIOClass() { + try { + Class.forName(WispFileIO.class.getName()); + } catch (Exception e) { + throw new ExceptionInInitializerError(e); + } + } + + static void startWispFileIODaemon() { + WispFileIO.INSTANCE.startDaemon(WispEngine.daemonThreadGroup); + setWispFileSyncIOIOAccess(); + } + + + private ExecutorService executor; + + private ThreadGroup threadGroup; + + void startDaemon(ThreadGroup g) { + threadGroup = g; + this.executor = new ThreadPoolExecutor(WispConfiguration.WISP_FILE_IO_WORKER_CORE, + WispConfiguration.WISP_FILE_IO_WORKER_MAX, Long.MAX_VALUE, + TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new WispFileIOThreadPoolFactory()); + wispFileWorkerStarted = true; + } + + public T invokeIOTask(Callable command) throws IOException { + Future future = submitIOTask(command); + T result; + boolean interrupted = false; + try { + while (true) { + try { + result = future.get(); + return result; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else { + throw new Error(e); + } + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + Future submitIOTask(Callable command) { + return executor.submit(command); + } + + private class WispFileIOThreadPoolFactory implements ThreadFactory { + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final static String namePrefix = "Wisp-FIO-worker-thread-"; + + WispFileIOThreadPoolFactory() { + } + + @Override + public Thread newThread(Runnable r) { + assert threadGroup != null; + Thread t = new Thread(threadGroup, r, namePrefix + threadNumber.getAndIncrement()); + t.setDaemon(true); + return t; + } + } +} diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispTask.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispTask.java index 5eb47d482be..3fca0d3ab0a 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispTask.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispTask.java @@ -87,7 +87,8 @@ static void trackTask(WispTask task) { enum Status { ALIVE, // ALIVE - ZOMBIE // exited + CACHED, // exited + DEAD // quited and never be used } private Runnable runnable; // runnable for created task @@ -139,6 +140,8 @@ enum Status { WispControlGroup controlGroup; long enterTs; + boolean shutdownPending; + WispTask(WispCarrier carrier, Coroutine ctx, boolean isRealTask, boolean isThreadTask) { this.isThreadTask = isThreadTask; this.id = isRealTask ? idGenerator.addAndGet(1) : -1; @@ -234,7 +237,7 @@ protected void run() { carrier.taskExit(); } } else { - carrier.schedule(); + carrier.schedule(false); } } } @@ -269,7 +272,7 @@ static Runnable wrapRunOutsideWisp(Runnable runnable) { *

* {@link #stealLock} is used in {@link WispCarrier#steal(WispTask)} . */ - static void switchTo(WispTask current, WispTask next) { + static void switchTo(WispTask current, WispTask next, boolean terminal) { assert next.ctx != null; assert WispCarrier.current() == current.carrier; assert current.carrier == next.carrier; @@ -280,7 +283,13 @@ static void switchTo(WispTask current, WispTask next) { STEAL_LOCK_UPDATER.lazySet(next, 1); // store load barrier is not necessary } - current.carrier.thread.getCoroutineSupport().unsafeSymmetricYieldTo(next.ctx); + if (terminal == true) { + current.carrier.thread.getCoroutineSupport().terminateCoroutine(next.ctx); + // should never run here. + assert false: "should not reach here"; + } else { + current.carrier.thread.getCoroutineSupport().unsafeSymmetricYieldTo(next.ctx); + } if (WispConfiguration.WISP_USE_STEAL_LOCK) { assert current.stealLock != 0; STEAL_LOCK_UPDATER.lazySet(current.from, 0); @@ -334,12 +343,8 @@ public String getName() { static final String SHUTDOWN_TASK_NAME = "SHUTDOWN_TASK"; - boolean isRunnable() { - return status == Status.ALIVE; - } - boolean isAlive() { - return status != Status.ZOMBIE; + return status == Status.ALIVE; } /** @@ -377,7 +382,7 @@ private void parkInternal(long timeoutNano, boolean fromJvm) { try { if (WispEngine.runningAsCoroutine(threadWrapper)) { setParkTime(); - carrier.schedule(); + carrier.schedule(false); } else { UA.park0(false, timeoutNano < 0 ? 0 : timeoutNano); } diff --git a/src/java.base/share/classes/java/dyn/CoroutineBase.java b/src/java.base/share/classes/java/dyn/CoroutineBase.java index 3a2a0440d97..e09bbbc122d 100644 --- a/src/java.base/share/classes/java/dyn/CoroutineBase.java +++ b/src/java.base/share/classes/java/dyn/CoroutineBase.java @@ -81,7 +81,7 @@ private final void startInternal() { // threadSupport is fixed by steal() threadSupport.beforeResume(this); - threadSupport.terminateCoroutine(); + threadSupport.terminateCoroutine(null); } assert threadSupport.getThread() == SharedSecrets.getJavaLangAccess().currentThread0(); } diff --git a/src/java.base/share/classes/java/dyn/CoroutineSupport.java b/src/java.base/share/classes/java/dyn/CoroutineSupport.java index 23947194ea5..7d33959f27f 100644 --- a/src/java.base/share/classes/java/dyn/CoroutineSupport.java +++ b/src/java.base/share/classes/java/dyn/CoroutineSupport.java @@ -230,18 +230,24 @@ void symmetricExitInternal(Coroutine coroutine) { /** * terminate current coroutine and yield forward + * @param target target */ - void terminateCoroutine() { + public void terminateCoroutine(Coroutine target) { assert currentCoroutine != threadCoroutine : "cannot exit thread coroutine"; - assert currentCoroutine != getNextCoroutine(currentCoroutine.nativeCoroutine) : "last coroutine shouldn't call coroutineexit"; lock(); Coroutine old = currentCoroutine; - Coroutine forward = getNextCoroutine(old.nativeCoroutine); + Coroutine forward = target; + if (forward == null) { + forward = getNextCoroutine(old.nativeCoroutine); + } + assert forward == threadCoroutine : "switch to target must be thread coroutine"; currentCoroutine = forward; - unlockLater(forward); switchToAndTerminate(old, forward); + + // should never run here. + assert false; } /** diff --git a/src/java.base/share/classes/java/io/FileInputStream.java b/src/java.base/share/classes/java/io/FileInputStream.java index a2ab728ab9b..45789d86ab8 100644 --- a/src/java.base/share/classes/java/io/FileInputStream.java +++ b/src/java.base/share/classes/java/io/FileInputStream.java @@ -26,6 +26,9 @@ package java.io; import java.nio.channels.FileChannel; + + +import jdk.internal.misc.SharedSecrets; import sun.nio.ch.FileChannelImpl; @@ -228,7 +231,11 @@ private void open(String name) throws FileNotFoundException { * @exception IOException if an I/O error occurs. */ public int read() throws IOException { - return read0(); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> read0()); + } else { + return read0(); + } } private native int read0() throws IOException; @@ -240,7 +247,15 @@ public int read() throws IOException { * @param len the number of bytes that are written * @exception IOException If an I/O error has occurred. */ - private native int readBytes(byte b[], int off, int len) throws IOException; + private native int readBytes0(byte b[], int off, int len) throws IOException; + + private int readBytes(byte b[], int off, int len) throws IOException { + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> readBytes0(b, off, len)); + } else { + return readBytes0(b, off, len); + } + } /** * Reads up to b.length bytes of data from this input diff --git a/src/java.base/share/classes/java/io/FileOutputStream.java b/src/java.base/share/classes/java/io/FileOutputStream.java index 569f5269b13..807cfd606ab 100644 --- a/src/java.base/share/classes/java/io/FileOutputStream.java +++ b/src/java.base/share/classes/java/io/FileOutputStream.java @@ -26,6 +26,7 @@ package java.io; import java.nio.channels.FileChannel; + import jdk.internal.misc.SharedSecrets; import jdk.internal.misc.JavaIOFileDescriptorAccess; import sun.nio.ch.FileChannelImpl; @@ -327,9 +328,20 @@ public void write(int b) throws IOException { * end of file * @exception IOException If an I/O error has occurred. */ - private native void writeBytes(byte b[], int off, int len, boolean append) + private native void writeBytes0(byte b[], int off, int len, boolean append) throws IOException; + private void writeBytes(byte b[], int off, int len, boolean append) throws IOException { + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> { + writeBytes0(b, off, len, append); + return 0; + }); + } else { + writeBytes0(b, off, len, append); + } + } + /** * Writes b.length bytes from the specified byte array * to this file output stream. diff --git a/src/java.base/share/classes/java/io/RandomAccessFile.java b/src/java.base/share/classes/java/io/RandomAccessFile.java index 55abd462ce8..805b3205054 100644 --- a/src/java.base/share/classes/java/io/RandomAccessFile.java +++ b/src/java.base/share/classes/java/io/RandomAccessFile.java @@ -26,7 +26,10 @@ package java.io; import java.nio.channels.FileChannel; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; + + import jdk.internal.misc.JavaIORandomAccessFileAccess; import jdk.internal.misc.SharedSecrets; import sun.nio.ch.FileChannelImpl; @@ -363,7 +366,11 @@ private void open(String name, int mode) * end-of-file has been reached. */ public int read() throws IOException { - return read0(); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> read0()); + } else { + return read0(); + } } private native int read0() throws IOException; @@ -375,7 +382,15 @@ public int read() throws IOException { * @param len the number of bytes to read. * @exception IOException If an I/O error has occurred. */ - private native int readBytes(byte b[], int off, int len) throws IOException; + private native int readBytes0(byte b[], int off, int len) throws IOException; + + private int readBytes(byte b[], int off, int len) throws IOException { + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> readBytes0(b, off ,len)); + } else { + return readBytes0(b, off, len); + } + } /** * Reads up to {@code len} bytes of data from this file into an @@ -533,8 +548,18 @@ public void write(int b) throws IOException { * @param len the number of bytes that are written * @exception IOException If an I/O error has occurred. */ - private native void writeBytes(byte b[], int off, int len) throws IOException; - + private native void writeBytes0(byte b[], int off, int len) throws IOException; + + private void writeBytes(byte b[], int off, int len) throws IOException { + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> { + writeBytes0(b, off, len); + return 0; + }); + } else { + writeBytes0(b, off, len); + } + } /** * Writes {@code b.length} bytes from the specified byte array * to this file, starting at the current file pointer. diff --git a/src/java.base/share/classes/java/nio/MappedByteBuffer.java b/src/java.base/share/classes/java/nio/MappedByteBuffer.java index d9ead79c092..912aa9df0c0 100644 --- a/src/java.base/share/classes/java/nio/MappedByteBuffer.java +++ b/src/java.base/share/classes/java/nio/MappedByteBuffer.java @@ -26,8 +26,13 @@ package java.nio; import java.io.FileDescriptor; +import java.io.IOException; import java.lang.ref.Reference; + + import jdk.internal.misc.Unsafe; +import jdk.internal.misc.SharedSecrets; + /** @@ -207,11 +212,26 @@ public final MappedByteBuffer force() { } if ((address != 0) && (capacity() != 0)) { long offset = mappingOffset(); - force0(fd, mappingAddress(offset), mappingLength(offset)); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + asynchronousForce(this, fd, mappingAddress(offset), mappingLength(offset)); + } else { + force0(fd, mappingAddress(offset), mappingLength(offset)); + } } return this; } + private static void asynchronousForce(MappedByteBuffer mapBuf, FileDescriptor fd, long address, long length) { + try { + SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> { + mapBuf.force0(fd, address, length); + return 0; + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private native boolean isLoaded0(long address, long length, int pageCount); private native void load0(long address, long length); private native void force0(FileDescriptor fd, long address, long length); diff --git a/src/java.base/share/classes/jdk/internal/misc/SharedSecrets.java b/src/java.base/share/classes/jdk/internal/misc/SharedSecrets.java index e72e4b3ee46..cb80065e0b3 100644 --- a/src/java.base/share/classes/jdk/internal/misc/SharedSecrets.java +++ b/src/java.base/share/classes/jdk/internal/misc/SharedSecrets.java @@ -84,6 +84,7 @@ public class SharedSecrets { private static WispEngineAccess wispEngineAccess; private static EpollAccess epollAccess; private static RCMAccesss rcmAccesss; + private static WispFileSyncIOAccess wispFileSyncIOAccess; public static JavaUtilJarAccess javaUtilJarAccess() { if (javaUtilJarAccess == null) { @@ -416,4 +417,14 @@ public static RCMAccesss getRCMAccess() { public static void setRCMAccesss(RCMAccesss rcmAccesss) { SharedSecrets.rcmAccesss = rcmAccesss; } + + + public static WispFileSyncIOAccess getWispFileSyncIOAccess() { + return wispFileSyncIOAccess; + } + + public static void setWispFileSyncIOAccess(WispFileSyncIOAccess wispAsyncIOAccess) { + SharedSecrets.wispFileSyncIOAccess = wispAsyncIOAccess; + } + } diff --git a/src/java.base/share/classes/jdk/internal/misc/WispFileSyncIOAccess.java b/src/java.base/share/classes/jdk/internal/misc/WispFileSyncIOAccess.java new file mode 100644 index 00000000000..e65c7672b79 --- /dev/null +++ b/src/java.base/share/classes/jdk/internal/misc/WispFileSyncIOAccess.java @@ -0,0 +1,10 @@ +package jdk.internal.misc; + +import java.io.IOException; +import java.util.concurrent.Callable; + +public interface WispFileSyncIOAccess { + boolean usingAsyncFileIO(); + + T executeAsAsyncFileIO(Callable command) throws IOException; +} diff --git a/src/java.base/share/classes/module-info.java b/src/java.base/share/classes/module-info.java index e16a66a567d..2d69e33be96 100644 --- a/src/java.base/share/classes/module-info.java +++ b/src/java.base/share/classes/module-info.java @@ -342,7 +342,8 @@ java.prefs; exports sun.util.resources to jdk.localedata; - + exports com.alibaba.rcm.internal to + jdk.management; // the service types defined by the APIs in this module diff --git a/src/java.base/share/native/libjava/FileInputStream.c b/src/java.base/share/native/libjava/FileInputStream.c index 44122a87b27..2bdc3d0d29a 100644 --- a/src/java.base/share/native/libjava/FileInputStream.c +++ b/src/java.base/share/native/libjava/FileInputStream.c @@ -67,7 +67,7 @@ Java_java_io_FileInputStream_read0(JNIEnv *env, jobject this) { } JNIEXPORT jint JNICALL -Java_java_io_FileInputStream_readBytes(JNIEnv *env, jobject this, +Java_java_io_FileInputStream_readBytes0(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len) { return readBytes(env, this, bytes, off, len, fis_fd); } diff --git a/src/java.base/share/native/libjava/RandomAccessFile.c b/src/java.base/share/native/libjava/RandomAccessFile.c index d20dfe869a7..ca471bc7ff3 100644 --- a/src/java.base/share/native/libjava/RandomAccessFile.c +++ b/src/java.base/share/native/libjava/RandomAccessFile.c @@ -73,7 +73,7 @@ Java_java_io_RandomAccessFile_read0(JNIEnv *env, jobject this) { } JNIEXPORT jint JNICALL -Java_java_io_RandomAccessFile_readBytes(JNIEnv *env, +Java_java_io_RandomAccessFile_readBytes0(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len) { return readBytes(env, this, bytes, off, len, raf_fd); } @@ -84,7 +84,7 @@ Java_java_io_RandomAccessFile_write0(JNIEnv *env, jobject this, jint byte) { } JNIEXPORT void JNICALL -Java_java_io_RandomAccessFile_writeBytes(JNIEnv *env, +Java_java_io_RandomAccessFile_writeBytes0(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len) { writeBytes(env, this, bytes, off, len, JNI_FALSE, raf_fd); } diff --git a/src/java.base/unix/classes/java/io/UnixFileSystem.java b/src/java.base/unix/classes/java/io/UnixFileSystem.java index 068593cfba5..e8aea877167 100644 --- a/src/java.base/unix/classes/java/io/UnixFileSystem.java +++ b/src/java.base/unix/classes/java/io/UnixFileSystem.java @@ -26,6 +26,7 @@ package java.io; import java.util.Properties; +import jdk.internal.misc.SharedSecrets; import jdk.internal.util.StaticProperty; import sun.security.action.GetPropertyAction; @@ -281,12 +282,25 @@ public boolean rename(File f1, File f2) { // anyway. cache.clear(); javaHomePrefixCache.clear(); - return rename0(f1, f2); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return asynchronousRename(this, f1, f2); + } else { + return rename0(f1, f2); + } } private native boolean rename0(File f1, File f2); public native boolean setLastModifiedTime(File f, long time); public native boolean setReadOnly(File f); + private boolean asynchronousRename(UnixFileSystem fs, File f1, File f2) { + boolean result = false; + try { + result = SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> fs.rename0(f1, f2)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return result; + } /* -- Filesystem interface -- */ diff --git a/src/java.base/unix/classes/sun/nio/ch/FileDispatcherImpl.java b/src/java.base/unix/classes/sun/nio/ch/FileDispatcherImpl.java index 54a210c24e1..b54b71fa506 100644 --- a/src/java.base/unix/classes/sun/nio/ch/FileDispatcherImpl.java +++ b/src/java.base/unix/classes/sun/nio/ch/FileDispatcherImpl.java @@ -27,9 +27,9 @@ import java.io.FileDescriptor; import java.io.IOException; +import jdk.internal.misc.SharedSecrets; import jdk.internal.misc.JavaIOFileDescriptorAccess; -import jdk.internal.misc.SharedSecrets; class FileDispatcherImpl extends FileDispatcher { @@ -45,33 +45,57 @@ class FileDispatcherImpl extends FileDispatcher { } int read(FileDescriptor fd, long address, int len) throws IOException { - return read0(fd, address, len); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> read0(fd, address, len)); + } else { + return read0(fd, address, len); + } } int pread(FileDescriptor fd, long address, int len, long position) - throws IOException + throws IOException { - return pread0(fd, address, len, position); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> pread0(fd, address, len, position)); + } else { + return pread0(fd, address, len, position); + } } long readv(FileDescriptor fd, long address, int len) throws IOException { - return readv0(fd, address, len); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> readv0(fd, address, len)); + } else { + return readv0(fd, address, len); + } } int write(FileDescriptor fd, long address, int len) throws IOException { - return write0(fd, address, len); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> write0(fd, address, len)); + } else { + return write0(fd, address, len); + } } int pwrite(FileDescriptor fd, long address, int len, long position) throws IOException { - return pwrite0(fd, address, len, position); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> pwrite0(fd, address, len, position)); + } else { + return pwrite0(fd, address, len, position); + } } long writev(FileDescriptor fd, long address, int len) throws IOException { - return writev0(fd, address, len); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> writev0(fd, address, len)); + } else { + return writev0(fd, address, len); + } } long seek(FileDescriptor fd, long offset) throws IOException { @@ -79,11 +103,19 @@ long seek(FileDescriptor fd, long offset) throws IOException { } int force(FileDescriptor fd, boolean metaData) throws IOException { - return force0(fd, metaData); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> force0(fd, metaData)); + } else { + return force0(fd, metaData); + } } int truncate(FileDescriptor fd, long size) throws IOException { - return truncate0(fd, size); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> truncate0(fd, size)); + } else { + return truncate0(fd, size); + } } long size(FileDescriptor fd) throws IOException { diff --git a/src/java.base/unix/native/libjava/FileOutputStream_md.c b/src/java.base/unix/native/libjava/FileOutputStream_md.c index 407cb9ad23b..4ec996eef45 100644 --- a/src/java.base/unix/native/libjava/FileOutputStream_md.c +++ b/src/java.base/unix/native/libjava/FileOutputStream_md.c @@ -65,7 +65,7 @@ Java_java_io_FileOutputStream_write(JNIEnv *env, jobject this, jint byte, jboole } JNIEXPORT void JNICALL -Java_java_io_FileOutputStream_writeBytes(JNIEnv *env, +Java_java_io_FileOutputStream_writeBytes0(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len, jboolean append) { writeBytes(env, this, bytes, off, len, append, fos_fd); } diff --git a/src/java.base/windows/native/libjava/FileOutputStream_md.c b/src/java.base/windows/native/libjava/FileOutputStream_md.c index 452a3a41187..e2cf2181cd9 100644 --- a/src/java.base/windows/native/libjava/FileOutputStream_md.c +++ b/src/java.base/windows/native/libjava/FileOutputStream_md.c @@ -61,7 +61,7 @@ Java_java_io_FileOutputStream_open0(JNIEnv *env, jobject this, } JNIEXPORT void JNICALL -Java_java_io_FileOutputStream_write(JNIEnv *env, jobject this, jint byte, jboolean append) { +Java_java_io_FileOutputStream_write0(JNIEnv *env, jobject this, jint byte, jboolean append) { writeSingle(env, this, byte, append, fos_fd); } diff --git a/src/jdk.management/share/classes/com/alibaba/management/ResourceContainerMXBean.java b/src/jdk.management/share/classes/com/alibaba/management/ResourceContainerMXBean.java new file mode 100644 index 00000000000..a0b8c481fa4 --- /dev/null +++ b/src/jdk.management/share/classes/com/alibaba/management/ResourceContainerMXBean.java @@ -0,0 +1,44 @@ +package com.alibaba.management; + +import java.lang.management.PlatformManagedObject; +import java.util.List; + +/** + * Platform-specific management interface for the resource container + * of the Java virtual machine. + */ +public interface ResourceContainerMXBean extends PlatformManagedObject { + /** + * Get all running containers uniq id as List + * @return all active containers' id + */ + List getAllContainerIds(); + + /** + * Get a specific container's constraints by id + * @param id container id + * @return constraints as list + */ + List getConstraintsById(long id); + + /** + * Get the total cpu time consumed by id specified container + * @param id container id + * @return consumed cpu time by nanosecond + */ + long getCPUResourceConsumedAmount(long id); + + /** + * Get how many times the resource limitation has been reached + * @param id + * @return + */ + long getCPUResourceLimitReachedCount(long id); + + /** + * Get how many active threads are running in container + * @param id container id + * @return thread id as list + */ + List getActiveContainerThreadIds(long id); +} diff --git a/src/jdk.management/share/classes/com/alibaba/management/internal/ResourceContainerMXBeanImpl.java b/src/jdk.management/share/classes/com/alibaba/management/internal/ResourceContainerMXBeanImpl.java new file mode 100644 index 00000000000..30f071674a4 --- /dev/null +++ b/src/jdk.management/share/classes/com/alibaba/management/internal/ResourceContainerMXBeanImpl.java @@ -0,0 +1,49 @@ +package com.alibaba.management.internal; + +import com.alibaba.management.ResourceContainerMXBean; +import com.alibaba.rcm.ResourceType; +import com.alibaba.rcm.ResourceContainerMonitor; +import com.alibaba.rcm.internal.AbstractResourceContainer; +import sun.management.Util; + +import javax.management.ObjectName; +import java.util.List; +import java.util.stream.Collectors; + +public class ResourceContainerMXBeanImpl implements ResourceContainerMXBean { + private final static String TENANT_CONTAINER_MXBEAN_NAME = "com.alibaba.management:type=ResourceContainer"; + + @Override + public List getAllContainerIds() { + return ResourceContainerMonitor.getAllContainerIds(); + } + + @Override + public List getConstraintsById(long id) { + return ResourceContainerMonitor.getConstraintsById(id).stream().map(c -> c.getValues()[0]).collect(Collectors.toList()); + } + + + @Override + public long getCPUResourceConsumedAmount(long id) { + AbstractResourceContainer container = (AbstractResourceContainer) ResourceContainerMonitor.getContainerById(id); + return container.getConsumedAmount(ResourceType.CPU_PERCENT); + } + + @Override + public ObjectName getObjectName() { + return Util.newObjectName(TENANT_CONTAINER_MXBEAN_NAME); + } + + @Override + public long getCPUResourceLimitReachedCount(long id) { + AbstractResourceContainer container = (AbstractResourceContainer) ResourceContainerMonitor.getContainerById(id); + return container.getResourceLimitReachedCount(ResourceType.CPU_PERCENT); + } + + @Override + public List getActiveContainerThreadIds(long id) { + AbstractResourceContainer container = (AbstractResourceContainer) ResourceContainerMonitor.getContainerById(id); + return container.getActiveContainerThreadIds(); + } +} diff --git a/src/jdk.management/share/classes/com/sun/management/internal/PlatformMBeanProviderImpl.java b/src/jdk.management/share/classes/com/sun/management/internal/PlatformMBeanProviderImpl.java index 475db20bc82..b771ceb5775 100644 --- a/src/jdk.management/share/classes/com/sun/management/internal/PlatformMBeanProviderImpl.java +++ b/src/jdk.management/share/classes/com/sun/management/internal/PlatformMBeanProviderImpl.java @@ -24,8 +24,10 @@ */ package com.sun.management.internal; +import com.alibaba.management.ResourceContainerMXBean; import com.alibaba.management.TenantContainerMXBean; import com.alibaba.management.WispCounterMXBean; +import com.alibaba.management.internal.ResourceContainerMXBeanImpl; import com.alibaba.management.internal.TenantContainerMXBeanImpl; import com.alibaba.management.internal.WispCounterMXBeanImpl; import com.sun.management.DiagnosticCommandMBean; @@ -57,6 +59,7 @@ public final class PlatformMBeanProviderImpl extends PlatformMBeanProvider { private static OperatingSystemMXBean osMBean = null; private static TenantContainerMXBean tenantMBean = null; private static WispCounterMXBean wispCounterMBean = null; + private static ResourceContainerMXBean resourceContainerMXBean = null; static { AccessController.doPrivileged((PrivilegedAction) () -> { @@ -302,6 +305,34 @@ public Map nameToMBeanMap( } }); + initMBeanList.add(new PlatformComponent() { + private final Set ResourceContainerMXBeanInterfaceNames = + Collections.unmodifiableSet(Collections.singleton( + "com.alibaba.management.ResourceContainerMXBean")); + + @Override + public Set> mbeanInterfaces() { + return Collections.singleton(com.alibaba.management.ResourceContainerMXBean.class); + } + + @Override + public Set mbeanInterfaceNames() { + return ResourceContainerMXBeanInterfaceNames; + } + + @Override + public String getObjectNamePattern() { + return "com.alibaba.management:type=ResourceContainer"; + } + + @Override + public Map nameToMBeanMap() { + return Collections.singletonMap( + "com.alibaba.management:type=ResourceContainer", + getResourceContainerMXBean()); + } + }); + /** * Wisp-Counter support. */ @@ -359,6 +390,13 @@ private static synchronized TenantContainerMXBean getTenantContainerMXBean() { return tenantMBean; } + private static synchronized ResourceContainerMXBean getResourceContainerMXBean() { + if (resourceContainerMXBean == null) { + resourceContainerMXBean = new ResourceContainerMXBeanImpl(); + } + return resourceContainerMXBean; + } + private static synchronized WispCounterMXBean getWispCounterMXBean() { if (wispCounterMBean == null) { wispCounterMBean = new WispCounterMXBeanImpl(); diff --git a/test/jdk/com/alibaba/rcm/TestKillThreads.java b/test/jdk/com/alibaba/rcm/TestKillThreads.java index 143bf3c710a..a82b1645b38 100644 --- a/test/jdk/com/alibaba/rcm/TestKillThreads.java +++ b/test/jdk/com/alibaba/rcm/TestKillThreads.java @@ -1,20 +1,28 @@ /* * @test * @library /lib/testlibrary - * @build TestRcmCpu RcmUtils + * @build TestKillThreads RcmUtils * @summary test RCM TestKillThreads * @modules java.base/com.alibaba.wisp.engine:+open * @modules java.base/com.alibaba.rcm.internal:+open - * @run main/othervm -XX:+UseWisp2 -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -XX:+UnlockExperimentalVMOptions -Dcom.alibaba.wisp.threadAsWisp.black=name:Tester* -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -Xcomp -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -Xint -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -client -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads */ + import com.alibaba.rcm.ResourceContainer; import com.alibaba.rcm.ResourceType; import com.alibaba.rcm.internal.RCMUnsafe; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static jdk.testlibrary.Asserts.assertTrue; import static jdk.testlibrary.Asserts.assertFalse; @@ -61,25 +69,106 @@ public class TestKillThreads { } }; + private static Runnable TIMER = () -> { + try { + new Timer().scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + } + }, 1, 1); + started = 1; + new CountDownLatch(1).await(); + } catch (InterruptedException e) { + fail(); + } finally { + flag.set(true); + } + }; + + private static Runnable UPDATER = () -> { + boolean running = true; + started = 1; + int preTimes = -1; + int version = 0; + AtomicInteger checkTimes = new AtomicInteger(0); + while (running) { + try { + version = 0; + AtomicInteger atomicInteger = checkTimes; + synchronized (atomicInteger) { + while (preTimes == checkTimes.get() && running) { + checkTimes.wait(); + } + preTimes = checkTimes.get(); + } + if (!running) continue; + } catch (Exception e) { + fail(); + } + } + }; + + private static Runnable POLL = () -> { + DelayQueue taksQueue = new DelayQueue(); + while (true) { + started = 1; + try { + while (true) { + Delayed task =taksQueue.take(); + System.out.println(taksQueue); + } + } + catch (Exception exception) { + exception.printStackTrace(); + continue; + } + } + }; + + private static Runnable OVERRIDE = () -> { + while (true) { + started = 1; + try { + try { + Thread.sleep(200); + } finally { + throw new Exception("123"); + } + + } catch (Throwable t) { + t.printStackTrace(); + } + + } + }; + + private static final List workload = new ArrayList() {{ -// add(VOID_LOOP); TODO:// support loop -// add(BUSY_LOOP); + add(VOID_LOOP); + add(BUSY_LOOP); add(PARK); add(FINALLY); + add(TIMER); + add(UPDATER); + add(POLL); + add(OVERRIDE); }}; - public static void main(String[] args) { + public static void main(String[] args) throws Exception { for (Runnable runnable : workload) { - ResourceContainer container = RcmUtils.createContainer( - ResourceType.CPU_PERCENT.newConstraint(40)); + ResourceContainer container = RcmUtils.createContainer(Collections.singletonList( + ResourceType.CPU_PERCENT.newConstraint(40))); started = 0; container.run(() -> { - Thread t = new Thread(runnable); + Thread t = new Thread(runnable); t.start(); }); - while(0 == started) {} + while (0 == started) { + } + Thread.sleep(100); RCMUnsafe.killThreads(container); container.destroy(); + System.out.println("pass"); } assertTrue(flag.get()); } diff --git a/test/jdk/com/alibaba/rcm/TestRCMInheritanceCallBack.java b/test/jdk/com/alibaba/rcm/TestRCMInheritanceCallBack.java index eb6b1dbee88..797edf8057f 100644 --- a/test/jdk/com/alibaba/rcm/TestRCMInheritanceCallBack.java +++ b/test/jdk/com/alibaba/rcm/TestRCMInheritanceCallBack.java @@ -41,6 +41,17 @@ public static void main(String[] args) throws Exception { t.start(); }); + container.run(() -> { + Thread t = new Thread(() -> { + assertInRoot(true); + latch.countDown(); + }, "TenantWorker"); + + RCMUnsafe.attach(ResourceContainer.root()); + t.start(); + RCMUnsafe.attach(container); + }); + assertTrue(latch.await(2, TimeUnit.SECONDS)); } diff --git a/test/jdk/com/alibaba/rcm/TestRcmCpu.java b/test/jdk/com/alibaba/rcm/TestRcmCpu.java index 13abdf5d73f..dcadd6efb0d 100644 --- a/test/jdk/com/alibaba/rcm/TestRcmCpu.java +++ b/test/jdk/com/alibaba/rcm/TestRcmCpu.java @@ -8,9 +8,13 @@ * @run main/othervm -XX:+UseWisp2 -XX:ActiveProcessorCount=4 TestRcmCpu */ +import com.alibaba.management.ResourceContainerMXBean; import com.alibaba.rcm.ResourceContainer; import com.alibaba.rcm.ResourceType; +import javax.management.MBeanServer; +import java.io.IOException; +import java.lang.management.ManagementFactory; import java.security.MessageDigest; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -21,10 +25,12 @@ public class TestRcmCpu { + static ResourceContainerMXBean resourceContainerMXBean; + private static Callable taskFactory(int load) { return new Callable() { @Override - public Long call() throws Exception { + public Long call() interpreterRuntime.cppthrows Exception { long start = System.currentTimeMillis(); MessageDigest md5 = MessageDigest.getInstance("MD5"); int count = load; @@ -40,6 +46,14 @@ public Long call() throws Exception { } public static void main(String[] args) throws Exception { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try { + resourceContainerMXBean = ManagementFactory.newPlatformMXBeanProxy(mbs, + "com.alibaba.management:type=ResourceContainer", ResourceContainerMXBean.class); + } catch (IOException e) { + e.printStackTrace(); + } + ResourceContainer rc0 = RcmUtils.createContainer( ResourceType.CPU_PERCENT.newConstraint(40)); ResourceContainer rc1 = RcmUtils.createContainer( @@ -67,5 +81,10 @@ public static void main(String[] args) throws Exception { double ratio = (double) duration1 / duration0; assertLT(Math.abs(ratio - 0.5), 0.10, "deviation is out of reasonable scope"); + + for (long id : resourceContainerMXBean.getAllContainerIds()) { + System.out.println(resourceContainerMXBean.getConstraintsById(id)); + System.out.println(resourceContainerMXBean.getContainerConsumedAmount(id, ResourceType.CPU_PERCENT)); + } } } diff --git a/test/jdk/com/alibaba/rcm/demo/MyResourceContainer.java b/test/jdk/com/alibaba/rcm/demo/MyResourceContainer.java index ff2955b4237..bf2dc9e688e 100644 --- a/test/jdk/com/alibaba/rcm/demo/MyResourceContainer.java +++ b/test/jdk/com/alibaba/rcm/demo/MyResourceContainer.java @@ -1,6 +1,7 @@ package demo; import com.alibaba.rcm.Constraint; +import com.alibaba.rcm.ResourceType; import com.alibaba.rcm.internal.AbstractResourceContainer; import java.util.ArrayList; @@ -43,4 +44,9 @@ public Iterable getConstraints() { public void destroy() { dead = true; } + + @Override + public Long getConsumedAmount(ResourceType resourceType) { + return 0L; + } } diff --git a/test/jdk/com/alibaba/wisp/io/WispFileIOTest.java b/test/jdk/com/alibaba/wisp/io/WispFileIOTest.java new file mode 100644 index 00000000000..9e7da7d5eab --- /dev/null +++ b/test/jdk/com/alibaba/wisp/io/WispFileIOTest.java @@ -0,0 +1,224 @@ +/* + * @test + * @library /lib/testlibrary + * @summary test reuse WispUdpSocket buffer + * @modules java.base/jdk.internal.misc + * @run main/othervm -XX:+UseWisp2 -Dcom.alibaba.wisp.enableAsyncFileIO=true WispFileIOTest + */ + +import jdk.internal.misc.SharedSecrets; +import sun.misc.Unsafe; + +import java.io.*; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.util.concurrent.*; + +import static java.nio.file.StandardOpenOption.*; +import static jdk.testlibrary.Asserts.assertTrue; + +public class WispFileIOTest { + + public static void testNioFileChannel(File testFile) throws Exception { + resetTestFileContent(testFile); + RandomAccessFile file = new RandomAccessFile(testFile, "rw"); + FileChannel ch = file.getChannel(); + ByteBuffer buffer = ByteBuffer.allocate(1); + ch.read(buffer); + assertTrue("0".equals(new String(buffer.array()))); + buffer.flip(); + ch.write(buffer); + ch.close(); + String content = new String(Files.readAllBytes(testFile.toPath())); + assertTrue("00234".equals(content)); + } + + public static void testFileStream(File testFile) throws Exception { + //test RandomAccessFile + resetTestFileContent(testFile); + RandomAccessFile raf = new RandomAccessFile(testFile, "rw"); + byte[] buffer; + buffer = "5".getBytes(); + raf.write(buffer); + raf.seek(0); + buffer = new byte[1]; + raf.read(buffer, 0, 1); + assertTrue("5".equals(new String(buffer))); + + //test FileInputStream + resetTestFileContent(testFile); + FileInputStream fis = new FileInputStream(testFile); + buffer = new byte[1]; + fis.read(buffer); + assertTrue("0".equals(new String(buffer))); + + //test FileOutputStream + resetTestFileContent(testFile); + FileOutputStream fos = new FileOutputStream(testFile, true); + buffer = "5".getBytes(); + fos.write(buffer); + String content = new String(Files.readAllBytes(testFile.toPath())); + assertTrue("012345".equals(content)); + + } + + public static void testMappedByteBuffer() throws Exception { + File newfile = new File("/tmp/ThreadPoolAioTest_test_new2.file"); + newfile.deleteOnExit(); + RandomAccessFile raf = new RandomAccessFile(newfile, "rw"); + FileChannel fc = raf.getChannel(); + MappedByteBuffer map = fc.map(FileChannel.MapMode.READ_WRITE, 0, 2048); + fc.close(); + double current = map.getDouble(50); + map.putDouble(50, current + 0.1d); + map.force(); + } + + public static Thread workerThread = null; + + public static void resetTestFileContent(File testFile) throws IOException { + FileWriter writer = new FileWriter(testFile); + for (int i = 0; i < 5; i++) { + writer.write(String.valueOf(i)); + } + writer.close(); + } + + private static void testBlockingReadInterrupted(File testFile) throws IOException { + long block = 12; + long size = 4096; + long total = block * size; + createLargeFile(block * size, testFile); + for (int i = 0; i < block; i++) { + try (FileInputStream fis = new FileInputStream(testFile)) { + long skip = skipBytes(fis, size, total); + total -= skip; + assertTrue(skip == size || skip == 0); + } finally { + testFile.delete(); + } + } + + } + + // Skip toSkip number of bytes and expect that the available() method + // returns avail number of bytes. + private static long skipBytes(InputStream is, long toSkip, long avail) + throws IOException { + long skip = is.skip(toSkip); + if (skip != toSkip) { + throw new RuntimeException("skip() returns " + skip + + " but expected " + toSkip); + } + long remaining = avail - skip; + int expected = (remaining >= Integer.MAX_VALUE) + ? Integer.MAX_VALUE + : (remaining > 0 ? (int) remaining : 0); + + System.out.println("Skipped " + skip + " bytes, available() returns " + + expected + ", remaining " + remaining); + if (is.available() != expected) { + throw new RuntimeException("available() returns " + + is.available() + " but expected " + expected); + } + return skip; + } + + private static void createLargeFile(long filesize, + File file) throws IOException { + // Recreate a large file as a sparse file if possible + Files.delete(file.toPath()); + + try (FileChannel fc = + FileChannel.open(file.toPath(), CREATE_NEW, WRITE, APPEND)) { + ByteBuffer bb = ByteBuffer.allocate(1).put((byte) 1); + bb.rewind(); + int rc = fc.write(bb, filesize - 1); + + if (rc != 1) { + throw new RuntimeException("Failed to write 1 byte" + + " to the large file"); + } + } + } + + private static void mockIOException() throws Exception { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Unsafe unsafe = (Unsafe) f.get(null); + try { + SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> { + throw new IOException("expected"); + }); + } catch (IOException e) { + unsafe.throwException(e); + } catch (Exception exception) { + // + } + } + + public static void main(String[] args) throws Exception { + + // submit by another thread + Thread t = new Thread(() -> { + try { + File f = new File("/tmp/ThreadPoolAioTest_test.file"); + f.deleteOnExit(); + // test java nio + testNioFileChannel(f); + // test java io + testFileStream(f); + // test rename + File newfile = new File("/tmp/ThreadPoolAioTest_test_new.file"); + newfile.deleteOnExit(); + f.renameTo(newfile); + // test MappedByteBuffer force + testMappedByteBuffer(); + resetTestFileContent(f); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false, "exception happened"); + } + }); + t.start(); + t.join(); + + CountDownLatch finished = new CountDownLatch(1); + Thread interrupt = new Thread(() -> { + try { + File f = new File("/tmp/ThreadPoolAioTest_test.file"); + testBlockingReadInterrupted(f); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(e instanceof ClosedByInterruptException, "exception happened"); + } finally { + finished.countDown(); + } + }); + interrupt.start(); + + while (finished.getCount() != 0) { + interrupt.interrupt(); + } + interrupt.join(); + + + boolean exceptionHappened = false; + try { + mockIOException(); + } catch (Exception e) { + e.printStackTrace(); + if (e instanceof IOException) { + exceptionHappened = true; + } + } finally { + assertTrue(exceptionHappened); + } + + System.out.println("Success!"); + } +}