diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index db26786c0e4d..bb81fc6b5a7e 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -19,6 +19,8 @@ package org.eclipse.jetty.util.thread.strategy; import java.io.Closeable; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.LongAdder; @@ -32,8 +34,6 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; -import org.eclipse.jetty.util.thread.Locker; -import org.eclipse.jetty.util.thread.Locker.Lock; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; /** @@ -66,9 +66,8 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { private static final Logger LOG = Log.getLogger(EatWhatYouKill.class); - private enum State { IDLE, PRODUCING, REPRODUCING } - - private final Locker _locker = new Locker(); + private enum State { IDLE, PENDING, PRODUCING, REPRODUCING } + private final LongAdder _nonBlocking = new LongAdder(); private final LongAdder _blocking = new LongAdder(); private final LongAdder _executed = new LongAdder(); @@ -94,19 +93,20 @@ public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecut _producers = producers; addBean(_producer); if (LOG.isDebugEnabled()) - LOG.debug("{} created", this); + LOG.debug("{} created", this); } @Override public void dispatch() { boolean execute = false; - try (Lock locked = _locker.lock()) + synchronized(this) { switch(_state) { case IDLE: execute = true; + _state = State.PENDING; break; case PRODUCING: @@ -136,19 +136,19 @@ public void produce() { if (LOG.isDebugEnabled()) LOG.debug("{} produce", this); - boolean reproduce = true; - while(isRunning() && tryProduce(reproduce) && doProduce()) - reproduce = false; + if (tryProduce()) + doProduce(); } - public boolean tryProduce(boolean reproduce) + private boolean tryProduce() { boolean producing = false; - try (Lock locked = _locker.lock()) + synchronized(this) { switch (_state) { case IDLE: + case PENDING: // Enter PRODUCING _state = State.PRODUCING; producing = true; @@ -156,8 +156,7 @@ public boolean tryProduce(boolean reproduce) case PRODUCING: // Keep other Thread producing - if (reproduce) - _state = State.REPRODUCING; + _state = State.REPRODUCING; break; default: @@ -167,7 +166,7 @@ public boolean tryProduce(boolean reproduce) return producing; } - public boolean doProduce() + private void doProduce() { boolean producing = true; while (isRunning() && producing) @@ -178,30 +177,28 @@ public boolean doProduce() { task = _producer.produce(); } - catch(Throwable e) + catch (Throwable e) { LOG.warn(e); } - - if (LOG.isDebugEnabled()) - LOG.debug("{} t={}/{}",this,task,Invocable.getInvocationType(task)); - + if (task==null) { - try (Lock locked = _locker.lock()) + synchronized(this) { - // Could another one just have been queued with a produce call? - if (_state==State.REPRODUCING) + // Could another task just have been queued with a produce call? + switch (_state) { - _state = State.PRODUCING; - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("{} IDLE",toStringLocked()); - _state = State.IDLE; - producing = false; - } + case PRODUCING: + _state = State.IDLE; + producing = false; + break; + case REPRODUCING: + _state = State.PRODUCING; + break; + default: + throw new IllegalStateException(toStringLocked()); + } } } else @@ -209,21 +206,19 @@ public boolean doProduce() boolean consume; if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING) { - // PRODUCE CONSUME (EWYK!) - if (LOG.isDebugEnabled()) - LOG.debug("{} PC t={}", this, task); + // PRODUCE CONSUME consume = true; - _nonBlocking.increment(); + _nonBlocking.increment(); } else { - try (Lock locked = _locker.lock()) + synchronized(this) { if (_producers.tryExecute(this)) { // EXECUTE PRODUCE CONSUME! // We have executed a new Producer, so we can EWYK consume - _state = State.IDLE; + _state = State.PENDING; producing = false; consume = true; _blocking.increment(); @@ -233,13 +228,13 @@ public boolean doProduce() // PRODUCE EXECUTE CONSUME! consume = false; _executed.increment(); - } + } } - - if (LOG.isDebugEnabled()) - LOG.debug("{} {} t={}", this, consume ? "EPC" : "PEC", task); } + if (LOG.isDebugEnabled()) + LOG.debug("{} p={} c={} t={}/{}", this, producing, consume, task,Invocable.getInvocationType(task)); + // Consume or execute task try { @@ -250,7 +245,10 @@ public boolean doProduce() } catch (RejectedExecutionException e) { - LOG.warn(e); + if (isRunning()) + LOG.warn(e); + else + LOG.ignore(e); if (task instanceof Closeable) { try @@ -269,8 +267,6 @@ public boolean doProduce() } } } - - return producing; } @ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true) @@ -294,7 +290,7 @@ public long getBlockingTasksExecuted() @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) public boolean isIdle() { - try (Lock locked = _locker.lock()) + synchronized(this) { return _state==State.IDLE; } @@ -310,7 +306,7 @@ public void reset() public String toString() { - try (Lock locked = _locker.lock()) + synchronized(this) { return toStringLocked(); } @@ -339,5 +335,14 @@ private void getState(StringBuilder builder) builder.append(_state); builder.append('/'); builder.append(_producers); + builder.append("[nb="); + builder.append(getNonBlockingTasksConsumed()); + builder.append(",c="); + builder.append(getBlockingTasksConsumed()); + builder.append(",e="); + builder.append(getBlockingTasksExecuted()); + builder.append("]"); + builder.append("@"); + builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); } }