diff --git a/src/hotspot/share/c1/c1_Runtime1.cpp b/src/hotspot/share/c1/c1_Runtime1.cpp index 2348bc726cf..6bf29707fd8 100644 --- a/src/hotspot/share/c1/c1_Runtime1.cpp +++ b/src/hotspot/share/c1/c1_Runtime1.cpp @@ -498,7 +498,8 @@ JRT_ENTRY_NO_ASYNC(static address, exception_handler_for_pc_helper(JavaThread* c // Reset method handle flag. current->set_is_method_handle_return(false); - Handle exception(current, ex); + Handle exception(current, WispThread::is_current_death_pending(current) ? + (oopDesc*)Universe::wisp_thread_death_exception() : ex); // This function is called when we are about to throw an exception. Therefore, // we have to poll the stack watermark barrier to make sure that not yet safe @@ -707,7 +708,7 @@ JRT_BLOCK_ENTRY(void, Runtime1::monitorenter(JavaThread* current, oopDesc* obj, lock->set_obj(obj); } assert(obj == lock->obj(), "must match"); - WispPostStealHandleUpdateMark w(__hm); + WispPostStealHandleUpdateMark w(current, __hm); SharedRuntime::monitor_enter_helper(obj, lock->lock(), current); JRT_END diff --git a/src/hotspot/share/classfile/javaClasses.cpp b/src/hotspot/share/classfile/javaClasses.cpp index 0995ad10ce1..a6c41ca8fe8 100644 --- a/src/hotspot/share/classfile/javaClasses.cpp +++ b/src/hotspot/share/classfile/javaClasses.cpp @@ -4774,6 +4774,8 @@ int com_alibaba_wisp_engine_WispTask::_activeCount_offset = 0; int com_alibaba_wisp_engine_WispTask::_stealCount_offset = 0; int com_alibaba_wisp_engine_WispTask::_stealFailureCount_offset = 0; int com_alibaba_wisp_engine_WispTask::_preemptCount_offset = 0; +int com_alibaba_wisp_engine_WispTask::_shutdownPending_offset = 0; + #define WISPTASK_FIELDS_DO(macro) \ macro(_jvmParkStatus_offset, ik, vmSymbols::jvmParkStatus_name(), int_signature, false); \ @@ -4846,6 +4848,10 @@ int com_alibaba_wisp_engine_WispTask::get_stealFailureCount(oop obj) { return obj->int_field(_stealFailureCount_offset); } +bool com_alibaba_wisp_engine_WispTask::get_shutdownPending(oop obj) { + return obj->bool_field(_shutdownPending_offset); +} + #if INCLUDE_CDS void java_nio_Buffer::serialize_offsets(SerializeClosure* f) { BUFFER_FIELDS_DO(FIELD_SERIALIZE_OFFSET); diff --git a/src/hotspot/share/classfile/javaClasses.hpp b/src/hotspot/share/classfile/javaClasses.hpp index c0bdc9c8570..1d888b9c1a2 100644 --- a/src/hotspot/share/classfile/javaClasses.hpp +++ b/src/hotspot/share/classfile/javaClasses.hpp @@ -1837,6 +1837,7 @@ class com_alibaba_wisp_engine_WispTask: AllStatic { static int _stealCount_offset; static int _stealFailureCount_offset; static int _preemptCount_offset; + static int _shutdownPending_offset; public: static void set_jvmParkStatus(oop obj, jint status); static int get_jvmParkStatus(oop obj); @@ -1850,6 +1851,7 @@ class com_alibaba_wisp_engine_WispTask: AllStatic { static int get_stealFailureCount(oop obj); static int get_preemptCount(oop obj); static void set_preemptCount(oop obj, jint count); + static bool get_shutdownPending(oop obj); static void compute_offsets(); static void serialize_offsets(SerializeClosure* f) NOT_CDS_RETURN; diff --git a/src/hotspot/share/classfile/vmSymbols.hpp b/src/hotspot/share/classfile/vmSymbols.hpp index c2a67ba7335..47433e143c7 100644 --- a/src/hotspot/share/classfile/vmSymbols.hpp +++ b/src/hotspot/share/classfile/vmSymbols.hpp @@ -699,6 +699,7 @@ template(com_alibaba_wisp_engine_WispEventPump, "com/alibaba/wisp/engine/WispEventPump") \ template(isInCritical_name, "isInCritical") \ template(jdkParkStatus_name, "jdkParkStatus") \ + template(shutdownPending_name, "shutdownPending") \ template(jvmParkStatus_name, "jvmParkStatus") \ template(id_name, "id") \ template(threadWrapper_name, "threadWrapper") \ diff --git a/src/hotspot/share/interpreter/interpreterRuntime.cpp b/src/hotspot/share/interpreter/interpreterRuntime.cpp index ca709dd37f0..d8c0a55df10 100644 --- a/src/hotspot/share/interpreter/interpreterRuntime.cpp +++ b/src/hotspot/share/interpreter/interpreterRuntime.cpp @@ -459,7 +459,12 @@ JRT_ENTRY(address, InterpreterRuntime::exception_handler_for_exception(JavaThrea StackWatermarkSet::after_unwind(current); LastFrameAccessor last_frame(current); - Handle h_exception(current, exception); + // Wisp relys on threadDeath as a special uncatchable exception to shutdown + // all running coroutines. However, exceptions throw in finally block + // will overwrite current threadDeath exception, thus we need to replace + // all exception with threadDeath after coroutine shutdown. + Handle h_exception(current, WispThread::is_current_death_pending(current) ? + (oopDesc*)Universe::wisp_thread_death_exception() : exception); methodHandle h_method (current, last_frame.method()); constantPoolHandle h_constants(current, h_method->constants()); bool should_repeat; diff --git a/src/hotspot/share/jvmci/jvmciRuntime.cpp b/src/hotspot/share/jvmci/jvmciRuntime.cpp index f28c12133f1..d2f3b095457 100644 --- a/src/hotspot/share/jvmci/jvmciRuntime.cpp +++ b/src/hotspot/share/jvmci/jvmciRuntime.cpp @@ -392,7 +392,7 @@ address JVMCIRuntime::exception_handler_for_pc(JavaThread* current) { } JRT_BLOCK_ENTRY(void, JVMCIRuntime::monitorenter(JavaThread* current, oopDesc* obj, BasicLock* lock)) - WispPostStealHandleUpdateMark w(__hm); + WispPostStealHandleUpdateMark w(current, __hm); SharedRuntime::monitor_enter_helper(obj, lock, current); JRT_END diff --git a/src/hotspot/share/memory/universe.cpp b/src/hotspot/share/memory/universe.cpp index 8ba5fae9cb7..1c518dfb3ea 100644 --- a/src/hotspot/share/memory/universe.cpp +++ b/src/hotspot/share/memory/universe.cpp @@ -105,6 +105,7 @@ enum OutOfMemoryInstance { _oom_java_heap, _oom_count }; OopHandle Universe::_out_of_memory_errors; +OopHandle Universe::_wisp_thread_death_exception; OopHandle Universe::_delayed_stack_overflow_error_message; OopHandle Universe::_preallocated_out_of_memory_error_array; volatile jint Universe::_preallocated_out_of_memory_error_avail_count = 0; @@ -577,6 +578,7 @@ oop Universe::out_of_memory_error_realloc_objects() { // Throw default _out_of_memory_error_retry object as it will never propagate out of the VM oop Universe::out_of_memory_error_retry() { return out_of_memory_errors()->obj_at(_oom_retry); } +oop Universe::wisp_thread_death_exception() { return _wisp_thread_death_exception.resolve(); } oop Universe::delayed_stack_overflow_error_message() { return _delayed_stack_overflow_error_message.resolve(); } @@ -952,6 +954,14 @@ bool universe_post_init() { Universe::create_preallocated_out_of_memory_errors(CHECK_false); oop instance; + Klass* k; + if (EnableCoroutine && Wisp2ThreadStop) { + // Create the special exception used to kill thread + k = SystemDictionary::resolve_or_fail(vmSymbols::java_lang_ThreadDeath(), true, CHECK_false); + assert(NULL != k, "pre-condition"); + instance = InstanceKlass::cast(k)->allocate_instance(CHECK_false); + Universe::_wisp_thread_death_exception = OopHandle(Universe::vm_global(), instance); + } // Setup preallocated cause message for delayed StackOverflowError if (StackReservedPages > 0) { instance = java_lang_String::create_oop_from_str("Delayed StackOverflowError due to ReservedStackAccess annotated method", CHECK_false); @@ -960,7 +970,7 @@ bool universe_post_init() { // Setup preallocated NullPointerException // (this is currently used for a cheap & dirty solution in compiler exception handling) - Klass* k = SystemDictionary::resolve_or_fail(vmSymbols::java_lang_NullPointerException(), true, CHECK_false); + k = SystemDictionary::resolve_or_fail(vmSymbols::java_lang_NullPointerException(), true, CHECK_false); instance = InstanceKlass::cast(k)->allocate_instance(CHECK_false); Universe::_null_ptr_exception_instance = OopHandle(Universe::vm_global(), instance); diff --git a/src/hotspot/share/memory/universe.hpp b/src/hotspot/share/memory/universe.hpp index 39ee4c3d52d..e79c1e52c54 100644 --- a/src/hotspot/share/memory/universe.hpp +++ b/src/hotspot/share/memory/universe.hpp @@ -108,6 +108,7 @@ class Universe: AllStatic { // preallocated error objects (no backtrace) static OopHandle _out_of_memory_errors; + static OopHandle _wisp_thread_death_exception; // preallocated cause message for delayed StackOverflowError static OopHandle _delayed_stack_overflow_error_message; @@ -296,6 +297,7 @@ class Universe: AllStatic { // Throw default _out_of_memory_error_retry object as it will never propagate out of the VM static oop out_of_memory_error_retry(); static oop delayed_stack_overflow_error_message(); + static oop wisp_thread_death_exception(); // The particular choice of collected heap. static CollectedHeap* heap() { return _collectedHeap; } diff --git a/src/hotspot/share/oops/method.cpp b/src/hotspot/share/oops/method.cpp index e0870bcf521..1afd2e03ea6 100644 --- a/src/hotspot/share/oops/method.cpp +++ b/src/hotspot/share/oops/method.cpp @@ -235,6 +235,9 @@ int Method::fast_exception_handler_bci_for(const methodHandle& mh, Klass* ex_kla // access exception table ExceptionTable table(mh()); int length = table.length(); + bool is_force_thread_death_exception = (EnableCoroutine && Wisp2ThreadStop + && (ex_klass == vmClasses::ThreadDeath_klass() + || ex_klass->is_subtype_of(vmClasses::ThreadDeath_klass()))); // iterate through all entries sequentially constantPoolHandle pool(THREAD, mh->constants()); for (int i = 0; i < length; i ++) { @@ -287,6 +290,9 @@ int Method::fast_exception_handler_bci_for(const methodHandle& mh, Klass* ex_kla } assert(k != NULL, "klass not loaded"); if (ex_klass->is_subtype_of(k)) { + if (is_force_thread_death_exception) { + continue; + } if (log_is_enabled(Info, exceptions)) { ResourceMark rm(THREAD); log_info(exceptions)("Found matching handler for exception of type \"%s\" in method \"%s\" at BCI: %d", diff --git a/src/hotspot/share/opto/runtime.cpp b/src/hotspot/share/opto/runtime.cpp index 9ef68246ad0..d0632ad2614 100644 --- a/src/hotspot/share/opto/runtime.cpp +++ b/src/hotspot/share/opto/runtime.cpp @@ -1284,7 +1284,8 @@ JRT_ENTRY_NO_ASYNC(address, OptoRuntime::handle_exception_C_helper(JavaThread* c assert(current->exception_oop() != NULL, "exception oop is found"); address handler_address = NULL; - Handle exception(current, current->exception_oop()); + Handle exception(current, WispThread::is_current_death_pending(current) ? + Universe::wisp_thread_death_exception() : current->exception_oop()); address pc = current->exception_pc(); // Clear out the exception oop and pc since looking up an diff --git a/src/hotspot/share/prims/unsafe.cpp b/src/hotspot/share/prims/unsafe.cpp index 847eb332ec7..e039779c4b4 100644 --- a/src/hotspot/share/prims/unsafe.cpp +++ b/src/hotspot/share/prims/unsafe.cpp @@ -993,7 +993,7 @@ JVM_ENTRY(jboolean, CoroutineSupport_stealCoroutine(JNIEnv* env, jclass klass, j // The lock will also block coroutine switch operation, // so we must finish the steal operation as soon as possible. Coroutine* coro = (Coroutine*) coroPtr; - if (coro == NULL || coro->enable_steal_count() != coro->java_call_counter()) { + if (coro == NULL || coro->enable_steal_count() != coro->java_call_counter() || coro->is_yielding()) { return false; // an Exception throws and the coroutine being stealed is exited } assert(coro->thread() != thread, "steal from self"); @@ -1022,7 +1022,7 @@ JVM_ENTRY (jboolean, CoroutineSupport_shouldThrowException0(JNIEnv* env, jclass assert(EnableCoroutine, "pre-condition"); Coroutine* coro = (Coroutine*)coroPtr; assert(coro == thread->current_coroutine(), "invariant"); - return !coro->is_yielding() && coro->clinit_call_count() == 0; + return coro->clinit_call_count() == 0; JVM_END JVM_ENTRY (void, CoroutineSupport_printlnLockFree(JNIEnv* env, jclass klass, jstring info)) diff --git a/src/hotspot/share/runtime/arguments.cpp b/src/hotspot/share/runtime/arguments.cpp index 21fdb44f564..56a332a73d5 100644 --- a/src/hotspot/share/runtime/arguments.cpp +++ b/src/hotspot/share/runtime/arguments.cpp @@ -3985,6 +3985,10 @@ jint Arguments::parse(const JavaVMInitArgs* initial_cmd_args) { } } + if (Wisp2ThreadStop && !UseWisp2) { + vm_exit_during_initialization("Wisp2ThreadStop only works with UseWisp2"); + } + // Set object alignment values. set_object_alignment(); diff --git a/src/hotspot/share/runtime/coroutine.cpp b/src/hotspot/share/runtime/coroutine.cpp index fe8f3fa498d..55a5f4bd06b 100644 --- a/src/hotspot/share/runtime/coroutine.cpp +++ b/src/hotspot/share/runtime/coroutine.cpp @@ -737,7 +737,10 @@ void WispThread::park(long millis, const ObjectWaiter* ow) { // the runtime can not handle the exception on monitorenter bci // we need clear it to prevent jvm crash if (jt->has_pending_exception()) { - jt->clear_pending_exception(); + if (EnableCoroutine && Wisp2ThreadStop && jt->pending_exception()->klass() == vmClasses::ThreadDeath_klass()) { + jt->set_pending_async_exception(jt->pending_exception()); + } + jt->clear_pending_exception(); } ThreadStateTransition::transition(jt, _thread_in_vm, _thread_blocked); @@ -783,6 +786,10 @@ void WispThread::unpark(int task_id, bool using_wisp_park, bool proxy_unpark, Pa // due to the fact that we modify the priority of Wisp_lock from `non-leaf` to `special`, // so we'd use `MutexLocker` and `_no_safepoint_check_flag` to make our program run + // We don't want to yield a safepoint here, so we use the `special` rank to prevent it: + // In UnlockNode, we will call java in Wisp. We can't yield a safepoint that may cause + // deoptimization, which is very fatal for monitors. + NoSafepointVerifier nsv; MutexLocker mu(Wisp_lock, Mutex::_no_safepoint_check_flag); wisp_thread->_unpark_status = WispThread::_proxy_unpark_begin; _proxy_unpark->append(task_id); @@ -843,6 +850,7 @@ int WispThread::get_proxy_unpark(jintArray res) { NoSafepointVerifier nsv; MutexLocker mu(Wisp_lock, Mutex::_no_safepoint_check_flag); while (_proxy_unpark == NULL || _proxy_unpark->is_empty()) { + // we need to use _no_safepoint_check_flag, which won't yield a safepoint. Wisp_lock->wait_without_safepoint_check(); } typeArrayOop a = typeArrayOop(JNIHandles::resolve_non_null(res)); @@ -1002,6 +1010,9 @@ void Coroutine::after_safepoint(JavaThread* thread) { "Only SOF/OOM/ThreadDeath happens here"); // If it's a SOF / OOM / ThreadDeath exception, we'd clear it // because polling page stub shouldn't have a pending exception. + if (UseWisp2 && Wisp2ThreadStop && thread->pending_exception()->klass() == vmClasses::ThreadDeath_klass()) { + thread->set_pending_async_exception(thread->pending_exception()); + } thread->clear_pending_exception(); } @@ -1157,10 +1168,9 @@ WispPostStealHandleUpdateMark::WispPostStealHandleUpdateMark(JavaThread *& th1, WispPostStealHandle h2(&tiva); } -WispPostStealHandleUpdateMark::WispPostStealHandleUpdateMark(HandleMarkCleaner & hmc) +WispPostStealHandleUpdateMark::WispPostStealHandleUpdateMark(JavaThread *thread, HandleMarkCleaner & hmc) { - assert(hmc.thread_ref()->is_Java_thread(), "sanity"); - initialize((JavaThread*)hmc.thread_ref()); + initialize(thread); if (!_success) return; WispPostStealHandle h(&hmc); diff --git a/src/hotspot/share/runtime/coroutine.hpp b/src/hotspot/share/runtime/coroutine.hpp index fc88e9049e0..f9782a03722 100644 --- a/src/hotspot/share/runtime/coroutine.hpp +++ b/src/hotspot/share/runtime/coroutine.hpp @@ -433,6 +433,20 @@ class WispThread: public JavaThread { static void set_wisp_booted(JavaThread* thread); static const char *print_os_park_reason(int reason); static const char *print_blocking_status(int status); + static const bool is_current_death_pending(JavaThread *thread) { + if (EnableCoroutine && Wisp2ThreadStop) { + if (thread->current_coroutine() == NULL) { + // Main Thread + return false; + } + if (thread->current_coroutine()->wisp_task() == NULL) { + // Blacklisted threads that are not converted to couroutines + return false; + } + return com_alibaba_wisp_engine_WispTask::get_shutdownPending(thread->current_coroutine()->wisp_task()); + } + return false; + } virtual bool is_Wisp_thread() const { return true; } @@ -661,7 +675,7 @@ class WispPostStealHandleUpdateMark: public StackObj { ThreadInVMfromJava & tiva, HandleMarkCleaner & hmc); WispPostStealHandleUpdateMark(JavaThread *& th1, Thread *& th2, // constructor for other monitorenters ThreadInVMfromJava & tiva); - WispPostStealHandleUpdateMark(HandleMarkCleaner & hmc); + WispPostStealHandleUpdateMark(JavaThread *thread, HandleMarkCleaner & hmc); WispPostStealHandleUpdateMark(JavaThread *thread, ThreadBlockInVM & tbv); // constructor is used inside objectMonitor call, which is also within EnableStealMark scope. WispPostStealHandleUpdateMark(JavaThread *thread, ThreadStateTransition & tst); WispPostStealHandleUpdateMark(JavaThread *& th); // this is a special one, used for a fix inside EnableStealMark diff --git a/src/hotspot/share/runtime/globals.hpp b/src/hotspot/share/runtime/globals.hpp index 2eaa00c9fcc..4511d5d3c07 100644 --- a/src/hotspot/share/runtime/globals.hpp +++ b/src/hotspot/share/runtime/globals.hpp @@ -2100,6 +2100,9 @@ const intx ObjectAlignmentInBytes = 8; product(bool, UseWisp2, false, \ "Enable Wisp2") \ \ + product(bool, Wisp2ThreadStop, false, \ + "ThreadDeath cannot be catched") \ + \ product(bool, PrintThreadCoroutineInfo, false, MANAGEABLE, \ "print the park/unpark information for thread coroutine") \ \ diff --git a/src/hotspot/share/runtime/safepoint.cpp b/src/hotspot/share/runtime/safepoint.cpp index baaa7ddca4a..07cfa9d55e5 100644 --- a/src/hotspot/share/runtime/safepoint.cpp +++ b/src/hotspot/share/runtime/safepoint.cpp @@ -747,6 +747,10 @@ void SafepointSynchronize::block(JavaThread *thread) { guarantee(thread->safepoint_state()->get_safepoint_id() == InactiveSafepointCounter, "The safepoint id should be set only in block path"); + if (EnableCoroutine) { + Coroutine::after_safepoint(thread); + } + // cross_modify_fence is done by SafepointMechanism::process_if_requested // which is the only caller here. } diff --git a/src/hotspot/share/runtime/sharedRuntime.cpp b/src/hotspot/share/runtime/sharedRuntime.cpp index 5ef980d160e..b2a63fdffbf 100644 --- a/src/hotspot/share/runtime/sharedRuntime.cpp +++ b/src/hotspot/share/runtime/sharedRuntime.cpp @@ -2148,7 +2148,7 @@ void SharedRuntime::monitor_enter_helper(oopDesc* obj, BasicLock* lock, JavaThre // Handles the uncommon case in locking, i.e., contention or an inflated lock. JRT_BLOCK_ENTRY(void, SharedRuntime::complete_monitor_locking_C(oopDesc* obj, BasicLock* lock, JavaThread* current)) - WispPostStealHandleUpdateMark w(__hm); + WispPostStealHandleUpdateMark w(current, __hm); SharedRuntime::monitor_enter_helper(obj, lock, current); JRT_END diff --git a/src/java.base/share/classes/com/alibaba/rcm/ResourceContainer.java b/src/java.base/share/classes/com/alibaba/rcm/ResourceContainer.java index 92920ea90a8..7fcd8402f7b 100644 --- a/src/java.base/share/classes/com/alibaba/rcm/ResourceContainer.java +++ b/src/java.base/share/classes/com/alibaba/rcm/ResourceContainer.java @@ -185,4 +185,8 @@ static ResourceContainer current() { * Then the container state will become {@link State#DEAD}. */ void destroy(); + + Long getId(); + + Long getConsumedAmount(ResourceType resourceType); } diff --git a/src/java.base/share/classes/com/alibaba/rcm/ResourceContainerMonitor.java b/src/java.base/share/classes/com/alibaba/rcm/ResourceContainerMonitor.java new file mode 100644 index 00000000000..d6ccdcfb596 --- /dev/null +++ b/src/java.base/share/classes/com/alibaba/rcm/ResourceContainerMonitor.java @@ -0,0 +1,52 @@ +package com.alibaba.rcm; + +import com.alibaba.rcm.Constraint; +import com.alibaba.rcm.ResourceContainer; +import com.alibaba.rcm.ResourceType; +import com.alibaba.rcm.internal.AbstractResourceContainer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class ResourceContainerMonitor { + private static Map tenantContainerMap = new ConcurrentHashMap<>(); + private static AtomicLong idGen = new AtomicLong(0); + + public ResourceContainerMonitor() { + } + + public static long register(ResourceContainer resourceContainer) { + long id = idGen.getAndIncrement(); + tenantContainerMap.put(id, resourceContainer); + return id; + } + + public static void deregister(long id) { + tenantContainerMap.remove(id); + } + + public static ResourceContainer getContainerById(long id) { + return tenantContainerMap.get(id); + } + + public static List getAllContainerIds() { + return new ArrayList<>(tenantContainerMap.keySet()); + } + + public static List getConstraintsById(long id) { + AbstractResourceContainer resourceContainer = (AbstractResourceContainer) tenantContainerMap.get(id); + return StreamSupport + .stream(resourceContainer.getConstraints().spliterator(), false) + .collect(Collectors.toList()); + } + + public long getContainerConsumedAmount(long id) { + return 0; + } + +} diff --git a/src/java.base/share/classes/com/alibaba/rcm/internal/AbstractResourceContainer.java b/src/java.base/share/classes/com/alibaba/rcm/internal/AbstractResourceContainer.java index 9c7e5da7d0b..25eeaada6c6 100644 --- a/src/java.base/share/classes/com/alibaba/rcm/internal/AbstractResourceContainer.java +++ b/src/java.base/share/classes/com/alibaba/rcm/internal/AbstractResourceContainer.java @@ -2,11 +2,14 @@ import com.alibaba.rcm.Constraint; import com.alibaba.rcm.ResourceContainer; +import com.alibaba.rcm.ResourceContainerMonitor; +import com.alibaba.rcm.ResourceType; import jdk.internal.access.RCMAccesss; import jdk.internal.misc.VM; import jdk.internal.access.SharedSecrets; import java.util.Collections; +import java.util.List; import java.util.function.Predicate; /** @@ -43,6 +46,12 @@ public Predicate getResourceContainerInheritancePredicate(ResourceContai protected final static AbstractResourceContainer ROOT = new RootContainer(); private Predicate threadInherited = DEFAULT_PREDICATE; + final long id; + + protected AbstractResourceContainer() { + id = ResourceContainerMonitor.register(this); + } + public static AbstractResourceContainer root() { return ROOT; @@ -57,6 +66,12 @@ public static AbstractResourceContainer current() { return SharedSecrets.getJavaLangAccess().getResourceContainer(Thread.currentThread()); } + public abstract List getActiveContainerThreadIds(); + + public abstract Long getConsumedAmount(ResourceType resourceType); + + public abstract Long getResourceLimitReachedCount(ResourceType resourceType); + @Override public void run(Runnable command) { if (getState() != State.RUNNING) { @@ -79,6 +94,12 @@ public void run(Runnable command) { } } + + @Override + public Long getId() { + return id; + } + /** * Attach to this resource container. * Ensure {@link ResourceContainer#current()} as a root container @@ -155,5 +176,22 @@ public Iterable getConstraints() { public void destroy() { throw new UnsupportedOperationException("destroy() is not supported by root container"); } + + @Override + public List getActiveContainerThreadIds() { + // root resource container is not monitored + return Collections.emptyList(); + } + + @Override + public Long getConsumedAmount(ResourceType resourceType) { + return 0L; + } + + @Override + public Long getResourceLimitReachedCount(ResourceType resourceType) { + return 0L; + } + } } diff --git a/src/java.base/share/classes/com/alibaba/rcm/internal/RCMUnsafe.java b/src/java.base/share/classes/com/alibaba/rcm/internal/RCMUnsafe.java index 13743d2e1e5..d3631ed4eef 100644 --- a/src/java.base/share/classes/com/alibaba/rcm/internal/RCMUnsafe.java +++ b/src/java.base/share/classes/com/alibaba/rcm/internal/RCMUnsafe.java @@ -2,6 +2,7 @@ import com.alibaba.rcm.ResourceContainer; +import com.alibaba.rcm.ResourceContainerMonitor; import java.util.Objects; import java.util.function.Predicate; @@ -40,5 +41,14 @@ public static void killThreads(ResourceContainer resourceContainer) { assert resourceContainer instanceof AbstractResourceContainer; Objects.requireNonNull(resourceContainer); ((AbstractResourceContainer) resourceContainer).killThreads(); + ResourceContainerMonitor.deregister(resourceContainer.getId()); + } + + + public static void attach(ResourceContainer container) { + if (container != ResourceContainer.root()) + ((AbstractResourceContainer) container).attach(); + else + ((AbstractResourceContainer)ResourceContainer.current()).detach(); } } \ No newline at end of file diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispCarrier.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispCarrier.java index d77b5ed6517..a786f361b42 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispCarrier.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispCarrier.java @@ -125,7 +125,7 @@ final WispTask runTaskInternal(Runnable target, String name, Thread thread, Clas } finally { isInCritical = isInCritical0; } - yieldTo(wispTask); + yieldTo(wispTask, false); runWispTaskEpilog(); return wispTask; @@ -136,14 +136,20 @@ final WispTask runTaskInternal(Runnable target, String name, Thread thread, Clas * WispTask must call {@code taskExit()} to exit safely. */ void taskExit() { // and exit - current.status = WispTask.Status.ZOMBIE; TASK_COUNT_UPDATER.decrementAndGet(engine); current.countExecutionTime(switchTimestamp); switchTimestamp = 0; + current.setEpollArray(0); - unregisterEvent(); - returnTaskToCache(current); + boolean cached = !current.shutdownPending && returnTaskToCache(current); + TASK_COUNT_UPDATER.decrementAndGet(engine); + if (cached) { + current.status = WispTask.Status.CACHED; + } else { + current.status = WispTask.Status.DEAD; + WispTask.cleanExitedTask(current); + } // reset threadWrapper after call returnTaskToCache, // since the threadWrapper will be used in Thread.currentThread() @@ -153,7 +159,7 @@ void taskExit() { // and exit // In Tenant killing process, we have an pending exception, // WispTask.Coroutine's loop will be breaked // invoke an explicit reschedule instead of return - schedule(); + schedule(!cached); } /** @@ -182,16 +188,22 @@ private WispTask getTaskFromCache() { } /** - * return task back to global cache + * cache task back to global or local cache and return true, if beyond the capacity of + * cache will return false. */ - private void returnTaskToCache(WispTask task) { + private boolean returnTaskToCache(WispTask task) { // reuse exited wispTasks from shutdown wispEngine is very tricky, so we'd better not return // these tasks to global cache if (taskCache.size() > WispConfiguration.WISP_ENGINE_TASK_CACHE_SIZE && !engine.hasBeenShutdown) { - engine.groupTaskCache.add(task); + if (engine.groupTaskCache.size() > WispConfiguration.WISP_ENGINE_TASK_GLOBAL_CACHE_SIZE) { + return false; + } else { + engine.groupTaskCache.add(task); + } } else { taskCache.add(task); } + return true; } /** @@ -214,7 +226,7 @@ void destroy() { * Block current coroutine and do scheduling. * Typically called when resource is not ready. */ - final void schedule() { + final void schedule(boolean terminal) { assert WispCarrier.current() == this; WispTask current = this.current; current.countExecutionTime(switchTimestamp); @@ -224,21 +236,27 @@ final void schedule() { current.controlGroup.calcCpuTicks(current); } current.resumeEntry.setStealEnable(true); - yieldTo(threadTask); // letting the scheduler choose runnable task + yieldTo(threadTask, terminal); // letting the scheduler choose runnable task current.carrier.checkAndDispatchShutdown(); } - private void checkAndDispatchShutdown() { + void checkAndDispatchShutdown() { + assert WispCarrier.current() == this; WispTask current = WispCarrier.current().getCurrentTask(); - if ((engine.hasBeenShutdown - || (current.inDestoryedGroup() && current.inheritedFromNonRootContainer())) - && !WispTask.SHUTDOWN_TASK_NAME.equals(current.getName()) - && current.isAlive() + if (shutdownPending(current) && CoroutineSupport.checkAndThrowException(current.ctx)) { + current.shutdownPending = true; throw new ThreadDeath(); } } + boolean shutdownPending(WispTask current) { + return (engine.hasBeenShutdown + || (current.inDestoryedGroup() && current.inheritedFromNonRootContainer())) + && !WispTask.SHUTDOWN_TASK_NAME.equals(current.getName()) + && current.isAlive(); + } + /** * Wake up a {@link WispTask} that belongs to this carrier * @@ -291,11 +309,12 @@ public void run() { if (task.controlGroup != null) { long res = task.controlGroup.checkCpuLimit(task, false); if (res != 0) { + task.controlGroup.cpuLimitationReached++; current.resumeLater(System.nanoTime() + res, task); return; } } - current.yieldTo(task); + current.yieldTo(task, false); current.runWispTaskEpilog(); } @@ -350,7 +369,7 @@ private Coroutine.StealResult steal(WispTask task) { * * @param task coroutine to run */ - private boolean yieldTo(WispTask task) { + private boolean yieldTo(WispTask task, boolean terminal) { assert task != null; assert WispCarrier.current() == this; assert task.carrier == this; @@ -358,7 +377,7 @@ private boolean yieldTo(WispTask task) { schedTick++; - if (task.status == WispTask.Status.ZOMBIE) { + if (task.status != WispTask.Status.ALIVE) { unregisterEvent(task); return false; } @@ -368,7 +387,7 @@ private boolean yieldTo(WispTask task) { counter.incrementSwitchCount(); switchTimestamp = WispEngine.getNanoTime(); assert !isInCritical; - WispTask.switchTo(from, task); + WispTask.switchTo(from, task, terminal); // Since carrier is changed with stealing, we shouldn't directly access carrier's member any more. assert WispCarrier.current().current == from; assert !from.carrier.isInCritical; @@ -397,9 +416,9 @@ void yield() { assert yieldingTask == null; yieldingTask = current; // delay it, make sure wakeupTask is called after yield out - schedule(); + schedule(false); } - current.carrier.checkAndDispatchShutdown(); + WispCarrier.current().checkAndDispatchShutdown(); } else { WispEngine.JLA.yield0(); } diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispConfiguration.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispConfiguration.java index c64cc6fbc1f..c1235434696 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispConfiguration.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispConfiguration.java @@ -37,9 +37,11 @@ class WispConfiguration { static final boolean WISP_HIGH_PRECISION_TIMER; static final boolean WISP_USE_STEAL_LOCK; static final int WISP_ENGINE_TASK_CACHE_SIZE; + static final int WISP_ENGINE_TASK_GLOBAL_CACHE_SIZE; static final int WISP_SCHEDULE_STEAL_RETRY; static final int WISP_SCHEDULE_PUSH_RETRY; static final int WISP_SCHEDULE_HELP_STEAL_RETRY; + static final int WISP_SHUTDOWN_SLEEP_TIME; static final WispScheduler.SchedulingPolicy SCHEDULING_POLICY; static final boolean USE_DIRECT_SELECTOR_WAKEUP; static final boolean CARRIER_AS_POLLER; @@ -48,6 +50,9 @@ class WispConfiguration { // io static final boolean WISP_ENABLE_SOCKET_LOCK; + static final boolean WISP_ENABLE_ASYNC_FILE_IO; + static final int WISP_FILE_IO_WORKER_CORE; + static final int WISP_FILE_IO_WORKER_MAX; // wisp control group static final int WISP_CONTROL_GROUP_CFS_PERIOD; @@ -74,6 +79,8 @@ public Properties run() { PARK_ONE_MS_AT_LEAST = parseBooleanParameter(p, "com.alibaba.wisp.parkOneMs", true); WORKER_COUNT = parsePositiveIntegerParameter(p, "com.alibaba.wisp.carrierEngines", Runtime.getRuntime().availableProcessors()); + WISP_SHUTDOWN_SLEEP_TIME = parsePositiveIntegerParameter(p, "com.alibaba.wisp.shutdownSleepTime", + 100); POLLER_SHARDING_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.pollerShardingSize", 8); ENABLE_HANDOFF = parseBooleanParameter(p, "com.alibaba.wisp.enableHandOff", TRANSPARENT_WISP_SWITCH); @@ -99,6 +106,7 @@ public Properties run() { WISP_HIGH_PRECISION_TIMER = parseBooleanParameter(p, "com.alibaba.wisp.highPrecisionTimer", false); WISP_USE_STEAL_LOCK = parseBooleanParameter(p, "com.alibaba.wisp.useStealLock", true); WISP_ENGINE_TASK_CACHE_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.engineTaskCache", 20); + WISP_ENGINE_TASK_GLOBAL_CACHE_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.engineTaskGlobalCache", WORKER_COUNT * 10); WISP_SCHEDULE_STEAL_RETRY = parsePositiveIntegerParameter(p, "com.alibaba.wisp.schedule.stealRetry", Math.max(1, WORKER_COUNT / 2)); WISP_SCHEDULE_PUSH_RETRY = parsePositiveIntegerParameter(p, "com.alibaba.wisp.schedule.pushRetry", WORKER_COUNT); WISP_SCHEDULE_HELP_STEAL_RETRY = parsePositiveIntegerParameter(p, "com.alibaba.wisp.schedule.helpStealRetry", Math.max(1, WORKER_COUNT / 4)); @@ -111,6 +119,9 @@ public Properties run() { // WISP_CONTROL_GROUP_CFS_PERIOD default value is 0(Us), WispControlGroup will estimate a cfs period according to SYSMON_TICK_US. // If WISP_CONTROL_GROUP_CFS_PERIOD was configed by user, WispControlGroup will adopt it directly and won't estimate. WISP_CONTROL_GROUP_CFS_PERIOD = parsePositiveIntegerParameter(p, "com.alibaba.wisp.controlGroup.cfsPeriod", 0); + WISP_ENABLE_ASYNC_FILE_IO = parseBooleanParameter(p, "com.alibaba.wisp.enableAsyncFileIO", false); + WISP_FILE_IO_WORKER_CORE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.fileIOCoreWorkerCnt", WORKER_COUNT); + WISP_FILE_IO_WORKER_MAX = parsePositiveIntegerParameter(p, "com.alibaba.wisp.fileIOMaxWorkerCnt", WORKER_COUNT); checkCompatibility(); } diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispControlGroup.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispControlGroup.java index a2fa198d709..78045bfd69e 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispControlGroup.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispControlGroup.java @@ -5,6 +5,7 @@ import com.alibaba.rcm.ResourceType; import com.alibaba.rcm.internal.AbstractResourceContainer; +import java.util.ArrayList; import java.util.List; import java.util.Collections; import java.util.Objects; @@ -31,6 +32,9 @@ class WispControlGroup extends AbstractExecutorService { private static final int MAX_PERIOD = 100_000; // limit min period duration 10ms. private static final int MIN_PERIOD = 10_000; + + private long totalConsume = 0; + long cpuLimitationReached = 0; /** * ESTIMATED_PERIOD is an estimated cpu_cfs period according to wisp preemptive * schedule period, make sure there are at least SCHEDULE_TIMES wisp schedule @@ -143,6 +147,7 @@ void calcCpuTicks(WispTask task) { long usage = System.nanoTime() - task.enterTs; remainQuota.addAndGet(-usage); task.enterTs = 0; + totalConsume += usage; } private void attach() { @@ -289,6 +294,33 @@ protected void killThreads() { } } } + + @Override + public List getActiveContainerThreadIds() { + List threadIdList = new ArrayList<>(); + for (WispTask task : WispTask.id2Task.values()) { + if (task.isAlive() + && task.getThreadWrapper() != null + && task.controlGroup == WispControlGroup.this) { + threadIdList.add(task.getThreadWrapper().getId()); + } + } + return threadIdList; + } + + @Override + public Long getConsumedAmount(ResourceType resourceType) { + if (resourceType != ResourceType.CPU_PERCENT) + return 0L; + return totalConsume; + } + + @Override + public Long getResourceLimitReachedCount(ResourceType resourceType) { + if (resourceType != ResourceType.CPU_PERCENT) + return 0L; + return cpuLimitationReached; + } }; } } diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispEngine.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispEngine.java index 438dbf53ed9..211e94c95bc 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispEngine.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispEngine.java @@ -113,6 +113,9 @@ public int compare(Thread o1, Thread o2) { initializeClasses(); JLA.wispBooted(); } + if (WispConfiguration.WISP_ENABLE_ASYNC_FILE_IO) { + WispFileIO.initializeWispFileIOClass(); + } } private static void initializeClasses() { @@ -194,6 +197,9 @@ public void run() { if (WispConfiguration.WISP_PROFILE_LOG_ENABLED) { WispPerfCounterMonitor.INSTANCE.startDaemon(); } + if (WispConfiguration.WISP_ENABLE_ASYNC_FILE_IO) { + WispFileIO.startWispFileIODaemon(); + } } } @@ -497,7 +503,7 @@ public void run() { } } // wait tasks to exit on fixed frequency instead of polling - WispTask.jdkPark(TimeUnit.MILLISECONDS.toNanos(1)); + WispTask.jdkPark(TimeUnit.MILLISECONDS.toNanos(WispConfiguration.WISP_SHUTDOWN_SLEEP_TIME)); } while (!tasks.isEmpty()); finishShutdown(); } @@ -563,15 +569,16 @@ private List getRunningTasks(WispControlGroup group) { && task.carrier.engine == WispEngine.this && !task.isThreadTask() && !task.getName().equals(WispTask.SHUTDOWN_TASK_NAME) - && (group == null - || task.inDestoryedGroup() && task.inheritedFromNonRootContainer())) { + && (group == null || (task.inDestoryedGroup() && task.controlGroup == group))) { runningTasks.add(task); } } - return runningTasks; + } catch (Exception e) { + e.printStackTrace(); } finally { carrier.isInCritical = isInCritical0; } + return runningTasks; } @Override diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispFileIO.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispFileIO.java new file mode 100644 index 00000000000..c295c16ec0b --- /dev/null +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispFileIO.java @@ -0,0 +1,111 @@ +package com.alibaba.wisp.engine; + +import java.io.IOException; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import jdk.internal.access.SharedSecrets; +import jdk.internal.access.WispFileSyncIOAccess; + + +enum WispFileIO { + INSTANCE; + + private static volatile boolean wispFileWorkerStarted = false; + + static void setWispFileSyncIOIOAccess() { + if (SharedSecrets.getWispFileSyncIOAccess() == null) { + SharedSecrets.setWispFileSyncIOAccess(new WispFileSyncIOAccess() { + @Override + public boolean usingAsyncFileIO() { + return wispFileWorkerStarted + && WispEngine.runningAsCoroutine(Thread.currentThread()); + } + + @Override + public T executeAsAsyncFileIO(Callable command) throws IOException { + return WispFileIO.INSTANCE.invokeIOTask(command); + } + }); + } + } + + /* + * Initialize the WispFileIO class, called after System.initializeSystemClass by VM. + **/ + static void initializeWispFileIOClass() { + try { + Class.forName(WispFileIO.class.getName()); + } catch (Exception e) { + throw new ExceptionInInitializerError(e); + } + } + + static void startWispFileIODaemon() { + WispFileIO.INSTANCE.startDaemon(WispEngine.daemonThreadGroup); + setWispFileSyncIOIOAccess(); + } + + + private ExecutorService executor; + + private ThreadGroup threadGroup; + + void startDaemon(ThreadGroup g) { + threadGroup = g; + this.executor = new ThreadPoolExecutor(WispConfiguration.WISP_FILE_IO_WORKER_CORE, + WispConfiguration.WISP_FILE_IO_WORKER_MAX, Long.MAX_VALUE, + TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new WispFileIOThreadPoolFactory()); + wispFileWorkerStarted = true; + } + + public T invokeIOTask(Callable command) throws IOException { + Future future = submitIOTask(command); + T result; + boolean interrupted = false; + try { + while (true) { + try { + result = future.get(); + return result; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else { + throw new Error(e); + } + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + Future submitIOTask(Callable command) { + return executor.submit(command); + } + + private class WispFileIOThreadPoolFactory implements ThreadFactory { + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final static String namePrefix = "Wisp-FIO-worker-thread-"; + + WispFileIOThreadPoolFactory() { + } + + @Override + public Thread newThread(Runnable r) { + assert threadGroup != null; + Thread t = new Thread(threadGroup, r, namePrefix + threadNumber.getAndIncrement()); + t.setDaemon(true); + return t; + } + } +} diff --git a/src/java.base/share/classes/com/alibaba/wisp/engine/WispTask.java b/src/java.base/share/classes/com/alibaba/wisp/engine/WispTask.java index 0d26d8281e1..32872e084fa 100644 --- a/src/java.base/share/classes/com/alibaba/wisp/engine/WispTask.java +++ b/src/java.base/share/classes/com/alibaba/wisp/engine/WispTask.java @@ -86,7 +86,8 @@ static void trackTask(WispTask task) { enum Status { ALIVE, // ALIVE - ZOMBIE // exited + CACHED, // exited + DEAD // quited and never be used } private Runnable runnable; // runnable for created task @@ -138,6 +139,8 @@ enum Status { WispControlGroup controlGroup; long enterTs; + boolean shutdownPending; + WispTask(WispCarrier carrier, Coroutine ctx, boolean isRealTask, boolean isThreadTask) { this.isThreadTask = isThreadTask; this.id = isRealTask ? idGenerator.addAndGet(1) : -1; @@ -239,7 +242,7 @@ protected void run() { carrier.taskExit(); } } else { - carrier.schedule(); + carrier.schedule(false); } } } @@ -274,7 +277,7 @@ static Runnable wrapRunOutsideWisp(Runnable runnable) { *

* {@link #stealLock} is used in {@link WispCarrier#steal(WispTask)} . */ - static void switchTo(WispTask current, WispTask next) { + static void switchTo(WispTask current, WispTask next, boolean terminal) { assert next.ctx != null; assert WispCarrier.current() == current.carrier; assert current.carrier == next.carrier; @@ -285,7 +288,13 @@ static void switchTo(WispTask current, WispTask next) { STEAL_LOCK_UPDATER.lazySet(next, 1); // store load barrier is not necessary } - current.carrier.thread.getCoroutineSupport().unsafeSymmetricYieldTo(next.ctx); + if (terminal == true) { + current.carrier.thread.getCoroutineSupport().terminateCoroutine(next.ctx); + // should never run here. + assert false: "should not reach here"; + } else { + current.carrier.thread.getCoroutineSupport().unsafeSymmetricYieldTo(next.ctx); + } if (WispConfiguration.WISP_USE_STEAL_LOCK) { assert current.stealLock != 0; STEAL_LOCK_UPDATER.lazySet(current.from, 0); @@ -339,12 +348,8 @@ public String getName() { static final String SHUTDOWN_TASK_NAME = "SHUTDOWN_TASK"; - boolean isRunnable() { - return status == Status.ALIVE; - } - boolean isAlive() { - return status != Status.ZOMBIE; + return status == Status.ALIVE; } /** @@ -382,7 +387,7 @@ private void parkInternal(long timeoutNano, boolean fromJvm) { try { if (WispEngine.runningAsCoroutine(threadWrapper)) { setParkTime(); - carrier.schedule(); + carrier.schedule(false); } else { UA.park0(false, timeoutNano < 0 ? 0 : timeoutNano); } diff --git a/src/java.base/share/classes/java/dyn/CoroutineBase.java b/src/java.base/share/classes/java/dyn/CoroutineBase.java index 22e6d4ebea3..c27c9414bd4 100644 --- a/src/java.base/share/classes/java/dyn/CoroutineBase.java +++ b/src/java.base/share/classes/java/dyn/CoroutineBase.java @@ -81,7 +81,7 @@ private final void startInternal() { // threadSupport is fixed by steal() threadSupport.beforeResume(this); - threadSupport.terminateCoroutine(); + threadSupport.terminateCoroutine(null); } assert threadSupport.getThread() == SharedSecrets.getJavaLangAccess().currentThread0(); } diff --git a/src/java.base/share/classes/java/dyn/CoroutineSupport.java b/src/java.base/share/classes/java/dyn/CoroutineSupport.java index c286db5e980..1855ccd68b2 100644 --- a/src/java.base/share/classes/java/dyn/CoroutineSupport.java +++ b/src/java.base/share/classes/java/dyn/CoroutineSupport.java @@ -234,18 +234,24 @@ void symmetricExitInternal(Coroutine coroutine) { /** * terminate current coroutine and yield forward + * @param target target */ - void terminateCoroutine() { + public void terminateCoroutine(Coroutine target) { assert currentCoroutine != threadCoroutine : "cannot exit thread coroutine"; - assert currentCoroutine != getNextCoroutine(currentCoroutine.nativeCoroutine) : "last coroutine shouldn't call coroutineexit"; lock(); Coroutine old = currentCoroutine; - Coroutine forward = getNextCoroutine(old.nativeCoroutine); + Coroutine forward = target; + if (forward == null) { + forward = getNextCoroutine(old.nativeCoroutine); + } + assert forward == threadCoroutine : "switch to target must be thread coroutine"; currentCoroutine = forward; - unlockLater(forward); switchToAndTerminate(old, forward); + + // should never run here. + assert false; } /** diff --git a/src/java.base/share/classes/java/io/FileInputStream.java b/src/java.base/share/classes/java/io/FileInputStream.java index 1d078142512..e66ca54c393 100644 --- a/src/java.base/share/classes/java/io/FileInputStream.java +++ b/src/java.base/share/classes/java/io/FileInputStream.java @@ -27,6 +27,7 @@ import java.nio.channels.FileChannel; import java.util.Arrays; +import jdk.internal.access.SharedSecrets; import jdk.internal.util.ArraysSupport; import sun.nio.ch.FileChannelImpl; @@ -225,7 +226,11 @@ private void open(String name) throws FileNotFoundException { * @throws IOException if an I/O error occurs. */ public int read() throws IOException { - return read0(); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> read0()); + } else { + return read0(); + } } private native int read0() throws IOException; @@ -237,7 +242,15 @@ public int read() throws IOException { * @param len the number of bytes that are written * @throws IOException If an I/O error has occurred. */ - private native int readBytes(byte b[], int off, int len) throws IOException; + private native int readBytes0(byte b[], int off, int len) throws IOException; + + private int readBytes(byte b[], int off, int len) throws IOException { + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> readBytes0(b, off, len)); + } else { + return readBytes0(b, off, len); + } + } /** * Reads up to {@code b.length} bytes of data from this input diff --git a/src/java.base/share/classes/java/io/FileOutputStream.java b/src/java.base/share/classes/java/io/FileOutputStream.java index 2e71b2bf700..394e6c31201 100644 --- a/src/java.base/share/classes/java/io/FileOutputStream.java +++ b/src/java.base/share/classes/java/io/FileOutputStream.java @@ -26,6 +26,7 @@ package java.io; import java.nio.channels.FileChannel; + import jdk.internal.access.SharedSecrets; import jdk.internal.access.JavaIOFileDescriptorAccess; import sun.nio.ch.FileChannelImpl; @@ -322,9 +323,20 @@ public void write(int b) throws IOException { * end of file * @throws IOException If an I/O error has occurred. */ - private native void writeBytes(byte b[], int off, int len, boolean append) + private native void writeBytes0(byte b[], int off, int len, boolean append) throws IOException; + private void writeBytes(byte b[], int off, int len, boolean append) throws IOException { + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> { + writeBytes0(b, off, len, append); + return 0; + }); + } else { + writeBytes0(b, off, len, append); + } + } + /** * Writes {@code b.length} bytes from the specified byte array * to this file output stream. diff --git a/src/java.base/share/classes/java/io/RandomAccessFile.java b/src/java.base/share/classes/java/io/RandomAccessFile.java index 5443018ca8b..2d9affd319f 100644 --- a/src/java.base/share/classes/java/io/RandomAccessFile.java +++ b/src/java.base/share/classes/java/io/RandomAccessFile.java @@ -26,6 +26,8 @@ package java.io; import java.nio.channels.FileChannel; +import java.util.concurrent.Callable; + import jdk.internal.access.JavaIORandomAccessFileAccess; import jdk.internal.access.SharedSecrets; @@ -362,7 +364,11 @@ private void open(String name, int mode) * end-of-file has been reached. */ public int read() throws IOException { - return read0(); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> read0()); + } else { + return read0(); + } } private native int read0() throws IOException; @@ -374,7 +380,15 @@ public int read() throws IOException { * @param len the number of bytes to read. * @throws IOException If an I/O error has occurred. */ - private native int readBytes(byte b[], int off, int len) throws IOException; + private native int readBytes0(byte b[], int off, int len) throws IOException; + + private int readBytes(byte b[], int off, int len) throws IOException { + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> readBytes0(b, off ,len)); + } else { + return readBytes0(b, off, len); + } + } /** * Reads up to {@code len} bytes of data from this file into an @@ -532,8 +546,18 @@ public void write(int b) throws IOException { * @param len the number of bytes that are written * @throws IOException If an I/O error has occurred. */ - private native void writeBytes(byte b[], int off, int len) throws IOException; - + private native void writeBytes0(byte b[], int off, int len) throws IOException; + + private void writeBytes(byte b[], int off, int len) throws IOException { + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> { + writeBytes0(b, off, len); + return 0; + }); + } else { + writeBytes0(b, off, len); + } + } /** * Writes {@code b.length} bytes from the specified byte array * to this file, starting at the current file pointer. diff --git a/src/java.base/share/classes/java/nio/MappedByteBuffer.java b/src/java.base/share/classes/java/nio/MappedByteBuffer.java index 54fb6c869e0..fe1f01d3b14 100644 --- a/src/java.base/share/classes/java/nio/MappedByteBuffer.java +++ b/src/java.base/share/classes/java/nio/MappedByteBuffer.java @@ -26,12 +26,15 @@ package java.nio; import java.io.FileDescriptor; +import java.io.IOException; import java.io.UncheckedIOException; import java.lang.ref.Reference; import java.util.Objects; + import jdk.internal.access.foreign.MemorySegmentProxy; import jdk.internal.access.foreign.UnmapperProxy; +import jdk.internal.access.SharedSecrets; import jdk.internal.misc.ScopedMemoryAccess; import jdk.internal.misc.Unsafe; @@ -247,11 +250,26 @@ public final MappedByteBuffer force() { } int capacity = capacity(); if (isSync || ((address != 0) && (capacity != 0))) { - return force(0, capacity); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return asynchronousForce(this); + } else { + return force(0, capacity); + } } return this; } + private static MappedByteBuffer asynchronousForce(MappedByteBuffer mapBuf) { + try { + SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> { + return mapBuf.force(0, mapBuf.capacity()); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + return mapBuf; + } + /** * Forces any changes made to a region of this buffer's content to * be written to the storage device containing the mapped diff --git a/src/java.base/share/classes/jdk/internal/access/SharedSecrets.java b/src/java.base/share/classes/jdk/internal/access/SharedSecrets.java index fa6648ca13b..90408146d05 100644 --- a/src/java.base/share/classes/jdk/internal/access/SharedSecrets.java +++ b/src/java.base/share/classes/jdk/internal/access/SharedSecrets.java @@ -90,6 +90,7 @@ public class SharedSecrets { private static WispEngineAccess wispEngineAccess; private static EpollAccess epollAccess; private static RCMAccesss rcmAccesss; + private static WispFileSyncIOAccess wispFileSyncIOAccess; public static void setJavaUtilCollectionAccess(JavaUtilCollectionAccess juca) { javaUtilCollectionAccess = juca; @@ -495,4 +496,14 @@ public static RCMAccesss getRCMAccess() { public static void setRCMAccesss(RCMAccesss rcmAccesss) { SharedSecrets.rcmAccesss = rcmAccesss; } + + + public static WispFileSyncIOAccess getWispFileSyncIOAccess() { + return wispFileSyncIOAccess; + } + + public static void setWispFileSyncIOAccess(WispFileSyncIOAccess wispAsyncIOAccess) { + SharedSecrets.wispFileSyncIOAccess = wispAsyncIOAccess; + } + } diff --git a/src/java.base/share/classes/jdk/internal/access/WispFileSyncIOAccess.java b/src/java.base/share/classes/jdk/internal/access/WispFileSyncIOAccess.java new file mode 100644 index 00000000000..b21d4fda940 --- /dev/null +++ b/src/java.base/share/classes/jdk/internal/access/WispFileSyncIOAccess.java @@ -0,0 +1,10 @@ +package jdk.internal.access; + +import java.io.IOException; +import java.util.concurrent.Callable; + +public interface WispFileSyncIOAccess { + boolean usingAsyncFileIO(); + + T executeAsAsyncFileIO(Callable command) throws IOException; +} diff --git a/src/java.base/share/classes/module-info.java b/src/java.base/share/classes/module-info.java index 04bf81e50cb..5416ed309c3 100644 --- a/src/java.base/share/classes/module-info.java +++ b/src/java.base/share/classes/module-info.java @@ -368,6 +368,8 @@ jdk.localedata; exports jdk.internal.invoke to jdk.incubator.foreign; + exports com.alibaba.rcm.internal to + jdk.management; // the service types defined by the APIs in this module diff --git a/src/java.base/share/native/libjava/FileInputStream.c b/src/java.base/share/native/libjava/FileInputStream.c index e22499828f5..8078f672ab9 100644 --- a/src/java.base/share/native/libjava/FileInputStream.c +++ b/src/java.base/share/native/libjava/FileInputStream.c @@ -67,7 +67,7 @@ Java_java_io_FileInputStream_read0(JNIEnv *env, jobject this) { } JNIEXPORT jint JNICALL -Java_java_io_FileInputStream_readBytes(JNIEnv *env, jobject this, +Java_java_io_FileInputStream_readBytes0(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len) { return readBytes(env, this, bytes, off, len, fis_fd); } diff --git a/src/java.base/share/native/libjava/RandomAccessFile.c b/src/java.base/share/native/libjava/RandomAccessFile.c index 22c93b97233..aac9e9a7475 100644 --- a/src/java.base/share/native/libjava/RandomAccessFile.c +++ b/src/java.base/share/native/libjava/RandomAccessFile.c @@ -73,7 +73,7 @@ Java_java_io_RandomAccessFile_read0(JNIEnv *env, jobject this) { } JNIEXPORT jint JNICALL -Java_java_io_RandomAccessFile_readBytes(JNIEnv *env, +Java_java_io_RandomAccessFile_readBytes0(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len) { return readBytes(env, this, bytes, off, len, raf_fd); } @@ -84,7 +84,7 @@ Java_java_io_RandomAccessFile_write0(JNIEnv *env, jobject this, jint byte) { } JNIEXPORT void JNICALL -Java_java_io_RandomAccessFile_writeBytes(JNIEnv *env, +Java_java_io_RandomAccessFile_writeBytes0(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len) { writeBytes(env, this, bytes, off, len, JNI_FALSE, raf_fd); } diff --git a/src/java.base/unix/classes/java/io/UnixFileSystem.java b/src/java.base/unix/classes/java/io/UnixFileSystem.java index 501b5d8883b..7b150657fc8 100644 --- a/src/java.base/unix/classes/java/io/UnixFileSystem.java +++ b/src/java.base/unix/classes/java/io/UnixFileSystem.java @@ -26,6 +26,7 @@ package java.io; import java.util.Properties; +import jdk.internal.access.SharedSecrets; import jdk.internal.util.StaticProperty; import sun.security.action.GetPropertyAction; @@ -336,7 +337,11 @@ public boolean rename(File f1, File f2) { if (useCanonPrefixCache) { javaHomePrefixCache.clear(); } - return rename0(f1, f2); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return asynchronousRename(this, f1, f2); + } else { + return rename0(f1, f2); + } } private native boolean rename0(File f1, File f2); @@ -346,6 +351,16 @@ public boolean rename(File f1, File f2) { @Override public native boolean setReadOnly(File f); + private boolean asynchronousRename(UnixFileSystem fs, File f1, File f2) { + boolean result = false; + try { + result = SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> fs.rename0(f1, f2)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return result; + } + /* -- Filesystem interface -- */ @Override diff --git a/src/java.base/unix/classes/sun/nio/ch/FileDispatcherImpl.java b/src/java.base/unix/classes/sun/nio/ch/FileDispatcherImpl.java index 7c5bbcaca36..cd9a6fe3d50 100644 --- a/src/java.base/unix/classes/sun/nio/ch/FileDispatcherImpl.java +++ b/src/java.base/unix/classes/sun/nio/ch/FileDispatcherImpl.java @@ -45,33 +45,57 @@ class FileDispatcherImpl extends FileDispatcher { } int read(FileDescriptor fd, long address, int len) throws IOException { - return read0(fd, address, len); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> read0(fd, address, len)); + } else { + return read0(fd, address, len); + } } int pread(FileDescriptor fd, long address, int len, long position) - throws IOException + throws IOException { - return pread0(fd, address, len, position); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> pread0(fd, address, len, position)); + } else { + return pread0(fd, address, len, position); + } } long readv(FileDescriptor fd, long address, int len) throws IOException { - return readv0(fd, address, len); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> readv0(fd, address, len)); + } else { + return readv0(fd, address, len); + } } int write(FileDescriptor fd, long address, int len) throws IOException { - return write0(fd, address, len); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> write0(fd, address, len)); + } else { + return write0(fd, address, len); + } } int pwrite(FileDescriptor fd, long address, int len, long position) throws IOException { - return pwrite0(fd, address, len, position); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> pwrite0(fd, address, len, position)); + } else { + return pwrite0(fd, address, len, position); + } } long writev(FileDescriptor fd, long address, int len) throws IOException { - return writev0(fd, address, len); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> writev0(fd, address, len)); + } else { + return writev0(fd, address, len); + } } long seek(FileDescriptor fd, long offset) throws IOException { @@ -79,11 +103,19 @@ long seek(FileDescriptor fd, long offset) throws IOException { } int force(FileDescriptor fd, boolean metaData) throws IOException { - return force0(fd, metaData); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> force0(fd, metaData)); + } else { + return force0(fd, metaData); + } } int truncate(FileDescriptor fd, long size) throws IOException { - return truncate0(fd, size); + if (SharedSecrets.getWispFileSyncIOAccess() != null && SharedSecrets.getWispFileSyncIOAccess().usingAsyncFileIO()) { + return SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> truncate0(fd, size)); + } else { + return truncate0(fd, size); + } } long size(FileDescriptor fd) throws IOException { diff --git a/src/java.base/unix/native/libjava/FileOutputStream_md.c b/src/java.base/unix/native/libjava/FileOutputStream_md.c index 407cb9ad23b..4ec996eef45 100644 --- a/src/java.base/unix/native/libjava/FileOutputStream_md.c +++ b/src/java.base/unix/native/libjava/FileOutputStream_md.c @@ -65,7 +65,7 @@ Java_java_io_FileOutputStream_write(JNIEnv *env, jobject this, jint byte, jboole } JNIEXPORT void JNICALL -Java_java_io_FileOutputStream_writeBytes(JNIEnv *env, +Java_java_io_FileOutputStream_writeBytes0(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len, jboolean append) { writeBytes(env, this, bytes, off, len, append, fos_fd); } diff --git a/src/java.base/windows/native/libjava/FileOutputStream_md.c b/src/java.base/windows/native/libjava/FileOutputStream_md.c index 452a3a41187..e2cf2181cd9 100644 --- a/src/java.base/windows/native/libjava/FileOutputStream_md.c +++ b/src/java.base/windows/native/libjava/FileOutputStream_md.c @@ -61,7 +61,7 @@ Java_java_io_FileOutputStream_open0(JNIEnv *env, jobject this, } JNIEXPORT void JNICALL -Java_java_io_FileOutputStream_write(JNIEnv *env, jobject this, jint byte, jboolean append) { +Java_java_io_FileOutputStream_write0(JNIEnv *env, jobject this, jint byte, jboolean append) { writeSingle(env, this, byte, append, fos_fd); } diff --git a/src/jdk.management/share/classes/com/alibaba/management/ResourceContainerMXBean.java b/src/jdk.management/share/classes/com/alibaba/management/ResourceContainerMXBean.java new file mode 100644 index 00000000000..a0b8c481fa4 --- /dev/null +++ b/src/jdk.management/share/classes/com/alibaba/management/ResourceContainerMXBean.java @@ -0,0 +1,44 @@ +package com.alibaba.management; + +import java.lang.management.PlatformManagedObject; +import java.util.List; + +/** + * Platform-specific management interface for the resource container + * of the Java virtual machine. + */ +public interface ResourceContainerMXBean extends PlatformManagedObject { + /** + * Get all running containers uniq id as List + * @return all active containers' id + */ + List getAllContainerIds(); + + /** + * Get a specific container's constraints by id + * @param id container id + * @return constraints as list + */ + List getConstraintsById(long id); + + /** + * Get the total cpu time consumed by id specified container + * @param id container id + * @return consumed cpu time by nanosecond + */ + long getCPUResourceConsumedAmount(long id); + + /** + * Get how many times the resource limitation has been reached + * @param id + * @return + */ + long getCPUResourceLimitReachedCount(long id); + + /** + * Get how many active threads are running in container + * @param id container id + * @return thread id as list + */ + List getActiveContainerThreadIds(long id); +} diff --git a/src/jdk.management/share/classes/com/alibaba/management/internal/ResourceContainerMXBeanImpl.java b/src/jdk.management/share/classes/com/alibaba/management/internal/ResourceContainerMXBeanImpl.java new file mode 100644 index 00000000000..30f071674a4 --- /dev/null +++ b/src/jdk.management/share/classes/com/alibaba/management/internal/ResourceContainerMXBeanImpl.java @@ -0,0 +1,49 @@ +package com.alibaba.management.internal; + +import com.alibaba.management.ResourceContainerMXBean; +import com.alibaba.rcm.ResourceType; +import com.alibaba.rcm.ResourceContainerMonitor; +import com.alibaba.rcm.internal.AbstractResourceContainer; +import sun.management.Util; + +import javax.management.ObjectName; +import java.util.List; +import java.util.stream.Collectors; + +public class ResourceContainerMXBeanImpl implements ResourceContainerMXBean { + private final static String TENANT_CONTAINER_MXBEAN_NAME = "com.alibaba.management:type=ResourceContainer"; + + @Override + public List getAllContainerIds() { + return ResourceContainerMonitor.getAllContainerIds(); + } + + @Override + public List getConstraintsById(long id) { + return ResourceContainerMonitor.getConstraintsById(id).stream().map(c -> c.getValues()[0]).collect(Collectors.toList()); + } + + + @Override + public long getCPUResourceConsumedAmount(long id) { + AbstractResourceContainer container = (AbstractResourceContainer) ResourceContainerMonitor.getContainerById(id); + return container.getConsumedAmount(ResourceType.CPU_PERCENT); + } + + @Override + public ObjectName getObjectName() { + return Util.newObjectName(TENANT_CONTAINER_MXBEAN_NAME); + } + + @Override + public long getCPUResourceLimitReachedCount(long id) { + AbstractResourceContainer container = (AbstractResourceContainer) ResourceContainerMonitor.getContainerById(id); + return container.getResourceLimitReachedCount(ResourceType.CPU_PERCENT); + } + + @Override + public List getActiveContainerThreadIds(long id) { + AbstractResourceContainer container = (AbstractResourceContainer) ResourceContainerMonitor.getContainerById(id); + return container.getActiveContainerThreadIds(); + } +} diff --git a/src/jdk.management/share/classes/com/sun/management/internal/PlatformMBeanProviderImpl.java b/src/jdk.management/share/classes/com/sun/management/internal/PlatformMBeanProviderImpl.java index c27d7e38148..4508737bd37 100644 --- a/src/jdk.management/share/classes/com/sun/management/internal/PlatformMBeanProviderImpl.java +++ b/src/jdk.management/share/classes/com/sun/management/internal/PlatformMBeanProviderImpl.java @@ -24,7 +24,9 @@ */ package com.sun.management.internal; +import com.alibaba.management.ResourceContainerMXBean; import com.alibaba.management.WispCounterMXBean; +import com.alibaba.management.internal.ResourceContainerMXBeanImpl; import com.alibaba.management.internal.WispCounterMXBeanImpl; import com.sun.management.DiagnosticCommandMBean; import com.sun.management.HotSpotDiagnosticMXBean; @@ -55,6 +57,7 @@ public final class PlatformMBeanProviderImpl extends PlatformMBeanProvider { private static HotSpotDiagnostic hsDiagMBean = null; private static OperatingSystemMXBean osMBean = null; private static WispCounterMXBean wispCounterMBean = null; + private static ResourceContainerMXBean resourceContainerMXBean = null; static { AccessController.doPrivileged((PrivilegedAction) () -> { @@ -267,6 +270,34 @@ public Map nameToMBeanMap() { }); } + initMBeanList.add(new PlatformComponent() { + private final Set ResourceContainerMXBeanInterfaceNames = + Collections.unmodifiableSet(Collections.singleton( + "com.alibaba.management.ResourceContainerMXBean")); + + @Override + public Set> mbeanInterfaces() { + return Collections.singleton(com.alibaba.management.ResourceContainerMXBean.class); + } + + @Override + public Set mbeanInterfaceNames() { + return ResourceContainerMXBeanInterfaceNames; + } + + @Override + public String getObjectNamePattern() { + return "com.alibaba.management:type=ResourceContainer"; + } + + @Override + public Map nameToMBeanMap() { + return Collections.singletonMap( + "com.alibaba.management:type=ResourceContainer", + getResourceContainerMXBean()); + } + }); + /** * Wisp-Counter support. */ @@ -317,6 +348,13 @@ private static synchronized OperatingSystemMXBean getOperatingSystemMXBean() { return osMBean; } + private static synchronized ResourceContainerMXBean getResourceContainerMXBean() { + if (resourceContainerMXBean == null) { + resourceContainerMXBean = new ResourceContainerMXBeanImpl(); + } + return resourceContainerMXBean; + } + private static synchronized WispCounterMXBean getWispCounterMXBean() { if (wispCounterMBean == null) { wispCounterMBean = new WispCounterMXBeanImpl(); diff --git a/test/jdk/com/alibaba/rcm/TestKillThreads.java b/test/jdk/com/alibaba/rcm/TestKillThreads.java index 57e67644aff..4091b9246d7 100644 --- a/test/jdk/com/alibaba/rcm/TestKillThreads.java +++ b/test/jdk/com/alibaba/rcm/TestKillThreads.java @@ -1,20 +1,28 @@ /* * @test * @library /test/lib - * @build TestRcmCpu RcmUtils + * @build TestKillThreads RcmUtils * @summary test RCM TestKillThreads * @modules java.base/com.alibaba.wisp.engine:+open * @modules java.base/com.alibaba.rcm.internal:+open - * @run main/othervm -XX:+UseWisp2 -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -XX:+UnlockExperimentalVMOptions -Dcom.alibaba.wisp.threadAsWisp.black=name:Tester* -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -Xcomp -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -Xint -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads + * @run main/othervm -client -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -XX:+Wisp2ThreadStop -XX:ActiveProcessorCount=4 TestKillThreads */ + import com.alibaba.rcm.ResourceContainer; import com.alibaba.rcm.ResourceType; import com.alibaba.rcm.internal.RCMUnsafe; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static jdk.test.lib.Asserts.assertTrue; import static jdk.test.lib.Asserts.assertFalse; @@ -61,25 +69,106 @@ public class TestKillThreads { } }; + private static Runnable TIMER = () -> { + try { + new Timer().scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + } + }, 1, 1); + started = 1; + new CountDownLatch(1).await(); + } catch (InterruptedException e) { + fail(); + } finally { + flag.set(true); + } + }; + + private static Runnable UPDATER = () -> { + boolean running = true; + started = 1; + int preTimes = -1; + int version = 0; + AtomicInteger checkTimes = new AtomicInteger(0); + while (running) { + try { + version = 0; + AtomicInteger atomicInteger = checkTimes; + synchronized (atomicInteger) { + while (preTimes == checkTimes.get() && running) { + checkTimes.wait(); + } + preTimes = checkTimes.get(); + } + if (!running) continue; + } catch (Exception e) { + fail(); + } + } + }; + + private static Runnable POLL = () -> { + DelayQueue taksQueue = new DelayQueue(); + while (true) { + started = 1; + try { + while (true) { + Delayed task =taksQueue.take(); + System.out.println(taksQueue); + } + } + catch (Exception exception) { + exception.printStackTrace(); + continue; + } + } + }; + + private static Runnable OVERRIDE = () -> { + while (true) { + started = 1; + try { + try { + Thread.sleep(200); + } finally { + throw new Exception("123"); + } + + } catch (Throwable t) { + t.printStackTrace(); + } + + } + }; + + private static final List workload = new ArrayList() {{ -// add(VOID_LOOP); TODO:// support loop -// add(BUSY_LOOP); + add(VOID_LOOP); + add(BUSY_LOOP); add(PARK); add(FINALLY); + add(TIMER); + add(UPDATER); + add(POLL); + add(OVERRIDE); }}; - public static void main(String[] args) { + public static void main(String[] args) throws Exception { for (Runnable runnable : workload) { - ResourceContainer container = RcmUtils.createContainer( - ResourceType.CPU_PERCENT.newConstraint(40)); + ResourceContainer container = RcmUtils.createContainer(Collections.singletonList( + ResourceType.CPU_PERCENT.newConstraint(40))); started = 0; container.run(() -> { - Thread t = new Thread(runnable); + Thread t = new Thread(runnable); t.start(); }); - while(0 == started) {} + while (0 == started) { + } + Thread.sleep(100); RCMUnsafe.killThreads(container); container.destroy(); + System.out.println("pass"); } assertTrue(flag.get()); } diff --git a/test/jdk/com/alibaba/rcm/TestRCMInheritanceCallBack.java b/test/jdk/com/alibaba/rcm/TestRCMInheritanceCallBack.java index b3f3b859455..1b3c3c949f0 100644 --- a/test/jdk/com/alibaba/rcm/TestRCMInheritanceCallBack.java +++ b/test/jdk/com/alibaba/rcm/TestRCMInheritanceCallBack.java @@ -41,6 +41,17 @@ public static void main(String[] args) throws Exception { t.start(); }); + container.run(() -> { + Thread t = new Thread(() -> { + assertInRoot(true); + latch.countDown(); + }, "TenantWorker"); + + RCMUnsafe.attach(ResourceContainer.root()); + t.start(); + RCMUnsafe.attach(container); + }); + assertTrue(latch.await(2, TimeUnit.SECONDS)); } diff --git a/test/jdk/com/alibaba/rcm/TestRcmCpu.java b/test/jdk/com/alibaba/rcm/TestRcmCpu.java index c77bd558e7a..9e2c6a255e3 100644 --- a/test/jdk/com/alibaba/rcm/TestRcmCpu.java +++ b/test/jdk/com/alibaba/rcm/TestRcmCpu.java @@ -8,9 +8,13 @@ * @run main/othervm -XX:+UseWisp2 -XX:ActiveProcessorCount=4 TestRcmCpu */ +import com.alibaba.management.ResourceContainerMXBean; import com.alibaba.rcm.ResourceContainer; import com.alibaba.rcm.ResourceType; +import javax.management.MBeanServer; +import java.io.IOException; +import java.lang.management.ManagementFactory; import java.security.MessageDigest; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -21,6 +25,8 @@ public class TestRcmCpu { + static ResourceContainerMXBean resourceContainerMXBean; + private static Callable taskFactory(int load) { return new Callable() { @Override @@ -40,6 +46,14 @@ public Long call() throws Exception { } public static void main(String[] args) throws Exception { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try { + resourceContainerMXBean = ManagementFactory.newPlatformMXBeanProxy(mbs, + "com.alibaba.management:type=ResourceContainer", ResourceContainerMXBean.class); + } catch (IOException e) { + e.printStackTrace(); + } + ResourceContainer rc0 = RcmUtils.createContainer( ResourceType.CPU_PERCENT.newConstraint(40)); ResourceContainer rc1 = RcmUtils.createContainer( @@ -67,5 +81,9 @@ public static void main(String[] args) throws Exception { double ratio = (double) duration1 / duration0; assertLT(Math.abs(ratio - 0.5), 0.10, "deviation is out of reasonable scope"); + + for (long id : resourceContainerMXBean.getAllContainerIds()) { + System.out.println(resourceContainerMXBean.getConstraintsById(id)); + } } } diff --git a/test/jdk/com/alibaba/rcm/demo/MyResourceContainer.java b/test/jdk/com/alibaba/rcm/demo/MyResourceContainer.java index ff2955b4237..2932d5c1c31 100644 --- a/test/jdk/com/alibaba/rcm/demo/MyResourceContainer.java +++ b/test/jdk/com/alibaba/rcm/demo/MyResourceContainer.java @@ -1,6 +1,7 @@ package demo; import com.alibaba.rcm.Constraint; +import com.alibaba.rcm.ResourceType; import com.alibaba.rcm.internal.AbstractResourceContainer; import java.util.ArrayList; @@ -43,4 +44,19 @@ public Iterable getConstraints() { public void destroy() { dead = true; } + + @Override + public List getActiveContainerThreadIds() { + return null; + } + + @Override + public Long getConsumedAmount(ResourceType resourceType) { + return 0L; + } + + @Override + public Long getResourceLimitReachedCount(ResourceType resourceType) { + return null; + } } diff --git a/test/jdk/com/alibaba/wisp/io/WispFileIOTest.java b/test/jdk/com/alibaba/wisp/io/WispFileIOTest.java new file mode 100644 index 00000000000..cea9cb2afdd --- /dev/null +++ b/test/jdk/com/alibaba/wisp/io/WispFileIOTest.java @@ -0,0 +1,224 @@ +/* + * @test + * @library /test/lib + * @summary test reuse WispUdpSocket buffer + * @modules java.base/jdk.internal.access + * @run main/othervm -XX:+UseWisp2 -Dcom.alibaba.wisp.enableAsyncFileIO=true WispFileIOTest + */ + +import jdk.internal.access.SharedSecrets; +import sun.misc.Unsafe; + +import java.io.*; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.util.concurrent.*; + +import static java.nio.file.StandardOpenOption.*; +import static jdk.test.lib.Asserts.assertTrue; + +public class WispFileIOTest { + + public static void testNioFileChannel(File testFile) throws Exception { + resetTestFileContent(testFile); + RandomAccessFile file = new RandomAccessFile(testFile, "rw"); + FileChannel ch = file.getChannel(); + ByteBuffer buffer = ByteBuffer.allocate(1); + ch.read(buffer); + assertTrue("0".equals(new String(buffer.array()))); + buffer.flip(); + ch.write(buffer); + ch.close(); + String content = new String(Files.readAllBytes(testFile.toPath())); + assertTrue("00234".equals(content)); + } + + public static void testFileStream(File testFile) throws Exception { + //test RandomAccessFile + resetTestFileContent(testFile); + RandomAccessFile raf = new RandomAccessFile(testFile, "rw"); + byte[] buffer; + buffer = "5".getBytes(); + raf.write(buffer); + raf.seek(0); + buffer = new byte[1]; + raf.read(buffer, 0, 1); + assertTrue("5".equals(new String(buffer))); + + //test FileInputStream + resetTestFileContent(testFile); + FileInputStream fis = new FileInputStream(testFile); + buffer = new byte[1]; + fis.read(buffer); + assertTrue("0".equals(new String(buffer))); + + //test FileOutputStream + resetTestFileContent(testFile); + FileOutputStream fos = new FileOutputStream(testFile, true); + buffer = "5".getBytes(); + fos.write(buffer); + String content = new String(Files.readAllBytes(testFile.toPath())); + assertTrue("012345".equals(content)); + + } + + public static void testMappedByteBuffer() throws Exception { + File newfile = new File("/tmp/ThreadPoolAioTest_test_new2.file"); + newfile.deleteOnExit(); + RandomAccessFile raf = new RandomAccessFile(newfile, "rw"); + FileChannel fc = raf.getChannel(); + MappedByteBuffer map = fc.map(FileChannel.MapMode.READ_WRITE, 0, 2048); + fc.close(); + double current = map.getDouble(50); + map.putDouble(50, current + 0.1d); + map.force(); + } + + public static Thread workerThread = null; + + public static void resetTestFileContent(File testFile) throws IOException { + FileWriter writer = new FileWriter(testFile); + for (int i = 0; i < 5; i++) { + writer.write(String.valueOf(i)); + } + writer.close(); + } + + private static void testBlockingReadInterrupted(File testFile) throws IOException { + long block = 12; + long size = 4096; + long total = block * size; + createLargeFile(block * size, testFile); + for (int i = 0; i < block; i++) { + try (FileInputStream fis = new FileInputStream(testFile)) { + long skip = skipBytes(fis, size, total); + total -= skip; + assertTrue(skip == size || skip == 0); + } finally { + testFile.delete(); + } + } + + } + + // Skip toSkip number of bytes and expect that the available() method + // returns avail number of bytes. + private static long skipBytes(InputStream is, long toSkip, long avail) + throws IOException { + long skip = is.skip(toSkip); + if (skip != toSkip) { + throw new RuntimeException("skip() returns " + skip + + " but expected " + toSkip); + } + long remaining = avail - skip; + int expected = (remaining >= Integer.MAX_VALUE) + ? Integer.MAX_VALUE + : (remaining > 0 ? (int) remaining : 0); + + System.out.println("Skipped " + skip + " bytes, available() returns " + + expected + ", remaining " + remaining); + if (is.available() != expected) { + throw new RuntimeException("available() returns " + + is.available() + " but expected " + expected); + } + return skip; + } + + private static void createLargeFile(long filesize, + File file) throws IOException { + // Recreate a large file as a sparse file if possible + Files.delete(file.toPath()); + + try (FileChannel fc = + FileChannel.open(file.toPath(), CREATE_NEW, WRITE, APPEND)) { + ByteBuffer bb = ByteBuffer.allocate(1).put((byte) 1); + bb.rewind(); + int rc = fc.write(bb, filesize - 1); + + if (rc != 1) { + throw new RuntimeException("Failed to write 1 byte" + + " to the large file"); + } + } + } + + private static void mockIOException() throws Exception { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Unsafe unsafe = (Unsafe) f.get(null); + try { + SharedSecrets.getWispFileSyncIOAccess().executeAsAsyncFileIO(() -> { + throw new IOException("expected"); + }); + } catch (IOException e) { + unsafe.throwException(e); + } catch (Exception exception) { + // + } + } + + public static void main(String[] args) throws Exception { + + // submit by another thread + Thread t = new Thread(() -> { + try { + File f = new File("/tmp/ThreadPoolAioTest_test.file"); + f.deleteOnExit(); + // test java nio + testNioFileChannel(f); + // test java io + testFileStream(f); + // test rename + File newfile = new File("/tmp/ThreadPoolAioTest_test_new.file"); + newfile.deleteOnExit(); + f.renameTo(newfile); + // test MappedByteBuffer force + testMappedByteBuffer(); + resetTestFileContent(f); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false, "exception happened"); + } + }); + t.start(); + t.join(); + + CountDownLatch finished = new CountDownLatch(1); + Thread interrupt = new Thread(() -> { + try { + File f = new File("/tmp/ThreadPoolAioTest_test.file"); + testBlockingReadInterrupted(f); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(e instanceof ClosedByInterruptException, "exception happened"); + } finally { + finished.countDown(); + } + }); + interrupt.start(); + + while (finished.getCount() != 0) { + interrupt.interrupt(); + } + interrupt.join(); + + + boolean exceptionHappened = false; + try { + mockIOException(); + } catch (Exception e) { + e.printStackTrace(); + if (e instanceof IOException) { + exceptionHappened = true; + } + } finally { + assertTrue(exceptionHappened); + } + + System.out.println("Success!"); + } +}