Skip to content

Commit f21d692

Browse files
committed
Simple UnboundedExecutor implementation
1 parent dccd14d commit f21d692

File tree

2 files changed

+209
-0
lines changed

2 files changed

+209
-0
lines changed

src/java.base/share/classes/java/util/concurrent/Executors.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,30 @@ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
237237
threadFactory);
238238
}
239239

240+
/**
241+
* Creates an Executor that runs each task in its own thread. The number of
242+
* threads is unbounded.
243+
*
244+
* @apiNote This is a prototype API. It is intended to be used to run tasks
245+
* in virtual threads as in the following example:
246+
* <pre> {@code
247+
* ThreadFactory factory = Thread.builder().virtual().factory();
248+
* try (ExecutorService executor = Executors.newUnboundedExecutor(factory)) {
249+
* executor.submit(task1);
250+
* executor.submit(task2);
251+
* }
252+
* }</pre>
253+
*
254+
* @param threadFactory the factory to use when creating new threads
255+
* @return a newly created executor
256+
* @throws NullPointerException if factory is null
257+
258+
* @since 99
259+
*/
260+
public static ExecutorService newUnboundedExecutor(ThreadFactory threadFactory) {
261+
return new UnboundedExecutor(threadFactory);
262+
}
263+
240264
/**
241265
* Creates a single-threaded executor that can schedule commands
242266
* to run after a given delay, or to execute periodically.
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
package java.util.concurrent;
26+
27+
import java.lang.invoke.MethodHandles;
28+
import java.lang.invoke.VarHandle;
29+
import java.util.List;
30+
import java.util.Objects;
31+
import java.util.Set;
32+
import java.util.concurrent.locks.Condition;
33+
import java.util.concurrent.locks.ReentrantLock;
34+
35+
/**
36+
* ExecutorService that executes each task in its own thread. Threads are not
37+
* re-used and the number of threads/tasks is unbounded.
38+
*
39+
* This is a inefficient/simple implementation for now, it will likely be replaced.
40+
*/
41+
class UnboundedExecutor extends AbstractExecutorService {
42+
private final ThreadFactory factory;
43+
44+
private static final VarHandle STATE;
45+
static {
46+
try {
47+
MethodHandles.Lookup l = MethodHandles.lookup();
48+
STATE = l.findVarHandle(UnboundedExecutor.class, "state", int.class);
49+
} catch (Exception e) {
50+
throw new InternalError(e);
51+
}
52+
}
53+
private volatile int state;
54+
55+
// states: RUNNING -> SHUTDOWN -> TERMINATED
56+
private static final int RUNNING = 0;
57+
private static final int SHUTDOWN = 1;
58+
private static final int TERMINATED = 2;
59+
60+
private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
61+
private final ReentrantLock terminationLock = new ReentrantLock();
62+
private final Condition terminationCondition = terminationLock.newCondition();
63+
64+
UnboundedExecutor(ThreadFactory factory) {
65+
this.factory = Objects.requireNonNull(factory);
66+
}
67+
68+
/**
69+
* Sets the state to TERMINATED if there are no remaining threads.
70+
*/
71+
private boolean tryTerminate() {
72+
assert state >= SHUTDOWN;
73+
if (threads.isEmpty()) {
74+
terminationLock.lock();
75+
try {
76+
STATE.set(this, TERMINATED);
77+
terminationCondition.signalAll();
78+
} finally {
79+
terminationLock.unlock();
80+
}
81+
return true;
82+
} else {
83+
return false;
84+
}
85+
}
86+
87+
/**
88+
* Sets the state to SHUTDOWN and attempts to terminate if not already shutdown
89+
* @throws SecurityException if denied by security manager
90+
*/
91+
private void tryShutdownAndTerminate() {
92+
SecurityManager sm = System.getSecurityManager();
93+
if (sm != null) {
94+
sm.checkPermission(new RuntimePermission("modifyThread"));
95+
}
96+
if (STATE.compareAndSet(this, RUNNING, SHUTDOWN)) {
97+
tryTerminate();
98+
}
99+
}
100+
101+
/**
102+
* Removes the thread from the set of threads and attempts to terminate
103+
* the executor if shutdown but not terminated.
104+
*/
105+
private void onTerminate(Thread thread) {
106+
threads.remove(thread);
107+
if (state == SHUTDOWN) {
108+
tryTerminate();
109+
}
110+
}
111+
112+
@Override
113+
public void shutdown() {
114+
tryShutdownAndTerminate();
115+
}
116+
117+
@Override
118+
public List<Runnable> shutdownNow() {
119+
tryShutdownAndTerminate();
120+
threads.forEach(Thread::interrupt);
121+
return List.of();
122+
}
123+
124+
@Override
125+
public boolean isShutdown() {
126+
return state >= SHUTDOWN;
127+
}
128+
129+
@Override
130+
public boolean isTerminated() {
131+
return state >= TERMINATED;
132+
}
133+
134+
@Override
135+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
136+
Objects.requireNonNull(unit);
137+
if (isTerminated()) {
138+
return true;
139+
} else {
140+
terminationLock.lock();
141+
try {
142+
if (!isTerminated()) {
143+
terminationCondition.await(timeout, unit);
144+
}
145+
} finally {
146+
terminationLock.unlock();
147+
}
148+
return isTerminated();
149+
}
150+
}
151+
152+
@Override
153+
public void execute(Runnable task) {
154+
Objects.requireNonNull(task);
155+
if (state >= SHUTDOWN) {
156+
// shutdown or terminated
157+
reject();
158+
}
159+
Runnable wrapper = () -> {
160+
try {
161+
task.run();
162+
} finally {
163+
onTerminate(Thread.currentThread());
164+
}
165+
};
166+
Thread thread = factory.newThread(wrapper);
167+
threads.add(thread);
168+
boolean started = false;
169+
try {
170+
if (state == RUNNING) {
171+
thread.start();
172+
started = true;
173+
}
174+
} finally {
175+
if (!started) {
176+
onTerminate(thread);
177+
reject();
178+
}
179+
}
180+
}
181+
182+
private static void reject() {
183+
throw new RejectedExecutionException();
184+
}
185+
}

0 commit comments

Comments
 (0)