Skip to content

Commit

Permalink
[ISSUE apache#3290]Refactor ConsumerManager
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Feb 27, 2023
1 parent 8a014d4 commit 2792801
Showing 1 changed file with 48 additions and 159 deletions.
Expand Up @@ -26,8 +26,6 @@

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;


import com.google.common.eventbus.Subscribe;
Expand All @@ -37,20 +35,15 @@
@Slf4j
public class ConsumerManager {

private EventMeshHTTPServer eventMeshHTTPServer;
private final EventMeshHTTPServer eventMeshHTTPServer;

/**
* consumerGroup to ConsumerGroupManager.
*/
private ConcurrentHashMap<String, ConsumerGroupManager> consumerTable =
new ConcurrentHashMap<>();
private ConcurrentHashMap<String, ConsumerGroupManager> consumerTable = new ConcurrentHashMap<>();

private static final int DEFAULT_UPDATE_TIME = 3 * 30 * 1000;

private ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();

public ConsumerManager(EventMeshHTTPServer eventMeshHTTPServer) {
public ConsumerManager(final EventMeshHTTPServer eventMeshHTTPServer) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
}

Expand All @@ -61,114 +54,15 @@ public void init() throws Exception {

public void start() throws Exception {
log.info("consumerManager started......");

// scheduledExecutorService.scheduleAtFixedRate(() -> {
// logger.info("clientInfo check start.....");
// synchronized (eventMeshHTTPServer.localClientInfoMapping) {
// Map<String, List<Client>> clientInfoMap =
// eventMeshHTTPServer.localClientInfoMapping;
// if (clientInfoMap.size() > 0) {
// for (String key : clientInfoMap.keySet()) {
// String consumerGroup = key.split("@")[0];
// String topic = key.split("@")[1];
// List<Client> clientList = clientInfoMap.get(key);
// Iterator<Client> clientIterator = clientList.iterator();
// boolean isChange = false;
// while (clientIterator.hasNext()) {
// Client client = clientIterator.next();
// //The time difference is greater than 3 heartbeat cycles
// if (System.currentTimeMillis() - client.lastUpTime.getTime()
// > DEFAULT_UPDATE_TIME) {
// logger.warn(
// "client {} lastUpdate time {} over three heartbeat cycles",
// JsonUtils.serialize(client), client.lastUpTime);
// clientIterator.remove();
// isChange = true;
// }
// }
// if (isChange) {
// if (clientList.size() > 0) {
// //change url
// logger.info("consumerGroup {} client info changing",
// consumerGroup);
// Map<String, List<String>> idcUrls = new HashMap<>();
// Set<String> clientUrls = new HashSet<>();
// for (Client client : clientList) {
// clientUrls.add(client.url);
// if (idcUrls.containsKey(client.idc)) {
// idcUrls.get(client.idc)
// .add(StringUtils.deleteWhitespace(client.url));
// } else {
// List<String> urls = new ArrayList<>();
// urls.add(client.url);
// idcUrls.put(client.idc, urls);
// }
// }
// synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
// ConsumerGroupConf consumerGroupConf =
// eventMeshHTTPServer.localConsumerGroupMapping
// .get(consumerGroup);
// Map<String, ConsumerGroupTopicConf> map =
// consumerGroupConf.getConsumerGroupTopicConf();
// for (String topicKey : map.keySet()) {
// if (StringUtils.equals(topic, topicKey)) {
// ConsumerGroupTopicConf latestTopicConf =
// new ConsumerGroupTopicConf();
// latestTopicConf.setConsumerGroup(consumerGroup);
// latestTopicConf.setTopic(topic);
// latestTopicConf.setSubscriptionItem(
// map.get(topicKey).getSubscriptionItem());
// latestTopicConf.setUrls(clientUrls);
//
// latestTopicConf.setIdcUrls(idcUrls);
//
// map.put(topic, latestTopicConf);
// }
// }
// eventMeshHTTPServer.localConsumerGroupMapping
// .put(consumerGroup, consumerGroupConf);
// logger.info(
// "consumerGroup {} client info changed, "
// + "consumerGroupConf {}", consumerGroup,
// JsonUtils.serialize(consumerGroupConf));
//
// try {
// notifyConsumerManager(consumerGroup, consumerGroupConf);
// } catch (Exception e) {
// logger.error("notifyConsumerManager error", e);
// }
// }
//
// } else {
// logger.info("consumerGroup {} client info removed",
// consumerGroup);
// //remove
// try {
// notifyConsumerManager(consumerGroup, null);
// } catch (Exception e) {
// logger.error("notifyConsumerManager error", e);
// }
//
// eventMeshHTTPServer.localConsumerGroupMapping.keySet()
// .removeIf(s -> StringUtils.equals(consumerGroup, s));
// }
// }
//
// }
// }
// }
// }, 10000, 10000, TimeUnit.MILLISECONDS);
//TODO: update the subscription periodically from registry
}

/**
* notify ConsumerManager groupLevel
*/
public void notifyConsumerManager(String consumerGroup,
ConsumerGroupConf latestConsumerGroupConfig)
throws Exception {
ConsumerGroupManager cgm =
eventMeshHTTPServer.getConsumerManager().getConsumer(consumerGroup);
public void notifyConsumerManager(String consumerGroup, ConsumerGroupConf latestConsumerGroupConfig) throws Exception {

ConsumerGroupManager cgm = eventMeshHTTPServer.getConsumerManager().getConsumer(consumerGroup);

if (latestConsumerGroupConfig == null) {
ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE;
Expand Down Expand Up @@ -219,10 +113,8 @@ public boolean contains(String consumerGroup) {
* @param consumerGroupConfig
* @throws Exception
*/
public synchronized void addConsumer(String consumerGroup,
ConsumerGroupConf consumerGroupConfig) throws Exception {
ConsumerGroupManager cgm =
new ConsumerGroupManager(eventMeshHTTPServer, consumerGroupConfig);
public synchronized void addConsumer(String consumerGroup, ConsumerGroupConf consumerGroupConfig) throws Exception {
ConsumerGroupManager cgm = new ConsumerGroupManager(eventMeshHTTPServer, consumerGroupConfig);
cgm.init();
cgm.start();
consumerTable.put(consumerGroup, cgm);
Expand Down Expand Up @@ -256,70 +148,67 @@ public synchronized void delConsumer(String consumerGroup) throws Exception {
log.info("start delConsumer with consumerGroup {}", consumerGroup);
if (consumerTable.containsKey(consumerGroup)) {
ConsumerGroupManager cgm = consumerTable.remove(consumerGroup);
log.info("start unsubscribe topic with consumer group manager {}",
JsonUtils.toJSONString(cgm));
log.info("start unsubscribe topic with consumer group manager {}", JsonUtils.toJSONString(cgm));
cgm.unsubscribe(consumerGroup);
cgm.shutdown();
}
log.info("end delConsumer with consumerGroup {}", consumerGroup);
}

@Subscribe
public void onChange(ConsumerGroupTopicConfChangeEvent event) {
public void handleConsumerGroupTopicConfChangeEvent(ConsumerGroupTopicConfChangeEvent event) {
try {
log.info("onChange event:{}", event);
if (event.action
== ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.NEW) {
ConsumerGroupManager manager = getConsumer(event.consumerGroup);
if (Objects.isNull(manager)) {
return;
switch (event.action) {
case NEW: {
ConsumerGroupManager manager = getConsumer(event.consumerGroup);
if (Objects.isNull(manager)) {
return;
}
manager.getConsumerGroupConfig().getConsumerGroupTopicConf().put(event.topic, event.newTopicConf);
break;
}
manager.getConsumerGroupConfig().getConsumerGroupTopicConf()
.put(event.topic, event.newTopicConf);
return;
}

if (event.action
== ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.CHANGE) {
ConsumerGroupManager manager = getConsumer(event.consumerGroup);
if (Objects.isNull(manager)) {
return;
case CHANGE: {
ConsumerGroupManager manager = getConsumer(event.consumerGroup);
if (Objects.isNull(manager)) {
return;
}
manager.getConsumerGroupConfig().getConsumerGroupTopicConf().replace(event.topic, event.newTopicConf);
break;
}
manager.getConsumerGroupConfig().getConsumerGroupTopicConf()
.replace(event.topic, event.newTopicConf);
return;
}

if (event.action
== ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.DELETE) {
ConsumerGroupManager manager = getConsumer(event.consumerGroup);
if (Objects.isNull(manager)) {
return;
case DELETE: {
ConsumerGroupManager manager = getConsumer(event.consumerGroup);
if (Objects.isNull(manager)) {
return;
}
manager.getConsumerGroupConfig().getConsumerGroupTopicConf().remove(event.topic);
break;
}
manager.getConsumerGroupConfig().getConsumerGroupTopicConf().remove(event.topic);
default:
//do nothing
}
} catch (Exception ex) {
log.error("onChange event:{} err", event, ex);
}
}

@Subscribe
public void onChange(ConsumerGroupStateEvent event) {
public void handleConsumerGroupStateEvent(ConsumerGroupStateEvent event) {
try {
log.info("onChange event:{}", event);
if (event.action == ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW) {
addConsumer(event.consumerGroup, event.consumerGroupConfig);
return;
}

if (event.action == ConsumerGroupStateEvent.ConsumerGroupStateAction.CHANGE) {
restartConsumer(event.consumerGroup, event.consumerGroupConfig);
return;
}

if (event.action == ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE) {
delConsumer(event.consumerGroup);
return;
switch (event.action) {
case NEW:
addConsumer(event.consumerGroup, event.consumerGroupConfig);
break;
case CHANGE:
restartConsumer(event.consumerGroup, event.consumerGroupConfig);
break;
case DELETE:
delConsumer(event.consumerGroup);
break;
default:
//do nothing
}
} catch (Exception ex) {
log.error("onChange event:{} err", event, ex);
Expand Down

0 comments on commit 2792801

Please sign in to comment.