New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide example of a WebSocketHandler implementation for WebFlux [SPR-16820] #21360

Closed
spring-issuemaster opened this Issue May 14, 2018 · 11 comments

Comments

Projects
None yet
2 participants
@spring-issuemaster
Copy link
Collaborator

spring-issuemaster commented May 14, 2018

zhw opened SPR-16820 and commented

In HttpServerOperations.sendWebsocket() Method 

if (replace(ops)) {
return FutureMono.from(ops.handshakerResult)
.then(Mono.defer(() -> Mono.from(websocketHandler.apply(ops, ops))))
.doAfterSuccessOrError(ops);
}

Spring will auto close connection after sending handshake info if My WebSocketHandler  return any value except  Mono.never()

 

if My WebSocketHandler return Mono.never()

   the connection will keep alive but Server can't recevie message from Browser(Chrome)

   but I can send message from Server to Browser


Affects: 5.0.6

Referenced from: commits ade2eab, 543f190, b385ff1, c7adf28

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 14, 2018

Rossen Stoyanchev commented

Spring will auto close connection after sending handshake info if My WebSocketHandler return any value except Mono.never()

You couldn't have tried all possible values, so please be more specific in what you have tried. For more context, the Mono<Void> returned from WebSocketHandler is used to understand when handling is complete, so you want to make sure you return a Mono that reflects when handling is actually complete, or Mono.never().

the connection will keep alive but Server can't recevie message from Browser(Chrome)

Please provide sample code that demonstrates the issue.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 14, 2018

zhw commented

My question is What kind of return value of WebSocketHandler will
keep connection alive?

@Configuration
public class Bootstrap{
  public static class MywsHandler implements WebSocketHandler { 
            @Override
            public Mono<Void> handle(WebSocketSession session) {
               // I would like to create Instant Messenger(IM) Server
               // so I need store every connection. 
               // if return any value(e.g Mono.empty()) 
               // except Mono.never(), spring will auto close connection
               // Because the return value of WebSocketHandler will
               // trigger Mono.doAfterSuccessOrError(ops) (According to HttpServerOptions.sendSocket)
               // If I return Mono.never(), connection will not close, but Server can't receive Any Message From Browser
               // But Browser can receive Message from Server

               new Thread(()->{
                  while(true){
                     Scanner input = new Scanner(System.in);
                     if(input.hasNext()){
                        // omit ...
                        session.send(...).subscribe(); // Message from Server can be received By Browser
                     }
                 }
               }).start();
              return Mono.never();
        }
    }

     public static void main(String[] args){
        ApplicationContext context = new AnnotationConfigApplicationContext(Bootstrap.class);
        
	HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
        
	ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
        
	HttpServer.create(60000).newHandler(adapter).block().channel().closeFuture().sync();
     }

     @Bean 
     public static WebHandler webHandler(){
        return new DispatcherHandler ();
     }
     

    @Bean
    public static HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/path", new MywsHandler());

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1); // before annotated controllers
        return mapping;
    }

    @Bean
    public static WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}
<!DOCTYPE HTML>
<html>
   <head>
   <meta charset="utf-8">
   <title>菜鸟教程(runoob.com)</title>
    
      <script type="text/javascript">
        var ws = new WebSocket("ws://127.0.0.1:60000/path");
                
               ws.onopen = function()
               {
                  // Web Socket 已连接上,使用 send() 方法发送数据
                  ws.send("发送数据");
                  alert("数据发送中...");
               };
                
               ws.onmessage = function (evt) 
               { 
                  var received_msg = evt.data;
                  alert("数据已接收...");
               };
                
               ws.onclose = function()
               { 
                  // 关闭 websocket
                  alert("连接已关闭..."); 
               };
	 function sm(){
	   ws.send("data from Browser");
	 }
      </script>
        
   </head>
   <body>
   
      <a href="javascript:sm()"> send Message to Server </a>
      
   </body>
</html>
@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 14, 2018

Rossen Stoyanchev commented

Sorry but this is completely unreadable. Please edit your comment, use proper indentation and the comment editor toolbar to format the snippets as code.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 14, 2018

zhw commented

HttpServer.newHandler(adapter) will add three ChannelHandler
to each NioSocketChannel
1、HttpserverCodec
2、HttpserverHandler
3、HttpServerOption

HttpserverHandler will delegates DispatcherHandler to process requests and render the appropriate responses
but WebSocketHandlerAdapter will remove HttpserverHandler
so WebSocketHandler will be invoked only once, is Right?

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 14, 2018

zhw commented

I have reformat my comment, would you please take look at my question, thank you very much

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 15, 2018

Rossen Stoyanchev commented

This is a basic, learning question that you need to start with on StackOverflow. That said, given that we don't have an example implementation of a WebSocketHandler in the docs, I'm turning this ticket into a doc improvement.

