Skip to content

Commit

Permalink
Add support for websockets.
Browse files Browse the repository at this point in the history
Reuses some of the Spring Framework websockets classes and
a custom Proxy WebSocketHandler.

fixes gh-6
  • Loading branch information
spencergibb committed Aug 10, 2017
1 parent 1d59ab5 commit 924c484
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.springframework.cloud.gateway.config;

import java.util.List;
import java.util.function.Consumer;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.actuate.endpoint.Endpoint;
Expand All @@ -32,6 +33,7 @@
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.NettyRoutingFilter;
import org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter;
import org.springframework.cloud.gateway.filter.WebsocketRoutingFilter;
import org.springframework.cloud.gateway.filter.WriteResponseFilter;
import org.springframework.cloud.gateway.filter.factory.AddRequestHeaderWebFilterFactory;
import org.springframework.cloud.gateway.filter.factory.AddRequestParameterWebFilterFactory;
Expand Down Expand Up @@ -87,8 +89,13 @@

import com.netflix.hystrix.HystrixObservableCommand;

import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import reactor.core.publisher.Flux;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.http.client.HttpClientOptions;
import reactor.ipc.netty.resources.PoolResources;
import rx.RxReactiveStreams;

Expand All @@ -107,17 +114,27 @@ public class GatewayAutoConfiguration {
protected static class NettyConfiguration {
@Bean
@ConditionalOnMissingBean
public HttpClient httpClient() {
return HttpClient.create(opts -> {
public HttpClient httpClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) {
return HttpClient.create();
}

@Bean
public Consumer<? super HttpClientOptions.Builder> nettyClientOptions() {
return opts -> {
opts.poolResources(PoolResources.elastic("proxy"));
// opts.disablePool(); //TODO: why do I need this again?
});
};
}

@Bean
public NettyRoutingFilter routingFilter(HttpClient httpClient) {
return new NettyRoutingFilter(httpClient);
}

@Bean
public ReactorNettyWebSocketClient reactorNettyWebSocketClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) {
return new ReactorNettyWebSocketClient(options);
}
}

@Bean
Expand Down Expand Up @@ -181,6 +198,16 @@ public RouteToRequestUrlFilter routeToRequestUrlFilter() {
return new RouteToRequestUrlFilter();
}

@Bean
public WebSocketService webSocketService() {
return new HandshakeWebSocketService();
}

@Bean
public WebsocketRoutingFilter websocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService) {
return new WebsocketRoutingFilter(webSocketClient, webSocketService);
}

@Bean
public WriteResponseFilter writeResponseFilter() {
return new WriteResponseFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
*/
public class NettyRoutingFilter implements GlobalFilter, Ordered {

//TODO: investigate using WebClient
//The WebSocketRoutingFilter worked out so well without a hard dependency on netty
private final HttpClient httpClient;

public NettyRoutingFilter(HttpClient httpClient) {
Expand All @@ -62,6 +64,11 @@ public int getOrder() {
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

String scheme = requestUrl.getScheme();
if (!scheme.equals("http") && !scheme.equals("https)")) {
return chain.filter(exchange);
}

ServerHttpRequest request = exchange.getRequest();

final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
Expand All @@ -70,24 +77,6 @@ public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
request.getHeaders().forEach(httpHeaders::set);

if ("WebSocket".equalsIgnoreCase(request.getHeaders().getUpgrade())) {
return this.httpClient.ws(url)
// .flatMap(res -> {
.doOnNext(res -> {
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

response.getHeaders().putAll(headers);
response.setStatusCode(HttpStatus.valueOf(res.status().code()));

// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write response later WriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
}).then(chain.filter(exchange));
}

return this.httpClient.request(method, url, req -> {
final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)
.failOnClientError(false)
Expand All @@ -103,8 +92,6 @@ public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
}
}).then())
.then(chain.filter(exchange));
} else if ("WebSocket".equalsIgnoreCase(request.getHeaders().getUpgrade())) {
return proxyRequest.sendWebsocket();
}

return proxyRequest.sendHeaders() //I shouldn't need this
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.springframework.cloud.gateway.filter;

import java.net.URI;
import java.util.logging.Level;

import org.springframework.core.Ordered;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;

import reactor.core.publisher.Mono;

/**
* @author Spencer Gibb
*/
public class WebsocketRoutingFilter implements GlobalFilter, Ordered {
private final WebSocketClient webSocketClient;
private final WebSocketService webSocketService;

public WebsocketRoutingFilter(WebSocketClient webSocketClient) {
this(webSocketClient, new HandshakeWebSocketService());
}

public WebsocketRoutingFilter(WebSocketClient webSocketClient,
WebSocketService webSocketService) {
this.webSocketClient = webSocketClient;
this.webSocketService = webSocketService;
}

@Override
public int getOrder() {
return 2000000;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

String scheme = requestUrl.getScheme();
if (!scheme.equals("ws") && !scheme.equals("wss")) {
return chain.filter(exchange);
}

return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient));
}

private static class ProxyWebSocketHandler implements WebSocketHandler {

private final WebSocketClient client;
private final URI url;

public ProxyWebSocketHandler(URI url, WebSocketClient client) {
this.client = client;
this.url = url;
}

@Override
public Mono<Void> handle(WebSocketSession session) {
return client.execute(url, proxySession -> {
// Use retain() for Reactor Netty
Mono<Void> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketMessage::retain))
.log("proxySessionSend", Level.FINE);
Mono<Void> serverSessionSend = session
.send(proxySession.receive().doOnNext(WebSocketMessage::retain))
.log("sessionSend", Level.FINE);
return Mono.when(proxySessionSend, serverSessionSend).then();
});
}
}
}
10 changes: 10 additions & 0 deletions spring-cloud-gateway-sample/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ spring:

routes:
# =====================================
# to run server
# $ wscat --listen 9000
# to run client
# $ wscat --connect ws://localhost:8080/echo
- id: websocket_test
uri: ws://localhost:9000
order: 9000
predicates:
- Path=/echo
# =====================================
- id: default_path_to_httpbin
uri: ${test.uri}
order: 10000
Expand Down

0 comments on commit 924c484

Please sign in to comment.