Skip to content

Commit

Permalink
Issue jetty#3537 - new test for websocket over http2 and minor fixes
Browse files Browse the repository at this point in the history
Signed-off-by: lachan-roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed May 6, 2019
1 parent c6b838e commit 1a66598
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void succeeded()
}
else
{
if (hasContent || (lastContent && !isTunnel(request, response)))
if (hasContent || (lastContent && !isTunnel(request, metaData)))
{
if (lastContent)
{
Expand Down
10 changes: 10 additions & 0 deletions jetty-websocket/jetty-websocket-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@
<artifactId>jetty-websocket-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.tests</groupId>
<artifactId>jetty-http-tools</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package org.eclipse.jetty.websocket.tests.http2;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.eclipse.jetty.websocket.server.JettyWebSocketServlet;
import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.server.JettyWebSocketServletFactory;
import org.eclipse.jetty.websocket.tests.EchoSocket;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class WebSocketOverHttp2ServerTest
{
public static class MyWebSocketServlet extends JettyWebSocketServlet
{
@Override
public void configure(JettyWebSocketServletFactory factory)
{
factory.addMapping("/",(req, resp)->new EchoSocket());
}
}

private static final Logger LOG = Log.getLogger(WebSocketOverHttp2ServerTest.class);

private Server server;
private ServerConnector connector;
private HTTP2Client http2Client;
private ByteBufferPool bufferPool = new MappedByteBufferPool();
private Generator generator = new Generator(bufferPool);
private Parser parser = new Parser(bufferPool);

@BeforeEach
public void before() throws Exception
{
server = new Server();
HTTP2CServerConnectionFactory factory = new HTTP2CServerConnectionFactory(new HttpConfiguration());
factory.setExtendedConnectSupported(true);
connector = new ServerConnector(server, 1, 1, factory);
server.addConnector(connector);

ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
contextHandler.setContextPath("/");
server.setHandler(contextHandler);
contextHandler.addServlet(MyWebSocketServlet.class, "/");
JettyWebSocketServletContainerInitializer.configureContext(contextHandler);

server.start();

http2Client = new HTTP2Client();
http2Client.start();
}

@AfterEach
public void after() throws Exception
{
http2Client.stop();
server.stop();
}

@Test
public void test() throws Exception
{
FuturePromise<Session> sessionPromise = new FuturePromise<>();
http2Client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), sessionPromise);

Session session = sessionPromise.get(5, TimeUnit.SECONDS);
HttpFields fields = new HttpFields();
fields.add(HttpHeader.SEC_WEBSOCKET_VERSION, "13");
MetaData.Request connectMetaData = new MetaData.Request(HttpMethod.CONNECT.asString(), HttpScheme.HTTP,
new HostPortHttpField("localhost:"+connector.getLocalPort()), "/", HttpVersion.HTTP_2, fields);
connectMetaData.setProtocol("websocket");
HeadersFrame connect = new HeadersFrame(connectMetaData, null, false);

BlockingArrayQueue<ByteBuffer> dataQueue = new BlockingArrayQueue<>();
CountDownLatch headersLatch = new CountDownLatch(1);

FuturePromise<Stream> streamPromise = new FuturePromise<>();
session.newStream(connect, streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
LOG.info("onHeaders(): " + frame);
MetaData.Response metaData = (MetaData.Response)frame.getMetaData();
if (metaData.getStatus() == HttpStatus.OK_200)
headersLatch.countDown();
}

@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
LOG.info("onData(): " + frame);
dataQueue.offer(frame.getData());
callback.succeeded();
}
});

ByteBuffer generatedFrame = generator.generateWholeFrame(new Frame(OpCode.TEXT, "hello world").setMask(new byte[]{0,0,0,0}));
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
assertTrue(headersLatch.await(5, TimeUnit.SECONDS));

stream.data(new DataFrame(stream.getId(), generatedFrame, false), Callback.NOOP);
ByteBuffer receivedData = dataQueue.poll(666, TimeUnit.SECONDS);
Parser.ParsedFrame parsedFrame = parser.parse(receivedData);

LOG.info("receivedFrame: " + parsedFrame);
assertThat(parsedFrame.getOpCode(), is(OpCode.TEXT));
assertThat(parsedFrame.getPayloadAsUTF8(), is("hello world"));

generator.getBufferPool().release(generatedFrame);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.Slf4jLog
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.tests.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.http2.LEVEL=DEBUG
# org.eclipse.jetty.util.log.stderr.LONG=true
# org.eclipse.jetty.server.AbstractConnector.LEVEL=DEBUG
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

package org.eclipse.jetty.websocket.core.internal;

import java.nio.ByteBuffer;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.Frame;

import java.nio.ByteBuffer;

/**
* Generating a frame in WebSocket land.
*
Expand Down Expand Up @@ -237,6 +237,36 @@ public void generateWholeFrame(Frame frame, ByteBuffer buf)
}
}

/**
* Generate the whole frame (header + payload copy) into a single ByteBuffer.
* <p>
* Note: This is slow, moves lots of memory around. Only use this if you must (such as in unit testing).
*
* @param frame the frame to generate
* @return the ByteBuffer the frame was generated to
*/
public ByteBuffer generateWholeFrame(Frame frame)
{
ByteBuffer buffer = bufferPool.acquire(MAX_HEADER_LENGTH + frame.getPayloadLength(), false);
BufferUtil.clearToFill(buffer);
generateHeaderBytes(frame, buffer);

if (frame.hasPayload())
{
if (readOnly)
{
buffer.put(frame.getPayload().slice());
}
else
{
buffer.put(frame.getPayload());
}
}

BufferUtil.flipToFlush(buffer, 0);
return buffer;
}

public ByteBufferPool getBufferPool()
{
return bufferPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.log.Log;
Expand Down Expand Up @@ -192,15 +192,13 @@ public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest
if (getSendServerVersion(connector))
baseResponse.getHttpFields().put(SERVER_VERSION);

baseResponse.flushBuffer();
baseRequest.setHandled(true);

// upgrade
if (LOG.isDebugEnabled())
LOG.debug("upgrade connection={} session={}", connection, channel);

baseResponse.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
baseRequest.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE, connection);
baseRequest.setAttribute(HttpTransport.UPGRADE_CONNECTION_ATTRIBUTE, connection);
return true;
}

Expand Down

0 comments on commit 1a66598

Please sign in to comment.