Skip to content

Commit

Permalink
Allows extension of DubboEventMulticaster
Browse files Browse the repository at this point in the history
  • Loading branch information
walklown committed Jun 6, 2024
1 parent 320a564 commit 19db740
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.event;

import java.util.List;

public class CompositeDubboEventMulticaster implements DubboEventMulticaster {

private final List<DubboEventMulticaster> multicasterList;

public CompositeDubboEventMulticaster(List<DubboEventMulticaster> multicasterList) {
this.multicasterList = multicasterList;
}

@Override
public void addListener(DubboListener<?> listener) {
for (DubboEventMulticaster dubboEventMulticaster : multicasterList) {
dubboEventMulticaster.addListener(listener);
}
}

@Override
public void removeListener(DubboListener<?> listener) {
for (DubboEventMulticaster dubboEventMulticaster : multicasterList) {
dubboEventMulticaster.removeListener(listener);
}
}

@Override
public void publishEvent(DubboEvent event) {
for (DubboEventMulticaster dubboEventMulticaster : multicasterList) {
dubboEventMulticaster.publishEvent(event);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.event;

import java.util.List;

public class CompositeDubboLifecycleEventMulticaster implements DubboLifecycleEventMulticaster {

private final List<DubboLifecycleEventMulticaster> multicasterList;

public CompositeDubboLifecycleEventMulticaster(List<DubboLifecycleEventMulticaster> multicasterList) {
this.multicasterList = multicasterList;
}

@Override
public void publishBeforeEvent(DubboEvent event) {
for (DubboLifecycleEventMulticaster multicaster : multicasterList) {
multicaster.publishBeforeEvent(event);
}
}

@Override
public void publishErrorEvent(DubboEvent event) {
for (DubboLifecycleEventMulticaster multicaster : multicasterList) {
multicaster.publishErrorEvent(event);
}
}

@Override
public void addListener(DubboListener<?> listener) {
for (DubboLifecycleEventMulticaster multicaster : multicasterList) {
multicaster.addListener(listener);
}
}

@Override
public void removeListener(DubboListener<?> listener) {
for (DubboLifecycleEventMulticaster multicaster : multicasterList) {
multicaster.removeListener(listener);
}
}

@Override
public void publishEvent(DubboEvent event) {
for (DubboLifecycleEventMulticaster multicaster : multicasterList) {
multicaster.publishEvent(event);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.dubbo.common.event;

import org.apache.dubbo.common.extension.Activate;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

public class DefaultDubboEventMulticaster implements DubboLifecycleEventMulticaster {
@Activate
public class DefaultDubboLifecycleEventMulticaster implements DubboLifecycleEventMulticaster {

/**
* All registered subscribers, indexed by event type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ public class DubboEventBus {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(DubboEventBus.class);

private static final ConcurrentHashMap<ApplicationModel, DubboLifecycleEventMulticaster> cachedMulticasterMap =
private static final ConcurrentHashMap<ApplicationModel, DubboEventMulticaster> cachedMulticasterMap =
new ConcurrentHashMap<>();

private static final ConcurrentHashMap<ApplicationModel, DubboLifecycleEventMulticaster>
cachedLifecycleMulticasterMap = new ConcurrentHashMap<>();

private DubboEventBus() {}

/**
Expand All @@ -50,6 +53,9 @@ private DubboEventBus() {}
* @param listener object whose subscriber methods should be registered.
*/
public static void addListener(ApplicationModel applicationModel, DubboListener<?> listener) {
if (listener instanceof DubboLifecycleListener) {
getLifecycleMulticaster(applicationModel).addListener(listener);
}
getMulticaster(applicationModel).addListener(listener);
}

Expand All @@ -60,6 +66,9 @@ public static void addListener(ApplicationModel applicationModel, DubboListener<
* @throws IllegalArgumentException if the object was not previously registered.
*/
public static void removeListener(ApplicationModel applicationModel, DubboListener<?> listener) {
if (listener instanceof DubboLifecycleListener) {
getLifecycleMulticaster(applicationModel).removeListener(listener);
}
getMulticaster(applicationModel).removeListener(listener);
}

Expand Down Expand Up @@ -131,7 +140,7 @@ private static void tryInvoke(Runnable runnable) {
* eventSaveRunner saves the event, so that the calculation rt is introverted
*/
public static void before(DubboEvent event) {
tryInvoke(() -> getMulticaster(event.getApplicationModel()).publishBeforeEvent(event));
tryInvoke(() -> getLifecycleMulticaster(event.getApplicationModel()).publishBeforeEvent(event));
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand All @@ -140,18 +149,25 @@ public static void after(DubboEvent event, Object result) {
if (event instanceof CustomAfterPost) {
((CustomAfterPost) event).customAfterPost(result);
}
getMulticaster(event.getApplicationModel()).publishEvent(event);
getLifecycleMulticaster(event.getApplicationModel()).publishEvent(event);
});
}

public static void error(DubboEvent event) {
tryInvoke(() -> getMulticaster(event.getApplicationModel()).publishErrorEvent(event));
tryInvoke(() -> getLifecycleMulticaster(event.getApplicationModel()).publishErrorEvent(event));
}

private static DubboLifecycleEventMulticaster getMulticaster(ApplicationModel applicationModel) {
private static DubboEventMulticaster getMulticaster(ApplicationModel applicationModel) {
return cachedMulticasterMap.computeIfAbsent(applicationModel, t -> {
ScopeBeanFactory beanFactory = applicationModel.getBeanFactory();
return beanFactory.getBean(DubboLifecycleEventMulticaster.class);
return beanFactory.getBean(CompositeDubboEventMulticaster.class);
});
}

private static DubboLifecycleEventMulticaster getLifecycleMulticaster(ApplicationModel applicationModel) {
return cachedLifecycleMulticasterMap.computeIfAbsent(applicationModel, t -> {
ScopeBeanFactory beanFactory = applicationModel.getBeanFactory();
return beanFactory.getBean(CompositeDubboLifecycleEventMulticaster.class);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.dubbo.common.event;

import org.apache.dubbo.common.extension.SPI;

/**
* Dispatches events to listeners, and provides ways for listeners to register themselves.
*
* @since 3.3.0
* @see DubboEvent
* @see DubboListener
*/
@SPI
public interface DubboEventMulticaster {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,50 @@
package org.apache.dubbo.common.event;

import org.apache.dubbo.common.beans.factory.ScopeBeanFactory;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ModuleModel;
import org.apache.dubbo.rpc.model.ScopeModelInitializer;

import java.util.ArrayList;
import java.util.List;

/**
* Initialize {@link DubboLifecycleEventMulticaster} for {@link ApplicationModel}
*
* @see DubboLifecycleEventMulticaster
* @since 3.3.0
*/
public class DubboApplicationMulticasterRegistry implements ScopeModelInitializer {
public class DubboMulticasterScopeModelInitializer implements ScopeModelInitializer {

@Override
public void initializeFrameworkModel(FrameworkModel frameworkModel) {}

@Override
public void initializeApplicationModel(ApplicationModel applicationModel) {
ScopeBeanFactory beanFactory = applicationModel.getBeanFactory();
beanFactory.registerBean(DefaultDubboEventMulticaster.class);

List<DubboLifecycleEventMulticaster> lifecycleEventMulticasters = new ArrayList<>();
CompositeDubboLifecycleEventMulticaster lifecycleEventMulticaster =
new CompositeDubboLifecycleEventMulticaster(lifecycleEventMulticasters);

List<DubboEventMulticaster> eventMulticasters = new ArrayList<>();
CompositeDubboEventMulticaster dubboEventMulticaster = new CompositeDubboEventMulticaster(eventMulticasters);

ExtensionLoader<DubboEventMulticaster> extensionLoader =
applicationModel.getExtensionLoader(DubboEventMulticaster.class);
if (extensionLoader != null) {
for (DubboEventMulticaster eventMulticaster : extensionLoader.getActivateExtensions()) {
eventMulticasters.add(eventMulticaster);
if (eventMulticaster instanceof DubboLifecycleEventMulticaster) {
lifecycleEventMulticasters.add((DubboLifecycleEventMulticaster) eventMulticaster);
}
}
}

beanFactory.registerBean(lifecycleEventMulticaster);
beanFactory.registerBean(dubboEventMulticaster);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default=org.apache.dubbo.common.event.DefaultDubboLifecycleEventMulticaster
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
dubbo-common=org.apache.dubbo.common.CommonScopeModelInitializer
application-multicaster-registry=org.apache.dubbo.common.event.DubboApplicationMulticasterRegistry
dubbo-multicaster-init=org.apache.dubbo.common.event.DubboMulticasterScopeModelInitializer
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package org.apache.dubbo.metrics;

import org.apache.dubbo.common.beans.factory.ScopeBeanFactory;
import org.apache.dubbo.common.event.DefaultDubboEventMulticaster;
import org.apache.dubbo.common.event.DubboLifecycleEventMulticaster;
import org.apache.dubbo.common.event.CompositeDubboEventMulticaster;
import org.apache.dubbo.common.event.CompositeDubboLifecycleEventMulticaster;
import org.apache.dubbo.common.event.DefaultDubboLifecycleEventMulticaster;
import org.apache.dubbo.metrics.collector.MetricsCollector;
import org.apache.dubbo.metrics.model.MetricsCategory;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
Expand Down Expand Up @@ -54,7 +55,12 @@ public void setUp() {
when(applicationModel.getBeanFactory()).thenReturn(beanFactory);
when(beanFactory.getBeansOfType(MetricsCollector.class))
.thenReturn(Collections.singletonList(metricsCollector));
when(beanFactory.getBean(DubboLifecycleEventMulticaster.class)).thenReturn(new DefaultDubboEventMulticaster());
when(beanFactory.getBean(CompositeDubboEventMulticaster.class))
.thenReturn(new CompositeDubboEventMulticaster(
Collections.singletonList(new DefaultDubboLifecycleEventMulticaster())));
when(beanFactory.getBean(CompositeDubboLifecycleEventMulticaster.class))
.thenReturn(new CompositeDubboLifecycleEventMulticaster(
Collections.singletonList(new DefaultDubboLifecycleEventMulticaster())));

defaultMetricsService = new DefaultMetricsService(applicationModel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.beans.factory.ScopeBeanFactory;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.event.DefaultDubboEventMulticaster;
import org.apache.dubbo.common.event.CompositeDubboEventMulticaster;
import org.apache.dubbo.common.event.CompositeDubboLifecycleEventMulticaster;
import org.apache.dubbo.common.event.DefaultDubboLifecycleEventMulticaster;
import org.apache.dubbo.common.event.DubboEventBus;
import org.apache.dubbo.common.event.DubboLifecycleEventMulticaster;
import org.apache.dubbo.common.event.DubboListener;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ReflectionUtils;
Expand Down Expand Up @@ -212,7 +213,12 @@ public void testQPS() {

when(applicationModel.getApplicationConfigManager()).thenReturn(configManager);
when(applicationModel.getBeanFactory()).thenReturn(beanFactory);
when(beanFactory.getBean(DubboLifecycleEventMulticaster.class)).thenReturn(new DefaultDubboEventMulticaster());
when(beanFactory.getBean(CompositeDubboEventMulticaster.class))
.thenReturn(new CompositeDubboEventMulticaster(
Collections.singletonList(new DefaultDubboLifecycleEventMulticaster())));
when(beanFactory.getBean(CompositeDubboLifecycleEventMulticaster.class))
.thenReturn(new CompositeDubboLifecycleEventMulticaster(
Collections.singletonList(new DefaultDubboLifecycleEventMulticaster())));
DefaultMetricsCollector defaultMetricsCollector = new DefaultMetricsCollector(applicationModel);
when(beanFactory.getBean(DefaultMetricsCollector.class)).thenReturn(defaultMetricsCollector);
when(configManager.getMetrics()).thenReturn(Optional.of(metricsConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.dubbo.metrics.collector.sample;

import org.apache.dubbo.common.beans.factory.ScopeBeanFactory;
import org.apache.dubbo.common.event.DefaultDubboEventMulticaster;
import org.apache.dubbo.common.event.DubboApplicationMulticasterRegistry;
import org.apache.dubbo.common.event.DubboLifecycleEventMulticaster;
import org.apache.dubbo.common.event.CompositeDubboEventMulticaster;
import org.apache.dubbo.common.event.CompositeDubboLifecycleEventMulticaster;
import org.apache.dubbo.common.event.DefaultDubboLifecycleEventMulticaster;
import org.apache.dubbo.common.event.DubboMulticasterScopeModelInitializer;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.store.DataStore;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
Expand All @@ -30,6 +31,7 @@
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -52,7 +54,7 @@ public class ThreadPoolMetricsSamplerTest {

ThreadPoolMetricsSampler sampler;

private DubboApplicationMulticasterRegistry multicasterRegistry = new DubboApplicationMulticasterRegistry();
private DubboMulticasterScopeModelInitializer multicasterInitializer = new DubboMulticasterScopeModelInitializer();

@BeforeEach
void setUp() {
Expand Down Expand Up @@ -142,8 +144,12 @@ public void setUp2() {
MockitoAnnotations.openMocks(this);

when(applicationModel.getBeanFactory()).thenReturn(scopeBeanFactory);
when(scopeBeanFactory.getBean(DubboLifecycleEventMulticaster.class))
.thenReturn(new DefaultDubboEventMulticaster());
when(scopeBeanFactory.getBean(CompositeDubboEventMulticaster.class))
.thenReturn(new CompositeDubboEventMulticaster(
Collections.singletonList(new DefaultDubboLifecycleEventMulticaster())));
when(scopeBeanFactory.getBean(CompositeDubboLifecycleEventMulticaster.class))
.thenReturn(new CompositeDubboLifecycleEventMulticaster(
Collections.singletonList(new DefaultDubboLifecycleEventMulticaster())));

collector = new DefaultMetricsCollector(applicationModel);
sampler2 = new ThreadPoolMetricsSampler(collector);
Expand Down
Loading

0 comments on commit 19db740

Please sign in to comment.