Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/java.base/share/classes/java/lang/System.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import java.util.function.Supplier;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

import jdk.internal.misc.CarrierThreadLocal;
import jdk.internal.misc.Unsafe;
import jdk.internal.util.StaticProperty;
import jdk.internal.module.ModuleBootstrap;
Expand Down Expand Up @@ -2554,12 +2556,20 @@ public <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
}
}

public <T> T getCarrierThreadLocal(ThreadLocal<T> local) {
return local.getCarrierThreadLocal();
public <T> T getCarrierThreadLocal(CarrierThreadLocal<T> local) {
return ((ThreadLocal<T>)local).getCarrierThreadLocal();
}

public <T> void setCarrierThreadLocal(CarrierThreadLocal<T> local, T value) {
((ThreadLocal<T>)local).setCarrierThreadLocal(value);
}

public void removeCarrierThreadLocal(CarrierThreadLocal<?> local) {
((ThreadLocal<?>)local).removeCarrierThreadLocal();
}

public <T> void setCarrierThreadLocal(ThreadLocal<T> local, T value) {
local.setCarrierThreadLocal(value);
public boolean isCarrierThreadLocalPresent(CarrierThreadLocal<?> local) {
return ((ThreadLocal<?>)local).isCarrierThreadLocalPresent();
}

public Object[] extentLocalCache() {
Expand Down
31 changes: 24 additions & 7 deletions src/java.base/share/classes/java/lang/ThreadLocal.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import jdk.internal.misc.CarrierThreadLocal;
import jdk.internal.misc.TerminatingThreadLocal;

/**
Expand Down Expand Up @@ -172,6 +174,7 @@ public T get() {
* thread-local variable.
*/
T getCarrierThreadLocal() {
assert this instanceof CarrierThreadLocal<T>;
return get(Thread.currentCarrierThread());
}

Expand All @@ -193,14 +196,18 @@ private T get(Thread t) {
}

/**
* Returns {@code true} if there is a value in the current thread's copy of
* Returns {@code true} if there is a value in the current carrier thread's copy of
* this thread-local variable, even if that values is {@code null}.
*
* @return {@code true} if current thread has associated value in this
* @return {@code true} if current carrier thread has associated value in this
* thread-local variable; {@code false} if not
*/
boolean isPresent() {
Thread t = Thread.currentThread();
boolean isCarrierThreadLocalPresent() {
assert this instanceof CarrierThreadLocal<T>;
return isPresent(Thread.currentCarrierThread());
}

private boolean isPresent(Thread t) {
ThreadLocalMap map = getMap(t);
if (map != null && map != ThreadLocalMap.NOT_SUPPORTED) {
return map.getEntry(this) != null;
Expand All @@ -224,8 +231,8 @@ private T setInitialValue(Thread t) {
} else {
createMap(t, value);
}
if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
if (this instanceof TerminatingThreadLocal<?> ttl) {
TerminatingThreadLocal.register(ttl);
}
return value;
}
Expand All @@ -249,6 +256,7 @@ public void set(T value) {
}

void setCarrierThreadLocal(T value) {
assert this instanceof CarrierThreadLocal<T>;
set(Thread.currentCarrierThread(), value);
}

Expand Down Expand Up @@ -276,7 +284,16 @@ private void set(Thread t, T value) {
* @since 1.5
*/
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
remove(Thread.currentThread());
}

void removeCarrierThreadLocal() {
assert this instanceof CarrierThreadLocal<T>;
remove(Thread.currentCarrierThread());
}

private void remove(Thread t) {
ThreadLocalMap m = getMap(t);
if (m != null && m != ThreadLocalMap.NOT_SUPPORTED) {
m.remove(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import java.lang.annotation.Annotation;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.module.ModuleDescriptor;
import java.lang.reflect.Executable;
Expand All @@ -45,6 +44,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Stream;

import jdk.internal.misc.CarrierThreadLocal;
import jdk.internal.module.ServicesCatalog;
import jdk.internal.reflect.ConstantPool;
import jdk.internal.vm.Continuation;
Expand Down Expand Up @@ -456,12 +456,23 @@ public interface JavaLangAccess {
/**
* Returns the value of the current carrier thread's copy of a thread-local.
*/
<T> T getCarrierThreadLocal(ThreadLocal<T> local);
<T> T getCarrierThreadLocal(CarrierThreadLocal<T> local);

/**
* Sets the value of the current carrier thread's copy of a thread-local.
*/
<T> void setCarrierThreadLocal(ThreadLocal<T> local, T value);
<T> void setCarrierThreadLocal(CarrierThreadLocal<T> local, T value);

/**
* Removes the value of the current carrier thread's copy of a thread-local.
*/
void removeCarrierThreadLocal(CarrierThreadLocal<?> local);

/**
* Returns {@code true} if there is a value in the current carrier thread's copy of
* thread-local, even if that values is {@code null}.
*/
boolean isCarrierThreadLocalPresent(CarrierThreadLocal<?> local);

/**
* Returns the current thread's extent locals cache
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

package jdk.internal.misc;

import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;

/**
* A {@link ThreadLocal} variant which binds its value to current thread's
* carrier thread.
*/
public class CarrierThreadLocal<T> extends ThreadLocal<T> {

@Override
public T get() {
return JLA.getCarrierThreadLocal(this);
}

@Override
public void set(T value) {
JLA.setCarrierThreadLocal(this, value);
}

@Override
public void remove() {
JLA.removeCarrierThreadLocal(this);
}

public boolean isPresent() {
return JLA.isCarrierThreadLocalPresent(this);
}

private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import java.util.IdentityHashMap;

/**
* A thread-local variable that is notified when a thread terminates and
* it has been initialized in the terminating thread (even if it was
* A per-carrier-thread-local variable that is notified when a thread terminates and
* it has been initialized in the terminating carrier thread or a virtual thread
* that had the terminating carrier thread as its carrier thread (even if it was
* initialized with a null value).
*/
public class TerminatingThreadLocal<T> extends ThreadLocal<T> {
public class TerminatingThreadLocal<T> extends CarrierThreadLocal<T> {

@Override
public void set(T value) {
Expand Down Expand Up @@ -79,8 +80,7 @@ public static void threadTerminated() {
* @param tl the ThreadLocal to register
*/
public static void register(TerminatingThreadLocal<?> tl) {
if (!Thread.currentThread().isVirtual())
REGISTRY.get().add(tl);
REGISTRY.get().add(tl);
}

/**
Expand All @@ -89,16 +89,15 @@ public static void register(TerminatingThreadLocal<?> tl) {
* @param tl the ThreadLocal to unregister
*/
private static void unregister(TerminatingThreadLocal<?> tl) {
if (!Thread.currentThread().isVirtual())
REGISTRY.get().remove(tl);
REGISTRY.get().remove(tl);
}

/**
* a per-thread registry of TerminatingThreadLocal(s) that have been registered
* but later not unregistered in a particular thread.
* a per-carrier-thread registry of TerminatingThreadLocal(s) that have been registered
* but later not unregistered in a particular carrier-thread.
*/
public static final ThreadLocal<Collection<TerminatingThreadLocal<?>>> REGISTRY =
new ThreadLocal<>() {
public static final CarrierThreadLocal<Collection<TerminatingThreadLocal<?>>> REGISTRY =
new CarrierThreadLocal<>() {
@Override
protected Collection<TerminatingThreadLocal<?>> initialValue() {
return Collections.newSetFromMap(new IdentityHashMap<>(4));
Expand Down
12 changes: 5 additions & 7 deletions src/java.base/share/classes/sun/nio/ch/IOVecWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

import java.nio.ByteBuffer;

import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.CarrierThreadLocal;
import jdk.internal.ref.CleanerFactory;

/**
Expand All @@ -46,7 +45,6 @@
*/

class IOVecWrapper {
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();

// Miscellaneous constants
private static final int BASE_OFFSET = 0;
Expand Down Expand Up @@ -83,8 +81,8 @@ public void run() {
}
}

// per thread IOVecWrapper
private static final ThreadLocal<IOVecWrapper> cached = new ThreadLocal<>();
// per carrier-thread IOVecWrapper
private static final CarrierThreadLocal<IOVecWrapper> cached = new CarrierThreadLocal<>();

private IOVecWrapper(int size) {
this.size = size;
Expand All @@ -97,7 +95,7 @@ private IOVecWrapper(int size) {
}

static IOVecWrapper get(int size) {
IOVecWrapper wrapper = JLA.getCarrierThreadLocal(cached);
IOVecWrapper wrapper = cached.get();
if (wrapper != null && wrapper.size < size) {
// not big enough; eagerly release memory
wrapper.vecArray.free();
Expand All @@ -106,7 +104,7 @@ static IOVecWrapper get(int size) {
if (wrapper == null) {
wrapper = new IOVecWrapper(size);
CleanerFactory.cleaner().register(wrapper, new Deallocator(wrapper.vecArray));
JLA.setCarrierThreadLocal(cached, wrapper);
cached.set(wrapper);
}
return wrapper;
}
Expand Down
15 changes: 6 additions & 9 deletions src/java.base/share/classes/sun/nio/ch/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,11 @@
import java.util.Iterator;
import java.util.Set;

import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.TerminatingThreadLocal;
import jdk.internal.misc.Unsafe;
import sun.security.action.GetPropertyAction;

public class Util {
private static JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();

// -- Caches --

Expand All @@ -55,8 +52,8 @@ public class Util {
// The max size allowed for a cached temp buffer, in bytes
private static final long MAX_CACHED_BUFFER_SIZE = getMaxCachedBufferSize();

// Per-thread cache of temporary direct buffers
private static ThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() {
// Per-carrier-thread cache of temporary direct buffers
private static TerminatingThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() {
@Override
protected BufferCache initialValue() {
return new BufferCache();
Expand Down Expand Up @@ -230,7 +227,7 @@ public static ByteBuffer getTemporaryDirectBuffer(int size) {
return ByteBuffer.allocateDirect(size);
}

BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
Expand All @@ -257,7 +254,7 @@ public static ByteBuffer getTemporaryAlignedDirectBuffer(int size,
.alignedSlice(alignment);
}

BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
if (buf.alignmentOffset(0, alignment) == 0) {
Expand Down Expand Up @@ -294,7 +291,7 @@ static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
}

assert buf != null;
BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
BufferCache cache = bufferCache.get();
if (!cache.offerFirst(buf)) {
// cache is full
free(buf);
Expand All @@ -316,7 +313,7 @@ static void offerLastTemporaryDirectBuffer(ByteBuffer buf) {
}

assert buf != null;
BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
BufferCache cache = bufferCache.get();
if (!cache.offerLast(buf)) {
// cache is full
free(buf);
Expand Down
Loading