diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 5f80f325367a..2e7ed1d5f131 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -75,8 +75,7 @@ public ManagedSelector(SelectorManager selectorManager, int id) _id = id; SelectorProducer producer = new SelectorProducer(); Executor executor = selectorManager.getExecutor(); - Scheduler scheduler = selectorManager.getScheduler(); - _strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class),scheduler); + _strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class)); addBean(_strategy,true); setStopTimeout(5000); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentStack.java b/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentStack.java deleted file mode 100644 index 9d185bce645b..000000000000 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentStack.java +++ /dev/null @@ -1,89 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util; - -import java.util.concurrent.atomic.AtomicReference; - -/** - *

Nonblocking stack using variation of Treiber's algorithm - * that allows for reduced garbage.

- */ -public class ConcurrentStack -{ - private final NodeStack> stack = new NodeStack<>(); - - public void push(I item) - { - stack.push(new Holder<>(item)); - } - - public I pop() - { - Holder holder = stack.pop(); - if (holder == null) - return null; - return holder.item; - } - - public static class Node> - { - E next; - } - - private static class Holder extends Node> - { - private final I item; - - private Holder(I item) - { - this.item = item; - } - } - - public static class NodeStack> - { - private final AtomicReference stack = new AtomicReference<>(); - - public void push(N node) - { - while (true) - { - N top = stack.get(); - node.next = top; - if (stack.compareAndSet(top, node)) - break; - } - } - - public N pop() - { - while (true) - { - N top = stack.get(); - if (top == null) - return null; - if (stack.compareAndSet(top, top.next)) - { - top.next = null; - return top; - } - } - } - } -} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 23788d7cb8ce..66d19e0ac9bf 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -18,13 +18,13 @@ package org.eclipse.jetty.util.thread; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; -import org.eclipse.jetty.util.ConcurrentStack; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.AbstractLifeCycle; @@ -49,7 +49,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { @Override public void run() - {} + { + } @Override public String toString() @@ -60,10 +61,9 @@ public String toString() private final Executor _executor; private final int _capacity; - private final ConcurrentStack.NodeStack _stack; + private final ConcurrentLinkedDeque _stack; private final AtomicInteger _size = new AtomicInteger(); private final AtomicInteger _pending = new AtomicInteger(); - private final AtomicInteger _waiting = new AtomicInteger(); private ThreadPoolBudget.Lease _lease; private Object _owner; @@ -96,7 +96,7 @@ public ReservedThreadExecutor(Executor executor,int capacity, Object owner) { _executor = executor; _capacity = reservedThreads(executor,capacity); - _stack = new ConcurrentStack.NodeStack<>(); + _stack = new ConcurrentLinkedDeque<>(); _owner = owner; LOG.debug("{}",this); @@ -145,12 +145,6 @@ public int getPending() return _pending.get(); } - @ManagedAttribute(value = "waiting reserved threads", readonly = true) - public int getWaiting() - { - return _waiting.get(); - } - @ManagedAttribute(value = "idletimeout in MS", readonly = true) public long getIdleTimeoutMs() { @@ -186,17 +180,13 @@ public void doStop() throws Exception _lease.close(); while(true) { - ReservedThread thread = _stack.pop(); - if (thread==null) - { - super.doStop(); - return; - } - + ReservedThread thread = _stack.pollFirst(); + if (thread == null) + break; _size.decrementAndGet(); - thread.stop(); } + super.doStop(); } @Override @@ -218,7 +208,7 @@ public boolean tryExecute(Runnable task) if (task==null) return false; - ReservedThread thread = _stack.pop(); + ReservedThread thread = _stack.pollFirst(); if (thread==null) { if (task!=STOP) @@ -263,16 +253,16 @@ private void startReservedThread() @Override public String toString() { - return String.format("%s@%s{s=%d/%d,p=%d,w=%d}", + return String.format("%s@%x{s=%d/%d,p=%d}@%s", getClass().getSimpleName(), - _owner != null ? _owner : Integer.toHexString(hashCode()), + hashCode(), _size.get(), _capacity, _pending.get(), - _waiting.get()); + _owner); } - private class ReservedThread extends ConcurrentStack.Node implements Runnable + private class ReservedThread implements Runnable { private final Locker _locker = new Locker(); private final Condition _wakeup = _locker.newCondition(); @@ -302,7 +292,7 @@ private Runnable reservedWait() LOG.debug("{} waiting", this); Runnable task = null; - while (isRunning() && task==null) + while (task==null) { boolean idle = false; @@ -312,7 +302,6 @@ private Runnable reservedWait() { try { - _waiting.incrementAndGet(); if (_idleTime == 0) _wakeup.await(); else @@ -322,10 +311,6 @@ private Runnable reservedWait() { LOG.ignore(e); } - finally - { - _waiting.decrementAndGet(); - } } task = _task; _task = null; @@ -347,7 +332,7 @@ private Runnable reservedWait() if (LOG.isDebugEnabled()) LOG.debug("{} task={}", this, task); - return task==null?STOP:task; + return task; } @Override @@ -355,8 +340,6 @@ public void run() { while (isRunning()) { - Runnable task = null; - // test and increment size BEFORE decrementing pending, // so that we don't have a race starting new pending. while(true) @@ -384,10 +367,10 @@ public void run() // Insert ourselves in the stack. Size is already incremented, but // that only effects the decision to keep other threads reserved. - _stack.push(this); + _stack.offerFirst(this); // Wait for a task - task = reservedWait(); + Runnable task = reservedWait(); if (task==STOP) // return on STOP poison pill diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index 4a5f9f3ede66..db26786c0e4d 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -35,9 +34,7 @@ import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker.Lock; -import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; -import org.eclipse.jetty.util.thread.Scheduler; /** *

A strategy where the thread that produces will run the resulting task if it @@ -78,8 +75,6 @@ private enum State { IDLE, PRODUCING, REPRODUCING } private final Producer _producer; private final Executor _executor; private final ReservedThreadExecutor _producers; - private final Scheduler _scheduler; - private final long _nonBlockingTaskTimeout = Long.getLong("org.eclipse.jetty.nonBlockingTaskTimeout", 2000); private State _state = State.IDLE; public EatWhatYouKill(Producer producer, Executor executor) @@ -93,16 +88,10 @@ public EatWhatYouKill(Producer producer, Executor executor, int maxReserved) } public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers) - { - this(producer, executor, producers, null); - } - - public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers, Scheduler scheduler) { _producer = producer; _executor = executor; _producers = producers; - _scheduler = scheduler; addBean(_producer); if (LOG.isDebugEnabled()) LOG.debug("{} created", this); @@ -255,7 +244,7 @@ public boolean doProduce() try { if (consume) - runTask(task); + task.run(); else _executor.execute(task); } @@ -284,33 +273,6 @@ public boolean doProduce() return producing; } - protected void runTask(Runnable task) - { - Scheduler.Task scheduled = null; - if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING && _scheduler != null && _nonBlockingTaskTimeout > 0) - { - scheduled = _scheduler.schedule(() -> - { - LOG.warn("Non-blocking task {} is taking more than {} ms", task, _nonBlockingTaskTimeout); - if (_executor instanceof QueuedThreadPool) - { - QueuedThreadPool threadPool = (QueuedThreadPool)_executor; - threadPool.setDetailedDump(true); - LOG.warn("Non-blocking task {} - executor dump{}{}", System.lineSeparator(), threadPool.dump()); - } - }, _nonBlockingTaskTimeout, TimeUnit.MILLISECONDS); - } - try - { - task.run(); - } - finally - { - if (scheduled != null) - scheduled.cancel(); - } - } - @ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true) public long getNonBlockingTasksConsumed() {