diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/EndpointOverHttp2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/EndpointOverHttp2.java new file mode 100644 index 000000000000..96130c0fc498 --- /dev/null +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/EndpointOverHttp2.java @@ -0,0 +1,169 @@ +package org.eclipse.jetty.http2.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.http2.IStream; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; + +public class EndpointOverHttp2 extends AbstractEndPoint +{ + private static final int BUFFER_SIZE = 1024; // TODO: configured from session + + private HttpTransportOverHTTP2 transport; + private IStream http2Stream; + + private InetSocketAddress local; + private InetSocketAddress remote; + + private ByteBuffer writeBuffer; + + private DataFrame frame; + private Callback callback; + private AtomicReference writeReady = new AtomicReference<>(State.IDLE); + + private enum State + { + IDLE, + FLUSHING, + PENDING + } + + public EndpointOverHttp2(HttpTransportOverHTTP2 transport, IStream stream, InetSocketAddress local, InetSocketAddress remote) + { + super(new ScheduledExecutorScheduler()); + this.transport = transport; + + this.http2Stream = stream; + this.local = local; + this.remote = remote; + + this.writeBuffer = ByteBuffer.allocate(BUFFER_SIZE); // TODO: use the ByteBufferPool + + stream.setListener(new Stream.Listener.Adapter() + { + // TODO: should we override other methods on this (maybe for failures) + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + EndpointOverHttp2.this.frame = frame; + EndpointOverHttp2.this.callback = callback; + } + }); + } + + @Override + public InetSocketAddress getLocalAddress() + { + return local; + } + + @Override + public InetSocketAddress getRemoteAddress() + { + return remote; + } + + @Override + public Object getTransport() + { + return transport; + } + + @Override + protected void onIncompleteFlush() + { + while (true) + { + switch (writeReady.get()) + { + case IDLE: + getWriteFlusher().completeWrite(); + break; + + case FLUSHING: + if (!writeReady.compareAndSet(State.FLUSHING, State.PENDING)) + continue; + break; + + case PENDING: + throw new IllegalStateException(); + } + + break; + } + } + + @Override + protected void needsFillInterest() throws IOException + { + //getFillInterest().fillable(); + + Callback succeedFrame = callback; + frame = null; + callback = null; + succeedFrame.succeeded(); + } + + @Override + public int fill(ByteBuffer buffer) throws IOException + { + return BufferUtil.put(frame.getData(), buffer); + } + + @Override + public boolean flush(ByteBuffer... buffer) throws IOException + { + if (!writeReady.compareAndSet(State.IDLE, State.FLUSHING)) + return false; + + boolean incomplete = false; + BufferUtil.clearToFill(writeBuffer); + for (ByteBuffer bb : buffer) + { + int filled = BufferUtil.put(bb, writeBuffer); + if (filled == 0) + { + incomplete = true; + break; + } + } + BufferUtil.flipToFlush(writeBuffer, 0); + + DataFrame frame = new DataFrame(http2Stream.getId(), writeBuffer, false); + http2Stream.data(frame, Callback.from(()-> + { + while(true) + { + switch (writeReady.get()) + { + case IDLE: + throw new IllegalStateException(); + + case FLUSHING: + if (!writeReady.compareAndSet(State.FLUSHING, State.IDLE)) + continue; + break; + + case PENDING: + if (!writeReady.compareAndSet(State.PENDING, State.IDLE)) + continue; + getWriteFlusher().completeWrite(); + break; + } + + break; + } + }, t->close(t))); + + return incomplete; + } +} \ No newline at end of file diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketNegotiationTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketNegotiationTest.java deleted file mode 100644 index 9d64da695eb7..000000000000 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketNegotiationTest.java +++ /dev/null @@ -1,171 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.websocket.tests; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.charset.StandardCharsets; - -import org.eclipse.jetty.http.HttpFields; -import org.eclipse.jetty.http.HttpHeader; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.util.B64Code; -import org.eclipse.jetty.websocket.client.WebSocketClient; -import org.eclipse.jetty.websocket.servlet.WebSocketServlet; -import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.startsWith; - -public class WebSocketNegotiationTest -{ - public static class EchoServlet extends WebSocketServlet - { - @Override - public void configure(WebSocketServletFactory factory) - { - factory.register(EchoSocket.class); - } - } - - private Server server; - private ServerConnector connector; - private WebSocketClient client; - - @BeforeEach - public void start() throws Exception - { - server = new Server(); - connector = new ServerConnector(server); - connector.setPort(0); - server.addConnector(connector); - - ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); - contextHandler.setContextPath("/"); - contextHandler.addServlet(EchoServlet.class, "/"); - server.setHandler(contextHandler); - - client = new WebSocketClient(); - - server.start(); - client.start(); - } - - @AfterEach - public void stop() throws Exception - { - client.stop(); - server.stop(); - } - - @Test - public void testValidUpgradeRequest() throws Exception - { - Socket client = new Socket(); - client.connect(new InetSocketAddress("127.0.0.1", connector.getLocalPort())); - - HttpFields httpFields = newUpgradeRequest(null); - httpFields.remove(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL); - httpFields.add(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, "testInvalidUpgradeRequest"); - String upgradeRequest = "GET / HTTP/1.1\r\n" + httpFields; - client.getOutputStream().write(upgradeRequest.getBytes(StandardCharsets.ISO_8859_1)); - String response = getUpgradeResponse(client.getInputStream()); - - assertThat(response, startsWith("HTTP/1.1 101 Switching Protocols")); - assertThat(response, containsString("Sec-WebSocket-Accept: +WahVcVmeMLKQUMm0fvPrjSjwzI=")); - } - - @Test - public void testInvalidUpgradeRequestNoKey() throws Exception - { - Socket client = new Socket(); - client.connect(new InetSocketAddress("127.0.0.1", connector.getLocalPort())); - - HttpFields httpFields = newUpgradeRequest(null); - httpFields.remove(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL); - httpFields.add(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, "testInvalidUpgradeRequest"); - httpFields.remove(HttpHeader.SEC_WEBSOCKET_KEY); - - String upgradeRequest = "GET / HTTP/1.1\r\n" + httpFields; - client.getOutputStream().write(upgradeRequest.getBytes(StandardCharsets.ISO_8859_1)); - String response = getUpgradeResponse(client.getInputStream()); - - assertThat(response, containsString("400 Missing request header 'Sec-WebSocket-Key'")); - } - - - protected static HttpFields newUpgradeRequest(String extensions) - { - HttpFields fields = new HttpFields(); - fields.add(HttpHeader.HOST, "127.0.0.1"); - fields.add(HttpHeader.UPGRADE, "websocket"); - fields.add(HttpHeader.CONNECTION, "Upgrade"); - fields.add(HttpHeader.SEC_WEBSOCKET_KEY, new String(B64Code.encode("0123456701234567".getBytes()))); - fields.add(HttpHeader.SEC_WEBSOCKET_VERSION, "13"); - fields.add(HttpHeader.PRAGMA, "no-cache"); - fields.add(HttpHeader.CACHE_CONTROL, "no-cache"); - fields.add(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, "test"); - if (extensions != null) - fields.add(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, extensions); - - return fields; - } - - protected static String getUpgradeResponse(InputStream in) throws IOException - { - int state = 0; - StringBuilder buffer = new StringBuilder(); - while (state < 4) - { - int i = in.read(); - if (i < 0) - throw new EOFException(); - int b = (byte)(i & 0xff); - buffer.append((char)b); - switch (state) - { - case 0: - state = (b == '\r')?1:0; - break; - case 1: - state = (b == '\n')?2:0; - break; - case 2: - state = (b == '\r')?3:0; - break; - case 3: - state = (b == '\n')?4:0; - break; - default: - state = 0; - } - } - - return buffer.toString(); - } -} \ No newline at end of file diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java index b6f9c55dfe84..1b3238de002f 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java @@ -112,6 +112,8 @@ public ClientUpgradeRequest(WebSocketCoreClient webSocketClient, URI requestURI) this.wsClient = webSocketClient; this.futureCoreSession = new CompletableFuture<>(); + + // TODO: this is invalid for HTTP/2 requests method(HttpMethod.GET); version(HttpVersion.HTTP_1_1); @@ -180,6 +182,7 @@ public void setSubProtocols(List protocols) @Override public void send(final Response.CompleteListener listener) { + // TODO: this adds only the HTTP/1.1 headers, (if HTTP/2 send CONNECT request) initWebSocketHeaders(); super.send(listener); } @@ -232,11 +235,12 @@ public void onComplete(Result result) } } + // TODO: this will be a 200 response for HTTP/2 success if (responseStatusCode != HttpStatus.SWITCHING_PROTOCOLS_101) { // Failed to upgrade (other reason) - handleException( - new UpgradeException(requestURI, responseStatusCode, "Failed to upgrade to websocket: Unexpected HTTP Response Status Code: " + responseLine)); + handleException( new UpgradeException(requestURI, responseStatusCode, + "Failed to upgrade to websocket: Unexpected HTTP Response Status Code: " + responseLine)); } } @@ -249,9 +253,11 @@ protected void handleException(Throwable failure) @Override public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection) { + // TODO: http2 upgrade does not use upgrade header if (!this.getHeaders().get(HttpHeader.UPGRADE).equalsIgnoreCase("websocket")) throw new HttpResponseException("Not a WebSocket Upgrade", response); + // TODO: http2 upgrade does not use SEC_WEBSOCKET_KEY or SEC_WEBSOCKET_ACCEPT // Check the Accept hash String reqKey = this.getHeaders().get(HttpHeader.SEC_WEBSOCKET_KEY); String expectedHash = WebSocketCore.hashKey(reqKey); @@ -324,7 +330,6 @@ else if (values.length == 1) // We can upgrade EndPoint endp = httpConnection.getEndPoint(); customize(endp); - FrameHandler frameHandler = getFrameHandler(wsClient, response); if (frameHandler == null) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/Handshaker.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/Handshaker.java index 5334aad9b212..255bcadf8686 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/Handshaker.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/Handshaker.java @@ -18,18 +18,21 @@ package org.eclipse.jetty.websocket.core.server; -import org.eclipse.jetty.websocket.core.FrameHandler; -import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker; +import java.io.IOException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import java.io.IOException; + +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.server.internal.HandshakerSelector; +import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker; +import org.eclipse.jetty.websocket.core.server.internal.RFC8441Handshaker; public interface Handshaker { - static Handshaker newInstance() + static Handshaker newInstance() { - return new RFC6455Handshaker(); + return new HandshakerSelector(new RFC6455Handshaker(), new RFC8441Handshaker()); } boolean upgradeRequest( diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/Negotiation.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/Negotiation.java index f139a376850a..c92d9ad4fc67 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/Negotiation.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/Negotiation.java @@ -41,21 +41,21 @@ public class Negotiation { - private final Request baseRequest; - private final HttpServletRequest request; - private final HttpServletResponse response; - private final List offeredExtensions; - private final List offeredSubprotocols; - private final WebSocketExtensionRegistry registry; - private final DecoratedObjectFactory objectFactory; - private final ByteBufferPool bufferPool; - private final String version; - private final Boolean upgrade; - private final String key; - - private List negotiatedExtensions; - private String subprotocol; - private ExtensionStack extensionStack; + protected final Request baseRequest; + protected final HttpServletRequest request; + protected final HttpServletResponse response; + protected final List offeredExtensions; + protected final List offeredSubprotocols; + protected final WebSocketExtensionRegistry registry; + protected final DecoratedObjectFactory objectFactory; + protected final ByteBufferPool bufferPool; + protected final String version; + protected final Boolean upgrade; + protected final String key; + + protected List negotiatedExtensions; + protected String subprotocol; + protected ExtensionStack extensionStack; /** * @throws BadMessageException if there is any errors parsing the upgrade request @@ -75,7 +75,7 @@ public Negotiation( this.objectFactory = objectFactory; this.bufferPool = bufferPool; - Boolean upgrade = null; + Boolean upgradeHeader = null; String key = null; String version = null; QuotedCSV connectionCSVs = null; @@ -91,8 +91,8 @@ public Negotiation( switch (field.getHeader()) { case UPGRADE: - if (upgrade == null && "websocket".equalsIgnoreCase(field.getValue())) - upgrade = Boolean.TRUE; + if (upgradeHeader == null && "websocket".equalsIgnoreCase(field.getValue())) + upgradeHeader = Boolean.TRUE; break; case CONNECTION: @@ -130,7 +130,8 @@ public Negotiation( this.version = version; this.key = key; - this.upgrade = upgrade != null && connectionCSVs != null && connectionCSVs.getValues().stream().anyMatch(s -> s.equalsIgnoreCase("Upgrade")); + + this.upgrade = upgradeHeader != null && connectionCSVs != null && connectionCSVs.getValues().stream().anyMatch(s -> s.equalsIgnoreCase("Upgrade")); Set available = registry.getAvailableExtensionNames(); offeredExtensions = extensions == null @@ -251,5 +252,4 @@ public String toString() getOfferedExtensions(), getOfferedSubprotocols()); } - } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/HandshakerSelector.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/HandshakerSelector.java new file mode 100644 index 000000000000..1f0beb8fa758 --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/HandshakerSelector.java @@ -0,0 +1,43 @@ +package org.eclipse.jetty.websocket.core.server.internal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.server.Handshaker; +import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; + +public class HandshakerSelector implements Handshaker +{ + private List handshakers = new ArrayList<>(); + + // todo remove + public HandshakerSelector(Handshaker ...handshakers) + { + for (Handshaker handshaker : handshakers) + { + this.handshakers.add(handshaker); + } + } + + @Override + public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException + { + // TODO: optimise (do pre checks and avoid iterating through handshakers) + // TODO: minimum simplest thing to do to return false + for (Handshaker handshaker : handshakers) + { + if (handshaker.upgradeRequest(negotiator, request, response, defaultCustomizer)) + return true; + + if (response.isCommitted()) + return false; + } + + return false; + } +} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java index 2b79084e1935..b6723daa761d 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java @@ -64,17 +64,17 @@ public final class RFC6455Handshaker implements Handshaker private static final HttpField CONNECTION_UPGRADE = new PreEncodedHttpField(HttpHeader.CONNECTION, HttpHeader.UPGRADE.asString()); private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); - public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, - FrameHandler.Customizer defaultCustomizer) throws IOException + @Override + public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException { Request baseRequest = Request.getBaseRequest(request); HttpChannel httpChannel = baseRequest.getHttpChannel(); Connector connector = httpChannel.getConnector(); - if (negotiator == null) + if (!HttpVersion.HTTP_1_1.equals(baseRequest.getHttpVersion())) { if (LOG.isDebugEnabled()) - LOG.debug("not upgraded: no WebSocketNegotiator {}", baseRequest); + LOG.debug("not upgraded HttpVersion!=1.1 {}", baseRequest); return false; } @@ -85,10 +85,10 @@ public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest return false; } - if (!HttpVersion.HTTP_1_1.equals(baseRequest.getHttpVersion())) + if (negotiator == null) { if (LOG.isDebugEnabled()) - LOG.debug("not upgraded version!=1.1 {}", baseRequest); + LOG.debug("not upgraded: no WebSocketNegotiator {}", baseRequest); return false; } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Handshaker.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Handshaker.java new file mode 100644 index 000000000000..a1d98dcae232 --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Handshaker.java @@ -0,0 +1,231 @@ +package org.eclipse.jetty.websocket.core.server.internal; + +import java.io.IOException; +import java.util.concurrent.Executor; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.PreEncodedHttpField; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.ConnectionFactory; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnection; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.core.Behavior; +import org.eclipse.jetty.websocket.core.ExtensionConfig; +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.WebSocketConstants; +import org.eclipse.jetty.websocket.core.WebSocketException; +import org.eclipse.jetty.websocket.core.internal.ExtensionStack; +import org.eclipse.jetty.websocket.core.internal.Negotiated; +import org.eclipse.jetty.websocket.core.internal.WebSocketChannel; +import org.eclipse.jetty.websocket.core.internal.WebSocketConnection; +import org.eclipse.jetty.websocket.core.server.Handshaker; +import org.eclipse.jetty.websocket.core.server.Negotiation; +import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; + +public class RFC8441Handshaker implements Handshaker +{ + static final Logger LOG = Log.getLogger(RFC8441Handshaker.class); + private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); + + @Override + public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException + { + Request baseRequest = Request.getBaseRequest(request); + HttpChannel httpChannel = baseRequest.getHttpChannel(); + Connector connector = httpChannel.getConnector(); + + if (!HttpVersion.HTTP_2.equals(baseRequest.getHttpVersion())) + { + if (LOG.isDebugEnabled()) + LOG.debug("not upgraded HttpVersion!=2 {}", baseRequest); + return false; + } + + if (!HttpMethod.CONNECT.is(request.getMethod())) + { + if (LOG.isDebugEnabled()) + LOG.debug("not upgraded method!=GET {}", baseRequest); + return false; + } + + if (negotiator == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("not upgraded: no WebSocketNegotiator {}", baseRequest); + return false; + } + + ByteBufferPool pool = negotiator.getByteBufferPool(); + if (pool == null) + pool = baseRequest.getHttpChannel().getConnector().getByteBufferPool(); + + Negotiation negotiation = new RFC8441Negotiation(baseRequest, request, response, + negotiator.getExtensionRegistry(), negotiator.getObjectFactory(), pool); + if (LOG.isDebugEnabled()) + LOG.debug("negotiation {}", negotiation); + + if (!negotiation.isUpgrade()) + { + if (LOG.isDebugEnabled()) + LOG.debug("not upgraded: no upgrade header or connection upgrade", baseRequest); + return false; + } + + if (!WebSocketConstants.SPEC_VERSION_STRING.equals(negotiation.getVersion())) + { + if (LOG.isDebugEnabled()) + LOG.debug("not upgraded: unsupported version {} {}", negotiation.getVersion(), baseRequest); + return false; + } + + // Negotiate the FrameHandler + FrameHandler handler = negotiator.negotiate(negotiation); + if (LOG.isDebugEnabled()) + LOG.debug("negotiated handler {}", handler); + + // Handle error responses + if (response.isCommitted()) + { + if (LOG.isDebugEnabled()) + LOG.debug("not upgraded: response committed {}", baseRequest); + baseRequest.setHandled(true); + return false; + } + if (response.getStatus() > 200) + { + if (LOG.isDebugEnabled()) + LOG.debug("not upgraded: error sent {} {}", response.getStatus(), baseRequest); + response.flushBuffer(); + baseRequest.setHandled(true); + return false; + } + + // Check for handler + if (handler == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("not upgraded: no frame handler provided {}", baseRequest); + return false; + } + + // validate negotiated subprotocol + String subprotocol = negotiation.getSubprotocol(); + if (subprotocol != null) + { + if (!negotiation.getOfferedSubprotocols().contains(subprotocol)) + throw new WebSocketException("not upgraded: selected a subprotocol not present in offered subprotocols"); + } + else + { + if (!negotiation.getOfferedSubprotocols().isEmpty()) + throw new WebSocketException("not upgraded: no subprotocol selected from offered subprotocols"); + } + + // validate negotiated extensions + for (ExtensionConfig config : negotiation.getNegotiatedExtensions()) + { + if (config.getName().startsWith("@")) + continue; + + long matches = negotiation.getOfferedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count(); + if (matches < 1) + throw new WebSocketException("Upgrade failed: negotiated extension not requested"); + + matches = negotiation.getNegotiatedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count(); + if (matches > 1) + throw new WebSocketException("Upgrade failed: multiple negotiated extensions of the same name"); + } + + // Create and Negotiate the ExtensionStack + ExtensionStack extensionStack = negotiation.getExtensionStack(); + + Negotiated negotiated = new Negotiated( + baseRequest.getHttpURI().toURI(), + subprotocol, + baseRequest.isSecure(), + extensionStack, + WebSocketConstants.SPEC_VERSION_STRING); + + // Create the Channel + WebSocketChannel channel = newWebSocketChannel(handler, negotiated); + if (defaultCustomizer!=null) + defaultCustomizer.customize(channel); + negotiator.customize(channel); + + if (LOG.isDebugEnabled()) + LOG.debug("channel {}", channel); + + // Create a connection + EndPoint endPoint = baseRequest.getHttpChannel().getTunnellingEndPoint(); + WebSocketConnection connection = newWebSocketConnection(endPoint, connector.getExecutor(), connector.getByteBufferPool(), channel); + if (LOG.isDebugEnabled()) + LOG.debug("connection {}", connection); + if (connection == null) + throw new WebSocketException("not upgraded: no connection"); + + for (Connection.Listener listener : connector.getBeans(Connection.Listener.class)) + connection.addListener(listener); + + channel.setWebSocketConnection(connection); + + // send upgrade response + Response baseResponse = baseRequest.getResponse(); + baseResponse.setStatus(HttpStatus.OK_200); + + // See bugs.eclipse.org/485969 + if (getSendServerVersion(connector)) + baseResponse.getHttpFields().put(SERVER_VERSION); + + baseResponse.flushBuffer(); + baseRequest.setHandled(true); + + // upgrade + if (LOG.isDebugEnabled()) + LOG.debug("upgrade connection={} session={}", connection, channel); + + baseResponse.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); + baseRequest.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE, connection); + return true; + } + + protected WebSocketChannel newWebSocketChannel(FrameHandler handler, Negotiated negotiated) + { + return new WebSocketChannel(handler, Behavior.SERVER, negotiated); + } + + protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, WebSocketChannel wsChannel) + { + return new WebSocketConnection(endPoint, executor, byteBufferPool, wsChannel); + } + + private boolean getSendServerVersion(Connector connector) + { + ConnectionFactory connFactory = connector.getConnectionFactory(HttpVersion.HTTP_2.asString()); + if (connFactory == null) + return false; + + if (connFactory instanceof HttpConnectionFactory) + { + HttpConfiguration httpConf = ((HttpConnectionFactory)connFactory).getHttpConfiguration(); + if (httpConf != null) + return httpConf.getSendServerVersion(); + } + return false; + } +} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Negotiation.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Negotiation.java new file mode 100644 index 000000000000..43ba9a5d40cf --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Negotiation.java @@ -0,0 +1,29 @@ +package org.eclipse.jetty.websocket.core.server.internal; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.BadMessageException; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.DecoratedObjectFactory; +import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry; +import org.eclipse.jetty.websocket.core.server.Negotiation; + +// TODO: remove and inline the check +public class RFC8441Negotiation extends Negotiation +{ + public RFC8441Negotiation(Request baseRequest, HttpServletRequest request, HttpServletResponse response, WebSocketExtensionRegistry registry, DecoratedObjectFactory objectFactory, ByteBufferPool bufferPool) throws BadMessageException + { + super(baseRequest, request, response, registry, objectFactory, bufferPool); + } + + @Override + public boolean isUpgrade() + { + if (!baseRequest.hasMetaData()) + return false; + + return "websocket".equals(baseRequest.getMetaData().getProtocol()); + } +}