Skip to content

Latest commit

 

History

History
683 lines (552 loc) · 43.9 KB

09.Soul-数据同步机制-http长轮询同步.md

File metadata and controls

683 lines (552 loc) · 43.9 KB

Soul-数据同步机制-http长轮询同步


本节概述

  • 熟悉 soul 的数据同步机制中的 http 长轮询同步

主要内容

关于什么是数据同步,以及同步策略的原理分析,请阅读Soul-数据同步机制-websocket同步。首先,我们结合官方文档,了解一下 http 长轮询的原理。 http 同步相比 zookeeperwebsocket 要复杂一些。Soul 借鉴了 ApolloNacos 的设计思想,取其精华,自己实现了 http 长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

http-long-polling

http 长轮询机制如上所示,soul-web 网关请求 admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 admin 配置服务及时响应变更数据,从而实现准实时推送。

http 请求到达 soul-admin 之后,并非立马响应数据,而是利用 Servlet3.0 的异步机制,异步响应数据。首先,将长轮询请求任务 LongPollingClient 扔到 BlocingQueue 中,并且开启调度任务,60s 后执行,这样做的目的是 60s 后将该长轮询请求移除队列,即便是这段时间内没有发生配置数据变更。因为即便是没有配置变更,也得让网关知道,总不能让其干等吧,而且网关请求配置服务时,也有 90s 的超时时间。

如果这段时间内,管理员变更了配置数据,此时,会挨个移除队列中的长轮询请求,并响应数据,告知是哪个 Group 的数据发生了变更(我们将插件、规则、流量配置、用户配置数据分成不同的组)。网关收到响应信息之后,就知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。有人会问,为什么不是直接将变更的数据写出?我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。

soul-web 网关层接收到 http 响应信息之后,拉取变更信息(如果有变更的话),然后再次请求 soul-admin 的配置服务,如此反复循环。

什么是 http长轮询

长轮询是一种即时通讯技术,常见的即时通讯技术还有轮询、 websocketSSE(Server-Sent Events) 。具体介绍请查看本节知识点拓展。在了解什么是长轮询之后,我们通过下文的配置和源码追踪来进一步了解 http 长轮询同步策略。

http长轮询同步策略-配置

soul-bootstrap 配置

soul-bootstrap 项目的 pom.xml 文件中引入了 soul-spring-boot-starter-sync-data-http 这个 starter

<!--soul data sync start use http-->
<dependency>
    <groupId>org.dromara</groupId>
    <artifactId>soul-spring-boot-starter-sync-data-http</artifactId>
    <version>${project.version}</version>
</dependency>

所以在 soul-bootstrap 启动后,就会去寻找 soul-spring-boot-starter-sync-data-httpresources/META-INF/spring.factories 文件,然后根据文件中的配置去加载指定模块。spring.factories 的文件内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.dromara.soul.spring.boot.starter.sync.data.http.HttpSyncDataConfiguration

soul-bootstrap 项目的 application-local.yml 文件中,配置了

soul:
  sync:
    http:
      url : http://localhost:9095

启动 soul-bootstrap 项目后,控制台打印内容如下:

