From 852c864d0826e1183099c7076b505b5717cf23fa Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Mon, 25 Sep 2023 22:17:36 +0800 Subject: [PATCH 1/5] =?UTF-8?q?refactor:=E4=BC=98=E5=8C=96registry?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dubbo/registry/PolarisRegistry.java | 206 +++--------------- .../dubbox/registry/PolarisRegistry.java | 177 ++------------- 2 files changed, 48 insertions(+), 335 deletions(-) diff --git a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java index 474cb26..cd7588b 100644 --- a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java +++ b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java @@ -19,16 +19,22 @@ import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.listener.ServiceListener; import com.tencent.polaris.api.pojo.Instance; -import com.tencent.polaris.api.pojo.ServiceChangeEvent; import com.tencent.polaris.api.utils.StringUtils; -import com.tencent.polaris.client.util.NamedThreadFactory; import com.tencent.polaris.common.registry.BaseBootConfigHandler; import com.tencent.polaris.common.registry.BootConfigHandler; import com.tencent.polaris.common.registry.Consts; import com.tencent.polaris.common.registry.ConvertUtils; import com.tencent.polaris.common.registry.PolarisOperator; import com.tencent.polaris.common.registry.PolarisOperators; -import com.tencent.polaris.common.utils.ExtensionConsts; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; +import org.apache.dubbo.rpc.cluster.Constants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -37,28 +43,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.constants.CommonConstants; -import org.apache.dubbo.common.constants.RegistryConstants; -import org.apache.dubbo.common.extension.ExtensionLoader; -import org.apache.dubbo.common.utils.ConcurrentHashSet; -import org.apache.dubbo.registry.NotifyListener; -import org.apache.dubbo.registry.support.FailbackRegistry; -import org.apache.dubbo.rpc.Filter; -import org.apache.dubbo.rpc.cluster.Constants; -import org.apache.dubbo.rpc.cluster.RouterFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PolarisRegistry extends FailbackRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(PolarisRegistry.class); - private static final TaskScheduler taskScheduler = new TaskScheduler(); - private final Set registeredInstances = new ConcurrentHashSet<>(); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -67,10 +57,6 @@ public class PolarisRegistry extends FailbackRegistry { private final PolarisOperator polarisOperator; - private final boolean hasCircuitBreaker; - - private final boolean hasRouter; - public PolarisRegistry(URL url) { this(url, new BaseBootConfigHandler()); } @@ -80,33 +66,6 @@ public PolarisRegistry(URL url, BootConfigHandler... handlers) { super(url); polarisOperator = new PolarisOperator(url.getHost(), url.getPort(), url.getParameters(), handlers); PolarisOperators.INSTANCE.addPolarisOperator(polarisOperator); - ExtensionLoader routerExtensionLoader = ExtensionLoader.getExtensionLoader(RouterFactory.class); - hasRouter = routerExtensionLoader.hasExtension(ExtensionConsts.PLUGIN_ROUTER_NAME); - ExtensionLoader filterExtensionLoader = ExtensionLoader.getExtensionLoader(Filter.class); - hasCircuitBreaker = filterExtensionLoader.hasExtension(ExtensionConsts.PLUGIN_CIRCUITBREAKER_NAME); - } - - private URL buildRouterURL(URL consumerUrl) { - URL routerURL = null; - if (hasRouter) { - URL registryURL = getUrl(); - routerURL = new URL(RegistryConstants.ROUTE_PROTOCOL, registryURL.getHost(), registryURL.getPort()); - routerURL = routerURL.setServiceInterface(CommonConstants.ANY_VALUE); - routerURL = routerURL.addParameter(Constants.ROUTER_KEY, ExtensionConsts.PLUGIN_ROUTER_NAME); - String consumerGroup = consumerUrl.getParameter(CommonConstants.GROUP_KEY); - String consumerVersion = consumerUrl.getParameter(CommonConstants.VERSION_KEY); - String consumerClassifier = consumerUrl.getParameter(CommonConstants.CLASSIFIER_KEY); - if (null != consumerGroup) { - routerURL = routerURL.addParameter(CommonConstants.GROUP_KEY, consumerGroup); - } - if (null != consumerVersion) { - routerURL = routerURL.addParameter(CommonConstants.VERSION_KEY, consumerVersion); - } - if (null != consumerClassifier) { - routerURL = routerURL.addParameter(CommonConstants.CLASSIFIER_KEY, consumerClassifier); - } - } - return routerURL; } @Override @@ -114,7 +73,7 @@ public void doRegister(URL url) { if (!shouldRegister(url)) { return; } - LOGGER.info("[POLARIS] register service to polaris: {}", url.toString()); + LOGGER.info("[POLARIS] register service to polaris: {}", url); Map metadata = new HashMap<>(url.getParameters()); metadata.put(CommonConstants.PATH_KEY, url.getPath()); int port = url.getPort(); @@ -138,7 +97,7 @@ public void doUnregister(URL url) { if (!shouldRegister(url)) { return; } - LOGGER.info("[POLARIS] unregister service from polaris: {}", url.toString()); + LOGGER.info("[POLARIS] unregister service from polaris: {}", url); int port = url.getPort(); if (port > 0) { polarisOperator.deregister(url.getServiceInterface(), url.getHost(), url.getPort()); @@ -155,79 +114,27 @@ public void destroy() { doUnregister(url); } polarisOperator.destroy(); - taskScheduler.destroy(); } } @Override public void doSubscribe(URL url, NotifyListener listener) { String service = url.getServiceInterface(); - Instance[] instances = polarisOperator.getAvailableInstances(service, !hasCircuitBreaker); + Instance[] instances = polarisOperator.getAvailableInstances(service, true); onInstances(url, listener, instances); LOGGER.info("[POLARIS] submit watch task for service {}", service); - taskScheduler.submitWatchTask(new WatchTask(url, listener, service)); - } - - private class WatchTask implements Runnable { - - private final String service; - - private final ServiceListener serviceListener; - - private final NotifyListener listener; - - private final FetchTask fetchTask; - - public WatchTask(URL url, NotifyListener listener, - String service) { - this.service = service; - this.listener = listener; - fetchTask = new FetchTask(url, listener); - serviceListener = new ServiceListener() { - @Override - public void onEvent(ServiceChangeEvent event) { - PolarisRegistry.taskScheduler.submitFetchTask(fetchTask); + serviceListeners.computeIfAbsent(listener, notifyListener -> { + ServiceListener serviceListener = event -> { + try { + Instance[] curInstances = polarisOperator.getAvailableInstances(service, true); + onInstances(url, listener, curInstances); + } catch (PolarisException e) { + LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); } }; - } - - @Override - public void run() { - boolean result = polarisOperator.watchService(service, serviceListener); - if (result) { - serviceListeners.put(listener, serviceListener); - PolarisRegistry.taskScheduler.submitFetchTask(fetchTask); - return; - } - PolarisRegistry.taskScheduler.submitWatchTask(this); - } - } - - private class FetchTask implements Runnable { - - private final String service; - - private final URL url; - - private final NotifyListener listener; - - public FetchTask(URL url, NotifyListener listener) { - this.service = url.getServiceInterface(); - this.url = url; - this.listener = listener; - } - - @Override - public void run() { - Instance[] instances; - try { - instances = polarisOperator.getAvailableInstances(service, !hasCircuitBreaker); - } catch (PolarisException e) { - LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); - return; - } - onInstances(url, listener, instances); - } + polarisOperator.watchService(service, serviceListener); + return serviceListener; + }); } private void onInstances(URL url, NotifyListener listener, Instance[] instances) { @@ -240,11 +147,7 @@ private void onInstances(URL url, NotifyListener listener, Instance[] instances) urls.add(instanceToURL(requireInterface, instance)); } } - URL routerURL = buildRouterURL(url); - if (null != routerURL) { - urls.add(routerURL); - } - PolarisRegistry.this.notify(url, listener, urls); + notify(url, listener, urls); } private static URL instanceToURL(String requireInterface, Instance instance) { @@ -275,68 +178,13 @@ private static URL instanceToURL(String requireInterface, Instance instance) { newMetadata); } - private static class TaskScheduler { - - private final ExecutorService fetchExecutor = Executors - .newSingleThreadExecutor(new NamedThreadFactory("agent-fetch")); - - private final ExecutorService watchExecutor = Executors - .newSingleThreadExecutor(new NamedThreadFactory("agent-retry-watch")); - - private final AtomicBoolean executorDestroyed = new AtomicBoolean(false); - - private final Object lock = new Object(); - - void submitFetchTask(Runnable fetchTask) { - if (executorDestroyed.get()) { - return; - } - synchronized (lock) { - if (executorDestroyed.get()) { - return; - } - fetchExecutor.submit(fetchTask); - } - } - - void submitWatchTask(Runnable watchTask) { - if (executorDestroyed.get()) { - return; - } - synchronized (lock) { - if (executorDestroyed.get()) { - return; - } - watchExecutor.submit(watchTask); - } - } - - boolean isDestroyed() { - return executorDestroyed.get(); - } - - void destroy() { - synchronized (lock) { - if (executorDestroyed.compareAndSet(false, true)) { - fetchExecutor.shutdown(); - watchExecutor.shutdown(); - } - } - } - } - @Override public void doUnsubscribe(URL url, NotifyListener listener) { LOGGER.info("[polaris] unsubscribe service: {}", url.toString()); - taskScheduler.submitWatchTask(new Runnable() { - @Override - public void run() { - ServiceListener serviceListener = serviceListeners.get(listener); - if (null != serviceListener) { - polarisOperator.unwatchService(url.getServiceInterface(), serviceListener); - } - } - }); + ServiceListener serviceListener = serviceListeners.get(listener); + if (null != serviceListener) { + polarisOperator.unwatchService(url.getServiceInterface(), serviceListener); + } } @Override diff --git a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java index 76c768a..e61041b 100644 --- a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java +++ b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java @@ -16,8 +16,6 @@ package com.tencent.polaris.dubbox.registry; -import static com.alibaba.dubbo.common.Constants.PATH_KEY; - import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.extension.ExtensionLoader; @@ -29,14 +27,15 @@ import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.listener.ServiceListener; import com.tencent.polaris.api.pojo.Instance; -import com.tencent.polaris.api.pojo.ServiceChangeEvent; import com.tencent.polaris.api.utils.StringUtils; -import com.tencent.polaris.client.util.NamedThreadFactory; import com.tencent.polaris.common.registry.Consts; import com.tencent.polaris.common.registry.ConvertUtils; import com.tencent.polaris.common.registry.PolarisOperator; import com.tencent.polaris.common.registry.PolarisOperators; import com.tencent.polaris.common.utils.ExtensionConsts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -45,18 +44,14 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static com.alibaba.dubbo.common.Constants.PATH_KEY; public class PolarisRegistry extends FailbackRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(PolarisRegistry.class); - private static final TaskScheduler taskScheduler = new TaskScheduler(); - private final Set registeredInstances = new ConcurrentHashSet<>(); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -65,31 +60,10 @@ public class PolarisRegistry extends FailbackRegistry { private final PolarisOperator polarisOperator; - private final URL routerURL; - - private final boolean hasCircuitBreaker; - - private final boolean hasRouter; - public PolarisRegistry(URL url) { super(url); polarisOperator = new PolarisOperator(url.getHost(), url.getPort(), url.getParameters()); PolarisOperators.INSTANCE.addPolarisOperator(polarisOperator); - this.routerURL = buildRouterURL(url.getHost(), url.getPort()); - ExtensionLoader routerExtensionLoader = ExtensionLoader.getExtensionLoader(RouterFactory.class); - hasRouter = routerExtensionLoader.hasExtension(ExtensionConsts.PLUGIN_ROUTER_NAME); - ExtensionLoader filterExtensionLoader = ExtensionLoader.getExtensionLoader(Filter.class); - hasCircuitBreaker = filterExtensionLoader.hasExtension(ExtensionConsts.PLUGIN_CIRCUITBREAKER_NAME); - } - - private URL buildRouterURL(String host, int port) { - URL routerURL = null; - if (hasRouter) { - routerURL = new URL(Constants.ROUTE_PROTOCOL, host, port); - routerURL = routerURL.setServiceInterface(Constants.ANY_VALUE); - routerURL = routerURL.addParameter(Constants.ROUTER_KEY, ExtensionConsts.PLUGIN_ROUTER_NAME); - } - return routerURL; } @Override @@ -138,79 +112,28 @@ public void destroy() { doUnregister(url); } polarisOperator.destroy(); - taskScheduler.destroy(); } } @Override public void doSubscribe(URL url, NotifyListener listener) { String service = url.getServiceInterface(); - Instance[] instances = polarisOperator.getAvailableInstances(service, !hasCircuitBreaker); + Instance[] instances = polarisOperator.getAvailableInstances(service, true); onInstances(url, listener, instances); LOGGER.info("[POLARIS] submit watch task for service {}", service); - taskScheduler.submitWatchTask(new WatchTask(url, listener, service)); - } - private class WatchTask implements Runnable { - - private final String service; - - private final ServiceListener serviceListener; - - private final NotifyListener listener; - - private final FetchTask fetchTask; - - public WatchTask(URL url, NotifyListener listener, - String service) { - this.service = service; - this.listener = listener; - fetchTask = new FetchTask(url, listener); - serviceListener = new ServiceListener() { - @Override - public void onEvent(ServiceChangeEvent event) { - PolarisRegistry.taskScheduler.submitFetchTask(fetchTask); + serviceListeners.computeIfAbsent(listener, notifyListener -> { + ServiceListener serviceListener = event -> { + try { + Instance[] curInstances = polarisOperator.getAvailableInstances(service, true); + onInstances(url, listener, curInstances); + } catch (PolarisException e) { + LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); } }; - } - - @Override - public void run() { - boolean result = polarisOperator.watchService(service, serviceListener); - if (result) { - serviceListeners.put(listener, serviceListener); - PolarisRegistry.taskScheduler.submitFetchTask(fetchTask); - return; - } - PolarisRegistry.taskScheduler.submitWatchTask(this); - } - } - - private class FetchTask implements Runnable { - - private final String service; - - private final URL url; - - private final NotifyListener listener; - - public FetchTask(URL url, NotifyListener listener) { - this.service = url.getServiceInterface(); - this.url = url; - this.listener = listener; - } - - @Override - public void run() { - Instance[] instances; - try { - instances = polarisOperator.getAvailableInstances(service, !hasCircuitBreaker); - } catch (PolarisException e) { - LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); - return; - } - onInstances(url, listener, instances); - } + polarisOperator.watchService(service, serviceListener); + return serviceListener; + }); } private void onInstances(URL url, NotifyListener listener, Instance[] instances) { @@ -222,10 +145,7 @@ private void onInstances(URL url, NotifyListener listener, Instance[] instances) urls.add(instanceToURL(instance)); } } - if (null != routerURL) { - urls.add(routerURL); - } - PolarisRegistry.this.notify(url, listener, urls); + notify(url, listener, urls); } private static URL instanceToURL(Instance instance) { @@ -255,68 +175,13 @@ private static URL instanceToURL(Instance instance) { newMetadata); } - private static class TaskScheduler { - - private final ExecutorService fetchExecutor = Executors - .newSingleThreadExecutor(new NamedThreadFactory("agent-fetch")); - - private final ExecutorService watchExecutor = Executors - .newSingleThreadExecutor(new NamedThreadFactory("agent-retry-watch")); - - private final AtomicBoolean executorDestroyed = new AtomicBoolean(false); - - private final Object lock = new Object(); - - void submitFetchTask(Runnable fetchTask) { - if (executorDestroyed.get()) { - return; - } - synchronized (lock) { - if (executorDestroyed.get()) { - return; - } - fetchExecutor.submit(fetchTask); - } - } - - void submitWatchTask(Runnable watchTask) { - if (executorDestroyed.get()) { - return; - } - synchronized (lock) { - if (executorDestroyed.get()) { - return; - } - watchExecutor.submit(watchTask); - } - } - - boolean isDestroyed() { - return executorDestroyed.get(); - } - - void destroy() { - synchronized (lock) { - if (executorDestroyed.compareAndSet(false, true)) { - fetchExecutor.shutdown(); - watchExecutor.shutdown(); - } - } - } - } - @Override public void doUnsubscribe(URL url, NotifyListener listener) { LOGGER.info("[polaris] unsubscribe service: {}", url.toString()); - taskScheduler.submitWatchTask(new Runnable() { - @Override - public void run() { - ServiceListener serviceListener = serviceListeners.get(listener); - if (null != serviceListener) { - polarisOperator.unwatchService(url.getServiceInterface(), serviceListener); - } - } - }); + ServiceListener serviceListener = serviceListeners.get(listener); + if (serviceListener != null) { + polarisOperator.unwatchService(url.getServiceInterface(), serviceListener); + } } @Override From 73d2f59f3c61fb5841d80dacf3af916fa386e759 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 20 Oct 2023 16:16:56 +0800 Subject: [PATCH 2/5] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E5=9B=A0dubbo?= =?UTF-8?q?=E6=9C=AC=E8=BA=AB=E6=8E=A8=E7=A9=BA=E4=BF=9D=E6=8A=A4=E7=9A=84?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=E8=80=8C=E6=97=A0=E6=B3=95=E6=B8=85=E7=90=86?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=AE=9E=E4=BE=8B=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dubbo/registry/PolarisRegistry.java | 20 ++++++++++++++++++- .../dubbox/registry/PolarisRegistry.java | 17 +++++++++++++++- .../common/registry/PolarisOperators.java | 4 ++++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java index cd7588b..937d4bb 100644 --- a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java +++ b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java @@ -27,7 +27,9 @@ import com.tencent.polaris.common.registry.PolarisOperator; import com.tencent.polaris.common.registry.PolarisOperators; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.URLBuilder; import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.ConcurrentHashSet; import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.support.FailbackRegistry; @@ -45,6 +47,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY; +import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY; +import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL; + public class PolarisRegistry extends FailbackRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(PolarisRegistry.class); @@ -147,7 +153,7 @@ private void onInstances(URL url, NotifyListener listener, Instance[] instances) urls.add(instanceToURL(requireInterface, instance)); } } - notify(url, listener, urls); + notify(url, listener, toUrlWithEmpty(url, urls)); } private static URL instanceToURL(String requireInterface, Instance instance) { @@ -178,6 +184,18 @@ private static URL instanceToURL(String requireInterface, Instance instance) { newMetadata); } + private List toUrlWithEmpty(URL providerUrl, List urls) { + if (CollectionUtils.isEmpty(urls)) { + LOGGER.warn("[POLARIS] received empty url address list, will clear current available addresses"); + URL empty = URLBuilder.from(providerUrl) + .setProtocol(EMPTY_PROTOCOL) + .addParameter(CATEGORY_KEY, DEFAULT_CATEGORY) + .build(); + urls.add(empty); + } + return urls; + } + @Override public void doUnsubscribe(URL url, NotifyListener listener) { LOGGER.info("[polaris] unsubscribe service: {}", url.toString()); diff --git a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java index e61041b..6398dd2 100644 --- a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java +++ b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java @@ -19,6 +19,7 @@ import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.extension.ExtensionLoader; +import com.alibaba.dubbo.common.utils.CollectionUtils; import com.alibaba.dubbo.common.utils.ConcurrentHashSet; import com.alibaba.dubbo.registry.NotifyListener; import com.alibaba.dubbo.registry.support.FailbackRegistry; @@ -46,6 +47,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import static com.alibaba.dubbo.common.Constants.CATEGORY_KEY; +import static com.alibaba.dubbo.common.Constants.DEFAULT_CATEGORY; +import static com.alibaba.dubbo.common.Constants.EMPTY_PROTOCOL; import static com.alibaba.dubbo.common.Constants.PATH_KEY; public class PolarisRegistry extends FailbackRegistry { @@ -145,7 +149,7 @@ private void onInstances(URL url, NotifyListener listener, Instance[] instances) urls.add(instanceToURL(instance)); } } - notify(url, listener, urls); + notify(url, listener, toUrlWithEmpty(url, urls)); } private static URL instanceToURL(Instance instance) { @@ -175,6 +179,17 @@ private static URL instanceToURL(Instance instance) { newMetadata); } + private List toUrlWithEmpty(URL providerUrl, List urls) { + if (CollectionUtils.isEmpty(urls)) { + LOGGER.warn("[POLARIS] received empty url address list, will clear current available addresses"); + URL empty = providerUrl + .setProtocol(EMPTY_PROTOCOL) + .addParameter(CATEGORY_KEY, DEFAULT_CATEGORY); + urls.add(empty); + } + return urls; + } + @Override public void doUnsubscribe(URL url, NotifyListener listener) { LOGGER.info("[polaris] unsubscribe service: {}", url.toString()); diff --git a/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperators.java b/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperators.java index a7e52af..bf9909d 100644 --- a/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperators.java +++ b/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperators.java @@ -29,6 +29,10 @@ private PolarisOperators() { public static final PolarisOperators INSTANCE = new PolarisOperators(); + public PolarisOperator loadOrStore(String host, int port, Map parameters, BootConfigHandler... handlers) { + return null; + } + public void addPolarisOperator(PolarisOperator polarisOperator) { polarisOperatorMap.put(polarisOperator.getPolarisConfig().getRegistryAddress(), polarisOperator); } From 1f9ae9da0afecef5c9219240c76bf18c776c6c25 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 20 Oct 2023 16:17:23 +0800 Subject: [PATCH 3/5] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E5=9B=A0dubbo?= =?UTF-8?q?=E6=9C=AC=E8=BA=AB=E6=8E=A8=E7=A9=BA=E4=BF=9D=E6=8A=A4=E7=9A=84?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=E8=80=8C=E6=97=A0=E6=B3=95=E6=B8=85=E7=90=86?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=AE=9E=E4=BE=8B=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index cddf8bb..8944d27 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ - 0.2.2 + 0.2.3 ${maven.build.timestamp} yyyy-MM-dd HH:mm UTF-8 From 7a048de80df94f19a5e53c6d4ffff2b6dc21983d Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 20 Oct 2023 16:25:22 +0800 Subject: [PATCH 4/5] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E5=9B=A0dubbo?= =?UTF-8?q?=E6=9C=AC=E8=BA=AB=E6=8E=A8=E7=A9=BA=E4=BF=9D=E6=8A=A4=E7=9A=84?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=E8=80=8C=E6=97=A0=E6=B3=95=E6=B8=85=E7=90=86?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=AE=9E=E4=BE=8B=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../polaris/dubbo/registry/PolarisRegistry.java | 3 +-- .../polaris/dubbox/registry/PolarisRegistry.java | 3 +-- .../polaris/common/registry/PolarisOperator.java | 7 ------- .../polaris/common/registry/PolarisOperators.java | 15 ++++++++++----- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java index 937d4bb..1a9d8cd 100644 --- a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java +++ b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java @@ -70,8 +70,7 @@ public PolarisRegistry(URL url) { // for test public PolarisRegistry(URL url, BootConfigHandler... handlers) { super(url); - polarisOperator = new PolarisOperator(url.getHost(), url.getPort(), url.getParameters(), handlers); - PolarisOperators.INSTANCE.addPolarisOperator(polarisOperator); + polarisOperator = PolarisOperators.INSTANCE.loadOrStore(url.getHost(), url.getPort(), url.getParameters(), handlers); } @Override diff --git a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java index 6398dd2..963b959 100644 --- a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java +++ b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java @@ -66,8 +66,7 @@ public class PolarisRegistry extends FailbackRegistry { public PolarisRegistry(URL url) { super(url); - polarisOperator = new PolarisOperator(url.getHost(), url.getPort(), url.getParameters()); - PolarisOperators.INSTANCE.addPolarisOperator(polarisOperator); + polarisOperator = PolarisOperators.INSTANCE.loadOrStore(url.getHost(), url.getPort(), url.getParameters()); } @Override diff --git a/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperator.java b/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperator.java index 985841d..18394b7 100644 --- a/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperator.java +++ b/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperator.java @@ -88,12 +88,6 @@ public class PolarisOperator { private CircuitBreakAPI circuitBreakAPI; - private ConfigFileService configFileService; - - private final Object lock = new Object(); - - private final Map> messageCache = new ConcurrentHashMap<>(); - public PolarisOperator(String host, int port, Map parameters, BootConfigHandler... handlers) { polarisConfig = new PolarisConfig(host, port, parameters); init(parameters, handlers); @@ -117,7 +111,6 @@ private void init(Map parameters, BootConfigHandler... handlers) limitAPI = LimitAPIFactory.createLimitAPIByContext(sdkContext); routerAPI = RouterAPIFactory.createRouterAPIByContext(sdkContext); circuitBreakAPI = CircuitBreakAPIFactory.createCircuitBreakAPIByContext(sdkContext); - configFileService = ConfigFileServiceFactory.createConfigFileService(sdkContext); } public void destroy() { diff --git a/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperators.java b/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperators.java index bf9909d..91e3047 100644 --- a/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperators.java +++ b/polaris-adapter-dubbo/src/main/java/com/tencent/polaris/common/registry/PolarisOperators.java @@ -17,8 +17,14 @@ package com.tencent.polaris.common.registry; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; public class PolarisOperators { @@ -30,11 +36,10 @@ private PolarisOperators() { public static final PolarisOperators INSTANCE = new PolarisOperators(); public PolarisOperator loadOrStore(String host, int port, Map parameters, BootConfigHandler... handlers) { - return null; - } - - public void addPolarisOperator(PolarisOperator polarisOperator) { - polarisOperatorMap.put(polarisOperator.getPolarisConfig().getRegistryAddress(), polarisOperator); + Map params = Optional.ofNullable(parameters).orElse(Collections.EMPTY_MAP); + String key = host + ":" + port + "|hash_code:" + params.hashCode(); + PolarisOperator saveVal = polarisOperatorMap.computeIfAbsent(key, s -> new PolarisOperator(host, port, parameters, handlers)); + return saveVal; } public PolarisOperator getPolarisOperator(String host, int port) { From 5794126eaa4010ae272200818ad2391ebaf14c8a Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 20 Oct 2023 16:57:18 +0800 Subject: [PATCH 5/5] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E5=9B=A0dubbo?= =?UTF-8?q?=E6=9C=AC=E8=BA=AB=E6=8E=A8=E7=A9=BA=E4=BF=9D=E6=8A=A4=E7=9A=84?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=E8=80=8C=E6=97=A0=E6=B3=95=E6=B8=85=E7=90=86?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=AE=9E=E4=BE=8B=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8944d27..71f5f63 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ - 0.2.3 + 0.2.3-SNAPSHOT ${maven.build.timestamp} yyyy-MM-dd HH:mm UTF-8