From a4b910522e04202bab405430e5f41836596f5426 Mon Sep 17 00:00:00 2001 From: noear Date: Fri, 13 Oct 2023 15:12:06 +0800 Subject: [PATCH] 0.51 --- .../java/org/noear/dami/bus/DamiBusImpl.java | 6 +-- .../org/noear/dami/bus/TopicDispatcher.java | 6 +-- .../dami/bus/impl/TopicDispatcherDefault.java | 37 +++++++++++++------ .../TopicDispatcherMonitor.java | 2 +- 4 files changed, 33 insertions(+), 18 deletions(-) diff --git a/dami/src/main/java/org/noear/dami/bus/DamiBusImpl.java b/dami/src/main/java/org/noear/dami/bus/DamiBusImpl.java index 974614d..be7037b 100644 --- a/dami/src/main/java/org/noear/dami/bus/DamiBusImpl.java +++ b/dami/src/main/java/org/noear/dami/bus/DamiBusImpl.java @@ -132,7 +132,7 @@ public boolean send(final String topic, final C content) { Payload payload = factory.create(topic, content, null); - dispatcher.handle(payload, router); + dispatcher.dispatch(payload, router); return payload.getHandled(); } @@ -151,7 +151,7 @@ public R sendAndRequest(final String topic, final C content) { CompletableFuture future = new CompletableFuture<>(); Payload payload = factory.create(topic, content, new AcceptorRequest<>(future)); - dispatcher.handle(payload, router); + dispatcher.dispatch(payload, router); if (payload.getHandled()) { try { @@ -178,7 +178,7 @@ public boolean sendAndSubscribe(final String topic, final C content, final Consu Payload payload = factory.create(topic, content, new AcceptorSubscribe<>(consumer)); - dispatcher.handle(payload, router); + dispatcher.dispatch(payload, router); return payload.getHandled(); } diff --git a/dami/src/main/java/org/noear/dami/bus/TopicDispatcher.java b/dami/src/main/java/org/noear/dami/bus/TopicDispatcher.java index d159385..45d62ef 100644 --- a/dami/src/main/java/org/noear/dami/bus/TopicDispatcher.java +++ b/dami/src/main/java/org/noear/dami/bus/TopicDispatcher.java @@ -1,7 +1,7 @@ package org.noear.dami.bus; /** - * 主题调度器 + * 主题派发器 * * @author noear * @since 1.0 @@ -13,7 +13,7 @@ public interface TopicDispatcher { void addInterceptor(int index, Interceptor interceptor); /** - * 调度处理 + * 派发 */ - void handle(Payload payload, TopicRouter router); + void dispatch(Payload payload, TopicRouter router); } diff --git a/dami/src/main/java/org/noear/dami/bus/impl/TopicDispatcherDefault.java b/dami/src/main/java/org/noear/dami/bus/impl/TopicDispatcherDefault.java index bf75a6a..1f28496 100644 --- a/dami/src/main/java/org/noear/dami/bus/impl/TopicDispatcherDefault.java +++ b/dami/src/main/java/org/noear/dami/bus/impl/TopicDispatcherDefault.java @@ -13,6 +13,8 @@ import java.util.List; /** + * 主题派发器默认实现 + * * @author noear * @since 1.0 */ @@ -48,17 +50,11 @@ public synchronized void addInterceptor(int index, Interceptor interceptor) { } } - @Override - public void handle(Payload payload, TopicRouter router) { - AssertUtil.assertTopic(payload.getTopic()); - - MDC.put("dami-guid", payload.getGuid()); - List> targets = router.matching(payload.getTopic()); - - new InterceptorChain<>(interceptors, targets).doIntercept(payload); - } + /** + * 执行拦截 + * */ @Override public void doIntercept(Payload payload, InterceptorChain chain) { if (log.isTraceEnabled()) { @@ -69,7 +65,7 @@ public void doIntercept(Payload payload, InterceptorChain chain) { if (targets != null && targets.size() > 0) { try { - doExchange(payload, chain.getTargets()); + doDispatch(payload, chain.getTargets()); payload.setHandled(); } catch (InvocationTargetException e) { throw new DamiException(e.getTargetException()); @@ -85,7 +81,26 @@ public void doIntercept(Payload payload, InterceptorChain chain) { } } - protected void doExchange(Payload payload, List> targets) throws Throwable { + /** + * 派发 + * */ + @Override + public void dispatch(Payload payload, TopicRouter router) { + AssertUtil.assertTopic(payload.getTopic()); + + MDC.put("dami-guid", payload.getGuid()); + + //获取路由匹配结果 + List> targets = router.matching(payload.getTopic()); + + //转成拦截链处理 + new InterceptorChain<>(interceptors, targets).doIntercept(payload); + } + + /** + * 执行派发 + * */ + protected void doDispatch(Payload payload, List> targets) throws Throwable { //用 i,可以避免遍历时添加监听的异常 for (int i = 0; i < targets.size(); i++) { targets.get(i).getListener().onEvent(payload); diff --git a/dami/src/test/java/features/demo16_monitor/TopicDispatcherMonitor.java b/dami/src/test/java/features/demo16_monitor/TopicDispatcherMonitor.java index ff21a47..0a00dcd 100644 --- a/dami/src/test/java/features/demo16_monitor/TopicDispatcherMonitor.java +++ b/dami/src/test/java/features/demo16_monitor/TopicDispatcherMonitor.java @@ -12,7 +12,7 @@ */ public class TopicDispatcherMonitor extends TopicDispatcherDefault { @Override - protected void doExchange(Payload payload, List> targets) throws Throwable { + protected void doDispatch(Payload payload, List> targets) throws Throwable { //开始监视... System.out.println("开始监视...");