2021-01-23 06:33:22.280  INFO 23738 --- [           main] .s.s.b.s.s.d.h.HttpSyncDataConfiguration : you use http long pull sync soul data
2021-01-23 06:33:32.540  INFO 23738 --- [           main] o.d.s.s.data.http.HttpSyncDataService    : request configs: [http://localhost:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATA]
2021-01-23 06:33:32.773  INFO 23738 --- [onPool-worker-2] o.d.s.s.d.h.refresh.AppAuthDataRefresh   : clear all appAuth data cache
2021-01-23 06:33:32.783  INFO 23738 --- [           main] o.d.s.s.data.http.HttpSyncDataService    : get latest configs: [{"code":200,"message":"success","data":{"META_DATA":{"md5":"c762aeb5273a4894a102178ecbad99ed","lastModifyTime":1611354789796,"data":[{"id":"1350470997790859264","appName":"sofa","contextPath":null,"path":"/sofa/insert","rpcType":"sofa","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboTestService","methodName":"insert","parameterTypes":"org.dromara.soul.examples.dubbo.api.entity.DubboTest","rpcExt":"{\"loadbalance\":\"hash\",\"retries\":3,\"timeout\":-1}","enabled":true},{"id":"1350470998256427008","appName":"sofa","contextPath":null,"path":"/sofa/findById","rpcType":"sofa","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboTestService","methodName":"findById","parameterTypes":"java.lang.String","rpcExt":"{\"loadbalance\":\"hash\",\"retries\":3,\"timeout\":-1}","enabled":true},{"id":"1350470998378061824","appName":"sofa","contextPath":null,"path":"/sofa/findAll","rpcType":"sofa","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboTestService","methodName":"findAll","parameterTypes":null,"rpcExt":"{\"loadbalance\":\"hash\",\"retries\":3,\"timeout\":-1}","enabled":true},{"id":"1350470998533251072","appName":"dubbo","contextPath":null,"path":"/dubbo/findByIdsAndName","rpcType":"dubbo","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboMultiParamService","methodName":"findByIdsAndName","parameterTypes":"java.util.List,java.lang.String","rpcExt":"{\"group\":\"\",\"version\":\"\",\"loadbalance\":\"random\",\"retries\":2,\"timeout\":10000,\"url\":\"\"}","enabled":true},{"id":"1350470998923321344","appName":"dubbo","contextPath":null,"path":"/dubbo/findByArrayIdsAndName","rpcType":"dubbo","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboMultiParamService","methodName":"findByArrayIdsAndName","parameterTypes":"[Ljava.lang.Integer;,java.lang.String","rpcExt":"{\"group\":\"\",\"version\":\"\",\"loadbalance\":\"random\",\"retries\":2,\"timeout\":10000,\"url\":\"\"}","enabled":true},{"id":"1350470999418249216","appName":"dubbo","contextPath":null,"path":"/dubbo/findByStringArray","rpcType":"dubbo","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboMultiParamService","methodName":"findByStringArray","parameterTypes":"[Ljava.lang.String;","rpcExt":"{\"group\":\"\",\"version\":\"\",\"loadbalance\":\"random\",\"retries\":2,\"timeout\":10000,\"url\":\"\"}","enabled":true},{"id":"1350470999657324544","appName":"dubbo","contextPath":null,"path":"/dubbo/findByListId","rpcType":"dubbo","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboMultiParamService","methodName":"findByListId","parameterTypes":"java.util.List","rpcExt":"{\"group\":\"\",\"version\":\"\",\"loadbalance\":\"random\",\"retries\":2,\"timeout\":10000,\"url\":\"\"}","enabled":true},{"id":"1350470999812513792","appName":"dubbo","contextPath":null,"path":"/dubbo/batchSave","rpcType":"dubbo","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboMultiParamService","methodName":"batchSave","parameterTypes":"java.util.List","rpcExt":"{\"group\":\"\",\"version\":\"\",\"loadbalance\":\"random\",\"retries\":2,\"timeout\":10000,\"url\":\"\"}","enabled":true},{"id":"1350470999988674560","appName":"dubbo","contextPath":null,"path":"/dubbo/batchSaveAndNameAndId","rpcType":"dubbo","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboMultiParamService","methodName":"batchSaveAndNameAndId","parameterTypes":"java.util.List,java.lang.String,java.lang.String","rpcExt":"{\"group\":\"\",\"version\":\"\",\"loadbalance\":\"random\",\"retries\":2,\"timeout\":10000,\"url\":\"\"}","enabled":true},{"id":"1350471000139669504","appName":"dubbo","contextPath":null,"path":"/dubbo/saveComplexBeanTest","rpcType":"dubbo","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboMultiParamService","methodName":"saveComplexBeanTest","parameterTypes":"org.dromara.soul.examples.dubbo.api.entity.ComplexBeanTest","rpcExt":"{\"group\":\"\",\"version\":\"\",\"loadbalance\":\"random\",\"retries\":2,\"timeout\":10000,\"url\":\"\"}","enabled":true},{"id":"1350471000273887232","appName":"dubbo","contextPath":null,"path":"/dubbo/saveComplexBeanTestAndName","rpcType":"dubbo","serviceName":"org.dromara.soul.examples.dubbo.api.service.DubboMultiParamService","methodName":"saveComplexBeanTestAndName","parameterTypes":"org.dromara.soul.examples.dubbo.api.entity.ComplexBeanTest,java.lang.String","rpcExt":"{\"group\":\"\",\"version\":\"\",\"loadbalance\":\"random\",\"retries\":2,\"timeout\":10000,\"url\":\"\"}","enabled":true},{"id":"1351596549541466112","appName":"springCloud-test","contextPath":null,"path":"/springcloud/**","rpcType":"springCloud","serviceName":"springCloud-test","methodName":"/springcloud","parameterTypes":null,"rpcExt":null,"enabled":true}]},"SELECTOR":{"md5":"f22658100dbe6d2733b1088e34715af2","lastModifyTime":1611354789783,"data":[{"id":"1349785409260158976","pluginId":"5","pluginName":"divide","name":"/http","matchMode":0,"type":1,"sort":1,"enabled":true,"loged":true,"continued":true,"handle":null,"conditionList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/http/**"}]},{"id":"1350470997925076992","pluginId":"6","pluginName":"dubbo","name":"/dubbo","matchMode":0,"type":1,"sort":1,"enabled":true,"loged":true,"continued":true,"handle":null,"conditionList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/dubbo/**"}]},{"id":"1351193221011312640","pluginId":"11","pluginName":"sofa","name":"/sofa","matchMode":0,"type":1,"sort":1,"enabled":true,"loged":true,"continued":true,"handle":"sofa","conditionList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/sofa/**"}]},{"id":"1351596549763764224","pluginId":"8","pluginName":"springCloud","name":"/springcloud","matchMode":0,"type":1,"sort":1,"enabled":true,"loged":true,"continued":true,"handle":"{\"serviceId\":\"springCloud-test\"}","conditionList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/springcloud/**"}]}]},"PLUGIN":{"md5":"0298afdf3cc5338833c99f44fb88f1e9","lastModifyTime":1611354789657,"data":[{"id":"1","name":"sign","config":null,"role":1,"enabled":false},{"id":"10","name":"sentinel","config":null,"role":1,"enabled":false},{"id":"11","name":"sofa","config":"{\"protocol\":\"zookeeper\",\"register\":\"127.0.0.1:2181\"}","role":0,"enabled":false},{"id":"12","name":"resilience4j","config":null,"role":1,"enabled":false},{"id":"13","name":"tars","config":null,"role":1,"enabled":false},{"id":"14","name":"context_path","config":null,"role":1,"enabled":false},{"id":"2","name":"waf","config":"{\"model\":\"black\"}","role":1,"enabled":false},{"id":"3","name":"rewrite","config":null,"role":1,"enabled":false},{"id":"4","name":"rate_limiter","config":"{\"master\":\"mymaster\",\"mode\":\"standalone\",\"url\":\"192.168.1.1:6379\",\"password\":\"abc\"}","role":1,"enabled":false},{"id":"5","name":"divide","config":null,"role":0,"enabled":true},{"id":"6","name":"dubbo","config":"{\"register\":\"zookeeper://localhost:2181\"}","role":1,"enabled":false},{"id":"7","name":"monitor","config":"{\"metricsName\":\"prometheus\",\"host\":\"localhost\",\"port\":\"9190\",\"async\":\"true\"}","role":1,"enabled":false},{"id":"8","name":"springCloud","config":null,"role":1,"enabled":false},{"id":"9","name":"hystrix","config":null,"role":0,"enabled":false}]},"APP_AUTH":{"md5":"d751713988987e9331980363e24189ce","lastModifyTime":1611354789644,"data":[]},"RULE":{"md5":"2bc7fe208cfdd0b5f6a146bd40759d16","lastModifyTime":1611354789768,"data":[{"id":"1349785409511817216","name":"/http/test/**","pluginName":"divide","selectorId":"1349785409260158976","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"loadBalance\":\"random\",\"retry\":0,\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/http/test/**"}]},{"id":"1349785409687977984","name":"/http/order/save","pluginName":"divide","selectorId":"1349785409260158976","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"loadBalance\":\"random\",\"retry\":0,\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/http/order/save"}]},{"id":"1349785409784446976","name":"/http/order/path/**","pluginName":"divide","selectorId":"1349785409260158976","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"loadBalance\":\"random\",\"retry\":0,\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/http/order/path/**"}]},{"id":"1349785409889304576","name":"/http/order/path/**/name","pluginName":"divide","selectorId":"1349785409260158976","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"loadBalance\":\"random\",\"retry\":0,\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/http/order/path/**/name"}]},{"id":"1349785409989967872","name":"/http/order/findById","pluginName":"divide","selectorId":"1349785409260158976","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"loadBalance\":\"random\",\"retry\":0,\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/http/order/findById"}]},{"id":"1350470998059294720","name":"/dubbo/insert","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/insert"}]},{"id":"1350470998294175744","name":"/dubbo/findById","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/findById"}]},{"id":"1350470998432587776","name":"/dubbo/findAll","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/findAll"}]},{"id":"1350470998604554240","name":"/dubbo/findByIdsAndName","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/findByIdsAndName"}]},{"id":"1350470999133036544","name":"/dubbo/findByArrayIdsAndName","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/findByArrayIdsAndName"}]},{"id":"1350470999569244160","name":"/dubbo/findByStringArray","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/findByStringArray"}]},{"id":"1350470999707656192","name":"/dubbo/findByListId","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/findByListId"}]},{"id":"1350470999883816960","name":"/dubbo/batchSave","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/batchSave"}]},{"id":"1350471000034811904","name":"/dubbo/batchSaveAndNameAndId","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/batchSaveAndNameAndId"}]},{"id":"1350471000177418240","name":"/dubbo/saveComplexBeanTest","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/saveComplexBeanTest"}]},{"id":"1350471000332607488","name":"/dubbo/saveComplexBeanTestAndName","pluginName":"dubbo","selectorId":"1350470997925076992","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/dubbo/saveComplexBeanTestAndName"}]},{"id":"1351193221082615808","name":"/sofa/insert","pluginName":"sofa","selectorId":"1351193221011312640","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/sofa/insert"}]},{"id":"1351193221216833536","name":"/sofa/findById","pluginName":"sofa","selectorId":"1351193221011312640","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/sofa/findById"}]},{"id":"1351193221275553792","name":"/sofa/findAll","pluginName":"sofa","selectorId":"1351193221011312640","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"retries\":0,\"loadBalance\":\"random\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/sofa/findAll"}]},{"id":"1351596549977673728","name":"/springcloud/order/save","pluginName":"springCloud","selectorId":"1351596549763764224","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"path\":\"/springcloud/order/save\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/springcloud/order/save"}]},{"id":"1351596550241914880","name":"/springcloud/order/findById","pluginName":"springCloud","selectorId":"1351596549763764224","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"path\":\"/springcloud/order/findById\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"=","paramName":"/","paramValue":"/springcloud/order/findById"}]},{"id":"1351596550334189568","name":"/springcloud/order/path/**","pluginName":"springCloud","selectorId":"1351596549763764224","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"path\":\"/springcloud/order/path/**\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/springcloud/order/path/**"}]},{"id":"1351596550418075648","name":"/springcloud/order/path/**/name","pluginName":"springCloud","selectorId":"1351596549763764224","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"path\":\"/springcloud/order/path/**/name\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/springcloud/order/path/**/name"}]},{"id":"1351596550510350336","name":"/springcloud/test/**","pluginName":"springCloud","selectorId":"1351596549763764224","matchMode":0,"sort":1,"enabled":true,"loged":true,"handle":"{\"path\":\"/springcloud/test/**\",\"timeout\":3000}","conditionDataList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/springcloud/test/**"}]}]}}}]

