Skip to content

Commit

Permalink
Issue #1973 - Implement minimum response data rate (#2012)
Browse files Browse the repository at this point in the history
* Code cleanups.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Improved test case handler.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Improved exception message.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Issue #1973 - Implement minimum response data rate.

Implemented response content data rate control in HttpOutput.

Introduced a WriteFlusher.Listener interface that produces events
for every flush(). These events are forwarded to the Connection
and from there to the HttpOutput so that the data rate control can
be enforced.

Both HTTP/1.1 and HTTP/2 are implemented.
Data rate control for HTTP/1.1 is approximate because it will count
also headers bytes and the chunk bytes, while for HTTP/2 is precise.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Issue #1973 - Implement minimum response data rate.

Addressed review comments.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet authored and gregw committed Dec 27, 2017
1 parent 696b60a commit e86e8a7
Show file tree
Hide file tree
Showing 13 changed files with 385 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
Expand All @@ -37,7 +38,7 @@
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;

public class HTTP2Connection extends AbstractConnection
public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener
{
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);

Expand Down Expand Up @@ -176,6 +177,13 @@ private Runnable pollTask()
}
}

@Override
public void onFlushed(long bytes) throws IOException
{
// TODO: add method to ISession ?
((HTTP2Session)session).onFlushed(bytes);
}

protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private final Callback fillableCallback = new FillableCallback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
Expand Down Expand Up @@ -175,7 +176,7 @@ protected Action process() throws Throwable
{
if (entry.generate(lease))
{
if (entry.dataRemaining() > 0)
if (entry.getDataBytesRemaining() > 0)
entries.offer(entry);
}
else
Expand Down Expand Up @@ -207,6 +208,31 @@ protected Action process() throws Throwable
return Action.SCHEDULED;
}

void onFlushed(long bytes) throws IOException
{
// For the given flushed bytes, we want to only
// forward those that belong to data frame content.
for (Entry entry : actives)
{
int frameBytesLeft = entry.getFrameBytesRemaining();
if (frameBytesLeft > 0)
{
int update = (int)Math.min(bytes, frameBytesLeft);
entry.onFrameBytesFlushed(update);
bytes -= update;
IStream stream = entry.stream;
if (stream != null && !entry.isControl())
{
Object channel = stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel instanceof WriteFlusher.Listener)
((WriteFlusher.Listener)channel).onFlushed(update - Frame.HEADER_LENGTH);
}
if (bytes == 0)
break;
}
}
}

@Override
public void succeeded()
{
Expand Down Expand Up @@ -234,13 +260,13 @@ private void complete()
for (int i = index; i < actives.size(); ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
if (entry.getDataBytesRemaining() > 0)
append(entry);
}
for (int i = 0; i < index; ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
if (entry.getDataBytesRemaining() > 0)
append(entry);
}
stalled = null;
Expand Down Expand Up @@ -333,7 +359,11 @@ protected Entry(Frame frame, IStream stream, Callback callback)
this.stream = stream;
}

public int dataRemaining()
public abstract int getFrameBytesRemaining();

public abstract void onFrameBytesFlushed(int bytesFlushed);

public int getDataBytesRemaining()
{
return 0;
}
Expand Down Expand Up @@ -387,6 +417,17 @@ private boolean isProtocol()
}
}

private boolean isControl()
{
switch (frame.getType())
{
case DATA:
return false;
default:
return true;
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,11 @@ protected void onStreamClosed(IStream stream)
{
}

void onFlushed(long bytes) throws IOException
{
flusher.onFlushed(bytes);
}

public void disconnect()
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -1132,15 +1137,28 @@ public String toString()
private class ControlEntry extends HTTP2Flusher.Entry
{
private int bytes;
private int frameBytes;

private ControlEntry(Frame frame, IStream stream, Callback callback)
{
super(frame, stream, callback);
}

@Override
public int getFrameBytesRemaining()
{
return frameBytes;
}

@Override
public void onFrameBytesFlushed(int bytesFlushed)
{
frameBytes -= bytesFlushed;
}

protected boolean generate(ByteBufferPool.Lease lease)
{
bytes = generator.control(lease, frame);
bytes = frameBytes = generator.control(lease, frame);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame);
prepare();
Expand Down Expand Up @@ -1238,7 +1256,8 @@ public void succeeded()
private class DataEntry extends HTTP2Flusher.Entry
{
private int bytes;
private int dataRemaining;
private int frameBytes;
private int dataBytes;
private int dataWritten;

private DataEntry(DataFrame frame, IStream stream, Callback callback)
Expand All @@ -1249,35 +1268,47 @@ private DataEntry(DataFrame frame, IStream stream, Callback callback)
// of data frames that cannot be completely written due to
// the flow control window exhausting, since in that case
// we would have to count the padding only once.
dataRemaining = frame.remaining();
dataBytes = frame.remaining();
}

@Override
public int getFrameBytesRemaining()
{
return frameBytes;
}

@Override
public void onFrameBytesFlushed(int bytesFlushed)
{
frameBytes -= bytesFlushed;
}

@Override
public int dataRemaining()
public int getDataBytesRemaining()
{
return dataRemaining;
return dataBytes;
}

protected boolean generate(ByteBufferPool.Lease lease)
{
int dataRemaining = dataRemaining();
int dataBytes = getDataBytesRemaining();

int sessionSendWindow = getSendWindow();
int streamSendWindow = stream.updateSendWindow(0);
int window = Math.min(streamSendWindow, sessionSendWindow);
if (window <= 0 && dataRemaining > 0)
if (window <= 0 && dataBytes > 0)
return false;

int length = Math.min(dataRemaining, window);
int length = Math.min(dataBytes, window);

// Only one DATA frame is generated.
bytes = generator.data(lease, (DataFrame)frame, length);
bytes = frameBytes = generator.data(lease, (DataFrame)frame, length);
int written = bytes - Frame.HEADER_LENGTH;
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataRemaining);
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataBytes);

this.dataWritten = written;
this.dataRemaining -= written;
this.dataBytes -= written;

flowControl.onDataSending(stream, written);

Expand All @@ -1292,7 +1323,7 @@ public void succeeded()

// Do we have more to send ?
DataFrame dataFrame = (DataFrame)frame;
if (dataRemaining() == 0)
if (getDataBytesRemaining() == 0)
{
// Only now we can update the close state
// and eventually remove the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public HttpClientTransportOverHTTP2(HTTP2Client client)
});
}

public HTTP2Client getHTTP2Client()
{
return client;
}

@ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
Expand All @@ -48,7 +49,7 @@
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, WriteFlusher.Listener
{
private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
Expand Down Expand Up @@ -85,6 +86,12 @@ public long getIdleTimeout()
return getStream().getIdleTimeout();
}

@Override
public void onFlushed(long bytes) throws IOException
{
getResponse().getHttpOutput().onFlushed(bytes);
}

public Runnable onRequest(HeadersFrame frame)
{
try
Expand Down
Loading

0 comments on commit e86e8a7

Please sign in to comment.