diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index 9ba31ec7d5d2..0b3c5a348ddc 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -27,24 +27,21 @@
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.eclipse.jetty.io.SelectorManager.SelectableEndPoint;
+import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
-import org.eclipse.jetty.util.component.ContainerLifeCycle;
-import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
+import org.eclipse.jetty.util.thread.SpinLock;
/**
*
{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.
@@ -52,24 +49,23 @@
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
* with the channel.
*/
-public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable, ExecutionStrategy.Producer
+public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Dumpable TODO
{
- protected static final Logger LOG = Log.getLogger(ManagedSelector.class);
- private final ExecutionStrategy _strategy;
+ private static final Logger LOG = Log.getLogger(ManagedSelector.class);
+
+ private final SpinLock _lock = new SpinLock();
+ private final Queue _actions = new ConcurrentArrayQueue<>();
private final SelectorManager _selectorManager;
- private final AtomicReference _state = new AtomicReference<>(State.PROCESSING);
private final int _id;
- private List _runChanges = new ArrayList<>();
- private List _addChanges = new ArrayList<>();
- private Iterator _selections = Collections.emptyIterator();
- private Set _selectedKeys = Collections.emptySet();
+ private final ExecutionStrategy _strategy;
+ private State _state = State.PROCESSING;
private Selector _selector;
public ManagedSelector(SelectorManager selectorManager, int id)
{
_selectorManager = selectorManager;
- _strategy = ExecutionStrategy.Factory.instanceFor(this, selectorManager.getExecutor());
_id = id;
+ _strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor());
setStopTimeout(5000);
}
@@ -78,7 +74,6 @@ protected void doStart() throws Exception
{
super.doStart();
_selector = newSelector();
- _state.set(State.PROCESSING);
}
protected Selector newSelector() throws IOException
@@ -89,7 +84,7 @@ protected Selector newSelector() throws IOException
public int size()
{
Selector s = _selector;
- if (s==null)
+ if (s == null)
return 0;
return s.keys().size();
}
@@ -106,81 +101,247 @@ protected void doStop() throws Exception
LOG.debug("Stopped {}", this);
}
+ public void submit(Runnable change)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Queued change {}", change);
+
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ _actions.offer(change);
+ if (_state == State.SELECTING)
+ {
+ _selector.wakeup();
+ // Move to PROCESSING now, so other submit()
+ // calls will avoid the extra select wakeup.
+ _state = State.PROCESSING;
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ _strategy.execute();
+ }
/**
- * Submits a change to be executed in the selector thread.
- * Changes may be submitted from any thread, and the selector thread woken up
- * (if necessary) to execute the change.
- *
- * @param change the change to submit
+ * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be
+ * notified of non-blocking events by the {@link ManagedSelector}.
*/
- public void submit(Runnable change)
+ public interface SelectableEndPoint extends EndPoint
{
- // This method may be called from the selector thread, and therefore
- // we could directly run the change without queueing, but this may
- // lead to stack overflows on a busy server, so we always offer the
- // change to the queue and process the state.
+ /**
+ * Callback method invoked when a read or write events has been
+ * detected by the {@link ManagedSelector} for this endpoint.
+ *
+ * @return a job that may block or null
+ */
+ Runnable onSelected();
+
+ /**
+ * Callback method invoked when all the keys selected by the
+ * {@link ManagedSelector} for this endpoint have been processed.
+ */
+ void updateKey();
+ }
- if (LOG.isDebugEnabled())
- LOG.debug("Queued change {}", change);
+ private class SelectorProducer implements ExecutionStrategy.Producer
+ {
+ private Set _keys = Collections.emptySet();
+ private Iterator _cursor = Collections.emptyIterator();
- out:
- while (true)
+ @Override
+ public Runnable produce()
{
- State state = _state.get();
- switch (state)
+ boolean looping = false;
+ while (true)
{
- case PROCESSING:
- // If we are processing
- if (!_state.compareAndSet(State.PROCESSING, State.LOCKED))
- continue;
- // we can just lock and add the change
- _addChanges.add(change);
- _state.set(State.PROCESSING);
- break out;
+ if (looping)
+ {
+ Runnable task = runActions();
+ if (task != null)
+ return task;
- case SELECTING:
- // If we are processing
- if (!_state.compareAndSet(State.SELECTING, State.LOCKED))
- continue;
- // we must lock, add the change and wakeup the selector
- _addChanges.add(change);
- _selector.wakeup();
- // we move to processing state now, because the selector will
- // not block and this avoids extra calls to wakeup()
- _state.set(State.PROCESSING);
- break out;
+ if (!select())
+ return null;
+ }
- case LOCKED:
- Thread.yield();
- continue;
+ Runnable task = processSelected();
+ if (task != null)
+ return task;
- default:
- throw new IllegalStateException();
+ update();
+
+ looping = true;
}
}
- }
- protected void runChange(Runnable change)
- {
- try
+ public Runnable produce2()
{
- if (LOG.isDebugEnabled())
- LOG.debug("Running change {}", change);
- change.run();
+ while (true)
+ {
+ Runnable task = processSelected();
+ if (task != null)
+ return task;
+
+ Runnable action = runActions();
+ if (action != null)
+ return action;
+
+ update();
+
+ if (!select())
+ return null;
+ }
}
- catch (Throwable x)
+
+ private Runnable runActions()
+ {
+ while (true)
+ {
+ Runnable action;
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ action = _actions.poll();
+ if (action == null)
+ {
+ _state = State.SELECTING;
+ return null;
+ }
+ }
+
+ if (action instanceof Product)
+ return action;
+
+ // Running the change may queue another action.
+ runChange(action);
+ }
+ }
+
+ private void runChange(Runnable change)
+ {
+ try
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Running change {}", change);
+ change.run();
+ }
+ catch (Throwable x)
+ {
+ LOG.debug("Could not run change " + change, x);
+ }
+ }
+
+ private boolean select()
+ {
+ try
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Selector loop waiting on select");
+ int selected = _selector.select();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
+
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ _state = State.PROCESSING;
+ }
+
+ _keys = _selector.selectedKeys();
+ _cursor = _keys.iterator();
+
+ return true;
+ }
+ catch (Throwable x)
+ {
+ closeNoExceptions(_selector);
+ if (isRunning())
+ LOG.warn(x);
+ else
+ LOG.debug(x);
+ return false;
+ }
+ }
+
+ private Runnable processSelected()
+ {
+ while (_cursor.hasNext())
+ {
+ SelectionKey key = _cursor.next();
+ if (key.isValid())
+ {
+ Object attachment = key.attachment();
+ try
+ {
+ if (attachment instanceof SelectableEndPoint)
+ {
+ // Try to produce a task
+ SelectableEndPoint selectable = (SelectableEndPoint)attachment;
+ Runnable task = selectable.onSelected();
+ if (task != null)
+ return task;
+ }
+ else if (key.isConnectable())
+ {
+ Runnable task = processConnect(key, (Connect)attachment);
+ if (task != null)
+ return task;
+ }
+ else if (key.isAcceptable())
+ {
+ processAccept(key);
+ }
+ else
+ {
+ throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps());
+ }
+ }
+ catch (CancelledKeyException x)
+ {
+ LOG.debug("Ignoring cancelled key for channel {}", key.channel());
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ catch (Throwable x)
+ {
+ LOG.warn("Could not process key for channel " + key.channel(), x);
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
+ Object attachment = key.attachment();
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ }
+ return null;
+ }
+
+ private void update()
+ {
+ for (SelectionKey key : _keys)
+ updateKey(key);
+ _keys.clear();
+ }
+
+ private void updateKey(SelectionKey key)
{
- LOG.debug("Could not run change " + change, x);
+ Object attachment = key.attachment();
+ if (attachment instanceof SelectableEndPoint)
+ ((SelectableEndPoint)attachment).updateKey();
}
}
- @Override
- public void run()
+ private interface Product extends Runnable
{
- _strategy.execute();
}
+/*
@Override
public Runnable produce()
{
@@ -362,8 +523,8 @@ private void updateKey(SelectionKey key)
if (attachment instanceof SelectableEndPoint)
((SelectableEndPoint)attachment).updateKey();
}
-
- private void processConnect(SelectionKey key, Connect connect)
+*/
+ private Runnable processConnect(SelectionKey key, final Connect connect)
{
SocketChannel channel = (SocketChannel)key.channel();
try
@@ -374,8 +535,15 @@ private void processConnect(SelectionKey key, Connect connect)
{
connect.timeout.cancel();
key.interestOps(0);
- EndPoint endpoint = createEndPoint(channel, key);
- key.attach(endpoint);
+ return new CreateEndPoint(channel, key)
+ {
+ @Override
+ protected void failed(Throwable failure)
+ {
+ super.failed(failure);
+ connect.failed(failure);
+ }
+ };
}
else
{
@@ -385,6 +553,7 @@ private void processConnect(SelectionKey key, Connect connect)
catch (Throwable x)
{
connect.failed(x);
+ return null;
}
}
@@ -433,6 +602,7 @@ private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey
public void destroyEndPoint(EndPoint endPoint)
{
+ // TODO: perhaps this code should be wrapped and submitted as a task.
if (LOG.isDebugEnabled())
LOG.debug("Destroyed {}", endPoint);
Connection connection = endPoint.getConnection();
@@ -440,7 +610,7 @@ public void destroyEndPoint(EndPoint endPoint)
_selectorManager.connectionClosed(connection);
_selectorManager.endPointClosed(endPoint);
}
-
+/*
@Override
public String dump()
{
@@ -479,7 +649,6 @@ public void dumpKeysState(List