soul-admin 配置

soul-admin 项目中的 application.yml 文件中配置 zookeeper 的相关配置如下:

soul:
  sync:
    http:
      enabled: true

启动 soul-admin 项目后,控制台打印内容如下:

2021-01-23 06:33:09.797  INFO 23731 --- [           main] a.l.h.HttpLongPollingDataChangedListener : http sync strategy refresh interval: 300000ms

大家可以先对这些配置有个基础印象,下面我们根据这些配置来追踪源码,学习 soul 是如何实现 http 长轮询同步策略的。

http长轮询同步策略-源码追踪

从 soul-bootstrap 开始追踪

通过搜索 soul.sync.http 可以得知,是 HttpSyncDataConfiguration 这个类加载了 soul.sync.zookeeper 这段配置。该类创建了 httpSyncDataService

@Bean
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
    log.info("you use http long pull sync soul data");
    return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
                                   metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}

通过项目启动日志 you use http long pull sync soul data 可以验证我们这一步的追踪没有问题。

HttpSyncDataConfiguration 中创建 HttpSyncDataService

public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,
                           final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
    this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
    this.httpConfig = httpConfig;
    this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
    this.httpClient = createRestTemplate();
    this.start(); // 调用 start 方法去 soul-admin 拉取数据
}

调用 start 方法

