Skip to content

Commit

Permalink
EWYK cleanups from #1970
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw authored and sbordet committed Dec 19, 2017
1 parent 2c0f695 commit 0c021f2
Showing 1 changed file with 52 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.eclipse.jetty.util.thread.strategy;

import java.io.Closeable;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -32,8 +34,6 @@
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;

/**
Expand Down Expand Up @@ -66,9 +66,8 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);

private enum State { IDLE, PRODUCING, REPRODUCING }

private final Locker _locker = new Locker();
private enum State { IDLE, PENDING, PRODUCING, REPRODUCING }

private final LongAdder _nonBlocking = new LongAdder();
private final LongAdder _blocking = new LongAdder();
private final LongAdder _executed = new LongAdder();
Expand All @@ -94,19 +93,20 @@ public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecut
_producers = producers;
addBean(_producer);
if (LOG.isDebugEnabled())
LOG.debug("{} created", this);
LOG.debug("{} created", this);
}

@Override
public void dispatch()
{
boolean execute = false;
try (Lock locked = _locker.lock())
synchronized(this)
{
switch(_state)
{
case IDLE:
execute = true;
_state = State.PENDING;
break;

case PRODUCING:
Expand Down Expand Up @@ -136,28 +136,27 @@ public void produce()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produce", this);
boolean reproduce = true;
while(isRunning() && tryProduce(reproduce) && doProduce())
reproduce = false;
if (tryProduce())
doProduce();
}

public boolean tryProduce(boolean reproduce)
private boolean tryProduce()
{
boolean producing = false;
try (Lock locked = _locker.lock())
synchronized(this)
{
switch (_state)
{
case IDLE:
case PENDING:
// Enter PRODUCING
_state = State.PRODUCING;
producing = true;
break;

case PRODUCING:
// Keep other Thread producing
if (reproduce)
_state = State.REPRODUCING;
_state = State.REPRODUCING;
break;

default:
Expand All @@ -167,7 +166,7 @@ public boolean tryProduce(boolean reproduce)
return producing;
}

public boolean doProduce()
private void doProduce()
{
boolean producing = true;
while (isRunning() && producing)
Expand All @@ -178,52 +177,48 @@ public boolean doProduce()
{
task = _producer.produce();
}
catch(Throwable e)
catch (Throwable e)
{
LOG.warn(e);
}

if (LOG.isDebugEnabled())
LOG.debug("{} t={}/{}",this,task,Invocable.getInvocationType(task));


if (task==null)
{
try (Lock locked = _locker.lock())
synchronized(this)
{
// Could another one just have been queued with a produce call?
if (_state==State.REPRODUCING)
// Could another task just have been queued with a produce call?
switch (_state)
{
_state = State.PRODUCING;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} IDLE",toStringLocked());
_state = State.IDLE;
producing = false;
}
case PRODUCING:
_state = State.IDLE;
producing = false;
break;
case REPRODUCING:
_state = State.PRODUCING;
break;
default:
throw new IllegalStateException(toStringLocked());
}
}
}
else
{
boolean consume;
if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING)
{
// PRODUCE CONSUME (EWYK!)
if (LOG.isDebugEnabled())
LOG.debug("{} PC t={}", this, task);
// PRODUCE CONSUME
consume = true;
_nonBlocking.increment();
_nonBlocking.increment();
}
else
{
try (Lock locked = _locker.lock())
synchronized(this)
{
if (_producers.tryExecute(this))
{
// EXECUTE PRODUCE CONSUME!
// We have executed a new Producer, so we can EWYK consume
_state = State.IDLE;
_state = State.PENDING;
producing = false;
consume = true;
_blocking.increment();
Expand All @@ -233,13 +228,13 @@ public boolean doProduce()
// PRODUCE EXECUTE CONSUME!
consume = false;
_executed.increment();
}
}
}

if (LOG.isDebugEnabled())
LOG.debug("{} {} t={}", this, consume ? "EPC" : "PEC", task);
}

if (LOG.isDebugEnabled())
LOG.debug("{} p={} c={} t={}/{}", this, producing, consume, task,Invocable.getInvocationType(task));

// Consume or execute task
try
{
Expand All @@ -250,7 +245,10 @@ public boolean doProduce()
}
catch (RejectedExecutionException e)
{
LOG.warn(e);
if (isRunning())
LOG.warn(e);
else
LOG.ignore(e);
if (task instanceof Closeable)
{
try
Expand All @@ -269,8 +267,6 @@ public boolean doProduce()
}
}
}

return producing;
}

@ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true)
Expand All @@ -294,7 +290,7 @@ public long getBlockingTasksExecuted()
@ManagedAttribute(value = "whether this execution strategy is idle", readonly = true)
public boolean isIdle()
{
try (Lock locked = _locker.lock())
synchronized(this)
{
return _state==State.IDLE;
}
Expand All @@ -310,7 +306,7 @@ public void reset()

public String toString()
{
try (Lock locked = _locker.lock())
synchronized(this)
{
return toStringLocked();
}
Expand Down Expand Up @@ -339,5 +335,14 @@ private void getState(StringBuilder builder)
builder.append(_state);
builder.append('/');
builder.append(_producers);
builder.append("[nb=");
builder.append(getNonBlockingTasksConsumed());
builder.append(",c=");
builder.append(getBlockingTasksConsumed());
builder.append(",e=");
builder.append(getBlockingTasksExecuted());
builder.append("]");
builder.append("@");
builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
}
}

0 comments on commit 0c021f2

Please sign in to comment.