Skip to content

Commit

Permalink
add registry renew
Browse files Browse the repository at this point in the history
  • Loading branch information
ipipman committed Apr 26, 2024
1 parent c1e37fa commit 181a60e
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void start() {
this.instance = InstanceMeta.http(ip, Integer.parseInt(port)).addParams(providerProperties.getMetas());
}
// 启动注册中心连接,开始注册
this.rc.start();
// this.rc.start();
this.skeleton.keySet().forEach(this::registerService);

}
Expand All @@ -90,7 +90,7 @@ public void stop() {
log.info(" ===> zk PreDestroy stop: " + this.skeleton);
// 取消注册,关闭注册中心连接
skeleton.keySet().forEach(this::unregisterService);
rc.stop();
// rc.stop();
}

private void unregisterService(String service) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Description for this class
Expand All @@ -27,30 +30,65 @@ public class IpManRegistryCenter implements RegistryCenter {
@Value("${registry-ipman.servers}")
String server;

private final Map<String, Long> VERSIONS = new HashMap<>();
// 记录注册中心,服务的版本号
Map<String, Long> VERSIONS = new HashMap<>();
// 记录需要通过renew保活的实例与服务
MultiValueMap<InstanceMeta, ServiceMeta> RENEWS = new LinkedMultiValueMap<>();
// 定期检查注册中心服务+实例的版本, 用于订阅
IpManRegistryExecutor versionChecker;
// 定期上报服务+实例的健康状态, 用于保活
IpManRegistryExecutor heathChecker;

@Override
public void start() {
log.info(" ====>>>> [IpMan-Registry] : start with server: {}", server);
// 定期对比与注册中心版本号, 如果版本有变化则触发回调, 用于更新 providers 的 instances, 5s一次
versionChecker = new IpManRegistryExecutor(1_000, 5_000, TimeUnit.MILLISECONDS);
// 定期将服务实例上报给注册中心, 避免被注册中心认为服务已死, 5s一次
heathChecker = new IpManRegistryExecutor(5, 5, TimeUnit.SECONDS);
heathChecker.executor(() -> RENEWS.keySet().forEach(
// 根据所有实例, 找到对应服务, 触发renew进行服务健康状态上报, 做探活
instance -> {
try {
StringBuilder sb = new StringBuilder();
for (ServiceMeta service : RENEWS.get(instance)) {
sb.append(service.toPath()).append(",");
}
String services = sb.toString();
if (services.endsWith(","))
services = services.substring(0, services.length() - 1);

Long timestamp = HttpInvoker.httpPost(
JSON.toJSONString(instance), reNewsPath(services), Long.class);
log.info(" ====>>>> [IpMan-Registry] : renew instance {} for {} at {}", instance, services, timestamp);
} catch (Exception e) {
log.error(" ====>>>> [IpMan-Registry] call registry leader error");
}
}
));
}

@Override
public void stop() {
log.info(" ====>>>> [IpMan-Registry] : stop with server: {}", server);
versionChecker.gracefulShutdown();
heathChecker.gracefulShutdown();
}

@Override
public void register(ServiceMeta service, InstanceMeta instance) {
log.info(" ====>>>> [IpMan-Registry] : register instance {} to {}", instance.toHttpUrl(), service.toPath());
InstanceMeta inst = HttpInvoker.httpPost(JSON.toJSONString(instance), regPath(service), InstanceMeta.class);
log.info(" ====>>>> [IpMan-Registry] : registered {} success", inst);
RENEWS.add(instance, service);
}

@Override
public void unregister(ServiceMeta service, InstanceMeta instance) {
log.info(" ====>>>> [IpMan-Registry] : unregister instance {} to {}", instance.toHttpUrl(), service.toPath());
InstanceMeta inst = HttpInvoker.httpPost(JSON.toJSONString(instance), unRegPath(service), InstanceMeta.class);
log.info(" ====>>>> [IpMan-Registry] : unregistered {} success", inst);
RENEWS.remove(instance, service);
}

@Override
Expand All @@ -62,25 +100,27 @@ public List<InstanceMeta> fetchAll(ServiceMeta service) {
return instances;
}

IpManHeathChecker heathChecker = new IpManHeathChecker();

@Override
public void subscribe(ServiceMeta service, ChangedListener listener) {
// 每隔5s, 去注册中心获取最新版本号,如果版本号大于当前版本, 就从注册中心同步最新实例的信息
heathChecker.check(() -> {
// 获取注册中心, 最新的版本号
String versionPath = versionPath(service);
Long newVersion = HttpInvoker.httpGet(versionPath, Long.class);
Long version = VERSIONS.getOrDefault(service.toPath(), -1L);
log.debug(" ====>>>> [{}] newVersion:{} oldVersion:{}", service.toPath(), newVersion, version);

// 如果版本号大于当前版本, 就从注册中心同步最新实例的信息
if (newVersion > version) {
log.info(" ====>>>> version changed [{}] newVersion:{} oldVersion:{}", service.toPath(), newVersion, version);
List<InstanceMeta> instanceMetas = fetchAll(service);
log.info(" ====>>>> version {} fetch all and fire: {}", newVersion, instanceMetas);
listener.fire(new Event(instanceMetas));
VERSIONS.put(service.toPath(), newVersion);
versionChecker.executor(() -> {
try {
// 获取注册中心, 最新的版本号
Long newVersion = HttpInvoker.httpGet(versionPath(service), Long.class);
Long version = VERSIONS.getOrDefault(service.toPath(), -1L);
log.debug(" ====>>>> [{}] newVersion:{} oldVersion:{}", service.toPath(), newVersion, version);

// 如果版本号大于当前版本, 就从注册中心同步最新实例的信息
if (newVersion > version) {
log.info(" ====>>>> version changed [{}] newVersion:{} oldVersion:{}", service.toPath(), newVersion, version);
List<InstanceMeta> instanceMetas = fetchAll(service);
log.info(" ====>>>> version {} fetch all and fire: {}", newVersion, instanceMetas);
listener.fire(new Event(instanceMetas));
VERSIONS.put(service.toPath(), newVersion);
}
} catch (Exception e) {
log.error(" ====>>>> [IpMan-Registry] call registry leader error");
}
});
}
Expand All @@ -105,4 +145,8 @@ private String findAllPath(ServiceMeta service) {
private String versionPath(ServiceMeta service) {
return server + "/version?service=" + service.toPath();
}

private String reNewsPath(String services) {
return server + "/renews?services=" + services;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,47 @@
* @Date 2024/4/21 20:12
*/
@Slf4j
public class IpManHeathChecker {
public class IpManRegistryExecutor {

// 注册中心探活间隔, 5s
final int interval = 5_000;
int initialDelay;
int delay;
TimeUnit unit;

final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

static final DateTimeFormatter DTF = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");

public void check(Callback callback) {

public IpManRegistryExecutor(int initialDelay, int delay, TimeUnit unit) {
this.initialDelay = initialDelay;
this.delay = delay;
this.unit = unit;
}

public void gracefulShutdown() {
executor.shutdown();
try {
executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
if (!executor.isTerminated()) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
// ignore
}
}

public void executor(Callback callback) {
executor.scheduleWithFixedDelay(() -> {
log.debug(" schedule to check ipman registry ... [{}]", DTF.format(LocalDateTime.now()));
try {
callback.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, interval, interval, TimeUnit.MILLISECONDS);
}, initialDelay, delay, unit);
}


@FunctionalInterface
public interface Callback {
void call() throws Exception;
Expand Down
6 changes: 3 additions & 3 deletions rpcman-demo-consumer/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ server:
rpcman:
#providers: http://localhost:8080,http://localhost:8081,http://localhost:8082
zk:
enabled: true
enabled: false
zkServer: localhost:2181
zkRoot: rpcman

Expand Down Expand Up @@ -39,10 +39,10 @@ apollo:

# 作者手写的注册中心:https://github.com/ipipman/registry-man
registry-ipman:
enabled: false
enabled: true
servers: http://localhost:8484

logging:
level:
root: error
cn.ipman.rpc: info
cn.ipman.rpc: debug
4 changes: 2 additions & 2 deletions rpcman-demo-provider/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ server:

rpcman:
zk:
enabled: true
enabled: false
zkServer: localhost:2181
zkRoot: rpcman

Expand Down Expand Up @@ -38,7 +38,7 @@ apollo:

# 作者手写的注册中心:https://github.com/ipipman/registry-man
registry-ipman:
enabled: false
enabled: true
servers: http://localhost:8484

logging:
Expand Down

0 comments on commit 181a60e

Please sign in to comment.