Skip to content

Commit

Permalink
work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Mar 16, 2017
1 parent d52a09a commit 5d18c65
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 52 deletions.
Expand Up @@ -226,6 +226,12 @@ public void setStopTimeout(long stopTimeout)
this._stopTimeout = stopTimeout;
}

@Override
public String toString()
{
return String.format("%s@%x",getClass().getSimpleName(),hashCode());
}

public static abstract class AbstractLifeCycleListener implements LifeCycle.Listener
{
@Override public void lifeCycleFailure(LifeCycle event, Throwable cause) {}
Expand Down
Expand Up @@ -157,7 +157,7 @@ public static Runnable asPreferred(Runnable task, InvocationType preferredInvoca
break;

case EITHER:
if (getInvocationType(task) == InvocationType.EITHER && preferredInvocationType == InvocationType.NON_BLOCKING)
if (preferredInvocationType == InvocationType.NON_BLOCKING)
return () -> invokeNonBlocking(task);
break;
}
Expand Down
Expand Up @@ -58,6 +58,7 @@ enum State { IDLE, PRODUCING, REPRODUCING };
private int _pendingProducersMax;
private int _pendingProducers;
private int _pendingProducersDispatched;
private int _pendingProducersSignalled;
private Condition _produce = _locker.newCondition();

public EatWhatYouKill(Producer producer, Executor executor)
Expand Down Expand Up @@ -158,21 +159,27 @@ private boolean pendingProducerWait()
{
try
{
_pendingProducers++;
_pendingProducers++;

_produce.await();
if (_state == State.IDLE)
if (_pendingProducersSignalled==0)
{
// spurious wakeup!
_pendingProducers--;
}
else if (_state == State.IDLE)
{
_pendingProducersSignalled--;
_state = State.PRODUCING;
return true;
}
}
catch (InterruptedException e)
{
LOG.debug(e);
// probably spurious, but we are not pending anymore
_pendingProducers--;
}
}
}
return false;
}

Expand All @@ -193,8 +200,8 @@ private void produceConsume()
LOG.debug("{} produced {}", this, task);

boolean may_block_caller = !Invocable.isNonBlockingInvocation();
boolean dispatch_new_producer = false;
boolean eat_it;
boolean dispatch_new_producer;
boolean run_task_ourselves;
boolean keep_producing;

try (Lock locked = _locker.lock())
Expand All @@ -212,72 +219,71 @@ private void produceConsume()

// ... and no additional calls to execute, so we are idle
_state = State.IDLE;
keep_producing = false;
break producing;
}

// Will we eat our own kill?
if (may_block_caller && Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
{
eat_it = true;
// ProduceConsume
run_task_ourselves = true;
keep_producing = true;
dispatch_new_producer = false;
}
else if (_pendingProducers==0 && _pendingProducersMax>0)
else if (may_block_caller && (_pendingProducers>0 || _pendingProducersMax==0))
{
keep_producing = true;
eat_it = false;
if ((_pendingProducersDispatched + _pendingProducers)<_pendingProducersMax)
{
_pendingProducersDispatched++;
dispatch_new_producer = true;
}
}
else
{
eat_it = true;
// ExecuteProduceConsume (eat what we kill!)
run_task_ourselves = true;
keep_producing = false;
dispatch_new_producer = true;
_pendingProducersDispatched++;
_state = State.IDLE;
_pendingProducers--;
_pendingProducersSignalled++;
_produce.signal();
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} mbc={} dnp={} ei={} kp={}", this,may_block_caller,dispatch_new_producer,eat_it,keep_producing);

// Run or execute the task.
if (task != null)
{;
if (eat_it)
_executor.invoke(task);
else
_executor.execute(task);
{
// ProduceExecuteConsume
keep_producing = true;
run_task_ourselves = false;
dispatch_new_producer = (_pendingProducersDispatched + _pendingProducers)<_pendingProducersMax;
if (dispatch_new_producer)
_pendingProducersDispatched++;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} mbc={} dnp={} ei={} kp={}", this,may_block_caller,dispatch_new_producer,run_task_ourselves,keep_producing);

// If we need more producers
if (dispatch_new_producer)
{
// Spawn a new thread to continue production by running the produce loop.
_executor.execute(this);
}


// Run or execute the task.
if (run_task_ourselves)
_executor.invoke(task);
else
_executor.execute(task);

// Once we have run the task, we can try producing again.
if (keep_producing)
continue producing;

try (Lock locked = _locker.lock())
if (may_block_caller)
{
switch(_state)
try (Lock locked = _locker.lock())
{
case IDLE:
_state = State.PRODUCING;
continue producing;

default:
// Perhaps we can be a pending Producer?
if (pendingProducerWait())
switch(_state)
{
case IDLE:
_state = State.PRODUCING;
continue producing;

default:
// Perhaps we can be a pending Producer?
if (pendingProducerWait())
continue producing;
}
}
}

Expand Down Expand Up @@ -310,16 +316,16 @@ public String toString()
StringBuilder builder = new StringBuilder();
builder.append(super.toString());
builder.append('/');
builder.append(_producer);
builder.append('/');
try (Lock locked = _locker.lock())
{
builder.append(_state);
builder.append('/');
builder.append(_pendingProducers);
builder.append('/');
builder.append(_pendingProducersMax);
builder.append('/');
}
builder.append(_producer);
return builder.toString();
}

Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
import org.eclipse.jetty.util.thread.Invocable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -141,8 +142,11 @@ public void run()

newExecutionStrategy(producer,threads);

for (int p=0; latch.getCount()>0 && p<TASKS; p++)
_strategy.produce();
Invocable.invokeNonBlocking(()->
{
for (int p=0; latch.getCount()>0 && p<TASKS; p++)
_strategy.produce();
});

assertTrue(latch.await(10,TimeUnit.SECONDS));
}
Expand Down Expand Up @@ -212,10 +216,11 @@ public void run()
});


for (int p=0; latch.getCount()>0 && p<TASKS; p++)
Invocable.invokeNonBlocking(()->
{
_strategy.produce();
}
for (int p=0; latch.getCount()>0 && p<TASKS; p++)
_strategy.produce();
});

assertTrue(latch.await(10,TimeUnit.SECONDS));

Expand Down

0 comments on commit 5d18c65

Please sign in to comment.