A few pointers:

  1. You cannot return "anything" especially if by anything you mean Mono.empty(). The return value has a meaning. Returning Mono.empty() indicates to the framework that you're done handling messages.
  2. To receive messages you actually have to try via  session.receive().
  3. You should learn more about Reactor and composing flows. You handler implementation needs to be composed as a connected flow of sending and receiving messages. For example typically you wouldn't subscribe explicitly.
  4. Starting a thread like that is not idiomatic when using a reactive library.

 

 

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 15, 2018

zhw commented

Please forgive my poor English. I didn't express my problem clearly。
1、I want to create an IM server based on Spring5 over Websocket。
2、I kown the fact that
"To receive messages you actually have to try via session.receive()"
My so-called server could not receive the browser's message that the program did not enter the breakpoint I set. Where the breakpoint I set is NioEventLoop.processSelectedKeysPlain

private void More ...processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
416     
419        if (selectedKeys.isEmpty()) {
420            return;
421        }
422       //  When my WebsocketHandler finishes running, 
            // no matter what message the client sends, it cannot enter the breakpoint I set
423        Iterator<SelectionKey> i = selectedKeys.iterator(); // #### Here is My breakpoint ### 
424        for (;;) {
425            final SelectionKey k = i.next();
426            final Object a = k.attachment();
427            i.remove();
428
429            if (a instanceof AbstractNioChannel) {
430                processSelectedKey(k, (AbstractNioChannel) a);
431            } else {
432                @SuppressWarnings("unchecked")
433                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
434                processSelectedKey(k, task);
435            }
436
437            if (!i.hasNext()) {
438                break;
439            }
440
441            if (needsToSelectAgain) {
442                selectAgain();
443                selectedKeys = selector.selectedKeys();
444
445                // Create the iterator again to avoid ConcurrentModificationException
446                if (selectedKeys.isEmpty()) {
447                    break;
448                } else {
449                    i = selectedKeys.iterator();
450                }
451            }
452        }
453    }

3、
The last and most important thing is that the code in my comment above is just sample code and they are not rigorous。Where do I open a thread just to facilitate debugging。

Enables a thread to receive input from the console to prove that the client can receive messages from the server

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 15, 2018

Rossen Stoyanchev commented

I think I understand what you're trying to do, but you're not pointing out an actual issue. You're learning how to work with Reactor and WebFlux, and this is not the place for that.

I will do my best to improve the docs on creating a WebSocketHandler, but you will need to do some reading first, and use StackOverflow for questions.

Once I've updated the docs, you can provide feedback on what's missing or not clear.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 16, 2018

zhw commented

First of all thank you for your patience。:D

1.The issue is that Spring automatically disconnects the server from the client after the WebsocketHandler call ends. If I don't want to disconnect, I must let WebsocketHandler return Mono.never(), but this causes the server to fail to receive the client's message ( But the client can receive the message from the server).

2.By looking at Spring's source code, I discovered that Spring will send a CloseFrame to the client after the WebsocketHandler has finished running, resulting in a broken connection. I don't understand why you design like this

The code below comes from HttpServerOperations

final Mono<Void> withWebsocketSupport(String url,
			String protocols,
			BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
		Objects.requireNonNull(websocketHandler, "websocketHandler");
		if (markSentHeaders()) {
			HttpServerWSOperations ops = new HttpServerWSOperations(url, protocols, this);

			if (replace(ops)) {
				return FutureMono.from(ops.handshakerResult)
				                 .then(Mono.defer(() -> Mono.from(websocketHandler.apply(ops, ops))))
//Here(doAfterSuccessOrError(ops)) will be based on the return value of WebsocketHandler to decide whether to trigger Spring to disconnect. From the source code, if I do not want Spring to disconnect I can only let WebsocketHandler return Mono.never(), because Mono.never() will not send any signal and will not trigger the call of this method
				                 .doAfterSuccessOrError(ops); 

			}
		}
		else {
			log.error("Cannot enable websocket if headers have already been sent");
		}
		return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
	}

The code below comes from HttpServerWSOperations

      // FutureMono.from(ops.handshakerResult)
	//			                 .then(Mono.defer(() -> //Mono.from(websocketHandler.apply(ops, ops))))
//				                 .doAfterSuccessOrError(ops);
//The above code will eventually cause the method to be called, and the method //determines whether to close the connection (by sending a CloseFrame) or handle an //exception based on the return value of the WebsocketHandler

	public void accept(Void aVoid, Throwable throwable) {
		if (throwable == null) {
			if (channel().isActive()) {
				sendClose(null, f -> onHandlerTerminate());
			}
		}
		else {
			onOutboundError(throwable);
		}
	}

3、I don't understand why you want to design this way. We use the Websocket protocol to maintain a long connection. From the above two sources, I know that my WebsocketHandle has only one chance to be called after the server successfully sends a handshake frame, because the connection will be interrupted after this

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 17, 2018

Rossen Stoyanchev commented

The documentation has been updated to include samples. Here is a link to the build snapshot.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

spring-issuemaster commented May 17, 2018

Rossen Stoyanchev commented

One more round of updates. Please refresh, same link.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment