From adaa520cc9d01fa8ef8db056c54c8a86cc7375b6 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Wed, 18 Feb 2015 23:00:12 +0100
Subject: [PATCH] Rewrite of ManagedSelector to handle connection creation as
an ExecutionStrategy task.
Now the creation of a connection, and the Connection.onOpen() call
happen as a Runnable that is run by the ExecutionStrategy.
This allows onOpen() to block or otherwise perform tasks that are not
suitable to be run by a selector thread, since the ExecutionStrategy
will guarantee that another thread will take over the selector duties.
---
.../org/eclipse/jetty/io/ManagedSelector.java | 362 ++++++++++++++----
.../jetty/io/SelectChannelEndPoint.java | 20 +-
.../org/eclipse/jetty/io/SelectorManager.java | 51 +--
3 files changed, 305 insertions(+), 128 deletions(-)
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index 9ba31ec7d5d2..0b3c5a348ddc 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -27,24 +27,21 @@
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.eclipse.jetty.io.SelectorManager.SelectableEndPoint;
+import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
-import org.eclipse.jetty.util.component.ContainerLifeCycle;
-import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
+import org.eclipse.jetty.util.thread.SpinLock;
/**
*
{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.
@@ -52,24 +49,23 @@
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
* with the channel.
*/
-public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable, ExecutionStrategy.Producer
+public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Dumpable TODO
{
- protected static final Logger LOG = Log.getLogger(ManagedSelector.class);
- private final ExecutionStrategy _strategy;
+ private static final Logger LOG = Log.getLogger(ManagedSelector.class);
+
+ private final SpinLock _lock = new SpinLock();
+ private final Queue _actions = new ConcurrentArrayQueue<>();
private final SelectorManager _selectorManager;
- private final AtomicReference _state = new AtomicReference<>(State.PROCESSING);
private final int _id;
- private List _runChanges = new ArrayList<>();
- private List _addChanges = new ArrayList<>();
- private Iterator _selections = Collections.emptyIterator();
- private Set _selectedKeys = Collections.emptySet();
+ private final ExecutionStrategy _strategy;
+ private State _state = State.PROCESSING;
private Selector _selector;
public ManagedSelector(SelectorManager selectorManager, int id)
{
_selectorManager = selectorManager;
- _strategy = ExecutionStrategy.Factory.instanceFor(this, selectorManager.getExecutor());
_id = id;
+ _strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor());
setStopTimeout(5000);
}
@@ -78,7 +74,6 @@ protected void doStart() throws Exception
{
super.doStart();
_selector = newSelector();
- _state.set(State.PROCESSING);
}
protected Selector newSelector() throws IOException
@@ -89,7 +84,7 @@ protected Selector newSelector() throws IOException
public int size()
{
Selector s = _selector;
- if (s==null)
+ if (s == null)
return 0;
return s.keys().size();
}
@@ -106,81 +101,247 @@ protected void doStop() throws Exception
LOG.debug("Stopped {}", this);
}
+ public void submit(Runnable change)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Queued change {}", change);
+
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ _actions.offer(change);
+ if (_state == State.SELECTING)
+ {
+ _selector.wakeup();
+ // Move to PROCESSING now, so other submit()
+ // calls will avoid the extra select wakeup.
+ _state = State.PROCESSING;
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ _strategy.execute();
+ }
/**
- *