Skip to content
This repository has been archived by the owner on Feb 17, 2020. It is now read-only.

Commit

Permalink
优化服务监控
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Jun 5, 2019
1 parent 56873dc commit e45503a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;


/**
Expand All @@ -26,5 +27,7 @@ public interface GatewayServerMonitor {

long getDeviceCount(String serverId);

void onServerDown(Consumer<String> listener);

long getDeviceCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.message.codec.Transport;
import org.redisson.api.RMap;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -34,6 +37,8 @@ public class RedissonGatewayServerMonitor implements GatewayServerMonitor {

private ScheduledExecutorService executorService;

private List<Consumer<String>> downListener = new CopyOnWriteArrayList<>();

private static final String transport_hosts = "transport_hosts";
private static final String transport_all_support = "transport_supports";
private static final String transport_connection_total = "transport_conn_total";
Expand All @@ -46,12 +51,15 @@ public String getRedisKey(String... key) {

private GatewayServerInfo current;

private RQueue<String> serverDownQueue;

public RedissonGatewayServerMonitor(String currentServerId, RedissonClient redissonClient, ScheduledExecutorService executorService) {
this.allServerId = redissonClient.getMap(getRedisKey("server:all"));
this.client = redissonClient;
this.currentServerId = currentServerId;
this.executorService = executorService;
current=newGatewayServerInfo(currentServerId);
current = newGatewayServerInfo(currentServerId);
serverDownQueue = client.getQueue("device-gateway-server-down");
}

private GatewayServerInfo newGatewayServerInfo(String serverId) {
Expand Down Expand Up @@ -122,7 +130,12 @@ public List<GatewayServerInfo> getAllServerInfo() {
@Override
public void serverOffline(String serverId) {
log.debug("device gateway server [{}] offline ", serverId);
allServerId.fastRemove(serverId);
long number = allServerId.fastRemove(serverId);
if (number > 0) {
for (Consumer<String> consumer : downListener) {
consumer.accept(serverId);
}
}
}

@Override
Expand All @@ -140,6 +153,11 @@ public void reportDeviceCount(Transport transport, long count) {
.set(count);
}

@Override
public void onServerDown(Consumer<String> listener) {
downListener.add(listener);
}

@Override
public long getDeviceCount(String serverId) {
return client.<Transport>getSet(getRedisKey(transport_all_support, serverId))
Expand Down Expand Up @@ -169,7 +187,8 @@ protected void clean() {

@PreDestroy
public void shutdown() {
allServerId.remove(currentServerId);
allServerId.fastRemove(currentServerId);
serverDownQueue.add(currentServerId);
clean();
}

Expand All @@ -179,7 +198,7 @@ protected synchronized void doStartup() {
}
startup = true;
allServerId.put(currentServerId, System.currentTimeMillis());

serverDownQueue.remove(currentServerId);
executorService.scheduleAtFixedRate(() -> {
log.debug("device gateway server [{}] keepalive", currentServerId);
allServerId.put(currentServerId, System.currentTimeMillis());
Expand All @@ -190,7 +209,14 @@ protected synchronized void doStartup() {
.map(Map.Entry::getKey)
.filter(Objects::nonNull)
.forEach(this::serverOffline);

//触发服务下线事件
for (String offlineServer = serverDownQueue.poll()
; offlineServer != null && (!currentServerId.equals(offlineServer))
; offlineServer = serverDownQueue.poll()) {
for (Consumer<String> listener : downListener) {
listener.accept(offlineServer);
}
}
}, 1, Math.max(1, timeToLive - 1), TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.Test;

import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

public class RedissonGatewayServerMonitorTest {

Expand All @@ -29,11 +30,16 @@ public void after() {
@Test
public void testOnlineOffline() {
monitor.registerTransport(Transport.MQTT);
AtomicReference<String> reference=new AtomicReference<>();

monitor.onServerDown(reference::set);

GatewayServerInfo info = monitor.getServerInfo("test").orElse(null);
Assert.assertNotNull(info);

Assert.assertEquals(info.getId(), "test");
monitor.serverOffline("test");
Assert.assertNotNull(reference.get());
info = monitor.getServerInfo("test").orElse(null);
Assert.assertNull(info);
}
Expand Down

0 comments on commit e45503a

Please sign in to comment.