Skip to content

Commit

Permalink
refactor: 使用新的协议加载逻辑. (#491)
Browse files Browse the repository at this point in the history
使用eventbus来传递协议变更事件
  • Loading branch information
zhou-hao committed Apr 7, 2024
1 parent cf2c781 commit b61b0e3
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1,96 +1,37 @@
package org.jetlinks.community.protocol;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.supports.protocol.StaticProtocolSupports;
import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.supports.protocol.management.DefaultProtocolSupportManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

@Slf4j
@Getter
@Setter
@Order(Ordered.HIGHEST_PRECEDENCE)
public class LazyInitManagementProtocolSupports extends StaticProtocolSupports implements CommandLineRunner {

private ProtocolSupportManager manager;

private ProtocolSupportLoader loader;

private ClusterManager clusterManager;

@Setter(AccessLevel.PRIVATE)
private Map<String, String> configProtocolIdMapping = new ConcurrentHashMap<>();

private Duration loadTimeOut = Duration.ofSeconds(30);

public void init() {

clusterManager.<ProtocolSupportDefinition>getTopic("_protocol_changed")
.subscribe()
.subscribe(protocol -> this.init(protocol).subscribe());

try {
manager
.loadAll()
.filter(de -> de.getState() == 1)
.flatMap(this::init)
.blockLast(loadTimeOut);
} catch (Throwable e) {
log.error("load protocol error", e);
}
public class LazyInitManagementProtocolSupports extends DefaultProtocolSupportManager implements CommandLineRunner {

public LazyInitManagementProtocolSupports(EventBus eventBus,
ClusterManager clusterManager,
ProtocolSupportLoader loader) {
super(eventBus, clusterManager.getCache("__protocol_supports"), loader);
}

public Mono<Void> init(ProtocolSupportDefinition definition) {
try {
if (definition.getState() != 1) {
String protocol = configProtocolIdMapping.get(definition.getId());
if (protocol != null) {
log.debug("uninstall protocol:{}", definition);
unRegister(protocol);
return Mono.empty();
}
}
String operation = definition.getState() != 1 ? "uninstall" : "install";
Consumer<ProtocolSupport> consumer = definition.getState() != 1 ? this::unRegister : this::register;

log.debug("{} protocol:{}", operation, definition);

return loader
.load(definition)
.doOnNext(e -> {
e.init(definition.getConfiguration());
log.debug("{} protocol[{}] success: {}", operation, definition.getId(), e);
configProtocolIdMapping.put(definition.getId(), e.getId());
consumer.accept(e);
})
.onErrorResume((e) -> {
log.error("{} protocol[{}] error: {}", operation, definition.getId(), e.getLocalizedMessage());
return Mono.empty();
})
.then();
} catch (Throwable err) {
log.error("init protocol error", err);
}
return Mono.empty();
public void init() {
super.init();
}

@Override
public void run(String... args) {
init();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,21 @@
@AutoConfigureBefore(DeviceClusterConfiguration.class)
public class ProtocolAutoConfiguration {

@Bean
public ProtocolSupportManager protocolSupportManager(ClusterManager clusterManager) {
return new ClusterProtocolSupportManager(clusterManager);
}
// @Bean
// public ProtocolSupportManager protocolSupportManager(ClusterManager clusterManager) {
// return new ClusterProtocolSupportManager(clusterManager);
// }

@Bean
public ServiceContext serviceContext(ApplicationContext applicationContext) {
return new SpringServiceContext(applicationContext);
}

@Bean
public LazyInitManagementProtocolSupports managementProtocolSupports(ProtocolSupportManager supportManager,
ProtocolSupportLoader loader,
ClusterManager clusterManager) {
LazyInitManagementProtocolSupports supports = new LazyInitManagementProtocolSupports();
supports.setClusterManager(clusterManager);
supports.setManager(supportManager);
supports.setLoader(loader);
return supports;
public LazyInitManagementProtocolSupports managementProtocolSupports(EventBus eventBus,
ClusterManager clusterManager,
ProtocolSupportLoader loader) {
return new LazyInitManagementProtocolSupports(eventBus, clusterManager, loader);
}

@Bean
Expand All @@ -57,7 +53,7 @@ public LazyProtocolSupports protocolSupports() {

@Bean
public AutoDownloadJarProtocolSupportLoader autoDownloadJarProtocolSupportLoader(WebClient.Builder builder, FileManager fileManager) {
return new AutoDownloadJarProtocolSupportLoader(builder,fileManager);
return new AutoDownloadJarProtocolSupportLoader(builder, fileManager);
}

@Bean
Expand Down

0 comments on commit b61b0e3

Please sign in to comment.