private static final AtomicBoolean RUNNING = new AtomicBoolean(false);

private void start() {
    // It could be initialized multiple times, so you need to control that.
    // 判断当前状态是否处于运行状态,保证每次只有一个客户端在轮询
    if (RUNNING.compareAndSet(false, true)) {
        // fetch all group configs.
        // 获取所有的数据,初始化缓存
        this.fetchGroupConfig(ConfigGroupEnum.values());
        int threadSize = serverList.size();
        this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                SoulThreadFactory.create("http-long-polling", true));
        // start long polling, each server creates a thread to listen for changes.
        this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
    } else {
        log.info("soul http long polling was started, executor=[{}]", executor);
    }
}

进入 fetchGroupConfig 方法

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
    for (int index = 0; index < this.serverList.size(); index++) {
        String server = serverList.get(index);
        try {
            this.doFetchGroupConfig(server, groups);
            break;
        } catch (SoulException e) {
            // no available server, throw exception.
            if (index >= serverList.size() - 1) {
                throw e;
            }
            log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
        }
    }
}

new HttpLongPollingTask(server)

private final int retryTimes = 3; // 重试次数

@Override
public void run() {
    while (RUNNING.get()) {
        for (int time = 1; time <= retryTimes; time++) {
            try {
                doLongPolling(server);
            } catch (Exception e) {
                // print warnning log.
                // 轮询失败,进行重试
                if (time < retryTimes) {
                    log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
                             time, retryTimes - time, e.getMessage());
                    ThreadUtils.sleep(TimeUnit.SECONDS, 5);
                    continue;
                }
                // print error, then suspended for a while.
                // 重试3次后,睡5min
                log.error("Long polling failed, try again after 5 minutes!", e);
                ThreadUtils.sleep(TimeUnit.MINUTES, 5);
            }
        }
    }
    log.warn("Stop http long polling.");
}

