Skip to content

Commit

Permalink
fix(网络组件): 修复关闭mqtt网关禁用逻辑错误 (#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
kyouji committed May 24, 2024
1 parent 2b8f571 commit 53a5d0c
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void setClient(io.vertx.mqtt.MqttClient client) {
private void reSubscribe() {
subscriber
.getAllSubscriber()
.filter(topic -> topic.getSubscribers().size() > 0)
.filter(topic -> !topic.getSubscribers().isEmpty())
.collectMap(topic -> getCompleteTopic(convertMqttTopic(topic.getSubscribers().iterator().next().getT1())),
topic -> topic.getSubscribers().iterator().next().getT3())
.filter(MapUtils::isNotEmpty)
Expand Down Expand Up @@ -170,10 +170,10 @@ public Flux<MqttMessage> subscribe(List<String> topics, int qos) {

Tuple3<String, FluxSink<MqttMessage>, Integer> topicQos = Tuples.of(topic, sink, qos);

boolean first = sinkTopic.getSubscribers().size() == 0;
boolean first = sinkTopic.getSubscribers().isEmpty();
sinkTopic.subscribe(topicQos);
composite.add(() -> {
if (sinkTopic.unsubscribe(topicQos).size() > 0 && isAlive()) {
if (!sinkTopic.unsubscribe(topicQos).isEmpty() && isAlive() && sinkTopic.getSubscribers().isEmpty()) {
client.unsubscribe(convertMqttTopic(completeTopic), result -> {
if (result.succeeded()) {
log.debug("unsubscribe mqtt topic {}", completeTopic);
Expand Down

0 comments on commit 53a5d0c

Please sign in to comment.