Skip to content

Commit

Permalink
better spruious wakeup handling and other simplifications
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Mar 20, 2017
1 parent c01a910 commit 1a92015
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 54 deletions.
Expand Up @@ -420,11 +420,11 @@ public static int append(ByteBuffer to, ByteBuffer b)
/* ------------------------------------------------------------ */
/**
* Like append, but does not throw {@link BufferOverflowException}
* @param to Buffer is flush mode
* @param b bytes to fill
* @param off offset into byte
* @param to Buffer The buffer to fill to. The buffer will be flipped to fill mode and then flipped back to flush mode.
* @param b bytes The bytes to fill
* @param off offset into bytes
* @param len length to fill
* @return The position of the valid data before the flipped position.
* @return the number of bytes taken from the buffer.
*/
public static int fill(ByteBuffer to, byte[] b, int off, int len)
{
Expand Down
Expand Up @@ -147,75 +147,72 @@ public void run()
LOG.debug("{} run", this);
if (!isRunning())
return;
boolean producing;
boolean producing = false;
try (Lock locked = _locker.lock())
{
_pendingProducersDispatched--;
producing = pendingProducerWait();
}
_pendingProducers++;

if (producing)
produceConsume();
}

private boolean pendingProducerWait()
{
if (_pendingProducers<_pendingProducersMax)
{
try
loop: while (isRunning())
{
_pendingProducers++;

_produce.await();
if (_pendingProducersSignalled==0)
{
// spurious wakeup!
if (LOG.isDebugEnabled() && isRunning())
LOG.debug("{} SPURIOUS WAKEUP",this);
_pendingProducers--;
}
else
try
{
_produce.await();

if (_pendingProducersSignalled==0)
{
// spurious wakeup!
continue loop;
}

_pendingProducersSignalled--;
if (_state == State.IDLE)
{
_state = State.PRODUCING;
return true;
producing = true;
}
}
}
catch (InterruptedException e)
{
LOG.debug(e);
_pendingProducers--;
}
}
return false;
catch (InterruptedException e)
{
LOG.debug(e);
_pendingProducers--;
}

break loop;
}
}

if (producing)
produceConsume();
}

private void produceConsume()
{
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
if (LOG.isDebugEnabled())
LOG.debug("{} produce enter", this);
LOG.debug("{} produce {}", this,may_block_caller?"non-blocking":"blocking");

producing: while (isRunning())
{
// If we got here, then we are the thread that is producing.
if (LOG.isDebugEnabled())
LOG.debug("{} producing", this);

Runnable task = _producer.produce();

if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", this, task);

boolean may_block_caller = !Invocable.isNonBlockingInvocation();
boolean new_pending_producer;
boolean run_task_ourselves;
boolean keep_producing;

StringBuilder state = null;

try (Lock locked = _locker.lock())
{
if (LOG.isDebugEnabled())
{
state = new StringBuilder();
getString(state);
getState(state);
state.append("->");
}

// Did we produced a task?
if (task == null)
{
Expand Down Expand Up @@ -261,9 +258,19 @@ else if (may_block_caller && (_pendingProducers>0 || _pendingProducersMax==0))
if (new_pending_producer)
_pendingProducersDispatched++;
}

if (LOG.isDebugEnabled())
getState(state);

}

if (LOG.isDebugEnabled())
LOG.debug("{} mbc={} dnp={} run={} kp={}", this,may_block_caller,new_pending_producer,run_task_ourselves,keep_producing);
{
LOG.debug("{} {} {}",
state,
run_task_ourselves?(new_pending_producer?"EPC!":"PC"):"PEC",
task);
}

if (new_pending_producer)
// Spawn a new thread to continue production by running the produce loop.
Expand Down Expand Up @@ -316,21 +323,31 @@ protected void doStop() throws Exception
public String toString()
{
StringBuilder builder = new StringBuilder();
getString(builder);
try (Lock locked = _locker.lock())
{
getState(builder);
}
return builder.toString();
}

private void getString(StringBuilder builder)
{
builder.append(getClass().getSimpleName());
builder.append('@');
builder.append(Integer.toHexString(hashCode()));
builder.append('/');
builder.append(_producer);
builder.append('/');
try (Lock locked = _locker.lock())
{
builder.append(_state);
builder.append('/');
builder.append(_pendingProducers);
builder.append('/');
builder.append(_pendingProducersMax);
}
return builder.toString();
}

private void getState(StringBuilder builder)
{
builder.append(_state);
builder.append('/');
builder.append(_pendingProducers);
builder.append('/');
builder.append(_pendingProducersMax);
}

private class RunProduce implements Runnable
Expand Down

0 comments on commit 1a92015

Please sign in to comment.