查看 doLongPolling(server) 方法:

@SuppressWarnings("unchecked")
private void doLongPolling(final String server) {
    MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
    // 按 ConfigGroupEnum 分组,app_auth、plugin、rule、selector、meta_data
    for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
        ConfigData<?> cacheConfig = factory.cacheConfigData(group);
        // 将轮询数据 MD5 后,和最后修改时间,用","拼接,作为请求参数,用于请求服务端
        String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
        params.put(group.name(), Lists.newArrayList(value));
    }
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
    HttpEntity httpEntity = new HttpEntity(params, headers);
    String listenerUrl = server + "/configs/listener";
    log.debug("request listener configs: [{}]", listenerUrl);
    JsonArray groupJson = null;
    try {
        // 发送 http 请求,调用 soul-admin 的 /configs/listener 接口
        String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
        log.debug("listener result: [{}]", json);
        // 解析接口响应数据 json
        groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
    } catch (RestClientException e) {
        String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
        throw new SoulException(message, e);
    }
    if (groupJson != null) {
        // fetch group configuration async.
        ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
        // 判断 changedGroups 更新组是否为空,不为空,证明有数据更新,调用 doFetchGroupConfig()
        if (ArrayUtils.isNotEmpty(changedGroups)) {
            log.info("Group config changed: {}", Arrays.toString(changedGroups));
            this.doFetchGroupConfig(server, changedGroups);
        }
    }
}

调用 doFetchGroupConfig(server, changedGroups) 拉取 groups 的配置数据:

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
    StringBuilder params = new StringBuilder();
    for (ConfigGroupEnum groupKey : groups) {
        params.append("groupKeys").append("=").append(groupKey.name()).append("&");
    }
    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
    log.info("request configs: [{}]", url);
    String json = null;
    try {
        // 调用 soul-admin 的 /configs/fetch 接口
        json = this.httpClient.getForObject(url, String.class);
    } catch (RestClientException e) {
        String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
        log.warn(message);
        throw new SoulException(message, e);
    }
    // update local cache
    boolean updated = this.updateCacheWithJson(json);
    if (updated) {
        log.info("get latest configs: [{}]", json);
        return;
    }
    // not updated. it is likely that the current config server has not been updated yet. wait a moment.
    log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
    // 根据 updated 判断,请求数据是否变化,若没有发生变化,那么下一次请求将延长30秒
    ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}

进入 updateCacheWithJson 更新本地缓存

private boolean updateCacheWithJson(final String json) {
    JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
    JsonObject data = jsonObject.getAsJsonObject("data");
    // if the config cache will be updated?
    return factory.executor(data);
}

进入 DataRefreshFactoryexecutor

public boolean executor(final JsonObject data) {
    final boolean[] success = {false};
    ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
    return success[0];
}

进入 refresh 的具体实现

@Override
public Boolean refresh(final JsonObject data) {
    boolean updated = false;
    JsonObject jsonObject = convert(data);
    if (null != jsonObject) {
        ConfigData<T> result = fromJson(jsonObject);
        if (this.updateCacheIfNeed(result)) {
            updated = true;
            // updateCacheIfNeed(result) 为 true,进行更新
            refresh(result.getData());
        }
    }
    return updated;
}

此处我们追一下 updateCacheIfNeed

protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
    // first init cache
    if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
        return true;
    }
    ResultHolder holder = new ResultHolder(false);
    GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
        // must compare the last update time
        // 判断 MD5是否相同 和 最后一次修改时间是否大于本地配置时间
        if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) {
            log.info("update {} config: {}", groupEnum, newVal);
            holder.result = true;
            return newVal;
        }
        log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
        return oldVal;
    });
    return holder.result;
}

从 soul-admin 开始追踪

通过搜索 soul.sync.http.enabled 可以得知,是 DataSyncConfiguration 这个类加载了 soul.sync.http.enabled 这段配置。该类创建了 HttpLongPollingDataChangedListener

/**
 * http long polling(default strategy).
 */
@Configuration
@ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {

    @Bean
    @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
    public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
        return new HttpLongPollingDataChangedListener(httpSyncProperties);
    }

}

下面我们进入 HttpLongPollingDataChangedListener 类中一探究竟

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
    
    private final BlockingQueue<LongPollingClient> clients;

    private final ScheduledExecutorService scheduler;

    private final HttpSyncProperties httpSyncProperties;
    
	/**
     * Instantiates a new Http long polling data changed listener.
     * @param httpSyncProperties the HttpSyncProperties
     */
    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
        this.clients = new ArrayBlockingQueue<>(1024); // 长轮询阻塞队列
        this.scheduler = new ScheduledThreadPoolExecutor(1,
                SoulThreadFactory.create("long-polling", true)); // 调度线程池
        this.httpSyncProperties = httpSyncProperties; // http同步配置
    }

    // Bean 初始化后执行
    @Override
    protected void afterInitialize() {
        // 刷新间隔为5min,定义在 HttpSyncProperties, private Duration refreshInterval = Duration.ofMinutes(5);
        long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
        // Periodically check the data for changes and update the cache
        scheduler.scheduleWithFixedDelay(() -> {
            log.info("http sync strategy refresh config start.");
            try {
                this.refreshLocalCache();
                log.info("http sync strategy refresh config success.");
            } catch (Exception e) {
                log.error("http sync strategy refresh config error!", e);
            }
        }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
        log.info("http sync strategy refresh interval: {}ms", syncInterval);
    }

    private void refreshLocalCache() {
        this.updateAppAuthCache();
        this.updatePluginCache();
        this.updateRuleCache();
        this.updateSelectorCache();
        this.updateMetaDataCache();
    }

    /**
     * If the configuration data changes, the group information for the change is immediately responded.
     * Otherwise, the client's request thread is blocked until any data changes or the specified timeout is reached.
     * 
     * 如果配置数据更改,则会立即响应更改的组信息。
     * 否则,客户端的请求线程将被阻塞,直到任何数据更改或达到指定的超时时间。
     * @param request  the request
     * @param response the response
     */
    public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {

        // compare group md5
        List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
        String clientIp = getRemoteIp(request);

        // response immediately.
        if (CollectionUtils.isNotEmpty(changedGroup)) {
            this.generateResponse(response, changedGroup);
            log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
            return;
        }

        // listen for configuration changed.
        final AsyncContext asyncContext = request.startAsync();

        // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
        asyncContext.setTimeout(0L);

        // block client's thread.
        scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
    }
    
    
}

