Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STOMP心跳与服务器断开连接后没有任何操作和处理 #39

Closed
fanchw opened this issue Sep 16, 2021 · 4 comments
Closed

STOMP心跳与服务器断开连接后没有任何操作和处理 #39

fanchw opened this issue Sep 16, 2021 · 4 comments
Labels
question Further information is requested

Comments

@fanchw
Copy link

fanchw commented Sep 16, 2021

com.ejlchina.okhttps.internal.WebSocketTask

	private void schedulePing() {
		if (!isConnected()) {
			return;
		}
		int delay = (int) (pingSeconds + lastPingSecs - nowSeconds());
		httpClient.executor.requireScheduler().schedule(() -> {
			if (!isConnected()) {
				return;
			}
			WebSocket ws = webSocket;
			if (nowSeconds() - lastPingSecs >= pingSeconds && ws != null) {
				ByteString ping = pingSupplier != null ? pingSupplier.getPing() : ByteString.EMPTY;
				ws.send(ping);
				lastPingSecs = nowSeconds();
			}
			schedulePing();
		}, delay, TimeUnit.SECONDS);
	}

	/**
	 * 检测服务器的心跳响应
	 */
	private void schedulePong() {
		if (!isConnected()) {
			return;
		}
		int delay = (int) (pongSeconds + lastPongSecs - nowSeconds());
		httpClient.executor.requireScheduler().schedule(() -> {
			if (!isConnected()) {
				return;
			}
			long noPongSeconds = nowSeconds() - lastPongSecs;
			if (noPongSeconds > 3L * pongSeconds) {
				WebSocketImpl ws = webSocket;
				if (ws != null) {
					Exception e = new SocketTimeoutException("Server didn't pong heart-beat on time. Last received at " + noPongSeconds + " seconds ago.");
					((RealWebSocket) ws.webSocket).failWebSocket(e, null);
				}
			} else {
				schedulePong();
			}
		}, delay, TimeUnit.SECONDS);
	}

如代码中所示,两个心跳检测中,判断与服务器连接断开!isConnected()后,直接return,此处是否应该抛出异常或触发onDisconnected回调。

@fanchw fanchw changed the title STOMP与服务器断开连接后没有任何操作和处理 STOMP心跳与服务器断开连接后没有任何操作和处理 Sep 16, 2021
@troyzhxu
Copy link
Owner

此时,说明连接已经断开,而 onDisconnected 回调 已经 或 正在 或 即将 在另外一个线程里 被执行

@troyzhxu troyzhxu added the question Further information is requested label Sep 16, 2021
@fanchw
Copy link
Author

fanchw commented Sep 16, 2021

配置心跳后,关闭服务端,没有触发任何回调。

服务端配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    public static final String END_POINT = "stomp-kafka";

    public static final String BROKER_PREFIX = "/kafka";

    private static final long HEAR_BEAT_MILLS = 5000L;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint(END_POINT).setAllowedOrigins("*").withSockJS();
        registry.addEndpoint("ok-https");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        final ThreadPoolTaskScheduler heartBeatPool = new ThreadPoolTaskScheduler();
        heartBeatPool.setPoolSize(1);
        heartBeatPool.setThreadNamePrefix("stomp-heart-beat-");
        heartBeatPool.initialize();
        registry.enableSimpleBroker(BROKER_PREFIX)
                .setHeartbeatValue(new long[]{HEAR_BEAT_MILLS, HEAR_BEAT_MILLS})
                .setTaskScheduler(heartBeatPool)
        ;
        registry.setApplicationDestinationPrefixes("/server");
    }

客户端配置:

    @Bean
    public Stomp okHttps(StompConfigEntity stompConfigEntity) {
        final Stomp stomp = Stomp.over(OkHttps.webSocket(stompConfigEntity.getUrl()).heatbeat(5, 5));
        stomp.setOnConnected(data -> {
                    log.info("Stomp Connected {}", data);
                    subscribe(stompConfigEntity.getTopics(), data);
                })
                .setOnDisconnected(data -> {
                    log.info("Stomp Disconnected {}", data);
                    didSubscribe = false;
                    stomp.connect();
                })
                .setOnError(data -> log.info("Error : {}", data))
                .setOnException(data -> {
                    log.warn("Exception!", data);
                    didSubscribe = false;
                    stomp.connect();
                });
        stomp.connect();
        return stomp;
    }

    private void subscribe(List<String> topics, Stomp stomp) {
        if (this.didSubscribe) {
            return;
        }
        this.didSubscribe = true;
        for (String topic : topics) {
            stomp.subscribe(topic, null, message -> log.info("Subscribe Handler : {}", message.getPayload()));
        }
    }

请问服务端和客户端这样的心跳配置正确吗?

@troyzhxu
Copy link
Owner

OnConnected 回调有执行吗?如果没有,说明根本没有连上这个服务器,可以尝试把 withSockJS 去掉试试

@fanchw
Copy link
Author

fanchw commented Sep 16, 2021

OnConnected 回调有执行吗?如果没有,说明根本没有连上这个服务器,可以尝试把 withSockJS 去掉试试

回调执行了 我开放了两个endpoint 一个是withJs的 另一个是没有

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants