Skip to content

Commit

Permalink
Issue #1970 - ManagedSelector loses selector thread.
Browse files Browse the repository at this point in the history
Remove broken data structure ConcurrentStack (ABA problem).

Made ReservedThreadExecutor use a ConcurrentLinkedDeque
instead of ConcurrentStack.

Removed "waiting" ReservedThread count, since it was
reporting the same as the "size".

Removed instrumentation code from previous commit.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Nov 14, 2017
1 parent 6ce2a67 commit d196db7
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ public ManagedSelector(SelectorManager selectorManager, int id)
_id = id;
SelectorProducer producer = new SelectorProducer();
Executor executor = selectorManager.getExecutor();
Scheduler scheduler = selectorManager.getScheduler();
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class),scheduler);
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
addBean(_strategy,true);
setStopTimeout(5000);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.eclipse.jetty.util.thread;

import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;

import org.eclipse.jetty.util.ConcurrentStack;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
Expand All @@ -49,7 +49,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
@Override
public void run()
{}
{
}

@Override
public String toString()
Expand All @@ -60,10 +61,9 @@ public String toString()

private final Executor _executor;
private final int _capacity;
private final ConcurrentStack.NodeStack<ReservedThread> _stack;
private final ConcurrentLinkedDeque<ReservedThread> _stack;
private final AtomicInteger _size = new AtomicInteger();
private final AtomicInteger _pending = new AtomicInteger();
private final AtomicInteger _waiting = new AtomicInteger();

private ThreadPoolBudget.Lease _lease;
private Object _owner;
Expand Down Expand Up @@ -96,7 +96,7 @@ public ReservedThreadExecutor(Executor executor,int capacity, Object owner)
{
_executor = executor;
_capacity = reservedThreads(executor,capacity);
_stack = new ConcurrentStack.NodeStack<>();
_stack = new ConcurrentLinkedDeque<>();
_owner = owner;

LOG.debug("{}",this);
Expand Down Expand Up @@ -145,12 +145,6 @@ public int getPending()
return _pending.get();
}

@ManagedAttribute(value = "waiting reserved threads", readonly = true)
public int getWaiting()
{
return _waiting.get();
}

@ManagedAttribute(value = "idletimeout in MS", readonly = true)
public long getIdleTimeoutMs()
{
Expand Down Expand Up @@ -186,17 +180,13 @@ public void doStop() throws Exception
_lease.close();
while(true)
{
ReservedThread thread = _stack.pop();
if (thread==null)
{
super.doStop();
return;
}

ReservedThread thread = _stack.pollFirst();
if (thread == null)
break;
_size.decrementAndGet();

thread.stop();
}
super.doStop();
}

@Override
Expand All @@ -218,7 +208,7 @@ public boolean tryExecute(Runnable task)
if (task==null)
return false;

ReservedThread thread = _stack.pop();
ReservedThread thread = _stack.pollFirst();
if (thread==null)
{
if (task!=STOP)
Expand Down Expand Up @@ -263,16 +253,16 @@ private void startReservedThread()
@Override
public String toString()
{
return String.format("%s@%s{s=%d/%d,p=%d,w=%d}",
return String.format("%s@%x{s=%d/%d,p=%d}@%s",
getClass().getSimpleName(),
_owner != null ? _owner : Integer.toHexString(hashCode()),
hashCode(),
_size.get(),
_capacity,
_pending.get(),
_waiting.get());
_owner);
}

private class ReservedThread extends ConcurrentStack.Node<ReservedThread> implements Runnable
private class ReservedThread implements Runnable
{
private final Locker _locker = new Locker();
private final Condition _wakeup = _locker.newCondition();
Expand Down Expand Up @@ -302,7 +292,7 @@ private Runnable reservedWait()
LOG.debug("{} waiting", this);

Runnable task = null;
while (isRunning() && task==null)
while (task==null)
{
boolean idle = false;

Expand All @@ -312,7 +302,6 @@ private Runnable reservedWait()
{
try
{
_waiting.incrementAndGet();
if (_idleTime == 0)
_wakeup.await();
else
Expand All @@ -322,10 +311,6 @@ private Runnable reservedWait()
{
LOG.ignore(e);
}
finally
{
_waiting.decrementAndGet();
}
}
task = _task;
_task = null;
Expand All @@ -347,16 +332,14 @@ private Runnable reservedWait()
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);

return task==null?STOP:task;
return task;
}

@Override
public void run()
{
while (isRunning())
{
Runnable task = null;

// test and increment size BEFORE decrementing pending,
// so that we don't have a race starting new pending.
while(true)
Expand Down Expand Up @@ -384,10 +367,10 @@ public void run()

// Insert ourselves in the stack. Size is already incremented, but
// that only effects the decision to keep other threads reserved.
_stack.push(this);
_stack.offerFirst(this);

// Wait for a task
task = reservedWait();
Runnable task = reservedWait();

if (task==STOP)
// return on STOP poison pill
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.Closeable;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand All @@ -35,9 +34,7 @@
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;

/**
* <p>A strategy where the thread that produces will run the resulting task if it
Expand Down Expand Up @@ -78,8 +75,6 @@ private enum State { IDLE, PRODUCING, REPRODUCING }
private final Producer _producer;
private final Executor _executor;
private final ReservedThreadExecutor _producers;
private final Scheduler _scheduler;
private final long _nonBlockingTaskTimeout = Long.getLong("org.eclipse.jetty.nonBlockingTaskTimeout", 2000);
private State _state = State.IDLE;

public EatWhatYouKill(Producer producer, Executor executor)
Expand All @@ -93,16 +88,10 @@ public EatWhatYouKill(Producer producer, Executor executor, int maxReserved)
}

public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers)
{
this(producer, executor, producers, null);
}

public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers, Scheduler scheduler)
{
_producer = producer;
_executor = executor;
_producers = producers;
_scheduler = scheduler;
addBean(_producer);
if (LOG.isDebugEnabled())
LOG.debug("{} created", this);
Expand Down Expand Up @@ -255,7 +244,7 @@ public boolean doProduce()
try
{
if (consume)
runTask(task);
task.run();
else
_executor.execute(task);
}
Expand Down Expand Up @@ -284,33 +273,6 @@ public boolean doProduce()
return producing;
}

protected void runTask(Runnable task)
{
Scheduler.Task scheduled = null;
if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING && _scheduler != null && _nonBlockingTaskTimeout > 0)
{
scheduled = _scheduler.schedule(() ->
{
LOG.warn("Non-blocking task {} is taking more than {} ms", task, _nonBlockingTaskTimeout);
if (_executor instanceof QueuedThreadPool)
{
QueuedThreadPool threadPool = (QueuedThreadPool)_executor;
threadPool.setDetailedDump(true);
LOG.warn("Non-blocking task {} - executor dump{}{}", System.lineSeparator(), threadPool.dump());
}
}, _nonBlockingTaskTimeout, TimeUnit.MILLISECONDS);
}
try
{
task.run();
}
finally
{
if (scheduled != null)
scheduled.cancel();
}
}

@ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true)
public long getNonBlockingTasksConsumed()
{
Expand Down

0 comments on commit d196db7

Please sign in to comment.