From 1789f14cd0f816e64bbe21ec418fa560c8bfc66d Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Fri, 19 Jan 2024 11:45:54 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BC=98=E5=8C=96dubbo=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E7=BA=A7=E6=B3=A8=E5=86=8C=E5=8F=91=E7=8E=B0=E7=9A=84=E5=86=85?= =?UTF-8?q?=E5=AD=98=E4=BD=BF=E7=94=A8=20(#40)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dubbo/registry/PolarisRegistry.java | 47 +++++++++++++++---- .../dubbox/registry/PolarisRegistry.java | 47 +++++++++++++++---- dubbox/pom.xml | 1 - 3 files changed, 74 insertions(+), 21 deletions(-) diff --git a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java index 1a9d8cd..1839f3a 100644 --- a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java +++ b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java @@ -19,6 +19,7 @@ import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.listener.ServiceListener; import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceChangeEvent; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.common.registry.BaseBootConfigHandler; import com.tencent.polaris.common.registry.BootConfigHandler; @@ -59,8 +60,9 @@ public class PolarisRegistry extends FailbackRegistry { private final AtomicBoolean destroyed = new AtomicBoolean(false); - private final Map serviceListeners = new ConcurrentHashMap<>(); + private final Map> dubboListeners = new ConcurrentHashMap<>(); + private final Map serviceListeners = new ConcurrentHashMap<>(); private final PolarisOperator polarisOperator; public PolarisRegistry(URL url) { @@ -128,15 +130,12 @@ public void doSubscribe(URL url, NotifyListener listener) { Instance[] instances = polarisOperator.getAvailableInstances(service, true); onInstances(url, listener, instances); LOGGER.info("[POLARIS] submit watch task for service {}", service); - serviceListeners.computeIfAbsent(listener, notifyListener -> { - ServiceListener serviceListener = event -> { - try { - Instance[] curInstances = polarisOperator.getAvailableInstances(service, true); - onInstances(url, listener, curInstances); - } catch (PolarisException e) { - LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); - } - }; + + dubboListeners.computeIfAbsent(url, s -> new ConcurrentHashSet<>()); + dubboListeners.get(url).add(listener); + + serviceListeners.computeIfAbsent(url, dubboUrl -> { + ServiceListener serviceListener = new DubboServiceListener(url, this); polarisOperator.watchService(service, serviceListener); return serviceListener; }); @@ -208,4 +207,32 @@ public void doUnsubscribe(URL url, NotifyListener listener) { public boolean isAvailable() { return true; } + + private static class DubboServiceListener implements ServiceListener { + + private final URL url; + + private final String service; + + private final PolarisRegistry registry; + + private DubboServiceListener(URL url, PolarisRegistry registry) { + this.url = url; + this.service = url.getServiceInterface(); + this.registry = registry; + } + + @Override + public void onEvent(ServiceChangeEvent serviceChangeEvent) { + try { + Set listeners = registry.dubboListeners.getOrDefault(url, Collections.emptySet()); + Instance[] curInstances = registry.polarisOperator.getAvailableInstances(service, true); + for (NotifyListener listener : listeners) { + registry.onInstances(url, listener, curInstances); + } + } catch (PolarisException e) { + LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); + } + } + } } diff --git a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java index 963b959..5995ce2 100644 --- a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java +++ b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java @@ -28,6 +28,7 @@ import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.listener.ServiceListener; import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceChangeEvent; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.common.registry.Consts; import com.tencent.polaris.common.registry.ConvertUtils; @@ -60,7 +61,9 @@ public class PolarisRegistry extends FailbackRegistry { private final AtomicBoolean destroyed = new AtomicBoolean(false); - private final Map serviceListeners = new ConcurrentHashMap<>(); + private final Map> dubboListeners = new ConcurrentHashMap<>(); + + private final Map serviceListeners = new ConcurrentHashMap<>(); private final PolarisOperator polarisOperator; @@ -125,15 +128,11 @@ public void doSubscribe(URL url, NotifyListener listener) { onInstances(url, listener, instances); LOGGER.info("[POLARIS] submit watch task for service {}", service); - serviceListeners.computeIfAbsent(listener, notifyListener -> { - ServiceListener serviceListener = event -> { - try { - Instance[] curInstances = polarisOperator.getAvailableInstances(service, true); - onInstances(url, listener, curInstances); - } catch (PolarisException e) { - LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); - } - }; + dubboListeners.computeIfAbsent(url, s -> new ConcurrentHashSet<>()); + dubboListeners.get(url).add(listener); + + serviceListeners.computeIfAbsent(url, dubboUrl -> { + ServiceListener serviceListener = new DubboServiceListener(url, this); polarisOperator.watchService(service, serviceListener); return serviceListener; }); @@ -202,4 +201,32 @@ public void doUnsubscribe(URL url, NotifyListener listener) { public boolean isAvailable() { return true; } + + private static class DubboServiceListener implements ServiceListener { + + private final URL url; + + private final String service; + + private final PolarisRegistry registry; + + private DubboServiceListener(URL url, PolarisRegistry registry) { + this.url = url; + this.service = url.getServiceInterface(); + this.registry = registry; + } + + @Override + public void onEvent(ServiceChangeEvent serviceChangeEvent) { + try { + Set listeners = registry.dubboListeners.getOrDefault(url, Collections.emptySet()); + Instance[] curInstances = registry.polarisOperator.getAvailableInstances(service, true); + for (NotifyListener listener : listeners) { + registry.onInstances(url, listener, curInstances); + } + } catch (PolarisException e) { + LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); + } + } + } } diff --git a/dubbox/pom.xml b/dubbox/pom.xml index e40b80b..d184e22 100644 --- a/dubbox/pom.xml +++ b/dubbox/pom.xml @@ -11,7 +11,6 @@ 4.0.0 dubbox - pom