Skip to content

Commit

Permalink
Issue #1637 Thread per HTTP/2 Connection
Browse files Browse the repository at this point in the history
This fix simplifies the EWYK scheduler by factoring out the preallocated producer into a
ReservedThreadExecutor class.   A shared ReservedThreadExecutor can then be used by multiple
EWYK instances to avoid over allocation of threads.

Squashed commit of the following:

commit c435dc2
Merge: 58a5a9a 90e5b56
Author: Greg Wilkins <gregw@webtide.com>
Date:   Wed Jun 21 10:48:22 2017 +0200

    Merge branch 'jetty-9.4.x' into jetty-9.4.x-ewyk3

commit 58a5a9a
Author: Simone Bordet <simone.bordet@gmail.com>
Date:   Wed Jun 14 15:56:43 2017 +0200

    Code cleanups.

commit 4e52962
Author: Greg Wilkins <gregw@webtide.com>
Date:   Wed Jun 14 07:34:58 2017 +0200

    refixed Producing to Reproducing

commit a1f8682
Author: Greg Wilkins <gregw@webtide.com>
Date:   Wed Jun 14 07:26:29 2017 +0200

    fixed Producing to Reproducing

commit 9468932
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 16:33:44 2017 +0200

    fixed javadoc

commit 9d4941e
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 16:05:27 2017 +0200

    Renamed Preallocated to ReservedThread

commit 6d3379a
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 12:28:52 2017 +0200

    Added configuration in modules

commit 1bd1ade
Merge: 83418a9 6702248
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 10:09:29 2017 +0200

    Merge branch 'jetty-9.4.x' into jetty-9.4.x-ewyk3

commit 83418a9
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 10:08:35 2017 +0200

    javadoc

commit 62918fd
Author: Greg Wilkins <gregw@webtide.com>
Date:   Sat Jun 10 00:04:06 2017 +0200

    Improved EatWhatYouKill implementation

    Simplified by abstracting out PreallocatedExecutor
    Removed invocation execution
    HTTP2 now uses a shared PreallocationExcecutor between connection
  • Loading branch information
gregw authored and joakime committed Aug 8, 2017
1 parent c9a1395 commit a105be9
Show file tree
Hide file tree
Showing 26 changed files with 798 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;

public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
{
public static final String CLIENT_CONTEXT_KEY = "http2.client";
public static final String BYTE_BUFFER_POOL_CONTEXT_KEY = "http2.client.byteBufferPool";
public static final String EXECUTOR_CONTEXT_KEY = "http2.client.executor";
public static final String PREALLOCATED_EXECUTOR_CONTEXT_KEY = "http2.client.preallocatedExecutor";
public static final String SCHEDULER_CONTEXT_KEY = "http2.client.scheduler";
public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener";
public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise";
Expand All @@ -58,6 +60,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY);
ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY);
Executor executor = (Executor)context.get(EXECUTOR_CONTEXT_KEY);
ReservedThreadExecutor preallocatedExecutor = (ReservedThreadExecutor)context.get(PREALLOCATED_EXECUTOR_CONTEXT_KEY);
Scheduler scheduler = (Scheduler)context.get(SCHEDULER_CONTEXT_KEY);
Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Expand All @@ -67,7 +70,33 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,

