Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Expand WebFlux docs with WebSocketHandler examples
Issue: SPR-16820
  • Loading branch information
rstoyanchev committed May 17, 2018
1 parent c555fef commit c7adf28
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 16 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,11 +19,69 @@
import java.util.Collections;
import java.util.List;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

/**
* Handler for a WebSocket session.
*
* <p>Use {@link WebSocketSession#receive()} to compose on the stream of
* inbound messages and {@link WebSocketSession#send(Publisher)} to write the
* stream of outbound messages.
*
* <p>You can handle inbound and outbound messages as independent streams, and
* then join them:
*
* <pre class="code">
* class ExampleHandler implements WebSocketHandler {
* &#064;Override
* public Mono&lt;Void&gt; handle(WebSocketSession session) {
*
* Mono&lt;Void&gt; input = session.receive()
* .doOnNext(message -> {
* // ...
* })
* .concatMap(message -> {
* // ...
* })
* .then();
*
* Flux&lt;String&gt; source = ... ;
* Mono&lt;Void&gt; output = session.send(source.map(session::textMessage));
*
* return Mono.zip(input, output).then();
* }
* }
* </pre>
*
* <p>You can also create a single flow including inbound and outbound messages:
* <pre class="code">
* class ExampleHandler implements WebSocketHandler {
* &#064;Override
* public Mono&lt;Void&gt; handle(WebSocketSession session) {
*
* Flux&lt;WebSocketMessage&gt; input = session.receive()
* .doOnNext(message -> {
* // ...
* })
* .concatMap(message -> {
* // ...
* })
* .map(value -> session.textMessage("Echo " + value));
*
* return session.send(output);
* }
* }
* </pre>
*
* <p>When the connection is closed, the inbound stream will receive a
* completion/error signal, while the outbound stream will get a cancellation
* signal. The above flows are composed in such a way that the
* {@code Mono<Void>} returned from the {@code WebSocketHandler} won't complete
* until the connection is closed.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
Expand All @@ -39,6 +97,9 @@ default List<String> getSubProtocols() {

/**
* Handle the WebSocket session.
*
*
*
* @param session the session to handle
* @return completion {@code Mono<Void>} to indicate the outcome of the
* WebSocket session handling.
Expand Down
119 changes: 104 additions & 15 deletions src/docs/asciidoc/web/webflux-websocket.adoc
Expand Up @@ -20,10 +20,10 @@ server side applications that handle WebSocket messages.


[[webflux-websocket-server-handler]]
=== WebSocketHandler
=== Server
[.small]#<<web.adoc#websocket-server-handler,Same in Servlet stack>>#

Creating a WebSocket server is as simple as implementing `WebSocketHandler`:
To create a WebSocket server, first create a `WebSocketHandler`:

[source,java,indent=0]
[subs="verbatim,quotes"]
Expand All @@ -40,10 +40,7 @@ Creating a WebSocket server is as simple as implementing `WebSocketHandler`:
}
----

Spring WebFlux provides a `WebSocketHandlerAdapter` that can adapt WebSocket
requests and use the above handler to handle the resulting WebSocket session. After the
adapter is registered as a bean, you can map requests to your handler, for example using
`SimpleUrlHandlerMapping`. This is shown below:
Then map it to a URL and add a `WebSocketHandlerAdapter`:

[source,java,indent=0]
[subs="verbatim,quotes"]
Expand Down Expand Up @@ -71,17 +68,109 @@ adapter is registered as a bean, you can map requests to your handler, for examp



[[webflux-websockethandler]]
=== WebSocketHandler

The most basic implementation of a handler is one that handles inbound messages:

[source,java,indent=0]
[subs="verbatim,quotes"]
----
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() <1>
.doOnNext(message -> {
// ... <2>
})
.concatMap(message -> {
// ... <3>
})
.then(); <4>
}
}
----
<1> Access stream of inbound messages.
<2> Do something with each message.
<3> Perform nested async operation using message content.
<4> Return `Mono<Void>` that doesn't complete while we continue to receive.

[NOTE]
====
If performing a nested, asynchronous operation, you'll need to call
`message.retain()` if the underlying server uses pooled data buffers (e.g. Netty), or
otherwise the data buffer may be released before you've had a chance to read the data.
For more on this see <<core.adoc#databuffers,Data Buffers and Codecs>>.
====

A handler can work with inbound and outbound messages as independent streams:

[source,java,indent=0]
[subs="verbatim,quotes"]
----
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() <1>
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); <2>
return Mono.zip(input, output).then(); <3>
}
}
----
<1> Handle inbound message stream.
<2> Send outgoing messages.
<3> Join the streams and return `Mono<Void>` that completes when _either_ stream ends.

A handler can compose a connected flow of inbound and outbound messages:
4
[source,java,indent=0]
[subs="verbatim,quotes"]
----
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() <1>
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); <2>
return session.send(output); <3>
}
}
----
<1> Handle inbound message stream.
<2> Create outbound message, producing a combined flow.
<3> Return `Mono<Void>` that doesn't complete while we continue to receive.



[[webflux-websocket-server-handshake]]
=== WebSocket Handshake
=== Handshake
[.small]#<<web.adoc#websocket-server-handshake,Same in Servlet stack>>#

`WebSocketHandlerAdapter` does not perform WebSocket handshakes itself. Instead it
delegates to an instance of `WebSocketService`. The default `WebSocketService`
implementation is `HandshakeWebSocketService`.

The `HandshakeWebSocketService` performs basic checks on the WebSocket request and
delegates to a server-specific `RequestUpgradeStrategy`. At present upgrade strategies
exist for Reactor Netty, Tomcat, Jetty, and Undertow.
`WebSocketHandlerAdapter` delegates to a `WebSocketService`. By default that's an instance
of `HandshakeWebSocketService`, which performs basic checks on the WebSocket request and
then uses `RequestUpgradeStrategy` for the server in use. Currently there is built-in
support for Reactor Netty, Tomcat, Jetty, and Undertow.



Expand Down Expand Up @@ -132,7 +221,7 @@ specify CORS settings by URL pattern. If both are specified they're combined via


[[webflux-websocket-client]]
== WebSocketClient
=== Client

Spring WebFlux provides a `WebSocketClient` abstraction with implementations for
Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (i.e. JSR-356).
Expand Down

0 comments on commit c7adf28

Please sign in to comment.