我们进入 compareChangedGroup 方法中

private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
    List<ConfigGroupEnum> changedGroup = new ArrayList<>(4);
    for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
        // md5,lastModifyTime
        String[] params = StringUtils.split(request.getParameter(group.name()), ',');
        if (params == null || params.length != 2) {
            throw new SoulException("group param invalid:" + request.getParameter(group.name()));
        }
        String clientMd5 = params[0];
        long clientModifyTime = NumberUtils.toLong(params[1]);
        ConfigDataCache serverCache = CACHE.get(group.name());
        // do check.
        if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
            changedGroup.add(group);
        }
    }
    return changedGroup;
}

// 校验 MD5是否相等,客户端数据修改时间和最后一次修改时间的前后
private boolean checkCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, final long clientModifyTime) {

    // is the same, doesn't need to be updated
    if (StringUtils.equals(clientMd5, serverCache.getMd5())) {
        return false;
    }

    // if the md5 value is different, it is necessary to compare lastModifyTime.
    long lastModifyTime = serverCache.getLastModifyTime();
    if (lastModifyTime >= clientModifyTime) {
        // the client's config is out of date.
        return true;
    }

    // the lastModifyTime before client, then the local cache needs to be updated.
    // Considering the concurrency problem, admin must lock,
    // otherwise it may cause the request from soul-web to update the cache concurrently, causing excessive db pressure
    boolean locked = false;
    try {
        locked = LOCK.tryLock(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return true;
    }
    if (locked) {
        try {
            ConfigDataCache latest = CACHE.get(serverCache.getGroup());
            if (latest != serverCache) {
                // the cache of admin was updated. if the md5 value is the same, there's no need to update.
                return !StringUtils.equals(clientMd5, latest.getMd5());
            }
            // load cache from db.
            this.refreshLocalCache();
            latest = CACHE.get(serverCache.getGroup());
            return !StringUtils.equals(clientMd5, latest.getMd5());
        } finally {
            LOCK.unlock();
        }
    }

    // not locked, the client need to be updated.
    return true;

}

通过 getRemoteIp 方法获取真实的 IP

private static String getRemoteIp(final HttpServletRequest request) {
    String xForwardedFor = request.getHeader(X_FORWARDED_FOR);
    if (!StringUtils.isBlank(xForwardedFor)) {
        return xForwardedFor.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim();
    }
    String header = request.getHeader(X_REAL_IP);
    return StringUtils.isBlank(header) ? request.getRemoteAddr() : header;
}

compareChangedGroup 方法返回的 List<ConfigGroupEnum> 非空时,调用 generateResponse

/**
 * Send response datagram.
 *
 * @param response      the response
 * @param changedGroups the changed groups
 */
private void generateResponse(final HttpServletResponse response, final List<ConfigGroupEnum> changedGroups) {
    try {
        response.setHeader("Pragma", "no-cache");
        response.setDateHeader("Expires", 0);
        response.setHeader("Cache-Control", "no-cache,no-store");
        response.setContentType(MediaType.APPLICATION_JSON_VALUE);
        response.setStatus(HttpServletResponse.SC_OK);
        response.getWriter().println(GsonUtils.getInstance().toJson(SoulAdminResult.success(SoulResultMessage.SUCCESS, changedGroups)));
    } catch (IOException ex) {
        log.error("Sending response failed.", ex);
    }
}

以选择器 selector 为例,当数据发生变化时,调用 afterSelectorChanged 方法,其他 appAuthmetaDatapluginrule 类似。

// 移除所有的长链接请求
@Override
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
    scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}