if (preallocatedExecutor==null)
{
// TODO move this to non lazy construction
preallocatedExecutor=client.getBean(ReservedThreadExecutor.class);
if (preallocatedExecutor==null)
{
synchronized (this)
{
if (preallocatedExecutor==null)
{
try
{
preallocatedExecutor = new ReservedThreadExecutor(executor,1); // TODO configure size
preallocatedExecutor.start();
client.addBean(preallocatedExecutor,true);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
}
}

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, preallocatedExecutor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
connection.addListener(connectionListener);
return customize(connection, context);
Expand All @@ -79,7 +108,7 @@ private class HTTP2ClientConnection extends HTTP2Connection implements Callback
private final Promise<Session> promise;
private final Session.Listener listener;

private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;

public class HTTP2Connection extends AbstractConnection
Expand All @@ -50,14 +51,14 @@ public class HTTP2Connection extends AbstractConnection
private final int bufferSize;
private final ExecutionStrategy strategy;

public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
public HTTP2Connection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor);
super(endPoint, executor.getExecutor());
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
this.strategy = new EatWhatYouKill(producer, executor, 0);
this.strategy = new EatWhatYouKill(producer, executor.getExecutor(), executor);

LifeCycle.start(strategy);
}
Expand Down Expand Up @@ -147,7 +148,10 @@ public boolean onIdleExpired()
protected void offerTask(Runnable task, boolean dispatch)
{
offerTask(task);
strategy.dispatch();
if (dispatch)
strategy.dispatch();
else
strategy.produce();
}

@Override
Expand Down Expand Up @@ -180,7 +184,7 @@ protected class HTTP2Producer implements ExecutionStrategy.Producer
private ByteBuffer buffer;

@Override
public synchronized Runnable produce()
public Runnable produce()
{
Runnable task = pollTask();
if (LOG.isDebugEnabled())
Expand Down
6 changes: 4 additions & 2 deletions jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
<Arg>
<New class="org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory">
<Arg name="config"><Ref refid="sslHttpConfig"/></Arg>
<Set name="maxConcurrentStreams"><Property name="jetty.http2.maxConcurrentStreams" deprecated="http2.maxConcurrentStreams" default="1024"/></Set>
<Set name="initialStreamRecvWindow"><Property name="jetty.http2.initialStreamRecvWindow" default="65535"/></Set>
<Set name="maxConcurrentStreams"><Property name="jetty.http2.maxConcurrentStreams" deprecated="http2.maxConcurrentStreams" default="128"/></Set>
<Set name="initialStreamRecvWindow"><Property name="jetty.http2.initialStreamRecvWindow" default="524288"/></Set>
<Set name="initialSessionRecvWindow"><Property name="jetty.http2.initialSessionRecvWindow" default="1048576"/></Set>
<Set name="reservedThreads"><Property name="jetty.http2.reservedThreads" default="-1"/></Set>
</New>
</Arg>
</Call>
Expand Down
11 changes: 9 additions & 2 deletions jetty-http2/http2-server/src/main/config/modules/http2.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ etc/jetty-http2.xml
[ini-template]
## Max number of concurrent streams per connection
# jetty.http2.maxConcurrentStreams=1024
# jetty.http2.maxConcurrentStreams=128
## Initial stream receive window (client to server)
# jetty.http2.initialStreamRecvWindow=65535
# jetty.http2.initialStreamRecvWindow=524288
## Initial session receive window (client to server)
# jetty.http2.initialSessionRecvWindow=1048576
## Reserve threads for high priority tasks (-1 use number of Selectors, 0 no reserved threads)
# jetty.http2.reservedThreads=-1
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;

@ManagedObject
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
Expand All @@ -48,6 +49,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
private int reservedThreads = -1;

public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
Expand Down Expand Up @@ -108,6 +110,7 @@ public void setMaxConcurrentStreams(int maxConcurrentStreams)
this.maxConcurrentStreams = maxConcurrentStreams;
}

@ManagedAttribute("The max header block fragment")
public int getMaxHeaderBlockFragment()
{
return maxHeaderBlockFragment;
Expand Down Expand Up @@ -139,6 +142,21 @@ public void setStreamIdleTimeout(long streamIdleTimeout)
this.streamIdleTimeout = streamIdleTimeout;
}

/**
* @see ReservedThreadExecutor
* @return The number of reserved threads
*/
@ManagedAttribute("The number of threads reserved for high priority tasks")
public int getReservedThreads()
{
return reservedThreads;
}

public void setReservedThreads(int threads)
{
this.reservedThreads = threads;
}

public HttpConfiguration getHttpConfiguration()
{
return httpConfiguration;
Expand All @@ -163,9 +181,32 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
streamIdleTimeout = endPoint.getIdleTimeout();
session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());


ReservedThreadExecutor executor = connector.getBean(ReservedThreadExecutor.class);
if (executor==null)
{
synchronized (this)
{
executor = connector.getBean(ReservedThreadExecutor.class);
if (executor==null)
{

try
{
executor = new ReservedThreadExecutor(connector.getExecutor(),getReservedThreads());
executor.start();
connector.addBean(executor,true);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
}

ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), executor,
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.addListener(connectionListener);
return configure(connection, connector, endPoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;

public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{
Expand Down Expand Up @@ -91,7 +92,7 @@ public static boolean isSupportedProtocol(String protocol)
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels;

public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.eclipse.jetty.http2.server;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;

Expand Down Expand Up @@ -46,7 +47,7 @@
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public class HttpChannelOverHTTP2 extends HttpChannel
public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
{
private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
Expand Down Expand Up @@ -377,6 +378,12 @@ public void continue100(int available) throws IOException
}
}

@Override
public void close()
{
abort(new IOException("Unexpected close"));
}

@Override
public String toString()
{
Expand Down
22 changes: 11 additions & 11 deletions jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
Expand All @@ -47,10 +46,9 @@
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;

/**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
Expand All @@ -76,8 +74,8 @@ public ManagedSelector(SelectorManager selectorManager, int id)
_id = id;
SelectorProducer producer = new SelectorProducer();
Executor executor = selectorManager.getExecutor();
_strategy = new EatWhatYouKill(producer,executor);
addBean(_strategy);
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
addBean(_strategy,true);
setStopTimeout(5000);
}

Expand Down Expand Up @@ -446,24 +444,26 @@ public void destroyEndPoint(final EndPoint endPoint)
@Override
public String dump()
{
super.dump();
return ContainerLifeCycle.dump(this);
}

@Override
public void dump(Appendable out, String indent) throws IOException
{
out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator());

Selector selector = _selector;
if (selector != null && selector.isOpen())
if (selector == null || !selector.isOpen())
dumpBeans(out, indent);
else
{
final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);

DumpKeys dumpKeys = new DumpKeys(dump);
submit(dumpKeys);
dumpKeys.await(5, TimeUnit.SECONDS);

ContainerLifeCycle.dump(out, indent, dump);
if (dump.isEmpty())
dumpBeans(out, indent);
else
dumpBeans(out, indent, dump);
}
}

Expand Down

0 comments on commit a105be9

Please sign in to comment.