Skip to content

Commit

Permalink
0.51
Browse files Browse the repository at this point in the history
  • Loading branch information
noear committed Oct 13, 2023
1 parent 8d1ba3d commit a4b9105
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 18 deletions.
6 changes: 3 additions & 3 deletions dami/src/main/java/org/noear/dami/bus/DamiBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public boolean send(final String topic, final C content) {

Payload<C, R> payload = factory.create(topic, content, null);

dispatcher.handle(payload, router);
dispatcher.dispatch(payload, router);

return payload.getHandled();
}
Expand All @@ -151,7 +151,7 @@ public R sendAndRequest(final String topic, final C content) {
CompletableFuture<R> future = new CompletableFuture<>();
Payload<C, R> payload = factory.create(topic, content, new AcceptorRequest<>(future));

dispatcher.handle(payload, router);
dispatcher.dispatch(payload, router);

if (payload.getHandled()) {
try {
Expand All @@ -178,7 +178,7 @@ public boolean sendAndSubscribe(final String topic, final C content, final Consu

Payload<C, R> payload = factory.create(topic, content, new AcceptorSubscribe<>(consumer));

dispatcher.handle(payload, router);
dispatcher.dispatch(payload, router);

return payload.getHandled();
}
Expand Down
6 changes: 3 additions & 3 deletions dami/src/main/java/org/noear/dami/bus/TopicDispatcher.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.noear.dami.bus;

/**
* 主题调度器
* 主题派发器
*
* @author noear
* @since 1.0
Expand All @@ -13,7 +13,7 @@ public interface TopicDispatcher<C,R> {
void addInterceptor(int index, Interceptor interceptor);

/**
* 调度处理
* 派发
*/
void handle(Payload<C, R> payload, TopicRouter<C, R> router);
void dispatch(Payload<C, R> payload, TopicRouter<C, R> router);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.List;

/**
* 主题派发器默认实现
*
* @author noear
* @since 1.0
*/
Expand Down Expand Up @@ -48,17 +50,11 @@ public synchronized void addInterceptor(int index, Interceptor interceptor) {
}
}

@Override
public void handle(Payload<C, R> payload, TopicRouter<C, R> router) {
AssertUtil.assertTopic(payload.getTopic());

MDC.put("dami-guid", payload.getGuid());

List<TopicListenerHolder<C, R>> targets = router.matching(payload.getTopic());

new InterceptorChain<>(interceptors, targets).doIntercept(payload);
}

/**
* 执行拦截
* */
@Override
public void doIntercept(Payload<C, R> payload, InterceptorChain<C, R> chain) {
if (log.isTraceEnabled()) {
Expand All @@ -69,7 +65,7 @@ public void doIntercept(Payload<C, R> payload, InterceptorChain<C, R> chain) {

if (targets != null && targets.size() > 0) {
try {
doExchange(payload, chain.getTargets());
doDispatch(payload, chain.getTargets());
payload.setHandled();
} catch (InvocationTargetException e) {
throw new DamiException(e.getTargetException());
Expand All @@ -85,7 +81,26 @@ public void doIntercept(Payload<C, R> payload, InterceptorChain<C, R> chain) {
}
}

protected void doExchange(Payload<C, R> payload, List<TopicListenerHolder<C, R>> targets) throws Throwable {
/**
* 派发
* */
@Override
public void dispatch(Payload<C, R> payload, TopicRouter<C, R> router) {
AssertUtil.assertTopic(payload.getTopic());

MDC.put("dami-guid", payload.getGuid());

//获取路由匹配结果
List<TopicListenerHolder<C, R>> targets = router.matching(payload.getTopic());

//转成拦截链处理
new InterceptorChain<>(interceptors, targets).doIntercept(payload);
}

/**
* 执行派发
* */
protected void doDispatch(Payload<C, R> payload, List<TopicListenerHolder<C, R>> targets) throws Throwable {
//用 i,可以避免遍历时添加监听的异常
for (int i = 0; i < targets.size(); i++) {
targets.get(i).getListener().onEvent(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
public class TopicDispatcherMonitor<C,R> extends TopicDispatcherDefault<C,R> {
@Override
protected void doExchange(Payload<C, R> payload, List<TopicListenerHolder<C, R>> targets) throws Throwable {
protected void doDispatch(Payload<C, R> payload, List<TopicListenerHolder<C, R>> targets) throws Throwable {
//开始监视...
System.out.println("开始监视...");

Expand Down

0 comments on commit a4b9105

Please sign in to comment.