当组的数据发生更改时,将创建线程以异步通知客户端。

class DataChangeTask implements Runnable {

    /**
         * The Group where the data has changed.
         */
    private final ConfigGroupEnum groupKey;

    /**
         * The Change time.
         */
    private final long changeTime = System.currentTimeMillis();

    /**
         * Instantiates a new Data change task.
         *
         * @param groupKey the group key
         */
    DataChangeTask(final ConfigGroupEnum groupKey) {
        this.groupKey = groupKey;
    }

    @Override
    public void run() {
        for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
            LongPollingClient client = iter.next();
            iter.remove(); //移除长连接请求
            client.sendResponse(Collections.singletonList(groupKey));
            log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
        }
    }
}

关于 LongPollingClient ,如果超过60秒,仍然没有数据更改,则返回空数据。如果数据在这个时间范围内发生变化,DataChangeTask 会取消这个定时任务,并对改变的组数据作出响应。

class LongPollingClient implements Runnable {

    /**
         * The Async context.
         */
    private final AsyncContext asyncContext;

    /**
         * The Ip.
         */
    private final String ip;

    /**
         * The Timeout time.
         */
    private final long timeoutTime;

    /**
         * The Async timeout future.
         */
    private Future<?> asyncTimeoutFuture;

    /**
         * Instantiates a new Long polling client.
         *
         * @param ac          the ac
         * @param ip          the ip
         * @param timeoutTime the timeout time
         */
    LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {
        this.asyncContext = ac;
        this.ip = ip;
        this.timeoutTime = timeoutTime;
    }

    @Override
    public void run() {
        this.asyncTimeoutFuture = scheduler.schedule(() -> {
            clients.remove(LongPollingClient.this);
            List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
            sendResponse(changedGroups);
        }, timeoutTime, TimeUnit.MILLISECONDS);
        clients.add(this);
    }

    /**
         * Send response.
         *
         * @param changedGroups the changed groups
         */
    void sendResponse(final List<ConfigGroupEnum> changedGroups) {
        // cancel scheduler
        if (null != asyncTimeoutFuture) {
            asyncTimeoutFuture.cancel(false);
        }
        generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
        asyncContext.complete();
    }
}

问题记录

知识拓展

相关文章推荐

知识点拓展

【常见的即时通讯技术】

轮询:客户端每隔几秒钟向服务端发送 http 请求,服务端在收到请求后,不论是否有数据更新,都直接进行响应。在服务端响应完成,就会关闭这个 TCP 连接。这种方式实现非常简单,兼容性也比较好,只要支持 http 协议就可以用这种方式实现。缺点就是非常消耗资源,会占用较多的内存和带宽。

长轮询:客户端发送请求后服务器端不会立即返回数据,服务器端会阻塞,请求连接挂起,直到服务端有数据更新或者是连接超时才返回,客户端才再次发出请求新建连接、如此反复从而获取最新数据。相比轮询,长轮询减少了很多不必要的 http 请求次数,相比之下节约了资源。

websocket:参考 Soul-数据同步机制-websocket同步 的知识点拓展模块

SSE(Server-Sent Events)Server-SentHTML5提出一个标准。由客户端发起与服务器之间创建TCP连接,然后并维持这个连接直到客户端或服务器中的任何一方断开,ServerSent使用的是“问”+“答”的机制,连接创建后浏览器会周期性地发送消息至服务器询问,是否有自己的消息。其实现原理类似于我们在基于 iframe 的长连接模式。 HTTP 响应内容有一种特殊的content-type —— text/event-stream,该响应头标识了响应内容为事件流,客户端不会关闭连接,而是等待服务端不断的发送响应结果。 SSE规范比较简单,主要分为两个部分:浏览器中的 EventSource 对象,以及服务器端与浏览器端之间的通讯协议。SSE的响应内容可以看成是一个事件流,由不同的事件所组成。这些事件会触发前端EventSource对象上的方法。同时,SSE 支持自定义事件,默认事件通过监听 message 来获取数据。其优点在于客户端只需连接一次,Server 就定时推送,除非其中一端断开连接。并且 SSE 会在连接意外断开时自动重连。缺点就是需要学习新的语法。