Skip to content

Commit

Permalink
feat(broker,proxy): init event tracker (apache#11)
Browse files Browse the repository at this point in the history
* feat(broker,proxy): init event tracker

Signed-off-by: lyx <1419360299@qq.com>

* fix(event): improve naming&structure

Signed-off-by: lyx <1419360299@qq.com>

* feat(event): use listener to record event

Signed-off-by: lyx <1419360299@qq.com>

* feat(event): fix start init

Signed-off-by: lyx <1419360299@qq.com>

* feat(event): add switch

Signed-off-by: lyx <1419360299@qq.com>

* feat(event): add event picker switch

Signed-off-by: lyx <1419360299@qq.com>

* feat(event): fix

Signed-off-by: lyx <1419360299@qq.com>

* fix(event): adjust naming

Signed-off-by: lyx <1419360299@qq.com>

* feat(event): handle cluster/local mode

Signed-off-by: lyx <1419360299@qq.com>

* feat(event): reuse subevent

Signed-off-by: lyx <1419360299@qq.com>

* fix(event): rename eventtracker eventType

Signed-off-by: lyx <1419360299@qq.com>

* fix(event): rename to SubscriptionGroupConfig

Signed-off-by: lyx <1419360299@qq.com>

---------

Signed-off-by: lyx <1419360299@qq.com>

feat(event): rename eventType -> event (apache#12)

rename eventType -> event

Signed-off-by: lyx <1419360299@qq.com>

fix(event): npe & use hook (apache#13)

* fix(event): npe & use hook

Signed-off-by: lyx <1419360299@qq.com>

* fix(event): use same hook

Signed-off-by: lyx <1419360299@qq.com>

---------

Signed-off-by: lyx <1419360299@qq.com>
  • Loading branch information
lyx2000 committed May 6, 2023
1 parent 82fe333 commit ef3ccdd
Show file tree
Hide file tree
Showing 50 changed files with 1,482 additions and 41 deletions.
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -44,6 +45,7 @@
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
Expand All @@ -52,6 +54,14 @@
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.eventtrack.EventTrackerUtil;
import org.apache.rocketmq.broker.eventtrack.EventTrackerManager;
import org.apache.rocketmq.broker.eventtrack.EventType;
import org.apache.rocketmq.broker.eventtrack.listener.ConsumerEventTrackListener;
import org.apache.rocketmq.broker.eventtrack.listener.GroupConfigEventTrackListener;
import org.apache.rocketmq.broker.eventtrack.listener.ProducerEventTrackListener;
import org.apache.rocketmq.broker.eventtrack.listener.SubscriptionEventTrackListener;
import org.apache.rocketmq.broker.eventtrack.listener.TopicEventTrackListener;
import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
Expand Down Expand Up @@ -157,6 +167,8 @@
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER;

public class BrokerController {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
Expand Down Expand Up @@ -395,6 +407,32 @@ public boolean online(String instanceId, String group, String topic) {
if (this.brokerConfig.isEnableSlaveActingMaster() && !this.brokerConfig.isSkipPreOnline()) {
this.brokerPreOnlineService = new BrokerPreOnlineService(this);
}

if (this.brokerConfig.isEnableEventTrack()) {
EventTrackerManager.initEventContext(brokerConfig.getBrokerName(), NODE_TYPE_BROKER);
Set<EventType> enableEventTypeSet = EventTrackerUtil.decodeEventList(this.brokerConfig.getEnabledTrackerList());
if (EventTrackerUtil.checkEvent(EventType.CONSUMER_GROUP_CLIENT_EVENT, enableEventTypeSet)) {
consumerManager.appendConsumerIdsChangeListener(new ConsumerEventTrackListener());
}
if (EventTrackerUtil.checkEvent(EventType.CONSUMER_GROUP_EVENT, enableEventTypeSet)) {
subscriptionGroupManager.appendConsumerGroupChangeListener(new GroupConfigEventTrackListener());
}
if (EventTrackerUtil.checkEvent(EventType.PRODUCER_CLIENT_EVENT, enableEventTypeSet)) {
producerManager.appendProducerChangeListener(new ProducerEventTrackListener());
}
if (EventTrackerUtil.checkEvent(EventType.SUBSCRIPTION_EVENT, enableEventTypeSet)) {
ConsumerGroupInfo.appendSubscriptionChangeListener(new SubscriptionEventTrackListener());
}
if (EventTrackerUtil.checkEvent(EventType.TOPIC_EVENT, enableEventTypeSet)) {
topicConfigManager.appendTopicChangeListener(new TopicEventTrackListener());
}
if (EventTrackerUtil.checkEvent(EventType.STARTUP_EVENT, enableEventTypeSet)) {
EventTrackerManager.trackEvent(EventType.STARTUP_EVENT);
}
if (EventTrackerUtil.checkEvent(EventType.SHUTDOWN_EVENT, enableEventTypeSet)) {
this.setShutdownHook(controller -> EventTrackerManager.trackEvent(EventType.SHUTDOWN_EVENT));
}
}
}

public BrokerConfig getBrokerConfig() {
Expand Down Expand Up @@ -906,16 +944,16 @@ private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(
new TransactionalMessageBridge(this, this.getMessageStore()));
new TransactionalMessageBridge(this, this.getMessageStore()));
LOG.warn("Load default transaction message hook service: {}",
TransactionalMessageServiceImpl.class.getSimpleName());
TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(
AbstractTransactionalMessageCheckListener.class);
AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
LOG.warn("Load default discard message hook service: {}",
DefaultTransactionalMessageCheckListener.class.getSimpleName());
DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
Expand Down
Expand Up @@ -76,6 +76,7 @@ public static BrokerController start(BrokerController controller) {
public static void shutdown(final BrokerController controller) {
if (null != controller) {
controller.shutdown();

}
}

Expand Down
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.client;

public enum ClientOfflineType {
CHANNEL_CLOSE,
UNREGISTER,
SCAN_NOT_ACTIVE
}
Expand Up @@ -30,6 +30,10 @@ public enum ConsumerGroupEvent {
* The group of consumer is registered.
*/
REGISTER,
/**
* The group of consumer is registered for the first time.
*/
GROUP_REGISTER,
/**
* The client of this consumer is new registered.
*/
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand All @@ -34,6 +35,7 @@

public class ConsumerGroupInfo {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final List<SubscriptionChangeListener> SUBSCRIPTION_CHANGE_LISTENERS = new CopyOnWriteArrayList<>();
private final String groupName;
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -125,9 +127,9 @@ public ClientChannelInfo doChannelCloseEvent(final String remoteAddr, final Chan
/**
* Update {@link #channelInfoTable} in {@link ConsumerGroupInfo}
*
* @param infoNew Channel info of new client.
* @param consumeType consume type of new client.
* @param messageModel message consuming model (CLUSTERING/BROADCASTING) of new client.
* @param infoNew Channel info of new client.
* @param consumeType consume type of new client.
* @param messageModel message consuming model (CLUSTERING/BROADCASTING) of new client.
* @param consumeFromWhere indicate the position when the client consume message firstly.
* @return the result that if new connector is connected or not.
*/
Expand Down Expand Up @@ -179,11 +181,16 @@ public boolean updateSubscription(final Set<SubscriptionData> subList) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true;
callSubscriptionChangeListener(this.groupName, SubscriptionEvent.SUBSCRIPTION_CREATE, null, sub);
log.info("subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) {
// if update track event
if (!sub.isSame(old)) {
callSubscriptionChangeListener(this.groupName, SubscriptionEvent.SUBSCRIPTION_UPDATE, old, sub);
}
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
Expand Down Expand Up @@ -215,7 +222,7 @@ public boolean updateSubscription(final Set<SubscriptionData> subList) {
oldTopic,
next.getValue().toString()
);

callSubscriptionChangeListener(this.groupName, SubscriptionEvent.SUBSCRIPTION_REMOVE, next.getValue(), null);
it.remove();
updated = true;
}
Expand Down Expand Up @@ -269,4 +276,19 @@ public ConsumeFromWhere getConsumeFromWhere() {
public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}

public static void appendSubscriptionChangeListener(SubscriptionChangeListener listener) {
SUBSCRIPTION_CHANGE_LISTENERS.add(listener);
}

protected static void callSubscriptionChangeListener(String groupName,
SubscriptionEvent type, SubscriptionData originSubscription, SubscriptionData newSubscription) {
for (SubscriptionChangeListener listener : SUBSCRIPTION_CHANGE_LISTENERS) {
try {
listener.handle(groupName, type, originSubscription, newSubscription);
} catch (Throwable t) {
log.error("err when call subscriptionChangeListener", t);
}
}
}
}
Expand Up @@ -136,7 +136,7 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
ConsumerGroupInfo info = next.getValue();
ClientChannelInfo clientChannelInfo = info.doChannelCloseEvent(remoteAddr, channel);
if (clientChannelInfo != null) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, next.getKey(), clientChannelInfo, info.getSubscribeTopics());
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, next.getKey(), clientChannelInfo, info.getSubscribeTopics(), ClientOfflineType.CHANNEL_CLOSE);
if (info.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
if (remove != null) {
Expand Down Expand Up @@ -178,8 +178,7 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
long start = System.currentTimeMillis();
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, group, clientChannelInfo,
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
callConsumerIdsChangeListener(ConsumerGroupEvent.GROUP_REGISTER, group);
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
Expand All @@ -188,6 +187,10 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
if (r1) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, group, clientChannelInfo,
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
}
boolean r2 = false;
if (updateSubscription) {
r2 = consumerGroupInfo.updateSubscription(subList);
Expand All @@ -201,7 +204,6 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
if (null != this.brokerStatsManager) {
this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start));
}

callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList, clientChannelInfo);

return r1 || r2;
Expand All @@ -213,13 +215,12 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien
if (null != consumerGroupInfo) {
boolean removed = consumerGroupInfo.unregisterChannel(clientChannelInfo);
if (removed) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo, consumerGroupInfo.getSubscribeTopics());
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo, consumerGroupInfo.getSubscribeTopics(), ClientOfflineType.UNREGISTER);
}
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);

callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
}
}
Expand Down Expand Up @@ -270,7 +271,7 @@ public void scanNotActiveChannel() {
LOGGER.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo, consumerGroupInfo.getSubscribeTopics());
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo, consumerGroupInfo.getSubscribeTopics(), ClientOfflineType.SCAN_NOT_ACTIVE);
RemotingHelper.closeChannel(clientChannelInfo.getChannel());
itChannel.remove();
}
Expand All @@ -281,6 +282,7 @@ public void scanNotActiveChannel() {
"SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
group);
it.remove();
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
}
}
removeExpireConsumerGroupInfo();
Expand Down
Expand Up @@ -37,10 +37,10 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
private final BrokerController brokerController;
private final int cacheSize = 8096;

private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
ThreadUtils.newGenericThreadFactory("DefaultConsumerIdsChangeListener", true));

private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new ConcurrentHashMap<>(cacheSize);
private ConcurrentHashMap<String, List<Channel>> consumerChannelMap = new ConcurrentHashMap<>(cacheSize);

public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
this.brokerController = brokerController;
Expand Down Expand Up @@ -89,6 +89,7 @@ public void handle(ConsumerGroupEvent event, String group, Object... args) {
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
case GROUP_REGISTER:
case CLIENT_REGISTER:
case CLIENT_UNREGISTER:
break;
Expand Down
Expand Up @@ -23,5 +23,6 @@
*/
public interface ProducerChangeListener {

void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo);
void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo,
ClientOfflineType offlineType);
}
Expand Up @@ -24,5 +24,9 @@ public enum ProducerGroupEvent {
/**
* The client of this producer is unregistered.
*/
CLIENT_UNREGISTER
CLIENT_UNREGISTER,
/**
* The client of this producer is unregistered.
*/
CLIENT_REGISTER
}

0 comments on commit ef3ccdd

Please sign in to comment.